提交 d32dc755 authored 作者: blu's avatar blu

feature heartbeat: evcloudsvc - evdaemon

上级 573759d8
...@@ -1024,7 +1024,7 @@ public: ...@@ -1024,7 +1024,7 @@ public:
this->peerData["config"].erase(sn); this->peerData["config"].erase(sn);
if(this->peerData["status"].contains(sn)) if(this->peerData["status"].contains(sn))
this->peerData["status"].erase(sn); this->peerData["status"].erase(sn);
spdlog::info("evcloudsvc removed sn: {}", sn); spdlog::info("evcloudsvc removed sn: {}", sn);
LVDB::setValue(this->configMap, KEY_CONFIG_MAP); LVDB::setValue(this->configMap, KEY_CONFIG_MAP);
} }
......
...@@ -59,12 +59,13 @@ private: ...@@ -59,12 +59,13 @@ private:
mutex cacheLock; mutex cacheLock;
queue<string> eventQue; queue<string> eventQue;
mutex eventQLock; mutex eventQLock;
mutex cfgLock;
mutex mutSubsystem; mutex mutSubsystem;
thread thHeartBeat; thread thHeartBeat;
mutex mutHeartBeat; mutex mutHeartBeat;
bool bHeartBeatLive = false; bool bGotHeartBeat = false;
int heartBeatTimeout = 60 * 1000;
/// module gid to process id /// module gid to process id
...@@ -660,6 +661,29 @@ private: ...@@ -660,6 +661,29 @@ private:
return 0; return 0;
} }
void setUpDealer(){
lock_guard<mutex> lg(mutHeartBeat);
if(pDealer != nullptr) {
int i = 0;
zmq_setsockopt(pDealer, ZMQ_LINGER, &i, sizeof(i));
zmq_close(pDealer);
pDealer = nullptr;
}
if(pDealerCtx != nullptr) {
zmq_ctx_destroy(pDealerCtx);
pDealerCtx = nullptr;
}
int ret = 0;
ret = zmqhelper::setupDealer(&pDealerCtx, &pDealer, cloudAddr, devSn, heartBeatTimeout);
if(ret != 0) {
spdlog::error("evdaemon {} failed to setup dealer", devSn);
exit(1);
}
spdlog::info("evdaemon {} connecting to cloud {}", devSn, cloudAddr);
}
protected: protected:
public: public:
void run() void run()
...@@ -823,33 +847,45 @@ public: ...@@ -823,33 +847,45 @@ public:
cloudAddr = strEnv; cloudAddr = strEnv;
} }
// setup dealer setUpDealer();
ret = zmqhelper::setupDealer(&pDealerCtx, &pDealer, cloudAddr, devSn);
if(ret != 0) {
spdlog::error("evdaemon {} failed to setup dealer", devSn);
exit(1);
}
spdlog::info("evdaemon {} connecting to cloud {}", devSn, cloudAddr);
// setup cloud msg processor
// setup cloud msg processor
thCloud = thread([this]() { thCloud = thread([this]() {
int cnt = 0;
while(true) { while(true) {
spdlog::info("evdaemon {} receiving evcloudsvc", this->devSn); spdlog::info("evdaemon {} waiting msg from evcloudsvc", this->devSn);
auto v = zmqhelper::z_recv_multiple(this->pDealer, true); auto v = zmqhelper::z_recv_multiple(this->pDealer, false);
if(v.size() == 0) { if(v.size() == 0) {
spdlog::error("evdaemon {} failed to receive msg {}, retrying", this->devSn, zmq_strerror(zmq_errno())); if(cnt > 1) {
this_thread::sleep_for(chrono::seconds(3)); // TODO: reset dealer socket;
spdlog::error("evdaemon {} failed to receive from evcloudsvc, resetting connection: {}", this->devSn, zmq_strerror(zmq_errno()));
this->setUpDealer();
cnt = 0;
continue;
}
cnt++;
} }
else { else {
handleCloudMsg(v); cnt = 0;
this->handleCloudMsg(v);
spdlog::info("evdaemon {} successfully handled msg from evcloudsvc", this->devSn); spdlog::info("evdaemon {} successfully handled msg from evcloudsvc", this->devSn);
} }
} }
}); });
thCloud.detach(); thCloud.detach();
spdlog::info("evdaemon {} cloud message processor had setup {}", devSn, cloudAddr); spdlog::info("evdaemon {} cloud message processor had setup {}", devSn, cloudAddr);
ping(pDealer);
thHeartBeat = thread([this](){
while(true){
{
lock_guard<mutex> lg(this->mutHeartBeat);
if(this->pDealer != nullptr)
this->ping(this->pDealer);
}
this_thread::sleep_for(chrono::milliseconds(this->heartBeatTimeout));
}
});
thHeartBeat.detach();
this->thIdMain = this_thread::get_id(); this->thIdMain = this_thread::get_id();
}; };
......
...@@ -640,6 +640,7 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st ...@@ -640,6 +640,7 @@ json getModulesOperFromConfDiff(json& oldConfig, json &newConfig, json &diff, st
newMgr[mgrSn] = json(); newMgr[mgrSn] = json();
newMgr[mgrSn] = newConfig[mgrSn]; newMgr[mgrSn] = newConfig[mgrSn];
} }
if(newMgr.size() != 0) { if(newMgr.size() != 0) {
json jret = cfgutils::getModuleGidsFromCfg(sn, newMgr, ""); json jret = cfgutils::getModuleGidsFromCfg(sn, newMgr, "");
spdlog::info("{}:{} getModuleGidsFromCfg dump: {}", __FILE__, __LINE__, jret.dump()); spdlog::info("{}:{} getModuleGidsFromCfg dump: {}", __FILE__, __LINE__, jret.dump());
......
...@@ -117,17 +117,24 @@ int setupRouter(void **ctx, void **s, string addr){ ...@@ -117,17 +117,24 @@ int setupRouter(void **ctx, void **s, string addr){
/// setup dealer /// setup dealer
/// @return 0 success, otherwise failed /// @return 0 success, otherwise failed
int setupDealer(void **ctx, void **s, string addr, string ident) { int setupDealer(void **ctx, void **s, string addr, string ident, int timeout) {
int ret = 0; int ret = 0;
*ctx = zmq_ctx_new(); *ctx = zmq_ctx_new();
*s = zmq_socket(*ctx, ZMQ_DEALER); *s = zmq_socket(*ctx, ZMQ_DEALER);
ret = 1; ret = 1;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE, &ret, sizeof (ret)); zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE, &ret, sizeof (ret));
ret = 5; ret = 20;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_IDLE, &ret, sizeof (ret)); zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_IDLE, &ret, sizeof (ret));
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret)); zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret));
ret = 2; ret = 2;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_CNT, &ret, sizeof (ret)); zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_CNT, &ret, sizeof (ret));
if(timeout != 0) {
if(timeout == -1) {
timeout = 10*1000;
}
zmq_setsockopt(*s, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
}
ret = zmq_setsockopt(*s, ZMQ_IDENTITY, ident.c_str(), ident.size()); ret = zmq_setsockopt(*s, ZMQ_IDENTITY, ident.c_str(), ident.size());
ret += zmq_setsockopt (*s, ZMQ_ROUTING_ID, ident.c_str(), ident.size()); ret += zmq_setsockopt (*s, ZMQ_ROUTING_ID, ident.c_str(), ident.size());
if(ret < 0) { if(ret < 0) {
......
...@@ -63,7 +63,7 @@ int z_send_multiple(void *s, vector<vector<uint8_t> >&body); ...@@ -63,7 +63,7 @@ int z_send_multiple(void *s, vector<vector<uint8_t> >&body);
int setupRouter(void **ctx, void **s, string addr); int setupRouter(void **ctx, void **s, string addr);
/// setup dealer /// setup dealer
/// @return 0 success, otherwise failed /// @return 0 success, otherwise failed
int setupDealer(void **ctx, void **s, string addr, string ident); int setupDealer(void **ctx, void **s, string addr, string ident, int timeout = 0);
/// recv config msg: /// recv config msg:
/// @return 0 success, otherwise failed. /// @return 0 success, otherwise failed.
int recvConfigMsg(void *s, json &config, string addr, string ident); int recvConfigMsg(void *s, json &config, string addr, string ident);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论