提交 99492efd authored 作者: blu's avatar blu

refactor of delta config

上级 f981a53a
...@@ -43,8 +43,11 @@ private: ...@@ -43,8 +43,11 @@ private:
AVFormatContext *pAVFormatInput = nullptr; AVFormatContext *pAVFormatInput = nullptr;
time_t tsLastBoot, tsUpdateTime; time_t tsLastBoot, tsUpdateTime;
json config; json config;
thread thPing; thread thMsgHandler;
string drport = "5549"; string drport = "5549";
condition_variable cvMsg;
mutex mutMsg;
bool gotFormat = false;
int init() int init()
{ {
...@@ -64,7 +67,7 @@ private: ...@@ -64,7 +67,7 @@ private:
break; break;
} }
} }
if(evpusher.size() != 0) { if(evpusher.size() != 0) {
ipc = j; ipc = j;
break; break;
...@@ -154,6 +157,57 @@ private: ...@@ -154,6 +157,57 @@ private:
return ret; return ret;
} }
int handleMsg(vector<vector<uint8_t> > v)
{
int ret = 0;
string peerId, metaType, metaValue, msg;
json data;
for(auto &b:v) {
msg +=body2str(b) + ";";
}
bool bProcessed = false;
if(v.size() == 3) {
try {
peerId = body2str(v[0]);
json meta = json::parse(body2str(v[1]));
metaType = meta["type"];
if(meta.count("value") != 0) {
metaValue = meta["value"];
}
// msg from cluster mgr
string clusterMgrId = mgrSn + ":evmgr:0";
if(peerId == clusterMgrId) {
if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) {
spdlog::info("evpusher {} received {} cmd from cluster mgr {}", selfId, metaValue, clusterMgrId);
bProcessed = true;
exit(0);
}
}
else if(peerId == pullerGid) {
if(metaType == EV_MSG_META_AVFORMATCTX) {
lock_guard<mutex> lock(this->mutMsg);
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
bProcessed = true;
cvMsg.notify_one();
}
}
}
catch(exception &e) {
spdlog::error("evpusher {} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evpusher {} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
}
int getInputFormat() int getInputFormat()
{ {
int ret = 0; int ret = 0;
...@@ -166,48 +220,17 @@ private: ...@@ -166,48 +220,17 @@ private:
meta["type"] = EV_MSG_META_AVFORMATCTX; meta["type"] = EV_MSG_META_AVFORMATCTX;
body.push_back(str2body(meta.dump())); body.push_back(str2body(meta.dump()));
body.push_back(str2body(MSG_HELLO)); body.push_back(str2body(MSG_HELLO));
bool gotFormat = false; this->gotFormat = false;
uint64_t failedCnt = 0; uint64_t failedCnt = 0;
while(!gotFormat) { // TODO: notification
ret = z_send_multiple(pDealer, body);
if(ret < 0) { ret = z_send_multiple(pDealer, body);
spdlog::error("evpusher {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno())); if(ret < 0) {
continue; spdlog::error("evpusher {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno()));
}
// expect response with avformatctx
auto v = z_recv_multiple(pDealer);
if(v.size() != 3) {
ret = zmq_errno();
if(ret != 0) {
if(failedCnt % 100 == 0) {
spdlog::error("evpusher {} {}, error receive avformatctx: {}, {}", devSn, iid, v.size(), zmq_strerror(ret));
spdlog::info("evpusher {} {} retry connect to peers", devSn, iid);
}
this_thread::sleep_for(chrono::seconds(5));
failedCnt++;
}
else {
spdlog::error("evpusher {} {}, received bad size zmq msg for avformatctx: {}", devSn, iid, v.size());
}
}
else if(body2str(v[0]) != pullerGid) {
spdlog::error("evpusher {} {}, invalid sender for avformatctx: {}, should be: {}", devSn, iid, body2str(v[0]), pullerGid);
}
else {
try {
auto cmd = json::parse(body2str(v[1]));
if(cmd["type"].get<string>() == EV_MSG_META_AVFORMATCTX) {
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
}
}
catch(exception &e) {
spdlog::error("evpusher {} {}, exception in parsing avformatctx packet: {}", devSn, iid, e.what());
}
}
} }
unique_lock<mutex> lk(this->mutMsg);
this->cvMsg.wait(lk, [this] {return this->gotFormat;});
return ret; return ret;
} }
...@@ -428,6 +451,12 @@ public: ...@@ -428,6 +451,12 @@ public:
} }
init(); init();
thMsgHandler = thread(
);
thMsgHandler.detach();
getInputFormat(); getInputFormat();
setupStream(); setupStream();
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论