提交 8dedbfd0 authored 作者: blu's avatar blu

refactor of delta config

上级 48518029
...@@ -37,7 +37,7 @@ class EvDaemon { ...@@ -37,7 +37,7 @@ class EvDaemon {
private: private:
Server svr; Server svr;
json config; json config;
json lastConfig; json oldConfig;
json deltaCfg; json deltaCfg;
json info; json info;
int port = 8088; int port = 8088;
...@@ -185,50 +185,48 @@ private: ...@@ -185,50 +185,48 @@ private:
{ {
int ret = 0; int ret = 0;
std::lock_guard<std::mutex> lock(mutSubsystem); std::lock_guard<std::mutex> lock(mutSubsystem);
vector<string> tmp;
json unkown;
vector<string> terms;
int cnt = 0;
string info;
for(auto &[k,v]: this->peerData["config"].items()) {
if(this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0) {
if((this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0)) {
tmp.push_back(k);
info += (cnt == 0? "" : string(", ")) + k;
}
else if(this->peerData["status"][k] == -1) {
unkown[k] = -1;
}
}
else {
terms.push_back(k);
}
cnt++;
}
if(subs.size() != 0) { if(subs.size() != 0) {
for(auto &k: subs) { for(auto &k: subs) {
pid_t pid = 0; if((this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0) && this->peerData["config"].count(k) != 0 && this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0){
if(std::find(terms.begin(), terms.end(), k) != terms.end()) { pid_t pid;
// ignore
spdlog::warn("evdaemon {} skip startup {} since it's disabled.", this->devSn, k);
}else{
ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid); ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid);
if(0 == ret) { if(0 == ret) {
this->peerData["status"][k] = 0; this->peerData["status"][k] = 0;
this->peerData["pids"][k] = pid; this->peerData["pids"][k] = pid;
spdlog::info("evdaemon {} created subsystem {}", devSn, k); spdlog::info("evdaemon {} created subsystem {}", this->devSn, k);
} }
else { else {
spdlog::info("evdaemon {} failed to create subsystem {}", devSn, k); spdlog::info("evdaemon {} failed to create subsystem {}", this->devSn, k);
} }
} }else{
spdlog::warn("evdaemon {} refuse to start subsystem {}, maybe it's disabled", this->devSn, k);
}
} }
}else{ }else{
// cold startup, ignore diff.
if(this->bColdStart) { if(this->bColdStart) {
vector<string> tmp;
json unkown;
vector<string> terms;
string info;
int cnt = 0;
for(auto &[k,v]: this->peerData["config"].items()) {
if(this->peerData["enabled"].count(k) != 0 && this->peerData["enabled"][k] != 0) {
if((this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0)) {
tmp.push_back(k);
info += (cnt == 0? "" : string(", ")) + k;
}
else if(this->peerData["status"][k] == -1) {
unkown[k] = -1;
}
}
else {
terms.push_back(k);
}
cnt++;
}
spdlog::info("evdaemon {} will start following subsystems: {}", devSn, info); spdlog::info("evdaemon {} will start following subsystems: {}", devSn, info);
//
for(string &e : tmp) { for(string &e : tmp) {
pid_t pid = 0; pid_t pid = 0;
ret = zmqhelper::forkSubsystem(devSn, e, portRouter, pid); ret = zmqhelper::forkSubsystem(devSn, e, portRouter, pid);
...@@ -242,6 +240,12 @@ private: ...@@ -242,6 +240,12 @@ private:
} }
} }
for(string &e: terms) {
if(this->peerData["pids"].count(e) != 0) {
kill(this->peerData["pids"][e], SIGTERM);
}
}
while(unkown.size() != 0 && cnt < 3) { while(unkown.size() != 0 && cnt < 3) {
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
for(auto &[k,v]: unkown.items()) { for(auto &[k,v]: unkown.items()) {
...@@ -253,15 +257,6 @@ private: ...@@ -253,15 +257,6 @@ private:
cnt++; cnt++;
} }
for(string &e: terms) {
// if(this->peerData["pids"].count(e) != 0) {
// kill(this->peerData["pids"][e], SIGTERM);
// }
if(this->peerData["pids"].count(e) != 0 && this->peerData["pids"].count(e) != -1) {
}
}
for(auto &[k,v]: unkown.items()) { for(auto &[k,v]: unkown.items()) {
pid_t pid = 0; pid_t pid = 0;
ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid); ret = zmqhelper::forkSubsystem(devSn, k, portRouter, pid);
...@@ -273,25 +268,31 @@ private: ...@@ -273,25 +268,31 @@ private:
else { else {
spdlog::info("evdaemon {} failed to create subsystem {}", devSn, k); spdlog::info("evdaemon {} failed to create subsystem {}", devSn, k);
} }
} }
this->bColdStart = false;
}else{ }else{
// TODO: load delta config // calc diff
json mods; auto mods = cfgutils::getModulesOperFromConfDiff(this->oldConfig, this->config, this->deltaCfg, this->devSn);
set<int> ipcs; this->deltaCfg = json();
for(auto &[k,v]: mods.items()) {
if(v == 0) {
// send stop msg
}
}
} }
} }
this->bColdStart = false;
this->deltaCfg = json();
return ret; return ret;
} }
void sendMsgToPeer(string peerId, string meta, string msg) { void sendCmd2Peer(string peerId, string cmdVal, string msg) {
json meta;
meta["type"] = EV_MSG_META_TYPE_CMD;
meta["value"] = cmdVal;
int ret = z_send(pDealer, this->daemonId, peerId, meta, msg); int ret = z_send(pDealer, this->daemonId, peerId, meta, msg);
if(ret < 0) { if(ret < 0) {
spdlog::error("evcloudsvc {} failed to send msg to peer {}: {} - {}", devSn, peerId, meta, msg); spdlog::error("evcloudsvc {} failed to send msg to peer {}: {} - {}", devSn, peerId, meta.dump(), msg);
}else{ }else{
spdlog::info("evcloudsvc {} successfully send msg to peer {}: {} - {}", devSn, peerId, meta, msg); spdlog::info("evcloudsvc {} successfully send msg to peer {}: {} - {}", devSn, peerId, meta.dump(), msg);
} }
} }
...@@ -469,7 +470,7 @@ private: ...@@ -469,7 +470,7 @@ private:
this->deltaCfg = json::diff(this->config, data); this->deltaCfg = json::diff(this->config, data);
spdlog::info("evdaemon {} received cloud config diff: {}\nnew: {}", devSn, this->deltaCfg.dump(4), data.dump()); spdlog::info("evdaemon {} received cloud config diff: {}\nnew: {}", devSn, this->deltaCfg.dump(4), data.dump());
if(this->deltaCfg.size() != 0 || this->bColdStart) { if(this->deltaCfg.size() != 0 || this->bColdStart) {
this->lastConfig = this->config; this->oldConfig = this->config;
this->config = data; this->config = data;
spdlog::info("evdaemon {} reloading config from cloud", devSn); spdlog::info("evdaemon {} reloading config from cloud", devSn);
ret = reloadCfg(); ret = reloadCfg();
......
...@@ -85,10 +85,8 @@ int z_send_multiple(void *s, vector<vector<uint8_t> >&body) ...@@ -85,10 +85,8 @@ int z_send_multiple(void *s, vector<vector<uint8_t> >&body)
return ret; return ret;
} }
int z_send(void *s, string peerId, string selfId, string sMeta, string body) int z_send(void *s, string peerId, string selfId, const json &meta, string body)
{ {
json meta;
meta["type"] = sMeta;
vector<vector<uint8_t> > v{str2body(peerId), str2body(selfId), str2body(meta.dump()), str2body(body)}; vector<vector<uint8_t> > v{str2body(peerId), str2body(selfId), str2body(meta.dump()), str2body(body)};
return z_send_multiple(s, v); return z_send_multiple(s, v);
} }
......
...@@ -27,15 +27,18 @@ namespace zmqhelper { ...@@ -27,15 +27,18 @@ namespace zmqhelper {
#define EV_MSG_META_PING "ping" #define EV_MSG_META_PING "ping"
#define EV_MSG_META_PONG "pong" #define EV_MSG_META_PONG "pong"
#define EV_MSG_META_EVENT "event" #define EV_MSG_META_EVENT "event"
#define EV_MSG_META_CMD "cmd"
#define EV_MSG_META_TYPE_CMD "cmd"
#define EV_MSG_META_VALUE_CMD_RESTART "restart"
#define EV_MSG_META_VALUE_CMD_UPDATE "update"
#define EV_MSG_META_VALUE_CMD_STOP "stop"
#define EV_MSG_META_CONFIG "config" #define EV_MSG_META_CONFIG "config"
#define EV_MSG_META_AVFORMATCTX "afctx" #define EV_MSG_META_AVFORMATCTX "afctx"
#define EV_MSG_TYPE_AI_MOTION "ai_motion" #define EV_MSG_TYPE_AI_MOTION "ai_motion"
#define EV_MSG_TYPE_CONN_STAT "connstat" #define EV_MSG_TYPE_CONN_STAT "connstat"
#define EV_MSG_TYPE_SYS_STAT "sysstat" #define EV_MSG_TYPE_SYS_STAT "sysstat"
#define EV_MSG_CMD_RESTART "restart"
// #define EV_MSG_CMD_UPDATE "update"
#define EV_MSG_EVENT_MOTION_START "start" #define EV_MSG_EVENT_MOTION_START "start"
#define EV_MSG_EVENT_MOTION_END "end" #define EV_MSG_EVENT_MOTION_END "end"
...@@ -63,7 +66,7 @@ int setupDealer(void **ctx, void **s, string addr, string ident); ...@@ -63,7 +66,7 @@ int setupDealer(void **ctx, void **s, string addr, string ident);
/// @return 0 success, otherwise failed. /// @return 0 success, otherwise failed.
int recvConfigMsg(void *s, json &config, string addr, string ident); int recvConfigMsg(void *s, json &config, string addr, string ident);
int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid); int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid);
int z_send(void *s, string peerId, string selfId, string sMeta, string body); int z_send(void *s, string peerId, string selfId, const json &meta, string body);
} }
#endif #endif
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论