提交 4cb09d06 authored 作者: blu's avatar blu

refactor of delta config

上级 99492efd
...@@ -43,7 +43,7 @@ private: ...@@ -43,7 +43,7 @@ private:
AVFormatContext *pAVFormatInput = nullptr; AVFormatContext *pAVFormatInput = nullptr;
time_t tsLastBoot, tsUpdateTime; time_t tsLastBoot, tsUpdateTime;
json config; json config;
thread thMsgHandler; thread thCloudMsgHandler, thEdgeMsgHandler;
string drport = "5549"; string drport = "5549";
condition_variable cvMsg; condition_variable cvMsg;
mutex mutMsg; mutex mutMsg;
...@@ -157,7 +157,7 @@ private: ...@@ -157,7 +157,7 @@ private:
return ret; return ret;
} }
int handleMsg(vector<vector<uint8_t> > v) int handleCloudMsg(vector<vector<uint8_t> > v)
{ {
int ret = 0; int ret = 0;
string peerId, metaType, metaValue, msg; string peerId, metaType, metaValue, msg;
...@@ -177,14 +177,51 @@ private: ...@@ -177,14 +177,51 @@ private:
} }
// msg from cluster mgr // msg from cluster mgr
string clusterMgrId = mgrSn + ":evmgr:0"; string daemonId = this->devSn + ":evdaemon:0";
if(peerId == clusterMgrId) { if(peerId == daemonId) {
if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) { if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) {
spdlog::info("evpusher {} received {} cmd from cluster mgr {}", selfId, metaValue, clusterMgrId); spdlog::info("evpusher {} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
bProcessed = true; bProcessed = true;
exit(0); exit(0);
} }
} }
}
catch(exception &e) {
spdlog::error("evpusher {} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evpusher {} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
}
int handleEdgeMsg(vector<vector<uint8_t> > v)
{
int ret = 0;
string peerId, metaType, metaValue, msg;
json data;
for(auto &b:v) {
msg +=body2str(b) + ";";
}
bool bProcessed = false;
if(v.size() == 3) {
try {
peerId = body2str(v[0]);
json meta = json::parse(body2str(v[1]));
metaType = meta["type"];
if(meta.count("value") != 0) {
metaValue = meta["value"];
}
// msg from cluster mgr
string clusterMgrId = this->mgrSn + ":evmgr:0";
if(peerId == clusterMgrId) {
//
}
else if(peerId == pullerGid) { else if(peerId == pullerGid) {
if(metaType == EV_MSG_META_AVFORMATCTX) { if(metaType == EV_MSG_META_AVFORMATCTX) {
lock_guard<mutex> lock(this->mutMsg); lock_guard<mutex> lock(this->mutMsg);
...@@ -452,10 +489,33 @@ public: ...@@ -452,10 +489,33 @@ public:
init(); init();
thMsgHandler = thread( thCloudMsgHandler = thread([this]{
while(true) {
); auto body = z_recv_multiple(pDaemon,false);
thMsgHandler.detach(); if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// full proto msg received.
this->handleCloudMsg(body);
}
});
thCloudMsgHandler.detach();
thEdgeMsgHandler = thread([this]{
while(true) {
auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// full proto msg received.
this->handleEdgeMsg(body);
}
});
thEdgeMsgHandler.detach();
getInputFormat(); getInputFormat();
setupStream(); setupStream();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论