提交 883ef81f authored 作者: blu's avatar blu

big refacting of communitation architect

上级 c5f22628
...@@ -35,8 +35,10 @@ class EvDaemon{ ...@@ -35,8 +35,10 @@ class EvDaemon{
int portRouter = 5549; int portRouter = 5549;
thread::id thIdMain; thread::id thIdMain;
thread thRouter; thread thRouter;
json peerStatus; json peerData;
json peerConfig; // peerData["status"];
// peerData["pids"];
// peerData["config"];
unordered_map<string, queue<vector<vector<uint8_t> >> > cachedMsg; unordered_map<string, queue<vector<vector<uint8_t> >> > cachedMsg;
mutex cacheLock; mutex cacheLock;
queue<string> eventQue; queue<string> eventQue;
...@@ -72,13 +74,14 @@ class EvDaemon{ ...@@ -72,13 +74,14 @@ class EvDaemon{
spdlog::info("evdaemon {} got cloud config:\n{}", devSn, jret.dump(4)); spdlog::info("evdaemon {} got cloud config:\n{}", devSn, jret.dump(4));
json &data = jret["data"]; json &data = jret["data"];
string peerId;
for(auto &[k,v]:data.items()) { for(auto &[k,v]:data.items()) {
if(k == this->devSn) { if(k == this->devSn) {
// startup evmgr // startup evmgr
string peerId = v["sn"].get<string>() + ":evmgr:0"; peerId = v["sn"].get<string>() + ":evmgr:0";
// offline // offline
this->peerStatus[peerId] = 0; this->peerData["status"][peerId] = 0;
this->peerConfig[peerId] = v; this->peerData["config"][peerId] = v;
pid_t pid; pid_t pid;
if( (pid = fork()) == -1 ) { if( (pid = fork()) == -1 ) {
spdlog::error("evdamon {} failed to fork subsytem - evmgr", this->devSn); spdlog::error("evdamon {} failed to fork subsytem - evmgr", this->devSn);
...@@ -94,6 +97,7 @@ class EvDaemon{ ...@@ -94,6 +97,7 @@ class EvDaemon{
spdlog::error("evdaemon {} failed to startup evmgr", this->devSn); spdlog::error("evdaemon {} failed to startup evmgr", this->devSn);
}else{ }else{
// parent // parent
this->peerData["pids"][peerId] = pid;
spdlog::info("evdaemon {} created evmgr", this->devSn); spdlog::info("evdaemon {} created evmgr", this->devSn);
} }
} }
...@@ -153,17 +157,17 @@ class EvDaemon{ ...@@ -153,17 +157,17 @@ class EvDaemon{
return -1; return -1;
} }
if(peerStatus.count(selfId) == 0) { if(peerData["status"].count(selfId) == 0) {
spdlog::warn("evdaemon {} unkown module with id: {}, peerStats {}", devSn, selfId, peerStatus.dump(4)); spdlog::warn("evdaemon {} unkown module with id: {}, peerStats {}", devSn, selfId, peerData["status"].dump(4));
return -1; return -1;
} }
if(peerStatus[selfId] == 0) { if(peerData["status"][selfId] == 0) {
peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evdaemon {} peer connected: {}", devSn, selfId); spdlog::info("evdaemon {} peer connected: {}", devSn, selfId);
eventConn = true; eventConn = true;
spdlog::debug("evdaemon {} update status of {} to 1 and send config", devSn, selfId); spdlog::debug("evdaemon {} update status of {} to 1 and send config", devSn, selfId);
string cfg = peerConfig[selfId].dump(); string cfg = peerData["config"][selfId].dump();
json j; json j;
j["type"] = EV_MSG_META_CONFIG; j["type"] = EV_MSG_META_CONFIG;
string meta = j.dump(); string meta = j.dump();
...@@ -171,7 +175,7 @@ class EvDaemon{ ...@@ -171,7 +175,7 @@ class EvDaemon{
z_send_multiple(pRouter, v); z_send_multiple(pRouter, v);
} }
else { else {
peerStatus[selfId] = 0; peerData["status"][selfId] = 0;
spdlog::warn("evdaemon {} peer disconnected: {}", devSn, selfId); spdlog::warn("evdaemon {} peer disconnected: {}", devSn, selfId);
} }
...@@ -209,14 +213,14 @@ class EvDaemon{ ...@@ -209,14 +213,14 @@ class EvDaemon{
selfId = body2str(body[0]); selfId = body2str(body[0]);
peerId = body2str(body[1]); peerId = body2str(body[1]);
// update status; // update status;
this->peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); this->peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// msg to peer // msg to peer
if(memcmp((void*)(body[1].data()), (devSn +":0:0").data(), body[1].size()) != 0) { if(memcmp((void*)(body[1].data()), (devSn +":0:0").data(), body[1].size()) != 0) {
// message to other peer // message to other peer
// check peer status // check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]}; vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerStatus.count(peerId)!= 0 && peerStatus[peerId] != 0) { if(peerData["status"].count(peerId)!= 0 && peerData["status"][peerId] != 0) {
spdlog::info("evdaemon {} route msg from {} to {}", devSn, selfId, peerId); spdlog::info("evdaemon {} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v); ret = z_send_multiple(pRouter, v);
if(ret < 0) { if(ret < 0) {
...@@ -378,6 +382,11 @@ class EvDaemon{ ...@@ -378,6 +382,11 @@ class EvDaemon{
} }
}); });
thRouter.detach(); thRouter.detach();
//
/// peerId -> value
peerData["status"] = json();
peerData["pids"] = json();
peerData["config"] = json();
}; };
~EvDaemon(){}; ~EvDaemon(){};
}; };
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论