提交 4a2331f9 authored 作者: blu's avatar blu

potential bugfix: reverse tunnel timeout for avoiding dead threads

上级 aa6de6e9
...@@ -87,7 +87,8 @@ private: ...@@ -87,7 +87,8 @@ private:
if(ret <0 || relBund.size() == 0) { if(ret <0 || relBund.size() == 0) {
this->releaseBundle["bundles"] = json(); this->releaseBundle["bundles"] = json();
this->releaseBundle["activeIdx"] = -1; this->releaseBundle["activeIdx"] = -1;
}else{ }
else {
this->releaseBundle = relBund; this->releaseBundle = relBund;
} }
} }
...@@ -101,7 +102,7 @@ private: ...@@ -101,7 +102,7 @@ private:
string meta = j.dump(); string meta = j.dump();
vector<vector<uint8_t> > v = {str2body(sn), str2body(devSn), str2body(meta), str2body(cfg)}; vector<vector<uint8_t> > v = {str2body(sn), str2body(devSn), str2body(meta), str2body(cfg)};
// if(peerData["status"].count(sn) == 0||peerData["status"][sn] == 0) { // if(peerData["online"].count(sn) == 0||peerData["online"][sn] == 0) {
// spdlog::warn("evcloudsvc {} cached config to {}", devSn, sn); // spdlog::warn("evcloudsvc {} cached config to {}", devSn, sn);
// lock_guard<mutex> lock(cacheLock); // lock_guard<mutex> lock(cacheLock);
// cachedMsg[sn].push(v); // cachedMsg[sn].push(v);
...@@ -377,11 +378,11 @@ private: ...@@ -377,11 +378,11 @@ private:
bool ret = false; bool ret = false;
int state = zmq_socket_get_peer_state(pRouter, selfId.data(), selfId.size()); int state = zmq_socket_get_peer_state(pRouter, selfId.data(), selfId.size());
spdlog::info("evcloudsvc peer {} state: {}", selfId, state); spdlog::info("evcloudsvc peer {} state: {}", selfId, state);
if(peerData["status"].count(selfId) == 0 || peerData["status"][selfId] == 0) { if(peerData["online"].count(selfId) == 0 || peerData["online"][selfId] == 0) {
peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); peerData["online"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evcloudsvc peer connected: {}", selfId); spdlog::info("evcloudsvc peer connected: {}", selfId);
ret = true; ret = true;
spdlog::debug("evcloudsvc update status of {} to 1 and send config", selfId); spdlog::debug("evcloudsvc update online status of {} to 1 and send config", selfId);
json data = getConfigForDevice(selfId); json data = getConfigForDevice(selfId);
if(data["code"] != 0) { if(data["code"] != 0) {
json resp; json resp;
...@@ -395,7 +396,7 @@ private: ...@@ -395,7 +396,7 @@ private:
} }
} }
else { else {
peerData["status"][selfId] = 0; peerData["online"][selfId] = 0;
spdlog::warn("{} peer disconnected: {}", devSn, selfId); spdlog::warn("{} peer disconnected: {}", devSn, selfId);
} }
return ret; return ret;
...@@ -439,14 +440,14 @@ private: ...@@ -439,14 +440,14 @@ private:
meta = body2str(body[2]); meta = body2str(body[2]);
selfId = body2str(body[0]); selfId = body2str(body[0]);
peerId = body2str(body[1]); peerId = body2str(body[1]);
// update status; // update online status;
this->peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); this->peerData["online"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
int minLen = std::min(body[1].size(), devSn.size()); int minLen = std::min(body[1].size(), devSn.size());
if(memcmp((void*)(body[1].data()), devSn.data(), minLen) != 0) { if(memcmp((void*)(body[1].data()), devSn.data(), minLen) != 0) {
// message to other peer // message to other peer
// check peer status // check peer online status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]}; vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerData["status"].count(peerId)!= 0 && peerData["status"][peerId] != 0) { if(peerData["online"].count(peerId)!= 0 && peerData["online"][peerId] != 0) {
spdlog::info("{} route msg from {} to {}", devSn, selfId, peerId); spdlog::info("{} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v); ret = z_send_multiple(pRouter, v);
if(ret < 0) { if(ret < 0) {
...@@ -508,15 +509,17 @@ private: ...@@ -508,15 +509,17 @@ private:
} }
} }
else { else {
try{ try {
json jmeta = json::parse(meta); json jmeta = json::parse(meta);
if(jmeta["type"] == EV_MSG_META_TYPE_REPORT) { if(jmeta["type"] == EV_MSG_META_TYPE_REPORT) {
// TODO: handle report msg // TODO: handle report msg
spdlog::warn("{} received report msg from {}: {}", devSn, selfId, body2str(body[3])); spdlog::warn("{} received report msg from {}: {}", devSn, selfId, body2str(body[3]));
}else{ }
else {
spdlog::warn("{} received unknown msg {} from {}", devSn, meta, selfId); spdlog::warn("{} received unknown msg {} from {}", devSn, meta, selfId);
} }
} catch(exception &e) { }
catch(exception &e) {
spdlog::warn("{} received unknown msg {} from {}", devSn, meta, selfId); spdlog::warn("{} received unknown msg {} from {}", devSn, meta, selfId);
} }
} }
...@@ -572,9 +575,10 @@ private: ...@@ -572,9 +575,10 @@ private:
} }
auto now = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); auto now = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
if(peerData["status"].count(sn) != 0 && ((now - peerData["status"][sn].get<decltype(now)>()) < 60) ){ if(peerData["online"].count(sn) != 0 && ((now - peerData["online"][sn].get<decltype(now)>()) < 60) ) {
ret["online"] = true; ret["online"] = true;
}else{ }
else {
ret["online"] = false; ret["online"] = false;
} }
} }
...@@ -615,7 +619,7 @@ private: ...@@ -615,7 +619,7 @@ private:
} }
body["sender"] = devSn; body["sender"] = devSn;
if(peerData["status"].count(v[0]) == 0 || peerData["status"][v[0]] == 0) { if(peerData["online"].count(v[0]) == 0 || peerData["online"][v[0]] == 0) {
spdlog::warn("evcloudsvc sent msg {} to {}, but it was offline", body.dump(), v[0]); spdlog::warn("evcloudsvc sent msg {} to {}, but it was offline", body.dump(), v[0]);
} }
else { else {
...@@ -663,10 +667,12 @@ private: ...@@ -663,10 +667,12 @@ private:
int stackId = -1; int stackId = -1;
if(bid.empty()) { if(bid.empty()) {
ret = this->releaseBundle; ret = this->releaseBundle;
}else{ }
try{ else {
try {
stackId = stoi(bid); stackId = stoi(bid);
}catch(exception &e) { }
catch(exception &e) {
stackId = -1; stackId = -1;
} }
...@@ -676,7 +682,8 @@ private: ...@@ -676,7 +682,8 @@ private:
if(stackId >=0 && idx >= 0) { if(stackId >=0 && idx >= 0) {
// idx style // idx style
ret = bunds[idx]; ret = bunds[idx];
}else{ }
else {
// releaseId style // releaseId style
for(auto &r: bunds) { for(auto &r: bunds) {
if(r["releaseId"] == bid) { if(r["releaseId"] == bid) {
...@@ -691,7 +698,8 @@ private: ...@@ -691,7 +698,8 @@ private:
return ret; return ret;
} }
string enableRelease(string bid, bool enable){ string enableRelease(string bid, bool enable)
{
string ret; string ret;
int stackId = -1; int stackId = -1;
bool handled = false; bool handled = false;
...@@ -703,15 +711,18 @@ private: ...@@ -703,15 +711,18 @@ private:
handled = true; handled = true;
// TODO: send release to edge // TODO: send release to edge
} }
}else{ }
else {
if(this->releaseBundle["bundles"].size() <= 1) { if(this->releaseBundle["bundles"].size() <= 1) {
ret = "no release to disable. (maybe only one or none release bundle configured)"; ret = "no release to disable. (maybe only one or none release bundle configured)";
} }
} }
}else{ }
try{ else {
try {
stackId = stoi(bid); stackId = stoi(bid);
}catch(exception &e) { }
catch(exception &e) {
isNumber = false; isNumber = false;
} }
if(this->releaseBundle.size() != 0 && this->releaseBundle.count("bundles") != 0) { if(this->releaseBundle.size() != 0 && this->releaseBundle.count("bundles") != 0) {
...@@ -726,20 +737,24 @@ private: ...@@ -726,20 +737,24 @@ private:
if(this->releaseBundle["activeIdx"] == idx) { if(this->releaseBundle["activeIdx"] == idx) {
if(enable) { if(enable) {
spdlog::info("evcloudsvc release {} is already in active. nop.", idx); spdlog::info("evcloudsvc release {} is already in active. nop.", idx);
}else{ }
else {
return enableRelease(to_string(idx - 1), true); return enableRelease(to_string(idx - 1), true);
// TODO: send release to edge // TODO: send release to edge
} }
}else{ }
else {
if(enable) { if(enable) {
this->releaseBundle["activeIdx"] = idx; this->releaseBundle["activeIdx"] = idx;
handled = true; handled = true;
// TODO: send release to edge // TODO: send release to edge
}else{ }
else {
ret = "this release is not in active. nop."; ret = "this release is not in active. nop.";
} }
} }
}else{ }
else {
// releaseId style // releaseId style
int idx = 0; int idx = 0;
for(auto &r: bunds) { for(auto &r: bunds) {
...@@ -755,17 +770,20 @@ private: ...@@ -755,17 +770,20 @@ private:
return ret; return ret;
} }
string delReleaseBundle(string bid) { string delReleaseBundle(string bid)
{
string ret; string ret;
int stackId = -1; int stackId = -1;
bool handled = false; bool handled = false;
bool isNumber = true; bool isNumber = true;
if(bid.empty()) { if(bid.empty()) {
ret = "empty release bundle id"; ret = "empty release bundle id";
}else{ }
try{ else {
try {
stackId = stoi(bid); stackId = stoi(bid);
}catch(exception &e) { }
catch(exception &e) {
isNumber = false; isNumber = false;
} }
...@@ -779,9 +797,10 @@ private: ...@@ -779,9 +797,10 @@ private:
} }
spdlog::info("idx: {}", idx); spdlog::info("idx: {}", idx);
if(idx == this->releaseBundle["activeIdx"].get<int>()){ if(idx == this->releaseBundle["activeIdx"].get<int>()) {
ret = "can't delete active release bundle"; ret = "can't delete active release bundle";
}else{ }
else {
bunds.erase(idx); bunds.erase(idx);
if(idx < this->releaseBundle["activeIdx"].get<int>()) { if(idx < this->releaseBundle["activeIdx"].get<int>()) {
this->releaseBundle["activeIdx"] = this->releaseBundle["activeIdx"].get<int>() -1; this->releaseBundle["activeIdx"] = this->releaseBundle["activeIdx"].get<int>() -1;
...@@ -789,7 +808,7 @@ private: ...@@ -789,7 +808,7 @@ private:
handled = true; handled = true;
} }
} }
else{ else {
// releaseId style // releaseId style
int idx = 0; int idx = 0;
for(auto r: bunds) { for(auto r: bunds) {
...@@ -819,7 +838,8 @@ private: ...@@ -819,7 +838,8 @@ private:
string ret; string ret;
if(bundle.count("releaseId") == 0) { if(bundle.count("releaseId") == 0) {
ret = "no releaseId field"; ret = "no releaseId field";
}else { }
else {
for(auto &b: this->releaseBundle["bundles"]) { for(auto &b: this->releaseBundle["bundles"]) {
if(b["releaseId"] == bundle["releaseId"]) { if(b["releaseId"] == bundle["releaseId"]) {
ret = "releaseId already exist: " + b.dump(); ret = "releaseId already exist: " + b.dump();
...@@ -849,7 +869,8 @@ private: ...@@ -849,7 +869,8 @@ private:
return ret; return ret;
} }
json getClusterInfo(set<string> sns) { json getClusterInfo(set<string> sns)
{
json ret; json ret;
ret["code"] = 0; ret["code"] = 0;
ret["msg"] = "ok"; ret["msg"] = "ok";
...@@ -865,9 +886,10 @@ private: ...@@ -865,9 +886,10 @@ private:
auto conf = getConfigForDevice(k); auto conf = getConfigForDevice(k);
auto now = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); auto now = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
if(peerData["status"].count(k) != 0 && ((now - peerData["status"][k].get<decltype(now)>()) < 60) ){ if(peerData["online"].count(k) != 0 && ((now - peerData["online"][k].get<decltype(now)>()) < 60) ) {
conf["online"] = true; conf["online"] = true;
}else{ }
else {
conf["online"] = false; conf["online"] = false;
} }
ret["data"][k] = conf; ret["data"][k] = conf;
...@@ -1033,8 +1055,8 @@ public: ...@@ -1033,8 +1055,8 @@ public:
this->configMap.erase(sn); this->configMap.erase(sn);
if(this->peerData["config"].contains(sn)) if(this->peerData["config"].contains(sn))
this->peerData["config"].erase(sn); this->peerData["config"].erase(sn);
if(this->peerData["status"].contains(sn)) if(this->peerData["online"].contains(sn))
this->peerData["status"].erase(sn); this->peerData["online"].erase(sn);
spdlog::info("evcloudsvc removed sn: {}", sn); spdlog::info("evcloudsvc removed sn: {}", sn);
LVDB::setValue(this->configMap, KEY_CONFIG_MAP); LVDB::setValue(this->configMap, KEY_CONFIG_MAP);
...@@ -1054,7 +1076,8 @@ public: ...@@ -1054,7 +1076,8 @@ public:
if(bundle.size() == 0) { if(bundle.size() == 0) {
ret["code"] = 1; ret["code"] = 1;
ret["msg"] = "not found"; ret["msg"] = "not found";
}else{ }
else {
ret["data"] = bundle; ret["data"] = bundle;
} }
} }
...@@ -1076,7 +1099,7 @@ public: ...@@ -1076,7 +1099,7 @@ public:
try { try {
auto body = json::parse(req.body); auto body = json::parse(req.body);
auto s = this->addReleaseBundle(body); auto s = this->addReleaseBundle(body);
if(!s.empty()){ if(!s.empty()) {
ret["code"] = 1; ret["code"] = 1;
ret["msg"] = s; ret["msg"] = s;
} }
...@@ -1123,6 +1146,7 @@ public: ...@@ -1123,6 +1146,7 @@ public:
this->peerData["info"] = json(); this->peerData["info"] = json();
this->peerData["info"]["ips"] = json(); this->peerData["info"]["ips"] = json();
this->peerData["config"] = json(); this->peerData["config"] = json();
this->peerData["online"] = json();
this->peerData["status"] = json(); this->peerData["status"] = json();
spdlog::info("evcloudsvc boot"); spdlog::info("evcloudsvc boot");
......
...@@ -477,7 +477,7 @@ private: ...@@ -477,7 +477,7 @@ private:
if(peerId == "evcloudsvc") { if(peerId == "evcloudsvc") {
zmqhelper::z_send(pDealer, peerId, body[2], body[3]); zmqhelper::z_send(pDealer, peerId, body[2], body[3]);
} }
else{ else {
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]}; vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerData["status"].count(peerId) != 0 && peerData["status"][peerId] != 0 && this->peerData["status"][peerId][peerId] != -1) { if(peerData["status"].count(peerId) != 0 && peerData["status"][peerId] != 0 && this->peerData["status"][peerId][peerId] != -1) {
spdlog::info("evdaemon {} route msg from {} to {}", devSn, selfId, peerId); spdlog::info("evdaemon {} route msg from {} to {}", devSn, selfId, peerId);
...@@ -545,7 +545,8 @@ private: ...@@ -545,7 +545,8 @@ private:
} }
int manageReverseTun(bool bStart, json &tunCfg) { int manageReverseTun(bool bStart, json &tunCfg)
{
int ret = 0; int ret = 0;
if(tunCfg.count("port") == 0 || tunCfg.count("host") == 0 || tunCfg.count("user") == 0 || tunCfg.count("password") == 0) { if(tunCfg.count("port") == 0 || tunCfg.count("host") == 0 || tunCfg.count("user") == 0 || tunCfg.count("password") == 0) {
spdlog::error("evcdaemon {} invalid reverse tunnel settings, shall have host, port, user, password fields"); spdlog::error("evcdaemon {} invalid reverse tunnel settings, shall have host, port, user, password fields");
...@@ -554,7 +555,7 @@ private: ...@@ -554,7 +555,7 @@ private:
thread th; thread th;
string host = tunCfg["host"], user = tunCfg["user"], password = tunCfg["password"]; string host = tunCfg["host"], user = tunCfg["user"], password = tunCfg["password"];
int port = tunCfg["port"]; int port = tunCfg["port"];
th = thread([host, port, user, password](){ th = thread([host, port, user, password]() {
createReverseTun(host, port, user, password); createReverseTun(host, port, user, password);
}); });
...@@ -631,7 +632,8 @@ private: ...@@ -631,7 +632,8 @@ private:
if(v.size() == 1) { if(v.size() == 1) {
if(data["metaValue"] == EV_MSG_META_VALUE_CMD_REVESETUN) { if(data["metaValue"] == EV_MSG_META_VALUE_CMD_REVESETUN) {
manageReverseTun(true, data["data"]); manageReverseTun(true, data["data"]);
}else{ }
else {
spdlog::info("evdaemon {} received msg {} from cloud to itself. but has no implementation for", devSn, data.dump()); spdlog::info("evdaemon {} received msg {} from cloud to itself. but has no implementation for", devSn, data.dump());
} }
} }
...@@ -690,7 +692,8 @@ private: ...@@ -690,7 +692,8 @@ private:
return 0; return 0;
} }
void setUpDealer(){ void setUpDealer()
{
lock_guard<mutex> lg(mutHeartBeat); lock_guard<mutex> lg(mutHeartBeat);
if(pDealer != nullptr) { if(pDealer != nullptr) {
int i = 0; int i = 0;
...@@ -906,8 +909,8 @@ public: ...@@ -906,8 +909,8 @@ public:
thCloud.detach(); thCloud.detach();
spdlog::info("evdaemon {} cloud message processor had setup {}", devSn, cloudAddr); spdlog::info("evdaemon {} cloud message processor had setup {}", devSn, cloudAddr);
thHeartBeat = thread([this](){ thHeartBeat = thread([this]() {
while(true){ while(true) {
{ {
lock_guard<mutex> lg(this->mutHeartBeat); lock_guard<mutex> lg(this->mutHeartBeat);
if(this->pDealer != nullptr) if(this->pDealer != nullptr)
......
...@@ -141,7 +141,8 @@ error_exit: ...@@ -141,7 +141,8 @@ error_exit:
auto body = z_recv_multiple(pDealer,false); auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) { if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", this->devSn, zmq_strerror(zmq_errno())); spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", this->devSn, zmq_strerror(zmq_errno()));
}else{ }
else {
// full proto msg received. // full proto msg received.
this->handleCloudMsg(body); this->handleCloudMsg(body);
} }
......
...@@ -494,11 +494,12 @@ private: ...@@ -494,11 +494,12 @@ private:
factor = int(int(this->pps) / this->detPara.fpsProc); // regulator to 0 if it was set inresonably high factor = int(int(this->pps) / this->detPara.fpsProc); // regulator to 0 if it was set inresonably high
} }
if(factor != 0 ){ if(factor != 0 ) {
if(called % factor == 0) { if(called % factor == 0) {
proc = true; proc = true;
//factor = 0; // reset it open to change //factor = 0; // reset it open to change
}else{ }
else {
proc = false; proc = false;
} }
} }
...@@ -508,8 +509,9 @@ private: ...@@ -508,8 +509,9 @@ private:
spdlog::info("evmlmotion {} pps {}, fpsFactor {}, called {}, lag {}, skip processing", this->selfId, this->pps, factor, called, this->pktLag); spdlog::info("evmlmotion {} pps {}, fpsFactor {}, called {}, lag {}, skip processing", this->selfId, this->pps, factor, called, this->pktLag);
} }
// detectMotion(pCodecContext->pix_fmt, pFrame, false); // detectMotion(pCodecContext->pix_fmt, pFrame, false);
}else{ }
if((called % (180*4)) == 0){ else {
if((called % (180*4)) == 0) {
spdlog::info("evmlmotion {} pps {}, fpsFactor {}, called {}, lag {}", this->selfId, this->pps, factor, called, this->pktLag); spdlog::info("evmlmotion {} pps {}, fpsFactor {}, called {}, lag {}", this->selfId, this->pps, factor, called, this->pktLag);
} }
detectMotion(pCodecContext->pix_fmt, pFrame, detect); detectMotion(pCodecContext->pix_fmt, pFrame, detect);
...@@ -816,7 +818,8 @@ protected: ...@@ -816,7 +818,8 @@ protected:
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR; meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump()); z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::error(msg); spdlog::error(msg);
}else{ }
else {
if(!bStatsSent) { if(!bStatsSent) {
bStatsSent = true; bStatsSent = true;
string msg = fmt::format("evmlmotion {} successfully decode packet", selfId); string msg = fmt::format("evmlmotion {} successfully decode packet", selfId);
...@@ -901,7 +904,8 @@ public: ...@@ -901,7 +904,8 @@ public:
auto body = z_recv_multiple(pDaemon,false); auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) { if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", selfId, zmq_strerror(zmq_errno())); spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", selfId, zmq_strerror(zmq_errno()));
}else{ }
else {
// full proto msg received. // full proto msg received.
this->handleCloudMsg(body); this->handleCloudMsg(body);
} }
...@@ -915,7 +919,8 @@ public: ...@@ -915,7 +919,8 @@ public:
auto body = z_recv_multiple(pDealer,false); auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) { if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple edge msg: {}", selfId, zmq_strerror(zmq_errno())); spdlog::error("evslicer {} failed to receive multiple edge msg: {}", selfId, zmq_strerror(zmq_errno()));
}else{ }
else {
// full proto msg received. // full proto msg received.
this->handleEdgeMsg(body); this->handleEdgeMsg(body);
} }
......
...@@ -327,7 +327,8 @@ protected: ...@@ -327,7 +327,8 @@ protected:
spdlog::error(msg); spdlog::error(msg);
// TODO: message report to cloud // TODO: message report to cloud
exit(1); exit(1);
}else{ }
else {
string msg = fmt::format("evpuller {} successfully openned input stream {}", selfId, urlIn); string msg = fmt::format("evpuller {} successfully openned input stream {}", selfId, urlIn);
json meta; json meta;
json data; json data;
...@@ -411,7 +412,7 @@ protected: ...@@ -411,7 +412,7 @@ protected:
pktCnt++; pktCnt++;
packet.stream_index = streamList[packet.stream_index]; packet.stream_index = streamList[packet.stream_index];
// skip first 5 packets avoid pusher and slicer exception // skip first 5 packets avoid pusher and slicer exception
if(pktCnt <= 5){ if(pktCnt <= 5) {
if(pktCnt == 5) { if(pktCnt == 5) {
// serialize formatctx to bytes // serialize formatctx to bytes
// be attention to the scope of lock guard! // be attention to the scope of lock guard!
...@@ -501,7 +502,8 @@ public: ...@@ -501,7 +502,8 @@ public:
auto body = z_recv_multiple(pDealer,false); auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) { if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", selfId, zmq_strerror(zmq_errno())); spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", selfId, zmq_strerror(zmq_errno()));
}else{ }
else {
// full proto msg received. // full proto msg received.
string msg; string msg;
for(auto &v: body) { for(auto &v: body) {
...@@ -524,7 +526,8 @@ public: ...@@ -524,7 +526,8 @@ public:
auto body = z_recv_multiple(pDaemon,false); auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) { if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple edge msg: {}", selfId, zmq_strerror(zmq_errno())); spdlog::error("evslicer {} failed to receive multiple edge msg: {}", selfId, zmq_strerror(zmq_errno()));
}else{ }
else {
// full proto msg received. // full proto msg received.
this->handleCloudMsg(body); this->handleCloudMsg(body);
} }
......
...@@ -306,8 +306,7 @@ private: ...@@ -306,8 +306,7 @@ private:
AVDictionary *pOptsRemux = nullptr; AVDictionary *pOptsRemux = nullptr;
string proto = urlOut.substr(0, 4); string proto = urlOut.substr(0, 4);
int cnt = 0; int cnt = 0;
while(ret < 0) while(ret < 0) {
{
if(cnt > 3) { if(cnt > 3) {
string msg = fmt::format("evpusher {} failed to write output header \"{}\": {}, {}", selfId, urlOut, ret, av_err2str(ret)); string msg = fmt::format("evpusher {} failed to write output header \"{}\": {}, {}", selfId, urlOut, ret, av_err2str(ret));
json meta; json meta;
...@@ -405,7 +404,8 @@ private: ...@@ -405,7 +404,8 @@ private:
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump()); z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::error(msg); spdlog::error(msg);
exit(1); exit(1);
}else{ }
else {
string msg = fmt::format("evpusher {} successfully open output \"{}\"", selfId, urlOut); string msg = fmt::format("evpusher {} successfully open output \"{}\"", selfId, urlOut);
json meta; json meta;
json data; json data;
...@@ -557,8 +557,9 @@ protected: ...@@ -557,8 +557,9 @@ protected:
pktCnt = 0; pktCnt = 0;
continue; continue;
} }
}else{ }
if(!bStatsSent){ else {
if(!bStatsSent) {
bStatsSent = true; bStatsSent = true;
string msg = fmt::format("evpusher {} start pushing {}", selfId, urlOut); string msg = fmt::format("evpusher {} start pushing {}", selfId, urlOut);
json meta; json meta;
...@@ -626,7 +627,8 @@ public: ...@@ -626,7 +627,8 @@ public:
auto body = z_recv_multiple(pDaemon,false); auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) { if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", selfId, zmq_strerror(zmq_errno())); spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", selfId, zmq_strerror(zmq_errno()));
}else{ }
else {
// full proto msg received. // full proto msg received.
this->handleCloudMsg(body); this->handleCloudMsg(body);
} }
...@@ -640,7 +642,8 @@ public: ...@@ -640,7 +642,8 @@ public:
auto body = z_recv_multiple(pDealer,false); auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) { if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple edge msg: {}", selfId, zmq_strerror(zmq_errno())); spdlog::error("evslicer {} failed to receive multiple edge msg: {}", selfId, zmq_strerror(zmq_errno()));
}else{ }
else {
// full proto msg received. // full proto msg received.
this->handleEdgeMsg(body); this->handleEdgeMsg(body);
} }
......
...@@ -613,7 +613,8 @@ protected: ...@@ -613,7 +613,8 @@ protected:
pktCnt = 0; pktCnt = 0;
break; break;
} }
}else{ }
else {
if(!bStatsSent) { if(!bStatsSent) {
bStatsSent = true; bStatsSent = true;
string msg = fmt::format("evslicer {} starting write file", selfId); string msg = fmt::format("evslicer {} starting write file", selfId);
...@@ -911,7 +912,8 @@ public: ...@@ -911,7 +912,8 @@ public:
auto body = z_recv_multiple(pDealer,false); auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) { if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple edge msg: {}", selfId, zmq_strerror(zmq_errno())); spdlog::error("evslicer {} failed to receive multiple edge msg: {}", selfId, zmq_strerror(zmq_errno()));
}else{ }
else {
// full proto msg received. // full proto msg received.
handleEdgeMsg(body); handleEdgeMsg(body);
} }
...@@ -926,7 +928,8 @@ public: ...@@ -926,7 +928,8 @@ public:
auto body = z_recv_multiple(pDaemon,false); auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) { if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", selfId, zmq_strerror(zmq_errno())); spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", selfId, zmq_strerror(zmq_errno()));
}else{ }
else {
// full proto msg received. // full proto msg received.
this->handleCloudMsg(body); this->handleCloudMsg(body);
} }
......
...@@ -119,6 +119,9 @@ int createReverseTun(string host, int port, string user, string _password) ...@@ -119,6 +119,9 @@ int createReverseTun(string host, int port, string user, string _password)
closeFun(); return -1; closeFun(); return -1;
} }
// bugfix: timeout
libssh2_session_set_timeout( session, 15 * 1000);
/* At this point we havn't yet authenticated. The first thing to do /* At this point we havn't yet authenticated. The first thing to do
* is check the hostkey's fingerprint against our known hosts Your app * is check the hostkey's fingerprint against our known hosts Your app
* may have it hard coded, may go to a file, may present it to the * may have it hard coded, may go to a file, may present it to the
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论