提交 62e44ec2 authored 作者: blu's avatar blu

refactor communitation archtecture to use evdamon only

上级 0b360f23
......@@ -9,6 +9,7 @@ update: 2019/08/30
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wunused-lambda-capture"
#include <queue>
#include <cstdlib>
#include "inc/tinythread.hpp"
#include "inc/httplib.h"
......@@ -21,6 +22,7 @@ update: 2019/08/30
using namespace std;
using namespace httplib;
using namespace nlohmann;
using namespace zmqhelper;
class EvDaemon{
private:
......@@ -29,9 +31,16 @@ class EvDaemon{
json info;
int port = 8088;
thread thMon;
string devSn;
string devSn, daemonId;
int portRouter = 5549;
thread::id thIdMain;
thread thRouter;
json peerStatus;
json peerConfig;
unordered_map<string, queue<vector<vector<uint8_t> >> > cachedMsg;
mutex cacheLock;
queue<string> eventQue;
mutex eventQLock;
/// module gid to process id
json mapModsToPids;
......@@ -50,6 +59,7 @@ class EvDaemon{
}
this->devSn = this->info["sn"];
this->daemonId = this->devSn + ":evdaemon:0";
/// req config
json jret = cloudutils::reqConfig(this->info);
// apply config
......@@ -59,27 +69,29 @@ class EvDaemon{
return 2;
}
spdlog::info("evmgr {} got cloud config:\n{}", devSn, jret.dump(4));
spdlog::info("evdaemon {} got cloud config:\n{}", devSn, jret.dump(4));
json &data = jret["data"];
for(auto &[k,v]:data.items()) {
if(k == this->devSn) {
// startup evmgr
string peerId = v["sn"].get<string>() + ":evmgr:0";
// offline
this->peerStatus[peerId] = 0;
this->peerConfig[peerId] = v;
pid_t pid;
if( (pid = fork()) == -1 ) {
spdlog::error("evdamon {} failed to fork subsytem - evmgr", this->devSn);
}else if(pid == 0) {
// child
// execl("./evmgr", "arg1", "arg2", (char *)0);
ret = setenv("ADDR", v["addr"].get<string>().c_str(), 1);
ret += setenv("SN", v["sn"].get<string>().c_str(), 1);
ret += setenv("PORT_ROUTER", to_string(v["port-router"].get<int>()).c_str(), 1);
ret += setenv("PORT_CLOUD", to_string(v["port-cloud"].get<int>()).c_str(), 1);
ret += setenv("ADDR_CLOUD", v["mqtt-cloud"].get<string>().c_str(), 1);
ret += setenv("DR_PORT", to_string(portRouter).c_str(), 1);
if(ret < 0) {
spdlog::error("evdaemon {} failed to set env", this->devSn);
return -3;
}
execl("./evmgr", NULL, NULL, NULL);
spdlog::error("evdaemon {} failed to startup evmgr", this->devSn);
}else{
......@@ -95,7 +107,6 @@ class EvDaemon{
json &modules = ipc["modules"];
for(auto &[mn, ml] : modules.items()) {
//
if()
}
}
}
......@@ -120,6 +131,147 @@ class EvDaemon{
});
}
int handleMsg(vector<vector<uint8_t> > &body)
{
int ret = 0;
zmq_msg_t msg;
// ID_SENDER, ID_TARGET, meta ,MSG
string selfId, peerId, meta;
if(body.size() == 2 && body[1].size() == 0) {
selfId = body2str(body[0]);
bool eventConn = false;
// XTF2BJR9:evslicer:1
auto sp = cloudutils::split(selfId, ':');
if(sp.size() != 3) {
spdlog::warn("evdaemon {} inproper peer id: {}", devSn, selfId);
return -1;
}
if(peerStatus.count(selfId) == 0) {
spdlog::warn("evdaemon {} unkown module with id: {}, peerStats {}", devSn, selfId, peerStatus.dump(4));
return -1;
}
if(peerStatus[selfId] == 0) {
peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evdaemon {} peer connected: {}", devSn, selfId);
eventConn = true;
spdlog::debug("evdaemon {} update status of {} to 1 and send config", devSn, selfId);
string cfg = peerConfig[selfId].dump();
json j;
j["type"] = EV_MSG_META_CONFIG;
string meta = j.dump();
vector<vector<uint8_t> > v = {str2body(selfId), str2body(this->daemonId), str2body(meta), str2body(cfg)};
z_send_multiple(pRouter, v);
}
else {
peerStatus[selfId] = 0;
spdlog::warn("evdaemon {} peer disconnected: {}", devSn, selfId);
}
//update config
ret = LVDB::setLocalConfig(config);
if(ret < 0) {
spdlog::error("evdaemon {} failed to update localconfig", devSn);
}
// event
json jEvt;
jEvt["type"] = EV_MSG_TYPE_CONN_STAT;
jEvt["gid"] = selfId;
jEvt["ts"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
if(eventConn) {
jEvt["event"] = EV_MSG_EVENT_CONN_CONN;
}
else {
jEvt["event"] = EV_MSG_EVENT_CONN_DISCONN;
}
eventQue.push(jEvt.dump());
if(eventQue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQue.pop();
}
return 0;
}
else if(body.size() != 4) {
spdlog::warn("evdaemon {} dropped an invalid message, size: {}", devSn, body.size());
return 0;
}
meta = body2str(body[2]);
selfId = body2str(body[0]);
peerId = body2str(body[1]);
// update status;
this->peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// msg to peer
if(memcmp((void*)(body[1].data()), (devSn +":0:0").data(), body[1].size()) != 0) {
// message to other peer
// check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerStatus.count(peerId)!= 0 && peerStatus[peerId] != 0) {
spdlog::info("evdaemon {} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evdaemon {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
else {
// cache
spdlog::warn("evdaemon {} cached msg from {} to {}", devSn, selfId, peerId);
lock_guard<mutex> lock(cacheLock);
cachedMsg[peerId].push(v);
if(cachedMsg[peerId].size() > EV_NUM_CACHE_PERPEER) {
cachedMsg[peerId].pop();
}
}
// check if event
try {
string metaType = json::parse(meta)["type"];
if(metaType == EV_MSG_META_EVENT) {
eventQue.push(body2str(body[3]));
if(eventQue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQue.pop();
}
}
}
catch(exception &e) {
spdlog::error("evdaemon {} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what());
}
}
else {
// message to mgr
// spdlog::info("evdaemon {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
if(meta == "pong"||meta == "ping") {
// update status
spdlog::info("evdaemon {}, ping msg from {}", devSn, selfId);
if(meta=="ping") {
if(cachedMsg.find(selfId) != cachedMsg.end()) {
while(!cachedMsg[selfId].empty()) {
lock_guard<mutex> lock(cacheLock);
auto v = cachedMsg[selfId].front();
cachedMsg[selfId].pop();
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evdaemon {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
}
}
}
else {
// TODO:
spdlog::warn("evdaemon {} received unknown meta {} from {}", devSn, meta, selfId);
}
}
return ret;
}
protected:
public:
void run(){
......@@ -185,28 +337,42 @@ class EvDaemon{
}
EvDaemon(){
int ret = 0;
// http port
char* strPort = getenv("DAEMON_PORT");
if(strPort != NULL) {
port = stoi(strPort);
}
// zmq router port
strPort = getenv("ROUTER_PORT");
if(strPort != NULL) {
portRouter = stoi(strPort);
}
string addr = string("tcp://*:") + to_string(portRouter);
// setup zmq
int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT;
pRouterCtx = zmq_ctx_new();
pRouter = zmq_socket(pRouterCtx, ZMQ_ROUTER);
zmq_setsockopt (pRouter, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify));
string addr = "tcp://127.0.0.1:" + to_string(portRouter);
int ret = zmq_bind(pRouter, addr.c_str());
ret = zmqhelper::setupRouter(&pRouterCtx, &pRouter, addr);
if(ret < 0) {
spdlog::error("evdaemon {} failed to bind port: {}", this->devSn, addr);
spdlog::error("evdaemon {} setup router: {}", this->devSn, addr);
exit(1);
}
this->thIdMain = this_thread::get_id();
// setup msg processor
thRouter = thread([this](){
while(true){
auto v = zmqhelper::z_recv_multiple(this->pRouter);
if(v.size() == 0) {
spdlog::error("evdaemon {} failed to receive msg {}", this->devSn, zmq_strerror(zmq_errno()));
}else{
handleMsg(v);
}
}
});
thRouter.detach();
};
~EvDaemon(){};
};
......
......@@ -38,6 +38,7 @@ class EvMgr:public TinyThread {
private:
void *pRouterCtx = NULL;
void *pRouter = NULL;
void *pCtxDealer = NULL, *pDealer = NULL;
json config;
string devSn;
json peerStatus;
......@@ -103,113 +104,6 @@ private:
spdlog::info("evmgr {} successfuly inited", devSn);
}
// // TODO: deprecated
// void _init()
// {
// int ret;
// json jret;
// bool inited = false;
// // TODO: load config from local db
// json info;
// ret = LVDB::getSn(info);
// if(ret < 0) {
// spdlog::error("failed to get sn");
// exit(1);
// }
// tsLastBoot = info["lastboot"];
// tsUpdateTime=info["updatetime"];
// spdlog::info("evmgr info: sn = {}, lastboot = {}, updatetime = {}", config["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime));
// devSn = config["sn"];
// ret = LVDB::getLocalConfig(config);
// if(ret < 0) {
// spdlog::error("evmgr failed to get local configuration");
// exit(1);
// }
// spdlog::info("evmgr local config:\n{}", config.dump(4));
// int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT;
// string proto, addr;
// while(!inited) {
// try {
// //
// jret = cloudutils::registry(this->config, info["sn"], "evmgr");
// if(jret["code"] != 0) {
// spdlog::error("evmgr {} failed to registry: {}", devSn, jret["msg"].get<string>());
// goto error_exit;
// }else{
// if(jret["msg"] == "diff") {
// json &data = jret["data"];
// if(data.size() == 1 && data[0].at("path") == "/lastupdated") {
// // no diff
// spdlog::info("evmgr {} no change in config", devSn);
// }else{
// // patch
// spdlog::info("evmgr config changed in cloud, merge patch:\n{}", jret["data"].dump(4));
// config.merge_patch(jret["data"]);
// ret = LVDB::setLocalConfig(config);
// if(ret < 0) {
// spdlog::error("evmgr {} failed to update local config:\n{}", devSn, config.dump(4));
// goto error_exit;
// }
// }
// }
// }
// // TODO: verify sn
// if(!config.count("data")||!config["data"].count(devSn)||!config["data"][devSn].count("ipcs")) {
// spdlog::error("evmgr {} invalid config. reload now...", devSn);
// goto error_exit;
// }
// jmgr = config["data"][devSn];
// proto = jmgr["proto"];
// if(proto != "zmq") {
// spdlog::warn("evmgr {} unsupported protocol: {}, try fallback to zmq instead now...", devSn, proto);
// }
// //
// if(jmgr["addr"].get<string>() == "*" || jmgr["addr"].get<string>() == "0.0.0.0") {
// spdlog::error("invalid mgr address: {} in config:\n{}", jmgr["addr"].get<string>(), jmgr.dump(4));
// goto error_exit;
// }
// //addr = "tcp://" + jmgr["addr"].get<string>() + ":" + to_string(jmgr["port-router"]);
// addr = "tcp://*:" + to_string(jmgr["port-router"]);
// // setup zmq
// // TODO: connect to cloud
// // router service
// pRouterCtx = zmq_ctx_new();
// pRouter = zmq_socket(pRouterCtx, ZMQ_ROUTER);
// zmq_setsockopt (pRouter, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify));
// ret = zmq_bind(pRouter, addr.c_str());
// if(ret < 0) {
// spdlog::error("evmgr {} failed to bind zmq at {} for reason: {}, retrying load configuration...", devSn, addr, zmq_strerror(zmq_errno()));
// goto error_exit;
// }
// spdlog::info("evmgr {} bind success to {}", devSn, addr);
// inited = true;
// break;
// error_exit:
// this_thread::sleep_for(chrono::seconds(3));
// //continue;
// }
// catch(exception &e) {
// spdlog::error("evmgr {} exception on init() for: {}, retrying load configuration...", devSn, e.what());
// this_thread::sleep_for(chrono::seconds(3));
// continue;
// }
// }
// spdlog::info("evmgr {} successfuly inited", devSn);
// }
int mqErrorMsg(string cls, string devSn, string extraInfo, int ret)
{
if(ret < 0) {
......@@ -320,6 +214,7 @@ private:
}
}
}
catch(exception &e) {
spdlog::error("evmgr {} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what());
}
......@@ -382,42 +277,50 @@ public:
EvMgr& operator=(EvMgr &&) = delete;
EvMgr()
{
config["addr"] = "127.0.0.1";
// config["api-cloud"] = "http://127.0.0.1:8089";
config["port-daemon"] = 5549;
// config["mqtt-cloud"] = "<cloud_addr>";
config["addr-cloud"] = "127.0.0.1";
config["port-cloud"] = 5556;
config["proto"] = "zmq";
config["sn"] = "none";
config["port-router"] = 5550;
//
const char *strEnv = getenv("ADDR");
const char *strEnv = getenv("DR_PORT");
if(strEnv != NULL) {
config["addr"] = strEnv;
}
strEnv = getenv("PORT_DAEMON");
if(strEnv != NULL) {
config["port-daemon"] = atoi(strEnv);
}
strEnv = getenv("PORT_CLOUD");
if(strEnv != NULL) {
config["port-cloud"] = atoi(strEnv);
config["dr-port"] = atoi(strEnv);
}else{
// TODO:
spdlog::error("evmgr failed to start. no DR_PORT set");
exit(1);
}
strEnv = getenv("SN");
if(strEnv != NULL) {
config["sn"] = strEnv;
devSn = strEnv;
}else{
spdlog::error("evmgr failed to start. no sn set");
spdlog::error("evmgr failed to start. no SN set");
exit(1);
}
//
string addr = string("tcp://127.0.0.1:") + to_string(config["dr-port"]);
string ident = config["sn"].get<string>() + ":evmgr:0";
int ret = zmqhelper::setupDealer(&pCtxDealer, &pDealer, addr, ident);
bool bConfigGot = false;
while(!bConfigGot){
auto v = zmqhelper::z_recv_multiple(pDealer);
if(v.size() != 3) {
spdlog::error("evmgr {} invalid msg from daemon: {}", ident, addr);
continue;
}
spdlog::info("evmgr {} msg received: {} {} {}", ident, body2str(v[0]), body2str(v[1]), body2str(v[2]));
try{
string sMeta = json::parse(body2str(v[1]))["type"];
if(sMeta != EV_MSG_META_CONFIG) {
throw StrException("meta type is:" + sMeta + ", but expecting " + EV_MSG_META_CONFIG);
}
config = json::parse(body2str(v[2]));
bConfigGot = true;
}catch(exception &e) {
spdlog::error("evmgr {} invalid config msg from daemon {}, {}", ident, addr, e.what());
}
}
init();
}
~EvMgr()
......
......@@ -24,6 +24,7 @@ namespace zmqhelper {
#define EV_MSG_META_PONG "pong"
#define EV_MSG_META_EVENT "event"
#define EV_MSG_META_CMD "cmd"
#define EV_MSG_META_CONFIG "config"
#define EV_MSG_META_AVFORMATCTX "afctx"
#define EV_MSG_TYPE_AI_MOTION "ai_motion"
......@@ -122,6 +123,42 @@ int z_send_multiple(void *s, vector<vector<uint8_t> >&body)
}
return ret;
}
/// setup router
int setupRouter(void **ctx, void **s, string addr){
int ret = 0;
int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT;
*ctx = zmq_ctx_new();
*s = zmq_socket(*ctx, ZMQ_ROUTER);
zmq_setsockopt (*s, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify));
ret = zmq_bind(*s, addr.c_str());
if(ret < 0) {
spdlog::debug("failed to bind zmq at {} for reason: {}, retrying load configuration...", addr, zmq_strerror(zmq_errno()));
}
return ret;
}
/// setup dealer
int setupDealer(void **ctx, void **s, string addr, string ident) {
int ret = 0;
*ctx = zmq_ctx_new();
*s = zmq_socket(*ctx, ZMQ_DEALER);
ret = zmq_setsockopt(*s, ZMQ_IDENTITY, ident.c_str(), ident.size());
ret += zmq_setsockopt (*s, ZMQ_ROUTING_ID, ident.c_str(), ident.size());
if(ret < 0) {
spdlog::debug("{} failed setsockopts ZMQ_ROUTING_ID to {}: {}", ident, addr, zmq_strerror(zmq_errno()));
}else{
ret = zmq_connect(*s, addr.c_str());
if(ret != 0) {
spdlog::error("{} failed connect dealer: {}", ident, addr);
}
}
return ret;
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论