提交 1735612e authored 作者: blu's avatar blu

init

上级 1c5414b2
......@@ -73,7 +73,7 @@ private:
// router service
pRouterCtx = zmq_ctx_new();
pRouter = zmq_socket(pRouterCtx, ZMQ_ROUTER);
int opt_notify = ZMQ_NOTIFY_DISCONNECT;
int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT;
zmq_setsockopt (pRouter, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify));
ret = zmq_bind(pRouter, addr.c_str());
if(ret < 0) {
......@@ -103,19 +103,24 @@ private:
int ret = 0;
zmq_msg_t msg;
// ID_SENDER, ID_TARGET, meta ,MSG
string peerId;
string selfId, peerId, meta;
if(body.size() == 2 && body[1].size() == 0) {
peerId = body2str(body[0]);
spdlog::warn("evmgr {} peer disconnected: {}", devSn, peerId);
peerStatus[peerId] = 0;
selfId = body2str(body[0]);
spdlog::warn("evmgr {} peer disconnected: {}", devSn, selfId);
if(peerStatus.count(selfId) == 0) {
peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
}else{
peerStatus[selfId] = 0;
}
return 0;
}else if(body.size() != 4) {
spdlog::warn("evmgr {} dropped an invalid message, size: {}", devSn, body.size());
return 0;
}
string meta = body2str(body[2]);
string selfId = body2str(body[0]);
meta = body2str(body[2]);
selfId = body2str(body[0]);
peerId = body2str(body[1]);
// update status;
this->peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
......@@ -125,24 +130,13 @@ private:
// message to other peer
// check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerStatus.count(peerId)!= 0) {
auto t = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count() - peerStatus[peerId].get<long long>();
if(t > EV_HEARTBEAT_SECONDS){
peerStatus[peerId] = 0;
// need cache
}else{
spdlog::info("evmgr {} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
if(peerStatus.count(peerId)!= 0 && peerStatus[peerId] != 0) {
spdlog::info("evmgr {} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}else{
peerStatus[peerId] = 0;
// need cache
}
if(peerStatus[peerId] == 0) {
// cache
spdlog::warn("evmgr {} cached msg from {} to {}", devSn, selfId, peerId);
lock_guard<mutex> lock(cacheLock);
......@@ -226,7 +220,7 @@ protected:
bool bStopSig = false;
int ret = 0;
zmq_msg_t msg;
// TODO: don't need this anymore, since I've used the draft feature of ZOUTER_NOTIFICATION instead
// disabled because:
// 1. it can't determine which peer disconnected, but only the underline socket FD.
// 2. used the draft feature of ZMQ_ROUTER_NOTIFY instead to capture peer module disconnections such as evpuser, evmlmotion.
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论