提交 79a7afbd authored 作者: blu's avatar blu

big refacting of communitation architect

上级 350b4af2
...@@ -8,6 +8,9 @@ update: 2019/09/10 ...@@ -8,6 +8,9 @@ update: 2019/09/10
#include <chrono> #include <chrono>
#include <set> #include <set>
#include <queue>
#include <mutex>
#include <algorithm>
#include "inc/tinythread.hpp" #include "inc/tinythread.hpp"
#include "inc/httplib.h" #include "inc/httplib.h"
#include "inc/database.h" #include "inc/database.h"
...@@ -19,6 +22,7 @@ update: 2019/09/10 ...@@ -19,6 +22,7 @@ update: 2019/09/10
using namespace std; using namespace std;
using namespace httplib; using namespace httplib;
using namespace nlohmann; using namespace nlohmann;
using namespace zmqhelper;
// //
#define KEY_CONFIG_MAP "configmap" #define KEY_CONFIG_MAP "configmap"
...@@ -28,10 +32,18 @@ private: ...@@ -28,10 +32,18 @@ private:
void *pRouterCtx = NULL, *pRouter = NULL; void *pRouterCtx = NULL, *pRouter = NULL;
string httpPort = "8089"; string httpPort = "8089";
string msgPort = "5048"; string msgPort = "5048";
string devSn = "evcloudsvc";
// sn:module -> sn_of_evmgr
json configMap; json configMap;
// peer data
json peerData;
unordered_map<string, queue<vector<vector<uint8_t> >> > cachedMsg;
mutex cacheLock;
queue<string> eventQue;
mutex eventQLock;
thread thMsgProcessor;
json config(json &newConfig) json config(json &newConfig)
{ {
json ret; json ret;
...@@ -138,6 +150,15 @@ private: ...@@ -138,6 +150,15 @@ private:
ret["code"] = iret; ret["code"] = iret;
ret["msg"] = msg; ret["msg"] = msg;
} }
// update in memory peerData
if(this->peerData["config"].count(k) != 0) {
json diff = json::diff(this->peerData["config"][k], v);
spdlog::info("evcloudsvc peer {} config diff:\n{}", k, diff.dump(4));
}else{
this->peerData["config"][k] = v;
}
// TODO: trigger msg
} // for evmgr } // for evmgr
// save configmap // save configmap
...@@ -160,13 +181,140 @@ private: ...@@ -160,13 +181,140 @@ private:
catch(exception &e) { catch(exception &e) {
ret.clear(); ret.clear();
ret["code"] = -1; ret["code"] = -1;
ret["msg"] = e.what(); ret["msg"] = string("evcloudsvc exception: ") + e.what();
} }
return ret; return ret;
} }
int handleMsg(vector<vector<uint8_t> > &body)
{
int ret = 0;
zmq_msg_t msg;
// ID_SENDER, ID_TARGET, meta ,MSG
string selfId, peerId, meta;
if(body.size() == 2 && body[1].size() == 0) {
selfId = body2str(body[0]);
bool eventConn = false;
if(peerData["status"].count(selfId) == 0 || peerData["status"][selfId] == 0) {
peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evcloudsvc {} peer connected: {}", devSn, selfId);
eventConn = true;
spdlog::debug("evcloudsvc {} update status of {} to 1 and send config", devSn, selfId);
string cfg = peerData["config"][selfId].dump();
json j;
j["type"] = EV_MSG_META_CONFIG;
string meta = j.dump();
vector<vector<uint8_t> > v = {str2body(selfId), str2body(devSn), str2body(meta), str2body(cfg)};
z_send_multiple(pRouter, v);
}
else {
peerData["status"][selfId] = 0;
spdlog::warn("evcloudsvc {} peer disconnected: {}", devSn, selfId);
}
if(ret < 0) {
spdlog::error("evcloudsvc {} failed to update localconfig", devSn);
}
// event
json jEvt;
jEvt["type"] = EV_MSG_TYPE_CONN_STAT;
jEvt["gid"] = selfId;
jEvt["ts"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
if(eventConn) {
jEvt["event"] = EV_MSG_EVENT_CONN_CONN;
}
else {
jEvt["event"] = EV_MSG_EVENT_CONN_DISCONN;
}
eventQue.push(jEvt.dump());
if(eventQue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQue.pop();
}
return 0;
}
else if(body.size() != 4) {
spdlog::warn("evcloudsvc {} dropped an invalid message, size: {}", devSn, body.size());
return 0;
}
// msg to peer
meta = body2str(body[2]);
selfId = body2str(body[0]);
peerId = body2str(body[1]);
// update status;
this->peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
int minLen = std::min(body[1].size(), devSn.size());
if(memcmp((void*)(body[1].data()), devSn.data(), minLen) != 0) {
// message to other peer
// check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerData["status"].count(peerId)!= 0 && peerData["status"][peerId] != 0) {
spdlog::info("evcloudsvc {} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evcloudsvc {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
else {
// cache
spdlog::warn("evcloudsvc {} cached msg from {} to {}", devSn, selfId, peerId);
lock_guard<mutex> lock(cacheLock);
cachedMsg[peerId].push(v);
if(cachedMsg[peerId].size() > EV_NUM_CACHE_PERPEER) {
cachedMsg[peerId].pop();
}
}
// check if event
try {
string metaType = json::parse(meta)["type"];
if(metaType == EV_MSG_META_EVENT) {
eventQue.push(body2str(body[3]));
if(eventQue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQue.pop();
}
}
}
catch(exception &e) {
spdlog::error("evcloudsvc {} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what());
}
}
else {
// message to evcloudsvc
// spdlog::info("evcloudsvc {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
if(meta == "pong"||meta == "ping") {
// update status
spdlog::info("evcloudsvc {}, ping msg from {}", devSn, selfId);
if(meta=="ping") {
if(cachedMsg.find(selfId) != cachedMsg.end()) {
while(!cachedMsg[selfId].empty()) {
lock_guard<mutex> lock(cacheLock);
auto v = cachedMsg[selfId].front();
cachedMsg[selfId].pop();
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evcloudsvc {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
}
}
}
else {
// TODO:
spdlog::warn("evcloudsvc {} received unknown meta {} from {}", devSn, meta, selfId);
}
}
return ret;
}
protected: protected:
public: public:
void run() void run()
...@@ -311,7 +459,6 @@ public: ...@@ -311,7 +459,6 @@ public:
} }
json data; json data;
for(auto &key : s) { for(auto &key : s) {
json cfg; json cfg;
int iret = LVDB::getLocalConfig(cfg, key); int iret = LVDB::getLocalConfig(cfg, key);
...@@ -408,6 +555,7 @@ public: ...@@ -408,6 +555,7 @@ public:
EvCloudSvc() EvCloudSvc()
{ {
int ret = 0; int ret = 0;
spdlog::info("evcloudsvc boot");
char *strEnv = getenv("HTTP_PORT"); char *strEnv = getenv("HTTP_PORT");
if(strEnv != NULL) { if(strEnv != NULL) {
httpPort = strEnv; httpPort = strEnv;
......
...@@ -12,6 +12,7 @@ update: 2019/09/10 ...@@ -12,6 +12,7 @@ update: 2019/09/10
#include <queue> #include <queue>
#include <cstdlib> #include <cstdlib>
#include <algorithm>
#include "inc/tinythread.hpp" #include "inc/tinythread.hpp"
#include "inc/httplib.h" #include "inc/httplib.h"
#include "inc/zmqhelper.hpp" #include "inc/zmqhelper.hpp"
...@@ -36,12 +37,13 @@ class EvDaemon{ ...@@ -36,12 +37,13 @@ class EvDaemon{
int portRouter = 5549; int portRouter = 5549;
thread::id thIdMain; thread::id thIdMain;
thread thRouter; thread thRouter;
json peerData; thread thCloud;
bool bReload = true; bool bReload = true;
bool bBootstrap = true; bool bBootstrap = true;
// peerData["status"]; // peerData["status"];
// peerData["pids"]; // peerData["pids"];
// peerData["config"]; // peerData["config"];
json peerData;
unordered_map<string, queue<vector<vector<uint8_t> >> > cachedMsg; unordered_map<string, queue<vector<vector<uint8_t> >> > cachedMsg;
mutex cacheLock; mutex cacheLock;
queue<string> eventQue; queue<string> eventQue;
...@@ -260,7 +262,7 @@ class EvDaemon{ ...@@ -260,7 +262,7 @@ class EvDaemon{
return 0; return 0;
} }
int handleMsg(vector<vector<uint8_t> > &body) int handleEdgeMsg(vector<vector<uint8_t> > &body)
{ {
int ret = 0; int ret = 0;
zmq_msg_t msg; zmq_msg_t msg;
...@@ -343,7 +345,9 @@ class EvDaemon{ ...@@ -343,7 +345,9 @@ class EvDaemon{
this->peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); this->peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// msg to peer // msg to peer
if(memcmp((void*)(body[1].data()), (devSn +":0:0").data(), body[1].size()) != 0) { string myId = devSn + ":0:0";
int minLen = std::min(body[1].size(), myId.size());
if(memcmp((void*)(body[1].data()), myId.data(), minLen) != 0) {
// message to other peer // message to other peer
// check peer status // check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]}; vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
...@@ -408,10 +412,60 @@ class EvDaemon{ ...@@ -408,10 +412,60 @@ class EvDaemon{
return ret; return ret;
} }
int handleCloudMsg(vector<vector<uint8_t> > &v) {
int ret = 0;
zmq_msg_t msg;
// ID_SENDER, meta ,MSG
string peerId, meta;
if(v.size() != 3) {
string msg;
for(auto &s:v) {
msg += body2str(s) + ";";
}
spdlog::error("evdaemon {} received invalid msg from cloud {}", devSn, msg);
}else{
try{
string meta = json::parse(v[1])["type"];
string peerId = body2str(v[0]);
json data = json::parse(body2str(v[2]));
// from cloudsvc
if(peerId == "evcloudsvc") {
// its configuration message
if(meta == EV_MSG_META_CONFIG) {
json diff = json::diff(config, data);
spdlog::info("evdaemon {} received cloud config {}", devSn, diff.dump());
}
}else{
// from peer
spdlog::info("evdaemon {} msg from peer {}: {}", devSn, peerId, data.dump());
}
}catch(exception &e) {
spdlog::error("evdaemon {} file {}:{} exception {}", devSn, __FILE__, __LINE__, e.what());
}
}
return 0;
}
protected: protected:
public: public:
void run(){ void run(){
setupSubSystems();
// setup cloud msg processor
thCloud = thread([this](){
while(true){
auto v = zmqhelper::z_recv_multiple(this->pDealer);
if(v.size() == 0) {
spdlog::error("evdaemon {} failed to receive msg {}", this->devSn, zmq_strerror(zmq_errno()));
}else{
handleCloudMsg(v);
}
}
});
thCloud.detach();
//setupSubSystems();
// get config // get config
svr.Get("/info", [this](const Request& req, Response& res){ svr.Get("/info", [this](const Request& req, Response& res){
...@@ -541,18 +595,19 @@ class EvDaemon{ ...@@ -541,18 +595,19 @@ class EvDaemon{
this->thIdMain = this_thread::get_id(); this->thIdMain = this_thread::get_id();
// setup msg processor // setup edge msg processor
thRouter = thread([this](){ thRouter = thread([this](){
while(true){ while(true){
auto v = zmqhelper::z_recv_multiple(this->pRouter); auto v = zmqhelper::z_recv_multiple(this->pRouter);
if(v.size() == 0) { if(v.size() == 0) {
spdlog::error("evdaemon {} failed to receive msg {}", this->devSn, zmq_strerror(zmq_errno())); spdlog::error("evdaemon {} failed to receive msg {}", this->devSn, zmq_strerror(zmq_errno()));
}else{ }else{
handleMsg(v); handleEdgeMsg(v);
} }
} }
}); });
thRouter.detach(); thRouter.detach();
/// peerId -> value /// peerId -> value
peerData["status"] = json(); peerData["status"] = json();
......
...@@ -77,7 +77,7 @@ private: ...@@ -77,7 +77,7 @@ private:
addr = "tcp://*:" + to_string(config["port-router"]); addr = "tcp://*:" + to_string(config["port-router"]);
// setup zmq // setup zmq
// TODO: connect to cloud // TODO: connect to cloud
// router service // router service
pRouterCtx = zmq_ctx_new(); pRouterCtx = zmq_ctx_new();
pRouter = zmq_socket(pRouterCtx, ZMQ_ROUTER); pRouter = zmq_socket(pRouterCtx, ZMQ_ROUTER);
...@@ -180,7 +180,9 @@ private: ...@@ -180,7 +180,9 @@ private:
this->peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); this->peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// msg to peer // msg to peer
if(memcmp((void*)(body[1].data()), (devSn +":0:0").data(), body[1].size()) != 0) { string myId = devSn + ":0:0";
int minLen = std::min(body[1].size(), myId.size());
if(memcmp((void*)(body[1].data()), myId.data(), minLen) != 0) {
// message to other peer // message to other peer
// check peer status // check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]}; vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论