提交 b25a9252 authored 作者: blu's avatar blu

refactor of delta config

上级 7a0233f4
...@@ -303,7 +303,7 @@ private: ...@@ -303,7 +303,7 @@ private:
} }
else { else {
peerData["status"][selfId] = 0; peerData["status"][selfId] = 0;
spdlog::warn("evcloudsvc {} peer disconnected: {}", devSn, selfId); spdlog::warn("{} peer disconnected: {}", devSn, selfId);
} }
return ret; return ret;
} }
......
...@@ -77,11 +77,103 @@ private: ...@@ -77,11 +77,103 @@ private:
int streamIdx = -1; int streamIdx = -1;
time_t tsLastBoot, tsUpdateTime; time_t tsLastBoot, tsUpdateTime;
json config; json config;
thread thPing; thread thEdgeMsgHandler, thCloudMsgHandler;
thread thEvent; thread thEvent;
string drport = "5549"; string drport = "5549";
condition_variable cvMsg;
mutex mutMsg;
bool gotFormat = false;
// //
int handleCloudMsg(vector<vector<uint8_t> > v)
{
int ret = 0;
string peerId, metaType, metaValue, msg;
json data;
for(auto &b:v) {
msg +=body2str(b) + ";";
}
bool bProcessed = false;
if(v.size() == 3) {
try {
peerId = body2str(v[0]);
json meta = json::parse(body2str(v[1]));
metaType = meta["type"];
if(meta.count("value") != 0) {
metaValue = meta["value"];
}
// msg from cluster mgr
string daemonId = this->devSn + ":evdaemon:0";
if(peerId == daemonId) {
if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) {
spdlog::info("evmlmotion {} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
bProcessed = true;
exit(0);
}
}
}
catch(exception &e) {
spdlog::error("evmlmotion {} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evmlmotion {} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
}
int handleEdgeMsg(vector<vector<uint8_t> > v)
{
int ret = 0;
string peerId, metaType, metaValue, msg;
json data;
for(auto &b:v) {
msg +=body2str(b) + ";";
}
bool bProcessed = false;
if(v.size() == 3) {
try {
peerId = body2str(v[0]);
json meta = json::parse(body2str(v[1]));
metaType = meta["type"];
if(meta.count("value") != 0) {
metaValue = meta["value"];
}
// msg from cluster mgr
string clusterMgrId = this->mgrSn + ":evmgr:0";
if(peerId == clusterMgrId) {
//
}
else if(peerId == pullerGid) {
if(metaType == EV_MSG_META_AVFORMATCTX) {
lock_guard<mutex> lock(this->mutMsg);
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
bProcessed = true;
cvMsg.notify_one();
}
}
}
catch(exception &e) {
spdlog::error("evmlmotion {} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evmlmotion {} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
}
int init() int init()
{ {
int ret = 0; int ret = 0;
...@@ -259,48 +351,14 @@ private: ...@@ -259,48 +351,14 @@ private:
meta["type"] = EV_MSG_META_AVFORMATCTX; meta["type"] = EV_MSG_META_AVFORMATCTX;
body.push_back(str2body(meta.dump())); body.push_back(str2body(meta.dump()));
body.push_back(str2body(MSG_HELLO)); body.push_back(str2body(MSG_HELLO));
bool gotFormat = false; gotFormat = false;
uint64_t failedCnt = 0; ret = z_send_multiple(pDealer, body);
while(!gotFormat) { if(ret < 0) {
ret = z_send_multiple(pDealer, body); spdlog::error("evpusher {} {}, failed to send hello to puller: {}. exiting...", devSn, iid, zmq_strerror(zmq_errno()));
if(ret < 0) { exit(1);
spdlog::error("evmlmotion {}, failed to send hello to puller: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// expect response with avformatctx
auto v = z_recv_multiple(pDealer);
if(v.size() != 3) {
ret = zmq_errno();
if(ret != 0) {
if(failedCnt % 100 == 0) {
spdlog::error("evmlmotion {}, error receive avformatctx: {}, {}", selfId, v.size(), zmq_strerror(ret));
spdlog::info("evmlmotion {} retry connect to peers", selfId);
}
this_thread::sleep_for(chrono::seconds(5));
failedCnt++;
}
else {
spdlog::error("evmlmotion {}, received bad size zmq msg for avformatctx: {}", selfId, v.size());
}
}
else if(body2str(v[0]) != pullerGid) {
spdlog::error("evmlmotion {}, invalid sender for avformatctx: {}, should be: {}", selfId, body2str(v[0]), pullerGid);
}
else {
try {
auto cmd = json::parse(body2str(v[1]));
if(cmd["type"].get<string>() == EV_MSG_META_AVFORMATCTX) {
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
}
}
catch(exception &e) {
spdlog::error("evmlmotion {}, exception in parsing avformatctx packet: {}", selfId, e.what());
}
}
} }
unique_lock<mutex> lk(this->mutMsg);
this->cvMsg.wait(lk, [this] {return this->gotFormat;});
return ret; return ret;
} }
...@@ -715,6 +773,33 @@ public: ...@@ -715,6 +773,33 @@ public:
} }
init(); init();
thCloudMsgHandler = thread([this]{
while(true) {
auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// full proto msg received.
this->handleCloudMsg(body);
}
});
thCloudMsgHandler.detach();
thEdgeMsgHandler = thread([this]{
while(true) {
auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// full proto msg received.
this->handleEdgeMsg(body);
}
});
thEdgeMsgHandler.detach();
getInputFormat(); getInputFormat();
setupStream(); setupStream();
}; };
......
...@@ -258,12 +258,11 @@ private: ...@@ -258,12 +258,11 @@ private:
body.push_back(str2body(meta.dump())); body.push_back(str2body(meta.dump()));
body.push_back(str2body(MSG_HELLO)); body.push_back(str2body(MSG_HELLO));
this->gotFormat = false; this->gotFormat = false;
uint64_t failedCnt = 0;
// TODO: notification
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno())); spdlog::error("evpusher {} {}, failed to send hello to puller: {}. exiting...", devSn, iid, zmq_strerror(zmq_errno()));
exit(1);
} }
unique_lock<mutex> lk(this->mutMsg); unique_lock<mutex> lk(this->mutMsg);
this->cvMsg.wait(lk, [this] {return this->gotFormat;}); this->cvMsg.wait(lk, [this] {return this->gotFormat;});
...@@ -497,7 +496,6 @@ public: ...@@ -497,7 +496,6 @@ public:
continue; continue;
} }
// full proto msg received. // full proto msg received.
spdlog::info("evpusher {} received cloud msg: {}", this->selfId, "aaa");
this->handleCloudMsg(body); this->handleCloudMsg(body);
} }
}); });
......
...@@ -55,9 +55,11 @@ private: ...@@ -55,9 +55,11 @@ private:
int *streamList = nullptr; int *streamList = nullptr;
time_t tsLastBoot, tsUpdateTime; time_t tsLastBoot, tsUpdateTime;
json config; json config;
thread thMsgProcessor, thSliceMgr; thread thEdgeMsgHandler, thCloudMsgHandler, thSliceMgr;
string drport = "5549"; string drport = "5549";
json slices; json slices;
condition_variable cvMsg;
mutex mutMsg;
bool gotFormat = false; bool gotFormat = false;
vector<long> vTsOld; vector<long> vTsOld;
mutex mutTsOld; mutex mutTsOld;
...@@ -73,7 +75,7 @@ private: ...@@ -73,7 +75,7 @@ private:
{ {
return true; return true;
} }
int handleMsg(vector<vector<uint8_t> > v) int handleEdgeMsg(vector<vector<uint8_t> > v)
{ {
int ret = 0; int ret = 0;
string peerId, meta; string peerId, meta;
...@@ -88,9 +90,11 @@ private: ...@@ -88,9 +90,11 @@ private:
peerId = body2str(v[0]); peerId = body2str(v[0]);
meta = json::parse(body2str(v[1]))["type"]; meta = json::parse(body2str(v[1]))["type"];
if(meta == EV_MSG_META_AVFORMATCTX) { if(meta == EV_MSG_META_AVFORMATCTX) {
lock_guard<mutex> lock(this->mutMsg);
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext)); pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput); AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true; gotFormat = true;
cvMsg.notify_one();
spdlog::info("evslicer {} got avformat from {}", selfId, peerId); spdlog::info("evslicer {} got avformat from {}", selfId, peerId);
} }
else if(meta == EV_MSG_META_EVENT) { else if(meta == EV_MSG_META_EVENT) {
...@@ -133,6 +137,47 @@ private: ...@@ -133,6 +137,47 @@ private:
return ret; return ret;
} }
int handleCloudMsg(vector<vector<uint8_t> > v)
{
int ret = 0;
string peerId, metaType, metaValue, msg;
json data;
for(auto &b:v) {
msg +=body2str(b) + ";";
}
bool bProcessed = false;
if(v.size() == 3) {
try {
peerId = body2str(v[0]);
json meta = json::parse(body2str(v[1]));
metaType = meta["type"];
if(meta.count("value") != 0) {
metaValue = meta["value"];
}
// msg from cluster mgr
string daemonId = this->devSn + ":evdaemon:0";
if(peerId == daemonId) {
if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) {
spdlog::info("evslicer {} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
bProcessed = true;
exit(0);
}
}
}
catch(exception &e) {
spdlog::error("evslicer {} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evslicer {} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
}
int init() int init()
{ {
int ret = 0; int ret = 0;
...@@ -242,7 +287,7 @@ private: ...@@ -242,7 +287,7 @@ private:
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size()); ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size()); ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {} failed setsockopts router: {}", selfId, urlRouter); spdlog::error("evslicer {} {} failed setsockopts router: {}", selfId, urlRouter);
exit(1); exit(1);
} }
if(ret < 0) { if(ret < 0) {
...@@ -296,16 +341,16 @@ private: ...@@ -296,16 +341,16 @@ private:
body.push_back(str2body(meta.dump())); body.push_back(str2body(meta.dump()));
body.push_back(str2body(MSG_HELLO)); body.push_back(str2body(MSG_HELLO));
uint64_t failedCnt = 0; this->gotFormat = false;
// TODO: change to notification style
while(!gotFormat) { ret = z_send_multiple(pDealer, body);
ret = z_send_multiple(pDealer, body); if(ret < 0) {
if(ret < 0) { spdlog::error("evslicer {}, failed to send hello to puller: {}. exiting ...", selfId, zmq_strerror(zmq_errno()));
spdlog::error("evslicer {}, failed to send hello to puller: {}", selfId, zmq_strerror(zmq_errno())); exit(1);
continue;
}
this_thread::sleep_for(chrono::seconds(20));
} }
unique_lock<mutex> lk(this->mutMsg);
this->cvMsg.wait(lk, [this] {return this->gotFormat;});
return ret; return ret;
} }
...@@ -813,7 +858,7 @@ public: ...@@ -813,7 +858,7 @@ public:
init(); init();
// thread for msg // thread for msg
thMsgProcessor = thread([this]() { thEdgeMsgHandler = thread([this]() {
while(true) { while(true) {
auto body = z_recv_multiple(pDealer,false); auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) { if(body.size() == 0) {
...@@ -821,10 +866,25 @@ public: ...@@ -821,10 +866,25 @@ public:
continue; continue;
} }
// full proto msg received. // full proto msg received.
handleMsg(body); handleEdgeMsg(body);
} }
}); });
thMsgProcessor.detach(); thEdgeMsgHandler.detach();
thCloudMsgHandler = thread([this]{
while(true) {
auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// full proto msg received.
this->handleCloudMsg(body);
}
});
thCloudMsgHandler.detach();
// thread for slicer maintenace // thread for slicer maintenace
thSliceMgr = thread([this]() { thSliceMgr = thread([this]() {
......
...@@ -282,6 +282,9 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st ...@@ -282,6 +282,9 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
ret["msg"] = "ok"; ret["msg"] = "ok";
ret["data"] = json(); ret["data"] = json();
bool hasError = false; bool hasError = false;
if(diff.size() == 0) {
return ret;
}
spdlog::info("getModulesOperFromConfDiff for sn {}. diff: {}, newConfig: {}", sn, diff.dump(), newConfig.dump()); spdlog::info("getModulesOperFromConfDiff for sn {}. diff: {}, newConfig: {}", sn, diff.dump(), newConfig.dump());
try { try {
for(auto &d: diff) { for(auto &d: diff) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论