提交 327bb284 authored 作者: blu's avatar blu

big refacting of communitation architect

上级 5e79deb3
...@@ -37,6 +37,7 @@ class EvDaemon{ ...@@ -37,6 +37,7 @@ class EvDaemon{
thread thRouter; thread thRouter;
json peerData; json peerData;
bool bReload = true; bool bReload = true;
bool bBootstrap = true;
// peerData["status"]; // peerData["status"];
// peerData["pids"]; // peerData["pids"];
// peerData["config"]; // peerData["config"];
...@@ -91,6 +92,7 @@ class EvDaemon{ ...@@ -91,6 +92,7 @@ class EvDaemon{
this->peerData["config"][peerId] = v; this->peerData["config"][peerId] = v;
if(this->peerData["status"].count(peerId) == 0||this->peerData["status"][peerId] == 0) { if(this->peerData["status"].count(peerId) == 0||this->peerData["status"][peerId] == 0) {
this->peerData["status"][peerId] = 0; this->peerData["status"][peerId] = 0;
if(bBootstrap) {
ret = zmqhelper::forkSubsystem(devSn, peerId, portRouter, pid); ret = zmqhelper::forkSubsystem(devSn, peerId, portRouter, pid);
if(ret != 0) { if(ret != 0) {
spdlog::error("evdaemon {} failed to fork subsystem: {}", devSn, peerId); spdlog::error("evdaemon {} failed to fork subsystem: {}", devSn, peerId);
...@@ -99,6 +101,7 @@ class EvDaemon{ ...@@ -99,6 +101,7 @@ class EvDaemon{
} }
this->peerData["pids"][peerId] = pid; this->peerData["pids"][peerId] = pid;
spdlog::info("evdaemon {} created subsystem {}", devSn, peerId); spdlog::info("evdaemon {} created subsystem {}", devSn, peerId);
}
}else{ }else{
// TODO: // TODO:
} }
...@@ -127,6 +130,7 @@ class EvDaemon{ ...@@ -127,6 +130,7 @@ class EvDaemon{
if(this->peerData["status"].count(peerId) == 0||this->peerData["status"][peerId] == 0) { if(this->peerData["status"].count(peerId) == 0||this->peerData["status"][peerId] == 0) {
this->peerData["status"][peerId] = 0; this->peerData["status"][peerId] = 0;
if(bBootstrap) {
ret = zmqhelper::forkSubsystem(devSn, peerId, portRouter, pid); ret = zmqhelper::forkSubsystem(devSn, peerId, portRouter, pid);
if(ret != 0) { if(ret != 0) {
spdlog::error("evdaemon {} failed to fork subsystem: {}", devSn, peerId); spdlog::error("evdaemon {} failed to fork subsystem: {}", devSn, peerId);
...@@ -135,6 +139,7 @@ class EvDaemon{ ...@@ -135,6 +139,7 @@ class EvDaemon{
} }
this->peerData["pids"][peerId] = pid; this->peerData["pids"][peerId] = pid;
spdlog::info("evdaemon {} created subsystem {}", devSn, peerId); spdlog::info("evdaemon {} created subsystem {}", devSn, peerId);
}
}else{ }else{
// TODO: // TODO:
} }
...@@ -165,9 +170,11 @@ class EvDaemon{ ...@@ -165,9 +170,11 @@ class EvDaemon{
if(this->peerData["config"].count(k) != 0){ if(this->peerData["config"].count(k) != 0){
this->peerData["config"].erase(k); this->peerData["config"].erase(k);
} }
if(this->peerData["pids"].count(k) != 0) {
this->peerData["pids"].erase(k); this->peerData["pids"].erase(k);
} }
}
} }
void setupSubSystems() { void setupSubSystems() {
...@@ -271,8 +278,13 @@ class EvDaemon{ ...@@ -271,8 +278,13 @@ class EvDaemon{
} }
else { else {
peerData["status"][selfId] = 0; peerData["status"][selfId] = 0;
if(peerData["pids"].count(selfId) != 0) {
peerData["pids"].erase(selfId); peerData["pids"].erase(selfId);
}
if(peerData["config"].count(selfId) != 0) {
peerData["config"].erase(selfId); peerData["config"].erase(selfId);
}
spdlog::warn("evdaemon {} peer disconnected: {}", devSn, selfId); spdlog::warn("evdaemon {} peer disconnected: {}", devSn, selfId);
// restart this module // restart this module
startSubModule(selfId); startSubModule(selfId);
...@@ -456,16 +468,21 @@ class EvDaemon{ ...@@ -456,16 +468,21 @@ class EvDaemon{
EvDaemon(){ EvDaemon(){
int ret = 0; int ret = 0;
char* strEnv = getenv("BOOTSTRAP");
if(strEnv != NULL && memcmp(strEnv, "false", 5) == 0) {
bBootstrap = false;
}
// http port // http port
char* strPort = getenv("DAEMON_PORT"); strEnv = getenv("DAEMON_PORT");
if(strPort != NULL) { if(strEnv != NULL) {
port = stoi(strPort); port = stoi(strEnv);
} }
// zmq router port // zmq router port
strPort = getenv("ROUTER_PORT"); strEnv = getenv("ROUTER_PORT");
if(strPort != NULL) { if(strEnv != NULL) {
portRouter = stoi(strPort); portRouter = stoi(strEnv);
} }
string addr = string("tcp://*:") + to_string(portRouter); string addr = string("tcp://*:") + to_string(portRouter);
......
...@@ -47,6 +47,7 @@ private: ...@@ -47,6 +47,7 @@ private:
queue<string> eventQue; queue<string> eventQue;
mutex eventQLock; mutex eventQLock;
time_t tsLastBoot, tsUpdateTime; time_t tsLastBoot, tsUpdateTime;
string drport = "5549";
// //
void init() void init()
...@@ -276,16 +277,12 @@ public: ...@@ -276,16 +277,12 @@ public:
EvMgr& operator=(EvMgr &&) = delete; EvMgr& operator=(EvMgr &&) = delete;
EvMgr() EvMgr()
{ {
string drport;
const char *strEnv = getenv("DR_PORT"); const char *strEnv = getenv("DR_PORT");
if(strEnv != NULL) { if(strEnv != NULL) {
drport = strEnv; drport = strEnv;
}else{
spdlog::error("evmgr failed to start. no DR_PORT set");
exit(1);
} }
strEnv = getenv("GID"); strEnv = getenv("PEERID");
if(strEnv != NULL) { if(strEnv != NULL) {
ident = strEnv; ident = strEnv;
auto v = strutils::split(ident, ':'); auto v = strutils::split(ident, ':');
......
...@@ -84,6 +84,7 @@ private: ...@@ -84,6 +84,7 @@ private:
json config; json config;
thread thPing; thread thPing;
thread thEvent; thread thEvent;
string drport = "5549";
// //
int init() int init()
...@@ -634,16 +635,12 @@ public: ...@@ -634,16 +635,12 @@ public:
EvMLMotion(queue<string> *queue) EvMLMotion(queue<string> *queue)
{ {
evtQueue = queue; evtQueue = queue;
string drport;
const char *strEnv = getenv("DR_PORT"); const char *strEnv = getenv("DR_PORT");
if(strEnv != NULL) { if(strEnv != NULL) {
drport = strEnv; drport = strEnv;
}else{
spdlog::error("evmlmotion failed to start. no DR_PORT set");
exit(1);
} }
strEnv = getenv("GID"); strEnv = getenv("PEERID");
if(strEnv != NULL) { if(strEnv != NULL) {
selfId = strEnv; selfId = strEnv;
auto v = strutils::split(selfId, ':'); auto v = strutils::split(selfId, ':');
......
...@@ -43,23 +43,6 @@ private: ...@@ -43,23 +43,6 @@ private:
const char * bytes; const char * bytes;
int len; int len;
void *pDealer=NULL; void *pDealer=NULL;
thread thPing;
int ping()
{
int ret = 0;
vector<vector<uint8_t> >body;
// since identity is auto set
body.push_back(str2body(mgrSn + ":0:0"));
body.push_back(str2body(EV_MSG_META_PING));
body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpuller {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
}
return ret;
}
int handleMsg(vector<vector<uint8_t> > v) int handleMsg(vector<vector<uint8_t> > v)
{ {
...@@ -99,18 +82,6 @@ protected: ...@@ -99,18 +82,6 @@ protected:
{ {
int ret = 0; int ret = 0;
bool bStopSig = false; bool bStopSig = false;
// declare ready to router
ping();
// TODO: don't need this anymore, since I've used the draft feature of ZOUTER_NOTIFICATION instead
// thPing = thread([&,this]() {
// while(true) {
// this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS-2));
// ping();
// }
// });
// thPing.detach();
// init response msg // init response msg
while (true) { while (true) {
...@@ -155,6 +126,19 @@ private: ...@@ -155,6 +126,19 @@ private:
int *streamList = NULL, numStreams = 0, iid; int *streamList = NULL, numStreams = 0, iid;
time_t tsLastBoot, tsUpdateTime; time_t tsLastBoot, tsUpdateTime;
json config; json config;
string drport = "5549";
int ping()
{
int ret = 0;
vector<vector<uint8_t> >body = {str2body(mgrSn + ":0:0"), str2body(EV_MSG_META_PING), str2body(MSG_HELLO)};
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpuller {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
}
return ret;
}
int init() int init()
{ {
...@@ -242,6 +226,8 @@ private: ...@@ -242,6 +226,8 @@ private:
spdlog::error("evpuller {} failed to connect to router {}", selfId, urlDealer); spdlog::error("evpuller {} failed to connect to router {}", selfId, urlDealer);
exit(1); exit(1);
} }
ping();
} }
catch(exception &e) { catch(exception &e) {
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
...@@ -356,16 +342,12 @@ protected: ...@@ -356,16 +342,12 @@ protected:
public: public:
EvPuller() EvPuller()
{ {
string drport;
const char *strEnv = getenv("DR_PORT"); const char *strEnv = getenv("DR_PORT");
if(strEnv != NULL) { if(strEnv != NULL) {
drport = strEnv; drport = strEnv;
}else{
spdlog::error("evpusher failed to start. no DR_PORT set");
exit(1);
} }
strEnv = getenv("GID"); strEnv = getenv("PEERID");
if(strEnv != NULL) { if(strEnv != NULL) {
selfId = strEnv; selfId = strEnv;
auto v = strutils::split(selfId, ':'); auto v = strutils::split(selfId, ':');
......
...@@ -48,6 +48,7 @@ private: ...@@ -48,6 +48,7 @@ private:
time_t tsLastBoot, tsUpdateTime; time_t tsLastBoot, tsUpdateTime;
json config; json config;
thread thPing; thread thPing;
string drport = "5549";
int init() int init()
{ {
...@@ -403,16 +404,12 @@ protected: ...@@ -403,16 +404,12 @@ protected:
public: public:
EvPusher() EvPusher()
{ {
string drport;
const char *strEnv = getenv("DR_PORT"); const char *strEnv = getenv("DR_PORT");
if(strEnv != NULL) { if(strEnv != NULL) {
drport = strEnv; drport = strEnv;
}else{
spdlog::error("evpusher failed to start. no DR_PORT set");
exit(1);
} }
strEnv = getenv("GID"); strEnv = getenv("PEERID");
if(strEnv != NULL) { if(strEnv != NULL) {
selfId = strEnv; selfId = strEnv;
auto v = strutils::split(selfId, ':'); auto v = strutils::split(selfId, ':');
......
...@@ -56,6 +56,7 @@ private: ...@@ -56,6 +56,7 @@ private:
time_t tsLastBoot, tsUpdateTime; time_t tsLastBoot, tsUpdateTime;
json config; json config;
thread thPing; thread thPing;
string drport = "5549";
int init() int init()
{ {
...@@ -440,16 +441,12 @@ protected: ...@@ -440,16 +441,12 @@ protected:
public: public:
EvSlicer() EvSlicer()
{ {
string drport;
const char *strEnv = getenv("DR_PORT"); const char *strEnv = getenv("DR_PORT");
if(strEnv != NULL) { if(strEnv != NULL) {
drport = strEnv; drport = strEnv;
}else{
spdlog::error("evslicer failed to start. no DR_PORT set");
exit(1);
} }
strEnv = getenv("GID"); strEnv = getenv("PEERID");
if(strEnv != NULL) { if(strEnv != NULL) {
selfId = strEnv; selfId = strEnv;
auto v = strutils::split(selfId, ':'); auto v = strutils::split(selfId, ':');
......
...@@ -200,7 +200,7 @@ int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid){ ...@@ -200,7 +200,7 @@ int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid){
spdlog::error("evdamon {} failed to fork subsytem - evmgr", devSn); spdlog::error("evdamon {} failed to fork subsytem - evmgr", devSn);
return -1; return -1;
}else if(pid == 0) { }else if(pid == 0) {
ret += setenv("GID", peerId.c_str(), 1); ret += setenv("PEERID", peerId.c_str(), 1);
ret += setenv("DR_PORT", to_string(drPort).c_str(), 1); ret += setenv("DR_PORT", to_string(drPort).c_str(), 1);
if(ret < 0) { if(ret < 0) {
spdlog::error("evdaemon {} failed to set env", devSn); spdlog::error("evdaemon {} failed to set env", devSn);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论