提交 f10da790 authored 作者: blu's avatar blu

big refacting of communitation architect

上级 da155b43
...@@ -165,7 +165,8 @@ class EvDaemon{ ...@@ -165,7 +165,8 @@ class EvDaemon{
void setupSubSystems() { void setupSubSystems() {
thMon = thread([this](){ thMon = thread([this](){
while(true) { while(true) {
if(bReload) { if(this->bReload) {
cleanupSubSystems();
int ret = reloadCfg(); int ret = reloadCfg();
if(ret != 0) { if(ret != 0) {
cleanupSubSystems(); cleanupSubSystems();
...@@ -181,6 +182,46 @@ class EvDaemon{ ...@@ -181,6 +182,46 @@ class EvDaemon{
thMon.detach(); thMon.detach();
} }
int startSubModule(string peerId){
int ret = 0;
if(peerData["status"].count(peerId) == 0 || peerData["status"][peerId] == 0) {
//
}else if(peerData["pids"].count(peerId) != 0){
kill(peerData["pids"][peerId], SIGTERM);
peerData["pids"].erase(peerId);
peerData["status"][peerId] = 0;
}
json jret = cloudutils::reqConfig(this->info);
// apply config
try{
if(jret["code"] != 0) {
spdlog::error("evdaemon {} request cloud configration error: {}", this->devSn, jret["msg"].get<string>());
return 2;
}
json *cfg = cfgutils::findModuleConfig(peerId, jret["data"]);
if(cfg == NULL) {
spdlog::error("evdaemon failed to find module {} in config {}", peerId, jret["data"].dump());
return 1;
}
peerData["config"][peerId] = *cfg;
peerData["status"][peerId] = 0;
pid_t pid;
ret = zmqhelper::forkSubsystem(devSn, peerId, portRouter, pid);
if(ret != 0) {
spdlog::error("evdaemon {} failed to fork subsystem: {}", devSn, peerId);
// TODO: clean up and reload config
return -3;
}
this->peerData["pids"][peerId] = pid;
}catch(exception &e) {
spdlog::error("evdaemon {} exception : {}", devSn, e.what());
return -1;
}
return 0;
}
int handleMsg(vector<vector<uint8_t> > &body) int handleMsg(vector<vector<uint8_t> > &body)
{ {
...@@ -217,7 +258,11 @@ class EvDaemon{ ...@@ -217,7 +258,11 @@ class EvDaemon{
} }
else { else {
peerData["status"][selfId] = 0; peerData["status"][selfId] = 0;
peerData["pids"].erase(selfId);
peerData["config"].erase(selfId);
spdlog::warn("evdaemon {} peer disconnected: {}", devSn, selfId); spdlog::warn("evdaemon {} peer disconnected: {}", devSn, selfId);
// restart this module
startSubModule(selfId);
} }
if(ret < 0) { if(ret < 0) {
...@@ -377,6 +422,14 @@ class EvDaemon{ ...@@ -377,6 +422,14 @@ class EvDaemon{
res.set_content(ret.dump(), "text/json"); res.set_content(ret.dump(), "text/json");
}); });
svr.Get("/sync-cloud", [this](const Request& req, Response& res){
json ret;
ret["code"] = 0;
ret["msg"] = "syncing ...";
res.set_content(ret.dump(), "text/json");
this->bReload = true;
});
svr.Get("/reset", [](const Request& req, Response& res){ svr.Get("/reset", [](const Request& req, Response& res){
json ret; json ret;
ret["code"] = 0; ret["code"] = 0;
......
...@@ -93,7 +93,7 @@ private: ...@@ -93,7 +93,7 @@ private:
// TODO: req config // TODO: req config
bool found = false; bool found = false;
try { try {
spdlog::info("evmlmotion {} config: {}", devSn, config.dump(4)); spdlog::info("evmlmotion {} config: {}", devSn, config.dump());
json evmlmotion; json evmlmotion;
json &evmgr = this->config; json &evmgr = this->config;
json ipc; json ipc;
...@@ -585,6 +585,10 @@ protected: ...@@ -585,6 +585,10 @@ protected:
bStopSig = true; bStopSig = true;
break; break;
} }
if(1 == getppid()) {
spdlog::error("evmlmotion {} exit since evdaemon is dead", selfId);
exit(1);
}
// business logic // business logic
int ret =zmq_msg_init(&msg); int ret =zmq_msg_init(&msg);
......
...@@ -305,6 +305,12 @@ protected: ...@@ -305,6 +305,12 @@ protected:
bStopSig = true; bStopSig = true;
break; break;
} }
if(1 == getppid()) {
spdlog::error("evpuller {} exit since evdaemon is dead", selfId);
exit(1);
}
AVStream *in_stream; AVStream *in_stream;
AVPacket packet; AVPacket packet;
zmq_msg_t msg; zmq_msg_t msg;
......
...@@ -320,6 +320,12 @@ protected: ...@@ -320,6 +320,12 @@ protected:
bStopSig = true; bStopSig = true;
break; break;
} }
if(1 == getppid()) {
spdlog::error("evpusher {} exit since evdaemon is dead", selfId);
exit(1);
}
ret =zmq_msg_init(&msg); ret =zmq_msg_init(&msg);
if(ret != 0) { if(ret != 0) {
spdlog::error("failed to init zmq msg"); spdlog::error("failed to init zmq msg");
......
...@@ -361,6 +361,12 @@ protected: ...@@ -361,6 +361,12 @@ protected:
bStopSig = true; bStopSig = true;
break; break;
} }
if(1 == getppid()) {
spdlog::error("evmgr {} exit since evdaemon is dead", selfId);
exit(1);
}
// business logic // business logic
int ret =zmq_msg_init(&msg); int ret =zmq_msg_init(&msg);
ret = zmq_recvmsg(pSub, &msg, 0); ret = zmq_recvmsg(pSub, &msg, 0);
......
...@@ -143,7 +143,63 @@ namespace cfgutils { ...@@ -143,7 +143,63 @@ namespace cfgutils {
return 0; return 0;
} }
}
json *findModuleConfig(string peerId, json &data) {
json *ret = NULL;
auto pp = strutils::split(peerId, ':');
if(pp.size() != 3) {
spdlog::error("invalid peerId: {}", peerId);
return ret;
}
string sn = pp[0];
string modName = pp[1];
string iid = pp[2];
//
string subMn = modName.substr(0,4);
if(subMn == "evml") {
subMn = modName.substr(4, modName.size());
}else{
subMn = "";
}
for(auto &[k,v]: data.items()) {
// it's evmgr
if(modName == "evmgr") {
if(k == sn) {
ret = &v;
break;
}
}else{
json &ipcs = v["ipcs"];
for(auto &ipc: ipcs) {
json &modules = ipc["modules"];
for(auto &[mn, ml]: modules.items()) {
for(auto &m: ml) {
if(subMn.empty()){
if(mn == modName && m["sn"] == sn && m["iid"] == iid && m["enabled"] != 0) {
ret = &v;
break;
}
}else{
if(subMn == m["type"] && m[iid] == iid && m["sn"] == sn && m["enabled"] != 0) {
ret = &v;
break;
}
}
}
if(ret != NULL) break;
}
if(ret != NULL) break;
}
}
}
return ret;
}
} // cfgutils
struct StrException : public std::exception struct StrException : public std::exception
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论