提交 4a60847c authored 作者: blu's avatar blu

init

上级 1735612e
......@@ -54,7 +54,7 @@ private:
config = json::parse(cloudutils::config);
spdlog::info("config dumps: \n{}", config.dump());
// TODO: verify sn
if(!config.count("data")||!config["data"].count(devSn)||!config["data"][devSn].count("ipcs")){
if(!config.count("data")||!config["data"].count(devSn)||!config["data"][devSn].count("ipcs")) {
spdlog::error("evmgr {} invalid config. reload now...", devSn);
this_thread::sleep_for(chrono::seconds(3));
continue;
......@@ -62,8 +62,8 @@ private:
jmgr = config["data"][devSn];
string proto = jmgr["proto"];
string addr;
if(proto != "zmq"){
if(proto != "zmq") {
spdlog::error("evmgr {} unsupported protocol: {}, try fallback to zmq instead now...", devSn, proto);
}
addr = "tcp://" + jmgr["addr"].get<string>() + ":" + to_string(jmgr["port-router"]);
......@@ -92,33 +92,55 @@ private:
spdlog::info("evmgr {} successfuly inited", devSn);
}
int mqErrorMsg(string cls, string devSn, string extraInfo, int ret) {
int mqErrorMsg(string cls, string devSn, string extraInfo, int ret)
{
if(ret < 0) {
spdlog::error("{} {} {}:{} ", cls, devSn, extraInfo, zmq_strerror(zmq_errno()));
}
return ret;
}
int handleMsg(vector<vector<uint8_t> > &body) {
int handleMsg(vector<vector<uint8_t> > &body)
{
int ret = 0;
zmq_msg_t msg;
// ID_SENDER, ID_TARGET, meta ,MSG
string selfId, peerId, meta;
if(body.size() == 2 && body[1].size() == 0) {
selfId = body2str(body[0]);
spdlog::warn("evmgr {} peer disconnected: {}", devSn, selfId);
if(peerStatus.count(selfId) == 0) {
bool eventConn = false;
if(peerStatus.count(selfId) == 0||peerStatus[selfId] == 0) {
peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
}else{
spdlog::info("evmgr {} peer connected: {}", devSn, selfId);
eventConn = true;
}
else {
peerStatus[selfId] = 0;
spdlog::warn("evmgr {} peer disconnected: {}", devSn, selfId);
}
// event
json jEvt;
jEvt["type"] = EV_MSG_TYPE_CONN_STAT;
jEvt["gid"] = selfId;
if(eventConn) {
jEvt["event"] = EV_MSG_EVENT_CONN_CONN;
}
else {
jEvt["event"] = EV_MSG_EVENT_CONN_DISCONN;
}
eventQue.push(jEvt.dump());
if(eventQue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQue.pop();
}
return 0;
}else if(body.size() != 4) {
}
else if(body.size() != 4) {
spdlog::warn("evmgr {} dropped an invalid message, size: {}", devSn, body.size());
return 0;
}
meta = body2str(body[2]);
selfId = body2str(body[0]);
peerId = body2str(body[1]);
......@@ -130,13 +152,14 @@ private:
// message to other peer
// check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerStatus.count(peerId)!= 0 && peerStatus[peerId] != 0) {
if(peerStatus.count(peerId)!= 0 && peerStatus[peerId] != 0) {
spdlog::info("evmgr {} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}else{
}
}
else {
// cache
spdlog::warn("evmgr {} cached msg from {} to {}", devSn, selfId, peerId);
lock_guard<mutex> lock(cacheLock);
......@@ -147,7 +170,7 @@ private:
}
// check if event
try{
try {
string metaType = json::parse(meta)["type"];
if(metaType == EV_MSG_META_EVENT) {
eventQue.push(body2str(body[3]));
......@@ -155,10 +178,12 @@ private:
eventQue.pop();
}
}
}catch(exception &e) {
}
catch(exception &e) {
spdlog::error("evmgr {} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what());
}
}else{
}
else {
// message to mgr
spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
if(meta == "pong"||meta == "ping") {
......@@ -166,18 +191,19 @@ private:
spdlog::info("evmgr {}, ping msg from {}", devSn, selfId);
if(meta=="ping") {
if(cachedMsg.find(selfId) != cachedMsg.end()) {
while(!cachedMsg[selfId].empty()){
while(!cachedMsg[selfId].empty()) {
lock_guard<mutex> lock(cacheLock);
auto v = cachedMsg[selfId].front();
cachedMsg[selfId].pop();
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
}
}
}
}else{
}
else {
// TODO:
spdlog::warn("evmgr {} received unknown meta {} from {}", devSn, meta, selfId);
}
......@@ -186,43 +212,44 @@ private:
return ret;
}
int get_monitor_event (void *monitor, int *value, char **address)
{
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (zmq_msg_more (&msg));
{
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (zmq_msg_more (&msg));
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
if (value)
*value = *(uint32_t *) (data + 2);
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
if (value)
*value = *(uint32_t *) (data + 2);
// Second frame in message contains event address
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (!zmq_msg_more (&msg));
// Second frame in message contains event address
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (!zmq_msg_more (&msg));
if (address) {
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
size_t size = zmq_msg_size (&msg);
*address = (char *) malloc (size + 1);
memcpy (*address, data, size);
(*address)[size] = 0;
}
return event;
}
if (address) {
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
size_t size = zmq_msg_size (&msg);
*address = (char *) malloc (size + 1);
memcpy (*address, data, size);
(*address)[size] = 0;
}
return event;
}
protected:
void run(){
void run()
{
bool bStopSig = false;
int ret = 0;
zmq_msg_t msg;
// TODO: don't need this anymore, since I've used the draft feature of ZOUTER_NOTIFICATION instead
// disabled because:
// 1. it can't determine which peer disconnected, but only the underline socket FD.
// disabled because:
// 1. it can't determine which peer disconnected, but only the underline socket FD.
// 2. used the draft feature of ZMQ_ROUTER_NOTIFY instead to capture peer module disconnections such as evpuser, evmlmotion.
// thread thMon = thread([&,this](){
// int ret = 0;
......@@ -256,7 +283,7 @@ protected:
auto body = z_recv_multiple(pRouter,false);
if(body.size() == 0) {
spdlog::error("evmgr {} failed to receive multiple msg: {}", devSn, zmq_strerror(zmq_errno()));
continue;
continue;
}
// full proto msg received.
handleMsg(body);
......@@ -278,7 +305,7 @@ public:
zmq_close(pRouter);
pRouter = NULL;
}
if(pRouterCtx != NULL){
if(pRouterCtx != NULL) {
zmq_ctx_destroy(pRouterCtx);
pRouterCtx = NULL;
}
......
......@@ -453,9 +453,9 @@ private:
evtState = IN;
json p;
spdlog::info("state: PRE->IN");
p["type"] = "motion";
p["type"] = EV_MSG_TYPE_AI_MOTION;
p["gid"] = selfId;
p["event"] = "end";
p["event"] = EV_MSG_EVENT_MOTION_START;
p["ts"] = chrono::duration_cast<chrono::seconds>(evtStartTmLast.time_since_epoch()).count();
//p["frame"] = origin.clone();
evtQueue->push(p.dump());
......@@ -489,9 +489,9 @@ private:
spdlog::info("state: POST->NONE");
evtState = NONE;
json p;
p["type"] = "motion";
p["type"] = EV_MSG_TYPE_AI_MOTION;
p["gid"] = selfId;
p["event"] = "end";
p["event"] = EV_MSG_EVENT_MOTION_END;
p["ts"] = chrono::duration_cast<chrono::seconds>(evtStartTmLast.time_since_epoch()).count() + (int)(detPara.post/2);
evtQueue->push(p.dump());
if(evtQueue->size() > MAX_EVENT_QUEUE_SIZE*2) {
......
......@@ -20,8 +20,18 @@ namespace zmqhelper {
#define EV_MSG_META_UPDATE "update"
#define EV_MSG_META_EVENT "event"
#define EV_MSG_META_AVFORMATCTX "afctx"
#define EV_MSG_TYPE_AI_MOTION "ai_motion"
#define EV_MSG_TYPE_CONN_STAT "connstat"
#define EV_MSG_TYPE_SYS_STAT "sysstat"
#define EV_MSG_EVENT_MOTION_START "start"
#define EV_MSG_EVENT_MOTION_END "end"
#define EV_MSG_EVENT_CONN_CONN "connect"
#define EV_MSG_EVENT_CONN_DISCONN "disconnect"
#define EV_NUM_CACHE_PERPEER 100
#define MAX_EVENT_QUEUE_SIZE 20
#define MAX_EVENT_QUEUE_SIZE 50
//
string body2str(vector<uint8_t> body)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论