提交 9ea27184 authored 作者: blu's avatar blu

init

上级 de4bc4ab
......@@ -101,53 +101,55 @@ private:
spdlog::warn("evmgr {} dropped a message, since its size is incorrect: {}", devSn, body.size());
return 0;
}
string meta = body2str(body[2]);
string selfId = body2str(body[0]);
string peerId = body2str(body[1]);
// update status;
this->peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
if(memcmp((void*)(body[1].data()), (devSn +":0:0").data(), body[1].size()) != 0) {
// message to other peer
// check peer status
string gid = body2str(body[1]);
if(peerStatus.count(gid)!= 0) {
auto t = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count() - peerStatus[gid].get<long long>();
if(t > EV_HEARTBEAT_SECONDS*5/4){
peerStatus[gid] = 0;
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerStatus.count(peerId)!= 0) {
auto t = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count() - peerStatus[peerId].get<long long>();
if(t > EV_HEARTBEAT_SECONDS){
peerStatus[peerId] = 0;
// need cache
}else{
spdlog::info("evmgr {} route msg from {} to {}", devSn, body2str(body[0]), body2str(body[1]));
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
spdlog::info("evmgr {} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
}else{
peerStatus[gid] = 0;
peerStatus[peerId] = 0;
// need cache
}
if(peerStatus[gid] == 0) {
if(peerStatus[peerId] == 0) {
// cache
spdlog::warn("evmgr {} cached msg from {} to {}", devSn, body2str(body[0]), gid);
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
spdlog::warn("evmgr {} cached msg from {} to {}", devSn, selfId, peerId);
lock_guard<mutex> lock(cacheLock);
cachedMsg[gid].push(v);
if(cachedMsg[gid].size() > EV_NUM_CACHE_PERPEER) {
cachedMsg[gid].pop();
cachedMsg[peerId].push(v);
if(cachedMsg[peerId].size() > EV_NUM_CACHE_PERPEER) {
cachedMsg[peerId].pop();
}
}
}else{
// message to mgr
spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
string meta = body2str(body[2]);
string gid = body2str(body[0]);
if(meta == "pong"||meta == "ping") {
// update status
this->peerStatus[gid] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evmgr {}, ping msg from {}", devSn, selfId);
if(meta=="ping") {
if(cachedMsg.find(gid) != cachedMsg.end()) {
while(!cachedMsg[gid].empty()){
if(cachedMsg.find(selfId) != cachedMsg.end()) {
while(!cachedMsg[selfId].empty()){
lock_guard<mutex> lock(cacheLock);
auto v = cachedMsg[gid].front();
cachedMsg[gid].pop();
auto v = cachedMsg[selfId].front();
cachedMsg[selfId].pop();
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
......@@ -157,7 +159,7 @@ private:
}
}else{
// TODO:
spdlog::warn("evmgr {} received unknown meta {} from {}", devSn, meta, gid);
spdlog::warn("evmgr {} received unknown meta {} from {}", devSn, meta, selfId);
}
}
......@@ -171,37 +173,37 @@ protected:
zmq_msg_t msg;
// health checking thread
auto thHealth = thread([&,this](){
auto ipcs = this->jmgr["ipcs"];
json jmeta; jmeta["type"] = "ping";
auto meta = str2body(jmeta.dump());
auto mgrId = str2body(this->devSn + ":0:0");
while(true) {
for(auto &j:ipcs) {
if(j.count("modules") != 0) {
for(auto &[k, v]: j["modules"].items()) {
// k = module name
for(auto &m: v) {
if(!m.count("sn") && !m.count("iid")) {
// construct gid for module
string gid = m["sn"].get<string>() + ":" + k + ":" + to_string(m["iid"]);
// build ping msg
vector<vector<uint8_t> > v = {str2body(gid), mgrId, meta, str2body("hello")};
ret = z_send_multiple(this->pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send ping to module {}", devSn, gid);
}else{
//
}
}
}
}
}
}
// TODO:
this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS));
}
});
// auto thHealth = thread([&,this](){
// auto ipcs = this->jmgr["ipcs"];
// json jmeta; jmeta["type"] = "ping";
// auto meta = str2body(jmeta.dump());
// auto mgrId = str2body(this->devSn + ":0:0");
// while(true) {
// for(auto &j:ipcs) {
// if(j.count("modules") != 0) {
// for(auto &[k, v]: j["modules"].items()) {
// // k = module name
// for(auto &m: v) {
// if(!m.count("sn") && !m.count("iid")) {
// // construct gid for module
// string gid = m["sn"].get<string>() + ":" + k + ":" + to_string(m["iid"]);
// // build ping msg
// vector<vector<uint8_t> > v = {str2body(gid), mgrId, meta, str2body("hello")};
// ret = z_send_multiple(this->pRouter, v);
// if(ret < 0) {
// spdlog::error("evmgr {} failed to send ping to module {}", devSn, gid);
// }else{
// //
// }
// }
// }
// }
// }
// }
// // TODO:
// this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS));
// }
// });
while (true) {
if(checkStop() == true) {
......
......@@ -39,7 +39,9 @@ private:
const char * bytes;
int len;
void *pDealer=NULL;
void sendPing(){
thread thPing;
int ping(){
int ret = 0;
vector<vector<uint8_t> >body;
// since identity is auto set
......@@ -50,8 +52,9 @@ private:
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpuller {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
return;
}
return ret;
}
protected:
void run()
......@@ -59,7 +62,16 @@ protected:
int ret = 0;
bool bStopSig = false;
// declare ready to router
sendPing();
ping();
thPing = thread([&,this](){
while(true) {
this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS-2));
ping();
}
});
thPing.detach();
// init response msg
auto msgBody = data2body(const_cast<char*>(bytes), len);
while (true) {
......@@ -88,7 +100,7 @@ protected:
spdlog::error("evpuller {} {} failed send rep to requester {}: {}", devSn, iid, body2str(v[0]), zmq_strerror(zmq_errno()));
}
}else if(meta["type"].get<string>() == EV_MSG_META_PING){
sendPing();
ping();
}
else{
spdlog::error("evpuller {} {} unknown meta from {}: {}", devSn, iid, body2str(v[0]), body2str(v[1]));
......
......@@ -35,6 +35,7 @@ private:
AVFormatContext *pAVFormatRemux = NULL;
AVFormatContext *pAVFormatInput = NULL;
json config;
thread thPing;
int init()
{
......@@ -106,6 +107,27 @@ private:
return 0;
}
int ping(){
// send hello to router
int ret = 0;
vector<vector<uint8_t> >body;
// since identity is auto set
body.push_back(str2body(mgrSn+":0:0"));
body.push_back(str2body(EV_MSG_META_PING)); // blank meta
body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpusher {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
//TODO:
}else{
spdlog::info("evpusher {} {} sent hello to router: {}", devSn, iid, mgrSn);
}
return ret;
}
int setupMq()
{
int ret = 0;
......@@ -138,24 +160,18 @@ private:
spdlog::error("evpusher {} {} failed connect dealer: {}", devSn, iid, urlDealer);
return -4;
}
//ping
ret = ping();
thPing = thread([&,this](){
while(true) {
this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS-2));
ping();
}
});
// send hello to router
vector<vector<uint8_t> >body;
// since identity is auto set
body.push_back(str2body(mgrSn+":0:0"));
body.push_back(str2body(EV_MSG_META_PING)); // blank meta
body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpusher {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
//TODO:
return -1;
}
spdlog::info("evpusher {} {} sent hello to router: {}", devSn, iid, mgrSn);
thPing.detach();
return 0;
return ret;
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论