提交 6634040b authored 作者: blu's avatar blu

big refacting of communitation architect

上级 10860e6a
...@@ -55,6 +55,7 @@ private: ...@@ -55,6 +55,7 @@ private:
queue<string> eventQue; queue<string> eventQue;
mutex eventQLock; mutex eventQLock;
mutex cfgLock; mutex cfgLock;
mutex mutSubsystem;
/// module gid to process id /// module gid to process id
json mapModsToPids; json mapModsToPids;
...@@ -100,9 +101,12 @@ private: ...@@ -100,9 +101,12 @@ private:
if(k == this->devSn) { if(k == this->devSn) {
// startup evmgr // startup evmgr
peerId = v["sn"].get<string>() + ":evmgr:0"; peerId = v["sn"].get<string>() + ":evmgr:0";
// offline
this->peerData["config"][peerId] = v; this->peerData["config"][peerId] = v;
this->peerData["status"][peerId] = 0; if(this->peerData["status"].count(peerId) == 0) {
this->peerData["status"][peerId] = -1; // unkown
}else{
// nop
}
} }
// startup other submodules // startup other submodules
...@@ -113,7 +117,7 @@ private: ...@@ -113,7 +117,7 @@ private:
for(auto &m : ml) { for(auto &m : ml) {
if(m["sn"] != this->devSn) { if(m["sn"] != this->devSn) {
continue; continue;
} }
string peerName; string peerName;
ret = cfgutils::getPeerId(mn, m, peerId, peerName); ret = cfgutils::getPeerId(mn, m, peerId, peerName);
...@@ -129,7 +133,11 @@ private: ...@@ -129,7 +133,11 @@ private:
} }
this->peerData["config"][peerId] = v; this->peerData["config"][peerId] = v;
this->peerData["status"][peerId] = 0; if(this->peerData["status"].count(peerId) == 0) {
this->peerData["status"][peerId] = -1; // unkown
}else{
// nop
}
} }
} }
} }
...@@ -165,17 +173,27 @@ private: ...@@ -165,17 +173,27 @@ private:
int startSubSystems() int startSubSystems()
{ {
// check status and startup // check status and startup
std::lock_guard<std::mutex> lock(mutSubsystem);
int ret = 0; int ret = 0;
vector<string> tmp; vector<string> tmp;
json unkown;
vector<string> terms;
string info; string info;
int cnt = 0; int cnt = 0;
for(auto &[k,v]: this->peerData["config"].items()) { for(auto &[k,v]: this->peerData["config"].items()) {
if(this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0 && (this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0)) { if(this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0) {
tmp.push_back(k); if((this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0)){
info += (cnt == 0? "" : string(", ")) + k; tmp.push_back(k);
info += (cnt == 0? "" : string(", ")) + k;
}else if(this->peerData["status"][k] == -1) {
unkown[k] = -1;
}
}else{
terms.push_back(k);
} }
cnt++; cnt++;
} }
spdlog::info("evdaemon {} will start following subsystems: {}", devSn, info); spdlog::info("evdaemon {} will start following subsystems: {}", devSn, info);
// //
for(string &e : tmp) { for(string &e : tmp) {
...@@ -191,6 +209,36 @@ private: ...@@ -191,6 +209,36 @@ private:
} }
} }
for(string &e: terms) {
if(this->peerData["pids"].count(e) != 0) {
kill(this->peerData["pids"][e], SIGTERM);
}
}
while(unkown.size() != 0) {
this_thread::sleep_for(chrono::seconds(3));
for(auto &[k,v]: unkown.items()) {
if(this->peerData["status"][k] != -1 && this->peerData["status"][k] != 0) {
// no need to start
unkown.erase(k);
}
}
cnt++;
}
for(auto &[k,v]: unkown.items()) {
pid_t pid = 0;
ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid);
if(0 == ret) {
this->peerData["status"][k] = 0;
this->peerData["pids"][k] = pid;
spdlog::info("evdaemon {} created subsystem {}", devSn, k);
}
else {
spdlog::info("evdaemon {} failed to create subsystem {}", devSn, k);
}
}
return ret; return ret;
} }
...@@ -227,15 +275,8 @@ private: ...@@ -227,15 +275,8 @@ private:
if(peerData["pids"].count(selfId) != 0) { if(peerData["pids"].count(selfId) != 0) {
peerData["pids"].erase(selfId); peerData["pids"].erase(selfId);
} }
spdlog::warn("evdaemon {} peer {} disconnected. reloading config", devSn, selfId); spdlog::warn("evdaemon {} peer {} disconnected. reloading config", devSn, selfId);
ret = reloadCfg();
if(ret != 0) {
spdlog::error("evdaemon {} failed to reload config. please check configuration", devSn);
return -1;
}
if(this->bBootstrap) { if(this->bBootstrap) {
startSubSystems(); startSubSystems();
} }
...@@ -278,7 +319,7 @@ private: ...@@ -278,7 +319,7 @@ private:
// message to other peer // message to other peer
// check peer status // check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]}; vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerData["status"].count(peerId)!= 0 && peerData["status"][peerId] != 0) { if(peerData["status"].count(peerId) != 0 && peerData["status"][peerId] != 0) {
spdlog::info("evdaemon {} route msg from {} to {}", devSn, selfId, peerId); spdlog::info("evdaemon {} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v); ret = z_send_multiple(pRouter, v);
if(ret < 0) { if(ret < 0) {
...@@ -389,6 +430,7 @@ private: ...@@ -389,6 +430,7 @@ private:
} }
if(bBootstrap) { if(bBootstrap) {
// TODO: wait for previous started modules to connecting
startSubSystems(); startSubSystems();
}else{ }else{
spdlog::info("evdaemon {} skip startup subsystems since BOOTSTRAP is set to false", devSn); spdlog::info("evdaemon {} skip startup subsystems since BOOTSTRAP is set to false", devSn);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论