提交 99790eb1 authored 作者: blu's avatar blu

evdameon, evcloudsvc: subsystem monitor thread and defered uploading

上级 2f6d48ce
...@@ -83,6 +83,12 @@ private: ...@@ -83,6 +83,12 @@ private:
json mapSubSystems; json mapSubSystems;
json jsonIPs; json jsonIPs;
/// monitoring thread
condition_variable cvSubMonitor;
mutex mutSubMonitor;
thread thSubMonitor;
int ping(void *s) int ping(void *s)
{ {
int ret = 0; int ret = 0;
...@@ -562,13 +568,15 @@ private: ...@@ -562,13 +568,15 @@ private:
peerData["pids"].erase(selfId); peerData["pids"].erase(selfId);
} }
if(this->bBootstrap) { spdlog::warn("evdaemon {} peer {} disconnected. will restarting it.", devSn, selfId);
spdlog::warn("evdaemon {} peer {} disconnected. restarting it.", devSn, selfId); /// let thSubSystem do the following job
startSubSystems({selfId}); // if(this->bBootstrap) {
} // spdlog::warn("evdaemon {} peer {} disconnected. restarting it.", devSn, selfId);
else { // startSubSystems({selfId});
spdlog::warn("evdaemon {} peer {} disconnected. won't restart it since BOOTSTRAP=false", devSn, selfId); // }
} // else {
// spdlog::warn("evdaemon {} peer {} disconnected. won't restart it since BOOTSTRAP=false", devSn, selfId);
// }
} }
} }
...@@ -1064,6 +1072,82 @@ public: ...@@ -1064,6 +1072,82 @@ public:
}); });
thHeartBeat.detach(); thHeartBeat.detach();
thSubMonitor = thread([this](){
while(true){
{
unique_lock<mutex> lk(this->mutSubMonitor);
this->cvSubMonitor.wait_for(lk, 5s, [this] {return !this->bColdStart;});
vector<string> tmp;
json unkown;
vector<string> terms;
string info;
json ret;
int cnt = 0;
for(auto &[k,v]: this->peerData["config"].items()) {
if(this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0) {
if((this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0)) {
tmp.push_back(k);
info += (cnt == 0? "" : string(", ")) + k;
}
else if(this->peerData["status"][k] == -1) {
unkown[k] = -1;
}
}
else {
terms.push_back(k);
}
cnt++;
}
spdlog::info("evdaemon {} will start following subsystems: {}", devSn, info);
//
for(string &e : tmp) {
pid_t pid = 0;
this->peerData["status"][e] = 0;
ret = zmqhelper::forkSubsystem(devSn, e, portRouter, pid);
if(0 == ret) {
this->peerData["pids"][e] = pid;
spdlog::info("evdaemon {} created subsystem {}", devSn, e);
}
else {
spdlog::info("evdaemon {} failed to create subsystem {}", devSn, e);
}
}
for(string &e: terms) {
if(this->peerData["pids"].count(e) != 0) {
kill(this->peerData["pids"][e], SIGTERM);
}
}
while(unkown.size() != 0 && cnt < 3) {
this_thread::sleep_for(chrono::seconds(3));
for(auto &[k,v]: unkown.items()) {
if(this->peerData["status"][k] != -1 && this->peerData["status"][k] != 0) {
// no need to start
unkown.erase(k);
}
}
cnt++;
}
for(auto &[k,v]: unkown.items()) {
pid_t pid = 0;
this->peerData["status"][k] = 0;
ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid);
if(0 == ret) {
this->peerData["pids"][k] = pid;
spdlog::info("evdaemon {} created subsystem {}", devSn, k);
}
else {
spdlog::info("evdaemon {} failed to create subsystem {}", devSn, k);
}
}
}
}
});
this->thIdMain = this_thread::get_id(); this->thIdMain = this_thread::get_id();
}; };
~EvDaemon() {}; ~EvDaemon() {};
......
...@@ -1044,6 +1044,7 @@ public: ...@@ -1044,6 +1044,7 @@ public:
if(!this->eventQueue.empty()) { if(!this->eventQueue.empty()) {
evt = this->eventQueue.front(); evt = this->eventQueue.front();
this->eventQueue.pop(); this->eventQueue.pop();
spdlog::error("{} msg queue is full!", selfId);
} }
} }
...@@ -1122,23 +1123,24 @@ public: ...@@ -1122,23 +1123,24 @@ public:
if(hasError) { if(hasError) {
continue; continue;
} }
/// let the upload script do the following jobs
spdlog::info("{} file upload range:({},{}) = ({}, {}), url: {}", selfId, tss, tse, this->videoFileTs2Name(tss), this->videoFileTs2Name(tse), this->videoFileServerApi); spdlog::info("{} file upload range:({},{}) = ({}, {}), url: {}", selfId, tss, tse, this->videoFileTs2Name(tss), this->videoFileTs2Name(tse), this->videoFileServerApi);
string strResp; // string strResp;
ret = netutils::postFiles(this->videoFileServerApi, params, fileNames, strResp); // ret = netutils::postFiles(this->videoFileServerApi, params, fileNames, strResp);
if( ret != 0 ) { // if( ret != 0 ) {
bUploadFailed = true; // bUploadFailed = true;
spdlog::error("{} failed uploaded ({}, {}). local({}, {}). resp: {} files:\n{}", selfId, tss, tse, first, end, strResp, sf); // spdlog::error("{} failed uploaded ({}, {}). local({}, {}). resp: {} files:\n{}", selfId, tss, tse, first, end, strResp, sf);
reportUploadFailure(selfId, true, strResp); // reportUploadFailure(selfId, true, strResp);
if(ret > 0) { // if(ret > 0) {
if(jEvt.count("cnt") == 0) { // if(jEvt.count("cnt") == 0) {
jEvt["cnt"] = ret; // jEvt["cnt"] = ret;
} // }
if(jEvt["cnt"].get<int>() <= 0) { // if(jEvt["cnt"].get<int>() <= 0) {
// TODO: report message to cloud // TODO: report message to cloud
string msg = fmt::format("{} failed to upload videos over N times: {}", selfId, strResp); // string msg = fmt::format("{} failed to upload videos over N times: {}", selfId, strResp);
spdlog::error(msg); // spdlog::error(msg);
// TODO: move to failed folder // TODO: move to failed folder
string dirDest = "/var/data/evsuits/failed_events/"; string dirDest = "/var/data/evsuits/failed_events/";
system((string("mkdir -p ") + dirDest).c_str()); system((string("mkdir -p ") + dirDest).c_str());
...@@ -1162,71 +1164,71 @@ public: ...@@ -1162,71 +1164,71 @@ public:
catch(exception &e) { catch(exception &e) {
spdlog::error("evcloudsvc {} {}:{} exception: {}", selfId, __FILE__, __LINE__, e.what()); spdlog::error("evcloudsvc {} {}:{} exception: {}", selfId, __FILE__, __LINE__, e.what());
} }
} // }
else { // else {
spdlog::info("{} retrying upload", selfId); // spdlog::info("{} retrying upload", selfId);
jEvt["cnt"] = jEvt["cnt"].get<int>() - 1; // jEvt["cnt"] = jEvt["cnt"].get<int>() - 1;
lock_guard<mutex> lock(this->mutEvent); // lock_guard<mutex> lock(this->mutEvent);
this->eventQueue.push(jEvt.dump()); // this->eventQueue.push(jEvt.dump());
if(eventQueue.size() > MAX_EVENT_QUEUE_SIZE) { // if(eventQueue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQueue.pop(); // eventQueue.pop();
} // }
cvEvent.notify_one(); // cvEvent.notify_one();
} // }
} // }
} // }
else { // ret == 0 // else { // ret == 0
spdlog::info("{} upload ({}, {}). local({}, {}). resp: {} files:\n{}", selfId, tss, tse, first, end, strResp, sf); // spdlog::info("{} upload ({}, {}). local({}, {}). resp: {} files:\n{}", selfId, tss, tse, first, end, strResp, sf);
try { // try {
auto resp = json::parse(strResp); // auto resp = json::parse(strResp);
//TODO: open this swith when video server has implemented this functionality // //TODO: open this swith when video server has implemented this functionality
if(resp.count("code") == 0 || resp["code"] != 0) { // if(resp.count("code") == 0 || resp["code"] != 0) {
bUploadFailed = true; // bUploadFailed = true;
reportUploadFailure(selfId, true, strResp); // reportUploadFailure(selfId, true, strResp);
} // }
if(resp.count("code") != 0 && resp["code"] != 0) { // if(resp.count("code") != 0 && resp["code"] != 0) {
bUploadFailed = true; // bUploadFailed = true;
if(resp["code"] == 4|| resp["code"] == 7) { // if(resp["code"] == 4|| resp["code"] == 7) {
if(jEvt.count("cnt") == 0) { // if(jEvt.count("cnt") == 0) {
jEvt["cnt"] = 2; // jEvt["cnt"] = 2;
} // }
else { // else {
if(jEvt["cnt"].get<int>() <= 0) { // if(jEvt["cnt"].get<int>() <= 0) {
string msg = fmt::format("{} failed to upload videos over N times. reason: {}", selfId, strResp); // string msg = fmt::format("{} failed to upload videos over N times. reason: {}", selfId, strResp);
spdlog::error(msg); // spdlog::error(msg);
} // }
else { // else {
jEvt["cnt"] = jEvt["cnt"].get<int>() - 1; // jEvt["cnt"] = jEvt["cnt"].get<int>() - 1;
lock_guard<mutex> lock(this->mutEvent); // lock_guard<mutex> lock(this->mutEvent);
this->eventQueue.push(jEvt.dump()); // this->eventQueue.push(jEvt.dump());
if(eventQueue.size() > MAX_EVENT_QUEUE_SIZE) { // if(eventQueue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQueue.pop(); // eventQueue.pop();
} // }
cvEvent.notify_one(); // cvEvent.notify_one();
} // }
} // }
} // }
else if(resp["code"] == 6) { // else if(resp["code"] == 6) {
// TODO: cloud storage issue. need stratigy policy // // TODO: cloud storage issue. need stratigy policy
spdlog::warn("{} TODO: handle cloud storage", this->selfId); // spdlog::warn("{} TODO: handle cloud storage", this->selfId);
} // }
else { // else {
spdlog::error("{} failed to upload videos. abort retry.", this->selfId); // spdlog::error("{} failed to upload videos. abort retry.", this->selfId);
} // }
} // }
if(resp.count("code") != 0 && resp["code"] == 0) { // if(resp.count("code") != 0 && resp["code"] == 0) {
if(bUploadFailed) { // if(bUploadFailed) {
bUploadFailed = false; // bUploadFailed = false;
reportUploadFailure(selfId, false, strResp); // reportUploadFailure(selfId, false, strResp);
} // }
} // }
} // }
catch(exception &e) { // catch(exception &e) {
spdlog::error("{} {}:{} exception: {}", this->selfId, __FILE__, __LINE__, e.what()); // spdlog::error("{} {}:{} exception: {}", this->selfId, __FILE__, __LINE__, e.what());
} // }
} //}
} }
} }
else { else {
......
...@@ -207,6 +207,7 @@ int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid){ ...@@ -207,6 +207,7 @@ int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid){
auto v = strutils::split(peerId, ':'); auto v = strutils::split(peerId, ':');
string modName = v[1]; string modName = v[1];
string sn = v[0]; string sn = v[0];
system(string("killall " + modName).c_str());
if( (pid = fork()) == -1 ) { if( (pid = fork()) == -1 ) {
spdlog::error("evdamon {} failed to fork subsytem: {}", devSn, peerId); spdlog::error("evdamon {} failed to fork subsytem: {}", devSn, peerId);
return -1; return -1;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论