提交 fce56b13 authored 作者: blu's avatar blu

getSn confliction

上级 12da497e
...@@ -283,7 +283,7 @@ private: ...@@ -283,7 +283,7 @@ private:
if(this->peerData["contConn"].count(k) != 0 && this->peerData["contConn"][k] > 4) { // 4 times if(this->peerData["contConn"].count(k) != 0 && this->peerData["contConn"][k] > 4) { // 4 times
json meta; json meta;
json data; json data;
string msg = fmt::format("evdaemon {} detects module {} is restarting frequently, slow down for 10s", this->devSn, k); string msg = fmt::format("evdaemon {} detects module {} is restarting frequently, should tabke action", this->devSn, k);
spdlog::error(msg); spdlog::error(msg);
data["msg"] = msg; data["msg"] = msg;
data["modId"] = k; data["modId"] = k;
...@@ -295,7 +295,7 @@ private: ...@@ -295,7 +295,7 @@ private:
meta["type"] = EV_MSG_META_TYPE_REPORT; meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR; meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDealer, "evcloudsvc", meta.dump(), data.dump()); z_send(pDealer, "evcloudsvc", meta.dump(), data.dump());
this_thread::sleep_for(chrono::seconds(10)); // this_thread::sleep_for(chrono::seconds(10));
} }
this->peerData["status"][k] = 0; this->peerData["status"][k] = 0;
ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid); ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid);
...@@ -771,6 +771,8 @@ private: ...@@ -771,6 +771,8 @@ private:
if(v.size() == 1) { if(v.size() == 1) {
if(data["metaValue"] == EV_MSG_META_VALUE_CMD_REVESETUN) { if(data["metaValue"] == EV_MSG_META_VALUE_CMD_REVESETUN) {
manageReverseTun(true, data["data"]); manageReverseTun(true, data["data"]);
}else if(data["metaValue"] == EV_MSG_META_VALUE_CMD_UPDATE){
//
} }
else { else {
spdlog::info("evdaemon {} received msg {} from cloud to itself. but has no implementation for", devSn, data.dump()); spdlog::info("evdaemon {} received msg {} from cloud to itself. but has no implementation for", devSn, data.dump());
......
...@@ -72,7 +72,7 @@ private: ...@@ -72,7 +72,7 @@ private:
int ret = 0; int ret = 0;
string peerId, meta; string peerId, meta;
json data; json data;
string msg; string msg, metaType, metaValue;
for(auto &b:v) { for(auto &b:v) {
msg +=body2str(b) + ";"; msg +=body2str(b) + ";";
} }
...@@ -81,23 +81,35 @@ private: ...@@ -81,23 +81,35 @@ private:
if(v.size() == 3) { if(v.size() == 3) {
try { try {
peerId = body2str(v[0]); peerId = body2str(v[0]);
meta = json::parse(body2str(v[1]))["type"]; json meta = json::parse(body2str(v[1]));
if(meta == EV_MSG_META_AVFORMATCTX) { metaType = meta["type"];
lock_guard<mutex> lock(this->mutMsg); if(meta.count("value") != 0) {
if(pAVFormatInput == nullptr) { metaValue = meta["value"];
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext)); }
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true; // msg from cluster mgr
cvMsg.notify_one(); string clusterMgrId = this->mgrSn + ":evmgr:0";
spdlog::info("evslicer {} got avformat from {}", selfId, peerId); if(peerId == clusterMgrId) {
} //
else { }
spdlog::warn("evslicer {} received avformatctx msg from {}, but already proceessed before, restarting", selfId, peerId); else if(peerId == pullerGid) {
spdlog::error("evslicer {} restart since reinit", selfId); if(metaType == EV_MSG_META_AVFORMATCTX) {
exit(0); lock_guard<mutex> lock(this->mutMsg);
if(pAVFormatInput == nullptr) {
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
cvMsg.notify_one();
spdlog::info("evslicer {} got avformat from {}", selfId, peerId);
}
else {
spdlog::warn("evslicer {} received avformatctx msg from {}, but already proceessed before, restarting", selfId, peerId);
spdlog::error("evslicer {} restart since reinit", selfId);
exit(0);
}
} }
} }
else if(meta == EV_MSG_META_EVENT) { else if(metaType == EV_MSG_META_EVENT) {
data = json::parse(body2str(v[2])); data = json::parse(body2str(v[2]));
/// evslicer has two msg interfaces to subsystems on edge side /// evslicer has two msg interfaces to subsystems on edge side
...@@ -107,7 +119,7 @@ private: ...@@ -107,7 +119,7 @@ private:
spdlog::info("evslicer {} received invalid msg from {}: {}", selfId, peerId, msg); spdlog::info("evslicer {} received invalid msg from {}: {}", selfId, peerId, msg);
} }
else { else {
spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, peerId, meta, data.dump()); spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, peerId, metaType, data.dump());
if(data["type"] == "event") { if(data["type"] == "event") {
lock_guard<mutex> lock(this->mutEvent); lock_guard<mutex> lock(this->mutEvent);
eventQueue.push(data.dump()); eventQueue.push(data.dump());
......
...@@ -171,33 +171,33 @@ int setupDealer(void **ctx, void **s, string addr, string ident, int timeout) { ...@@ -171,33 +171,33 @@ int setupDealer(void **ctx, void **s, string addr, string ident, int timeout) {
/// @return 0 success, otherwise failed. /// @return 0 success, otherwise failed.
int recvConfigMsg(void *s, json &config, string addr, string ident){ int recvConfigMsg(void *s, json &config, string addr, string ident){
bool bConfigGot = false; bool bConfigGot = false;
auto t = thread([&](){ // auto t = thread([&](){
this_thread::sleep_for(chrono::seconds(5)); // this_thread::sleep_for(chrono::seconds(5));
if(bConfigGot == false){ // if(bConfigGot == false){
spdlog::error("{} failed receive config from evdaemon", ident); // spdlog::error("{} failed receive config from evdaemon", ident);
exit(1); // exit(1);
} // }
}); // });
auto v = zmqhelper::z_recv_multiple(s); while(!bConfigGot) {
if(v.size() != 3) { auto v = zmqhelper::z_recv_multiple(s);
if(v.size() != 3) {
spdlog::error("{} received invalid msg from evdaemon", ident); spdlog::error("{} received invalid msg from evdaemon", ident);
exit(1); continue;
} }
spdlog::debug("{} configuration msg received: {} {} {}", ident, body2str(v[0]), body2str(v[1]), body2str(v[2])); spdlog::debug("{} configuration msg received: {} {} {}", ident, body2str(v[0]), body2str(v[1]), body2str(v[2]));
try{ try{
string sMeta = json::parse(body2str(v[1]))["type"]; string sMeta = json::parse(body2str(v[1]))["type"];
if(sMeta != EV_MSG_META_CONFIG) { if(sMeta != EV_MSG_META_CONFIG) {
throw StrException("meta type is:" + sMeta + ", but expecting " + EV_MSG_META_CONFIG); throw StrException("meta type is:" + sMeta + ", but expecting " + EV_MSG_META_CONFIG);
}
config = json::parse(body2str(v[2]));
bConfigGot = true;
}catch(exception &e) {
spdlog::error("{} invalid config msg from daemon {}, {}", ident, addr, e.what());
} }
config = json::parse(body2str(v[2]));
bConfigGot = true;
}catch(exception &e) {
spdlog::error("{} invalid config msg from daemon {}, {}", ident, addr, e.what());
} }
t.join();
return bConfigGot? 0: -1; return bConfigGot? 0: -1;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论