提交 82dffb16 authored 作者: blu's avatar blu

big refacting of communitation architect

上级 0ea4290c
...@@ -29,8 +29,8 @@ using namespace zmqhelper; ...@@ -29,8 +29,8 @@ using namespace zmqhelper;
#define EV_FILE_LVDB_DAEMON "/opt/lvldb/daemon" #define EV_FILE_LVDB_DAEMON "/opt/lvldb/daemon"
class EvDaemon{ class EvDaemon {
private: private:
Server svr; Server svr;
json config; json config;
json deltaCfg; json deltaCfg;
...@@ -67,17 +67,19 @@ class EvDaemon{ ...@@ -67,17 +67,19 @@ class EvDaemon{
/// tracking sub-systems: evmgr, evpuller, evpusher, evml*, evslicer etc. /// tracking sub-systems: evmgr, evpuller, evpusher, evml*, evslicer etc.
json mapSubSystems; json mapSubSystems;
int reloadCfg(string subModGid = "") { int reloadCfg(string subModGid = "")
{
int bootType = 0; int bootType = 0;
if(subModGid == "ALL") { if(subModGid == "ALL") {
bootType = 1; bootType = 1;
}else if(subModGid.empty()){ }
else if(subModGid.empty()) {
bootType = 2; bootType = 2;
}else{ }
else {
bootType = 3; bootType = 3;
} }
spdlog::info("evadmon {} reloading config: {}", devSn, this->config.dump());
int ret = LVDB::getSn(this->info); int ret = LVDB::getSn(this->info);
if(ret < 0) { if(ret < 0) {
...@@ -87,9 +89,9 @@ class EvDaemon{ ...@@ -87,9 +89,9 @@ class EvDaemon{
this->devSn = this->info["sn"]; this->devSn = this->info["sn"];
this->daemonId = this->devSn + ":evdaemon:0"; this->daemonId = this->devSn + ":evdaemon:0";
// apply config // apply config
try{ try {
// lock_guard<mutex> lock(cacheLock); // lock_guard<mutex> lock(cacheLock);
json &data = this->config; json &data = this->config;
string peerId; string peerId;
...@@ -110,9 +112,10 @@ class EvDaemon{ ...@@ -110,9 +112,10 @@ class EvDaemon{
return -3; return -3;
} }
this->peerData["pids"][peerId] = pid; this->peerData["pids"][peerId] = pid;
spdlog::info("evdaemon {} created subsystem {}", devSn, peerId); spdlog::info("evdaemon {} created subsystem {}", devSn, peerId);
} }
}else{ }
else {
// TODO // TODO
} }
} }
...@@ -128,7 +131,8 @@ class EvDaemon{ ...@@ -128,7 +131,8 @@ class EvDaemon{
} }
if(m.count("enabled") == 0 || m["enabled"] == 0) { if(m.count("enabled") == 0 || m["enabled"] == 0) {
spdlog::warn("evdaemon {} {} was disabled, ignore", this->devSn, mn); spdlog::warn("evdaemon {} {} was disabled, ignore", this->devSn, mn);
}else{ }
else {
string peerName; string peerName;
ret = cfgutils::getPeerId(mn, m, peerId, peerName); ret = cfgutils::getPeerId(mn, m, peerId, peerName);
if(ret != 0) { if(ret != 0) {
...@@ -138,17 +142,18 @@ class EvDaemon{ ...@@ -138,17 +142,18 @@ class EvDaemon{
this->peerData["config"][peerId] = v; this->peerData["config"][peerId] = v;
if(this->peerData["status"].count(peerId) == 0||this->peerData["status"][peerId] == 0) { if(this->peerData["status"].count(peerId) == 0||this->peerData["status"][peerId] == 0) {
this->peerData["status"][peerId] = 0; this->peerData["status"][peerId] = 0;
if(bootType == 1 || (bootType == 3 && subModGid == peerId)){ if(bootType == 1 || (bootType == 3 && subModGid == peerId)) {
ret = zmqhelper::forkSubsystem(devSn, peerId, portRouter, pid); ret = zmqhelper::forkSubsystem(devSn, peerId, portRouter, pid);
if(ret != 0) { if(ret != 0) {
spdlog::error("evdaemon {} failed to fork subsystem: {}", devSn, peerId); spdlog::error("evdaemon {} failed to fork subsystem: {}", devSn, peerId);
// TODO: cleanup and reload // TODO: cleanup and reload
return -2; return -2;
} }
this->peerData["pids"][peerId] = pid; this->peerData["pids"][peerId] = pid;
spdlog::info("evdaemon {} created subsystem {}", devSn, peerId); spdlog::info("evdaemon {} created subsystem {}", devSn, peerId);
} }
}else{ }
else {
// TODO: // TODO:
} }
} }
...@@ -156,7 +161,8 @@ class EvDaemon{ ...@@ -156,7 +161,8 @@ class EvDaemon{
} }
} }
} }
}catch(exception &e) { }
catch(exception &e) {
spdlog::error("evdaemon {} exception reload and apply configuration: {}:\n{}", this->devSn, e.what(), this->config.dump()); spdlog::error("evdaemon {} exception reload and apply configuration: {}:\n{}", this->devSn, e.what(), this->config.dump());
return -1; return -1;
} }
...@@ -164,12 +170,13 @@ class EvDaemon{ ...@@ -164,12 +170,13 @@ class EvDaemon{
return 0; return 0;
} }
void cleanupSubSystems(){ void cleanupSubSystems()
{
spdlog::info("evdaemon {} peerData {}", this->devSn, this->peerData.dump()); spdlog::info("evdaemon {} peerData {}", this->devSn, this->peerData.dump());
json &pd = this->peerData; json &pd = this->peerData;
for(auto &[k,v]: pd["pids"].items()){ for(auto &[k,v]: pd["pids"].items()) {
//kill(v, SIGTERM); //kill(v, SIGTERM);
if(this->peerData["status"].count(k) != 0){ if(this->peerData["status"].count(k) != 0) {
this->peerData["status"].erase(k); this->peerData["status"].erase(k);
} }
...@@ -182,14 +189,14 @@ class EvDaemon{ ...@@ -182,14 +189,14 @@ class EvDaemon{
} }
} }
int startSubSystems() { int startSubSystems()
{
// check status and startup // check status and startup
int ret = 0; int ret = 0;
vector<string> tmp; vector<string> tmp;
string info; string info;
int cnt = 0; int cnt = 0;
for(auto &[k,v]: this->peerData["config"].items()) { for(auto &[k,v]: this->peerData["config"].items()) {
spdlog::info("evdaemon {} submodule {}, config {}", devSn, k, v.dump());
if(this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0) { if(this->peerData["status"].count(k) == 0 || this->peerData["status"][k] == 0) {
tmp.push_back(k); tmp.push_back(k);
info += (cnt == 0? "" : string(", ")) + k; info += (cnt == 0? "" : string(", ")) + k;
...@@ -205,7 +212,8 @@ class EvDaemon{ ...@@ -205,7 +212,8 @@ class EvDaemon{
this->peerData["status"][e] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); this->peerData["status"][e] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
this->peerData["pids"][e] = pid; this->peerData["pids"][e] = pid;
spdlog::info("evdaemon {} created subsystem {}", devSn, e); spdlog::info("evdaemon {} created subsystem {}", devSn, e);
}else{ }
else {
spdlog::info("evdaemon {} failed to create subsystem {}", devSn, e); spdlog::info("evdaemon {} failed to create subsystem {}", devSn, e);
} }
} }
...@@ -249,16 +257,18 @@ class EvDaemon{ ...@@ -249,16 +257,18 @@ class EvDaemon{
spdlog::warn("evdaemon {} peer disconnected: {}", devSn, selfId); spdlog::warn("evdaemon {} peer disconnected: {}", devSn, selfId);
if(bBootstrap){ spdlog::info("evadmon {} reloading config for {}", devSn, selfId);
if(bBootstrap) {
ret = reloadCfg(selfId); ret = reloadCfg(selfId);
}else{ }
else {
ret = reloadCfg(""); ret = reloadCfg("");
} }
if(ret != 0) { if(ret != 0) {
cleanupSubSystems(); cleanupSubSystems();
} }
} }
// event // event
...@@ -345,7 +355,8 @@ class EvDaemon{ ...@@ -345,7 +355,8 @@ class EvDaemon{
ret = z_send_multiple(pRouter, v); ret = z_send_multiple(pRouter, v);
if(ret < 0) { if(ret < 0) {
spdlog::error("evdaemon {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno())); spdlog::error("evdaemon {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}else{ }
else {
spdlog::info("evdaemon {} cached msg sent from {} to {} of type: {}, content: {}", body2str(v[1]), body2str(v[0]), body2str(v[2]), body2str(v[3])); spdlog::info("evdaemon {} cached msg sent from {} to {} of type: {}, content: {}", body2str(v[1]), body2str(v[0]), body2str(v[2]), body2str(v[3]));
} }
} }
...@@ -361,7 +372,8 @@ class EvDaemon{ ...@@ -361,7 +372,8 @@ class EvDaemon{
return ret; return ret;
} }
int handleCloudMsg(vector<vector<uint8_t> > &v) { int handleCloudMsg(vector<vector<uint8_t> > &v)
{
int ret = 0; int ret = 0;
zmq_msg_t msg; zmq_msg_t msg;
// ID_SENDER, meta ,MSG // ID_SENDER, meta ,MSG
...@@ -372,8 +384,9 @@ class EvDaemon{ ...@@ -372,8 +384,9 @@ class EvDaemon{
msg += body2str(s) + ";"; msg += body2str(s) + ";";
} }
spdlog::error("evdaemon {} received invalid msg from cloud {}", devSn, msg); spdlog::error("evdaemon {} received invalid msg from cloud {}", devSn, msg);
}else{ }
try{ else {
try {
string meta = json::parse(v[1])["type"]; string meta = json::parse(v[1])["type"];
string peerId = body2str(v[0]); string peerId = body2str(v[0]);
json data = json::parse(body2str(v[2])); json data = json::parse(body2str(v[2]));
...@@ -384,11 +397,13 @@ class EvDaemon{ ...@@ -384,11 +397,13 @@ class EvDaemon{
if(meta == EV_MSG_META_CONFIG) { if(meta == EV_MSG_META_CONFIG) {
if(data.size() == 0) { if(data.size() == 0) {
spdlog::error("evdaemon {} received invalid empty config", devSn); spdlog::error("evdaemon {} received invalid empty config", devSn);
}else{ }
else {
this->deltaCfg = json::diff(this->config, data); this->deltaCfg = json::diff(this->config, data);
spdlog::info("evdaemon {} received cloud config diff: {}\nnew: {}", devSn, this->deltaCfg.dump(4), data.dump()); spdlog::info("evdaemon {} received cloud config diff: {}\nnew: {}", devSn, this->deltaCfg.dump(4), data.dump());
if(this->deltaCfg.size() != 0 || this->bColdStart) { if(this->deltaCfg.size() != 0 || this->bColdStart) {
this->config = data; this->config = data;
spdlog::info("evadmon {} reloading config from cloud", devSn);
ret = reloadCfg(); ret = reloadCfg();
if(ret != 0) { if(ret != 0) {
spdlog::error("evdameon {} failed to parse new config: {}", devSn, data.dump()); spdlog::error("evdameon {} failed to parse new config: {}", devSn, data.dump());
...@@ -400,43 +415,48 @@ class EvDaemon{ ...@@ -400,43 +415,48 @@ class EvDaemon{
return ret; return ret;
} }
this->bColdStart = false; this->bColdStart = false;
}else{ }
else {
} }
startSubSystems(); startSubSystems();
} }
} }
}else{ }
else {
// from peer // from peer
spdlog::info("evdaemon {} msg from peer {}: {}", devSn, peerId, data.dump()); spdlog::info("evdaemon {} msg from peer {}: {}", devSn, peerId, data.dump());
} }
}catch(exception &e) { }
catch(exception &e) {
spdlog::error("evdaemon {} file {}:{} exception {}", devSn, __FILE__, __LINE__, e.what()); spdlog::error("evdaemon {} file {}:{} exception {}", devSn, __FILE__, __LINE__, e.what());
} }
} }
return 0; return 0;
} }
protected: protected:
public: public:
void run(){ void run()
{
//setupSubsystems(); //setupSubsystems();
// get config // get config
svr.Get("/info", [this](const Request& req, Response& res){ svr.Get("/info", [this](const Request& req, Response& res) {
LVDB::getSn(this->info); LVDB::getSn(this->info);
res.set_content(this->info.dump(), "text/json"); res.set_content(this->info.dump(), "text/json");
}); });
svr.Post("/info", [this](const Request& req, Response& res){ svr.Post("/info", [this](const Request& req, Response& res) {
json ret; json ret;
ret["code"] = 0; ret["code"] = 0;
ret["msg"] = "ok"; ret["msg"] = "ok";
string sn = req.get_param_value("sn"); string sn = req.get_param_value("sn");
if(sn.empty()){ if(sn.empty()) {
ret["code"] = 1; ret["code"] = 1;
ret["msg"] = "no sn in param"; ret["msg"] = "no sn in param";
}else{ }
else {
json info; json info;
info["sn"] = sn; info["sn"] = sn;
// TODO: // TODO:
...@@ -447,25 +467,26 @@ class EvDaemon{ ...@@ -447,25 +467,26 @@ class EvDaemon{
res.set_content(this->info.dump(), "text/json"); res.set_content(this->info.dump(), "text/json");
}); });
svr.Get("/config", [this](const Request& req, Response& res){ svr.Get("/config", [this](const Request& req, Response& res) {
LVDB::getLocalConfig(this->config); LVDB::getLocalConfig(this->config);
res.set_content(this->config.dump(), "text/json"); res.set_content(this->config.dump(), "text/json");
}); });
svr.Post("/config", [this](const Request& req, Response& res){ svr.Post("/config", [this](const Request& req, Response& res) {
json ret; json ret;
ret["code"] = 0; ret["code"] = 0;
ret["msg"] = "ok"; ret["msg"] = "ok";
ret["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); ret["time"] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
try{ try {
json newConfig; json newConfig;
newConfig["data"] = json::parse(req.body)["data"]; newConfig["data"] = json::parse(req.body)["data"];
LVDB::setLocalConfig(newConfig); LVDB::setLocalConfig(newConfig);
spdlog::info("evmgr new config: {}", newConfig.dump()); spdlog::info("evmgr new config: {}", newConfig.dump());
// TODO: restart other components // TODO: restart other components
// //
}catch(exception &e) { }
catch(exception &e) {
ret.clear(); ret.clear();
ret["code"] = -1; ret["code"] = -1;
ret["msg"] = e.what(); ret["msg"] = e.what();
...@@ -474,7 +495,7 @@ class EvDaemon{ ...@@ -474,7 +495,7 @@ class EvDaemon{
res.set_content(ret.dump(), "text/json"); res.set_content(ret.dump(), "text/json");
}); });
svr.Get("/sync-cloud", [this](const Request& req, Response& res){ svr.Get("/sync-cloud", [this](const Request& req, Response& res) {
json ret; json ret;
ret["code"] = 0; ret["code"] = 0;
ret["msg"] = "syncing ..."; ret["msg"] = "syncing ...";
...@@ -482,7 +503,7 @@ class EvDaemon{ ...@@ -482,7 +503,7 @@ class EvDaemon{
this->bReload = true; this->bReload = true;
}); });
svr.Get("/reset", [](const Request& req, Response& res){ svr.Get("/reset", [](const Request& req, Response& res) {
json ret; json ret;
ret["code"] = 0; ret["code"] = 0;
ret["msg"] = "resetting ..."; ret["msg"] = "resetting ...";
...@@ -493,23 +514,25 @@ class EvDaemon{ ...@@ -493,23 +514,25 @@ class EvDaemon{
svr.listen("0.0.0.0", 8088); svr.listen("0.0.0.0", 8088);
} }
EvDaemon(){ EvDaemon()
{
int ret = 0; int ret = 0;
string dir_ = string("mkdir -p ") + EV_FILE_LVDB_DAEMON; string dir_ = string("mkdir -p ") + EV_FILE_LVDB_DAEMON;
system(dir_.c_str()); system(dir_.c_str());
// get sn of device // get sn of device
json info; json info;
try{ try {
LVDB::getSn(info); LVDB::getSn(info);
}catch(exception &e) { }
catch(exception &e) {
spdlog::error("evdaemon failed to get sn: {}", e.what()); spdlog::error("evdaemon failed to get sn: {}", e.what());
exit(1); exit(1);
} }
spdlog::info("evdaemon boot \n{}",info.dump()); spdlog::info("evdaemon boot \n{}",info.dump());
devSn = info["sn"]; devSn = info["sn"];
char* strEnv = getenv("BOOTSTRAP"); char* strEnv = getenv("BOOTSTRAP");
if(strEnv != nullptr && memcmp(strEnv, "false", 5) == 0) { if(strEnv != nullptr && memcmp(strEnv, "false", 5) == 0) {
bBootstrap = false; bBootstrap = false;
...@@ -525,7 +548,8 @@ class EvDaemon{ ...@@ -525,7 +548,8 @@ class EvDaemon{
ret = LVDB::getLocalConfig(cfg, "", EV_FILE_LVDB_DAEMON); ret = LVDB::getLocalConfig(cfg, "", EV_FILE_LVDB_DAEMON);
if(ret < 0) { if(ret < 0) {
spdlog::info("evdaemon {} no local config", devSn); spdlog::info("evdaemon {} no local config", devSn);
}else{ }
else {
this->config = cfg; this->config = cfg;
spdlog::info("evdaemon {} local config: {}", devSn, cfg.dump()); spdlog::info("evdaemon {} local config: {}", devSn, cfg.dump());
} }
...@@ -545,12 +569,13 @@ class EvDaemon{ ...@@ -545,12 +569,13 @@ class EvDaemon{
exit(1); exit(1);
} }
// setup edge msg processor // setup edge msg processor
thRouter = thread([this](){ thRouter = thread([this]() {
while(true){ while(true) {
auto v = zmqhelper::z_recv_multiple(this->pRouter); auto v = zmqhelper::z_recv_multiple(this->pRouter);
if(v.size() == 0) { if(v.size() == 0) {
spdlog::error("evdaemon {} failed to receive msg {}", this->devSn, zmq_strerror(zmq_errno())); spdlog::error("evdaemon {} failed to receive msg {}", this->devSn, zmq_strerror(zmq_errno()));
}else{ }
else {
handleEdgeMsg(v); handleEdgeMsg(v);
} }
} }
...@@ -574,12 +599,13 @@ class EvDaemon{ ...@@ -574,12 +599,13 @@ class EvDaemon{
} }
spdlog::info("evdaemon {} connected to cloud {}", devSn, cloudAddr); spdlog::info("evdaemon {} connected to cloud {}", devSn, cloudAddr);
// setup cloud msg processor // setup cloud msg processor
thCloud = thread([this](){ thCloud = thread([this]() {
while(true){ while(true) {
auto v = zmqhelper::z_recv_multiple(this->pDealer); auto v = zmqhelper::z_recv_multiple(this->pDealer);
if(v.size() == 0) { if(v.size() == 0) {
spdlog::error("evdaemon {} failed to receive msg {}", this->devSn, zmq_strerror(zmq_errno())); spdlog::error("evdaemon {} failed to receive msg {}", this->devSn, zmq_strerror(zmq_errno()));
}else{ }
else {
handleCloudMsg(v); handleCloudMsg(v);
} }
} }
...@@ -595,15 +621,17 @@ class EvDaemon{ ...@@ -595,15 +621,17 @@ class EvDaemon{
peerData["pids"] = json(); peerData["pids"] = json();
peerData["config"] = json(); peerData["config"] = json();
}; };
~EvDaemon(){}; ~EvDaemon() {};
}; };
void cleanup(int signal) { void cleanup(int signal)
int status; {
while (waitpid((pid_t) (-1), 0, WNOHANG) > 0) {} int status;
while (waitpid((pid_t) (-1), 0, WNOHANG) > 0) {}
} }
int main(){ int main()
{
signal(SIGCHLD, cleanup); signal(SIGCHLD, cleanup);
//sigignore(SIGCHLD); //sigignore(SIGCHLD);
EvDaemon srv; EvDaemon srv;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论