提交 0b360f23 authored 作者: blu's avatar blu

refactor communitation archtecture to use evdamon only

上级 69badb46
...@@ -386,7 +386,6 @@ togo_end: ...@@ -386,7 +386,6 @@ togo_end:
spdlog::warn("no local sn set. create a new one: {}", sn); spdlog::warn("no local sn set. create a new one: {}", sn);
auto tsNow = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); auto tsNow = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
info["lastboot"] = tsNow; info["lastboot"] = tsNow;
info["updatetime"] = tsNow;
ret = setSn(info, fileName); ret = setSn(info, fileName);
if(ret < 0) { if(ret < 0) {
spdlog::error("failed to save new generated sn"); spdlog::error("failed to save new generated sn");
......
...@@ -52,29 +52,55 @@ class EvDaemon{ ...@@ -52,29 +52,55 @@ class EvDaemon{
this->devSn = this->info["sn"]; this->devSn = this->info["sn"];
/// req config /// req config
json jret = cloudutils::reqConfig(this->info); json jret = cloudutils::reqConfig(this->info);
spdlog::info("evmgr {} got cloud config:\n{}", devSn, jret.dump(4));
// apply config // apply config
try{ try{
if(jret["code"] != 0) { if(jret["code"] != 0) {
spdlog::error("evdaemon {} reqConfig error: {}", this->devSn, jret["msg"].get<string>()); spdlog::error("evdaemon {} request cloud configration error: {}", this->devSn, jret["msg"].get<string>());
return 2; return 2;
} }
spdlog::info("evmgr {} got cloud config:\n{}", devSn, jret.dump(4));
json &data = jret["data"]; json &data = jret["data"];
for(auto &[k,v]:data.items()) { for(auto &[k,v]:data.items()) {
if(k == this->devSn) { if(k == this->devSn) {
// startup evmgr // startup evmgr
pid_t pid = fork(); pid_t pid;
if( (pid = fork()) == -1 ) {
spdlog::error("evdamon {} failed to fork subsytem - evmgr", this->devSn);
}else if(pid == 0) {
// child
// execl("./evmgr", "arg1", "arg2", (char *)0);
ret = setenv("ADDR", v["addr"].get<string>().c_str(), 1);
ret += setenv("SN", v["sn"].get<string>().c_str(), 1);
ret += setenv("PORT_ROUTER", to_string(v["port-router"].get<int>()).c_str(), 1);
ret += setenv("PORT_CLOUD", to_string(v["port-cloud"].get<int>()).c_str(), 1);
ret += setenv("ADDR_CLOUD", v["mqtt-cloud"].get<string>().c_str(), 1);
if(ret < 0) {
spdlog::error("evdaemon {} failed to set env", this->devSn);
return -3;
}
execl("./evmgr", NULL, NULL, NULL);
spdlog::error("evdaemon {} failed to startup evmgr", this->devSn);
}else{
// parent
spdlog::info("evdaemon {} created evmgr", this->devSn);
}
}
ret = system("./evmgr"); // startup other submodules
if(ret == -1) {
spdlog::error("evdaemon {} failed to start evmgr", this->devSn); json &ipcs = v["ipcs"];
break; for(auto &ipc : ipcs) {
json &modules = ipc["modules"];
for(auto &[mn, ml] : modules.items()) {
//
if()
} }
} }
} }
}catch(exception &e) { }catch(exception &e) {
spdlog::error("evdaemon {} exception to reload and apply configuration:\n{}", this->devSn, jret.dump(4)); spdlog::error("evdaemon {} exception {} to reload and apply configuration:\n{}", this->devSn, e.what(), jret.dump(4));
return -1; return -1;
} }
...@@ -89,6 +115,7 @@ class EvDaemon{ ...@@ -89,6 +115,7 @@ class EvDaemon{
spdlog::error("evdaemon {} failed to setup subsystems, please check log for more info", this->devSn); spdlog::error("evdaemon {} failed to setup subsystems, please check log for more info", this->devSn);
} }
this_thread::sleep_for(chrono::seconds(5)); this_thread::sleep_for(chrono::seconds(5));
break;
} }
}); });
} }
...@@ -184,7 +211,13 @@ class EvDaemon{ ...@@ -184,7 +211,13 @@ class EvDaemon{
~EvDaemon(){}; ~EvDaemon(){};
}; };
void cleanup(int signal) {
int status;
while (waitpid((pid_t) (-1), 0, WNOHANG) > 0) {}
}
int main(){ int main(){
signal(SIGCHLD, cleanup);
json info; json info;
LVDB::getSn(info); LVDB::getSn(info);
spdlog::info("evdaemon: \n{}",info.dump(4)); spdlog::info("evdaemon: \n{}",info.dump(4));
......
...@@ -41,7 +41,6 @@ private: ...@@ -41,7 +41,6 @@ private:
json config; json config;
string devSn; string devSn;
json peerStatus; json peerStatus;
json jmgr;
unordered_map<string, queue<vector<vector<uint8_t> >> > cachedMsg; unordered_map<string, queue<vector<vector<uint8_t> >> > cachedMsg;
mutex cacheLock; mutex cacheLock;
queue<string> eventQue; queue<string> eventQue;
...@@ -53,80 +52,29 @@ private: ...@@ -53,80 +52,29 @@ private:
{ {
int ret; int ret;
json jret; json jret;
bool inited = false; bool inited = false;
// TODO: load config from local db
json info;
ret = LVDB::getSn(info);
if(ret < 0) {
spdlog::error("failed to get sn");
exit(1);
}
tsLastBoot = info["lastboot"];
tsUpdateTime=info["updatetime"];
spdlog::info("evmgr info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime));
devSn = info["sn"];
ret = LVDB::getLocalConfig(config);
if(ret < 0) {
spdlog::error("failed to get local configuration");
exit(1);
}
spdlog::info("evmgr info: sn = {}, boot on {}", config["sn"].get<string>(), chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count());
devSn = config["sn"];
spdlog::info("evmgr local config:\n{}", config.dump(4)); spdlog::info("evmgr local config:\n{}", config.dump(4));
int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT; int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT;
string proto, addr; string addr;
while(!inited) {
try {
// register
jret = cloudutils::registry(config, info["sn"], "evmgr");
spdlog::info("evmgr {} get config from cloud:\n{}", devSn, jret.dump(4));
if(jret["code"] != 0) {
spdlog::error("evmgr {} failed to registry: {}", devSn, jret["msg"].get<string>());
goto togo_sleep_continue;
}else{
if(jret["msg"] == "diff") {
json &data = jret["data"];
if(data.size() == 1 && data[0].at("path") == "/lastupdated") {
// no diff
spdlog::info("evmgr {} no change in config", devSn);
}else{
// patch
spdlog::info("evmgr config changed in cloud, merge patch:\n{}", jret["data"].dump(4));
config.merge_patch(jret["data"]);
ret = LVDB::setLocalConfig(config);
if(ret < 0) {
spdlog::error("evmgr {} failed to update local config:\n{}", devSn, config.dump(4));
goto togo_sleep_continue;
}
}
}
}
spdlog::info("new config dumps: \n{}", config.dump(4)); try {
// TODO: verify sn //
if(!config.count("data")||!config["data"].count(devSn)||!config["data"][devSn].count("ipcs")) { spdlog::info("evmgr {} starting with configuration:\n{}", devSn, config.dump(4));
spdlog::error("evmgr {} invalid config. reload now...", devSn);
goto togo_sleep_continue;
}
jmgr = config["data"][devSn];
proto = jmgr["proto"];
if(proto != "zmq") { if(config["proto"] != "zmq") {
spdlog::warn("evmgr {} unsupported protocol: {}, try fallback to zmq instead now...", devSn, proto); spdlog::warn("evmgr {} unsupported protocol: {}, try fallback to zmq instead now...", devSn, config["proto"].get<string>());
} }
// //
if(jmgr["addr"].get<string>() == "*" || jmgr["addr"].get<string>() == "0.0.0.0") { if(config["addr"].get<string>() == "*" || config["addr"].get<string>() == "0.0.0.0") {
spdlog::error("invalid mgr address: {} in config:\n{}", jmgr["addr"].get<string>(), jmgr.dump(4)); spdlog::error("invalid mgr address: {} in config:\n{}", config["addr"].get<string>(), config.dump(4));
goto togo_sleep_continue; goto error_exit;
} }
//addr = "tcp://" + jmgr["addr"].get<string>() + ":" + to_string(jmgr["port-router"]); addr = "tcp://*:" + to_string(config["port-router"]);
addr = "tcp://*:" + to_string(jmgr["port-router"]);
// setup zmq // setup zmq
// TODO: connect to cloud // TODO: connect to cloud
...@@ -137,25 +85,131 @@ private: ...@@ -137,25 +85,131 @@ private:
ret = zmq_bind(pRouter, addr.c_str()); ret = zmq_bind(pRouter, addr.c_str());
if(ret < 0) { if(ret < 0) {
spdlog::error("evmgr {} failed to bind zmq at {} for reason: {}, retrying load configuration...", devSn, addr, zmq_strerror(zmq_errno())); spdlog::error("evmgr {} failed to bind zmq at {} for reason: {}, retrying load configuration...", devSn, addr, zmq_strerror(zmq_errno()));
goto togo_sleep_continue; goto error_exit;
} }
spdlog::info("evmgr {} bind success to {}", devSn, addr); spdlog::info("evmgr {} bind success to {}", devSn, addr);
inited = true; inited = true;
break; error_exit:
if(inited) {
togo_sleep_continue: }else{
this_thread::sleep_for(chrono::seconds(3)); exit(1);
//continue; }
} }
catch(exception &e) { catch(exception &e) {
spdlog::error("evmgr {} exception on init() for: {}, retrying load configuration...", devSn, e.what()); spdlog::error("evmgr {} exception on init() for: {}, retrying load configuration...", devSn, e.what());
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
continue;
}
} }
spdlog::info("evmgr {} successfuly inited", devSn); spdlog::info("evmgr {} successfuly inited", devSn);
} }
// // TODO: deprecated
// void _init()
// {
// int ret;
// json jret;
// bool inited = false;
// // TODO: load config from local db
// json info;
// ret = LVDB::getSn(info);
// if(ret < 0) {
// spdlog::error("failed to get sn");
// exit(1);
// }
// tsLastBoot = info["lastboot"];
// tsUpdateTime=info["updatetime"];
// spdlog::info("evmgr info: sn = {}, lastboot = {}, updatetime = {}", config["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime));
// devSn = config["sn"];
// ret = LVDB::getLocalConfig(config);
// if(ret < 0) {
// spdlog::error("evmgr failed to get local configuration");
// exit(1);
// }
// spdlog::info("evmgr local config:\n{}", config.dump(4));
// int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT;
// string proto, addr;
// while(!inited) {
// try {
// //
// jret = cloudutils::registry(this->config, info["sn"], "evmgr");
// if(jret["code"] != 0) {
// spdlog::error("evmgr {} failed to registry: {}", devSn, jret["msg"].get<string>());
// goto error_exit;
// }else{
// if(jret["msg"] == "diff") {
// json &data = jret["data"];
// if(data.size() == 1 && data[0].at("path") == "/lastupdated") {
// // no diff
// spdlog::info("evmgr {} no change in config", devSn);
// }else{
// // patch
// spdlog::info("evmgr config changed in cloud, merge patch:\n{}", jret["data"].dump(4));
// config.merge_patch(jret["data"]);
// ret = LVDB::setLocalConfig(config);
// if(ret < 0) {
// spdlog::error("evmgr {} failed to update local config:\n{}", devSn, config.dump(4));
// goto error_exit;
// }
// }
// }
// }
// // TODO: verify sn
// if(!config.count("data")||!config["data"].count(devSn)||!config["data"][devSn].count("ipcs")) {
// spdlog::error("evmgr {} invalid config. reload now...", devSn);
// goto error_exit;
// }
// jmgr = config["data"][devSn];
// proto = jmgr["proto"];
// if(proto != "zmq") {
// spdlog::warn("evmgr {} unsupported protocol: {}, try fallback to zmq instead now...", devSn, proto);
// }
// //
// if(jmgr["addr"].get<string>() == "*" || jmgr["addr"].get<string>() == "0.0.0.0") {
// spdlog::error("invalid mgr address: {} in config:\n{}", jmgr["addr"].get<string>(), jmgr.dump(4));
// goto error_exit;
// }
// //addr = "tcp://" + jmgr["addr"].get<string>() + ":" + to_string(jmgr["port-router"]);
// addr = "tcp://*:" + to_string(jmgr["port-router"]);
// // setup zmq
// // TODO: connect to cloud
// // router service
// pRouterCtx = zmq_ctx_new();
// pRouter = zmq_socket(pRouterCtx, ZMQ_ROUTER);
// zmq_setsockopt (pRouter, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify));
// ret = zmq_bind(pRouter, addr.c_str());
// if(ret < 0) {
// spdlog::error("evmgr {} failed to bind zmq at {} for reason: {}, retrying load configuration...", devSn, addr, zmq_strerror(zmq_errno()));
// goto error_exit;
// }
// spdlog::info("evmgr {} bind success to {}", devSn, addr);
// inited = true;
// break;
// error_exit:
// this_thread::sleep_for(chrono::seconds(3));
// //continue;
// }
// catch(exception &e) {
// spdlog::error("evmgr {} exception on init() for: {}, retrying load configuration...", devSn, e.what());
// this_thread::sleep_for(chrono::seconds(3));
// continue;
// }
// }
// spdlog::info("evmgr {} successfuly inited", devSn);
// }
int mqErrorMsg(string cls, string devSn, string extraInfo, int ret) int mqErrorMsg(string cls, string devSn, string extraInfo, int ret)
{ {
if(ret < 0) { if(ret < 0) {
...@@ -328,6 +382,42 @@ public: ...@@ -328,6 +382,42 @@ public:
EvMgr& operator=(EvMgr &&) = delete; EvMgr& operator=(EvMgr &&) = delete;
EvMgr() EvMgr()
{ {
config["addr"] = "127.0.0.1";
// config["api-cloud"] = "http://127.0.0.1:8089";
config["port-daemon"] = 5549;
// config["mqtt-cloud"] = "<cloud_addr>";
config["addr-cloud"] = "127.0.0.1";
config["port-cloud"] = 5556;
config["proto"] = "zmq";
config["sn"] = "none";
config["port-router"] = 5550;
//
const char *strEnv = getenv("ADDR");
if(strEnv != NULL) {
config["addr"] = strEnv;
}
strEnv = getenv("PORT_DAEMON");
if(strEnv != NULL) {
config["port-daemon"] = atoi(strEnv);
}
strEnv = getenv("PORT_CLOUD");
if(strEnv != NULL) {
config["port-cloud"] = atoi(strEnv);
}else{
// TODO:
}
strEnv = getenv("SN");
if(strEnv != NULL) {
config["sn"] = strEnv;
}else{
spdlog::error("evmgr failed to start. no sn set");
exit(1);
}
init(); init();
} }
~EvMgr() ~EvMgr()
......
...@@ -73,7 +73,7 @@ json reqConfig(json &info){ ...@@ -73,7 +73,7 @@ json reqConfig(json &info){
Uri uri=Uri::Parse(api); Uri uri=Uri::Parse(api);
string sn = info.at("sn").get<string>(); string sn = info.at("sn").get<string>();
if(uri.Host.empty()||uri.Port.empty()||uri.Protocol.find("http") == string::npos) { if(uri.Host.empty()||uri.Port.empty()||uri.Protocol.find("http") == string::npos) {
string msg = "reqConfig error. invalid api-cloud in info: " + api; string msg = string(__FILE__) +":" + to_string(__LINE__) + ": request cloud configuration error. invalid api-cloud in info: " + api;
ret["code"] = 1; ret["code"] = 1;
ret["msg"] = msg; ret["msg"] = msg;
spdlog::error(msg); spdlog::error(msg);
...@@ -85,8 +85,21 @@ json reqConfig(json &info){ ...@@ -85,8 +85,21 @@ json reqConfig(json &info){
Client cli(uri.Host.c_str(), stoi(uri.Port)); Client cli(uri.Host.c_str(), stoi(uri.Port));
auto res = cli.Get("/config", Headers(), params); auto res = cli.Get("/config", Headers(), params);
if(res == nullptr || res->status != 200) {
const char *msg = NULL;
if(res == nullptr) {
msg = (string("error to connect to server: ") + api + "/config").c_str();
ret["code"] = -2;
}else{
msg = httplib::detail::status_message(res->status);
ret["code"] = res->status;
}
spdlog::debug("failed to reqConfig. {}", msg);
ret["msg"] = msg;
}else{
spdlog::debug("{} {} registry res from cloud : {}", __FILE__, __LINE__, res->body); spdlog::debug("{} {} registry res from cloud : {}", __FILE__, __LINE__, res->body);
ret = json::parse(res->body); ret = json::parse(res->body);
}
}catch(exception &e) { }catch(exception &e) {
ret["code"] = -1; ret["code"] = -1;
string msg = string(__FILE__) + ":" + to_string(__LINE__) + string(": registry exception - ") + e.what(); string msg = string(__FILE__) + ":" + to_string(__LINE__) + string(": registry exception - ") + e.what();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论