提交 ed80379a authored 作者: blu's avatar blu

init

上级 a1b5acad
#DEBUG=-g DEBUG=-g
CC = gcc CC = gcc
CPP = g++ CPP = g++
CPPFLAGS = $(DEBUG) -Wall -std=gnu++1z CPPFLAGS = $(DEBUG) -Wall -std=gnu++1z
......
...@@ -56,12 +56,26 @@ namespace LVDB { ...@@ -56,12 +56,26 @@ namespace LVDB {
break; break;
}else{ }else{
json &modules = ipc["modules"]; json &modules = ipc["modules"];
if(modules.count(moduleName) == 0){ string modname = moduleName.substr(0,4);
string sub;
if(modname == "evml") {
sub = moduleName.substr(4, moduleName.size());
}else{
modname = moduleName;
}
if(modules.count(modname) == 0){
break; break;
}else{ }else{
json &module = modules[moduleName]; json &module = modules[modname];
for(auto &inst: module) { for(auto &inst: module) {
if(inst.count("sn") != 0 && inst["sn"] == sn && inst.count("iid") != 0 && inst["iid"] == iid) { if(inst.count("sn") != 0 && inst["sn"] == sn && inst.count("iid") != 0 && inst["iid"] == iid) {
if(!sub.empty()) {
if(inst.count("type") != 0 && inst["type"] == sub) {
return &inst;
}
// continue
}else{
return &inst; return &inst;
} }
} }
...@@ -69,10 +83,77 @@ namespace LVDB { ...@@ -69,10 +83,77 @@ namespace LVDB {
} }
} }
} }
}
}
return ret;
}
int traverseConfigureModules(json &config, cb_traverse_configration_module cb, string moduleName){
int ret = 0;
if(config.count("data") == 0) {
return -1;
}
if(cb == NULL) {
return -2;
}
json &data = config["data"];
for(auto &[k,v]: data.items()){
json &mgr = data[k];
if(mgr.count("ipcs") == 0) {
break;
}else{
json &ipcs = mgr["ipcs"];
for(auto &ipc:ipcs) {
if(ipc.count("modules") == 0) {
continue;
}else{
string modname, sub;
if(!moduleName.empty()){
modname = moduleName.substr(0,4);
if(modname == "evml") {
sub = moduleName.substr(4, moduleName.size());
}else{
modname = moduleName;
}
}
json &modules = ipc["modules"];
if(!modname.empty()) {
if(modules.count(modname) == 0) {
return -3;
}else{
json &module = modules[modname];
if(!sub.empty()) {
for(auto &m:module) {
if(m.count("type") != 0 && m["type"] == sub) {
ret = cb(modname, m);
if(ret <0) {
spdlog::error("failed to traverse and callback config on module: {}", m.dump());
return ret;
}
}
} }
}
}
}else{
for(auto &[mn, mod]:modules.items()) {
for(auto &m:mod) {
ret = cb(mn,m);
if(ret <0) {
spdlog::error("failed to traverse and callback config on module: {}", m.dump());
return ret; return ret;
} }
}
}
}
}
}
}
}
return 0;
}
int _getDB(string fileName, DB** pdb) { int _getDB(string fileName, DB** pdb) {
static bool bmk = false; static bool bmk = false;
......
...@@ -72,6 +72,21 @@ private: ...@@ -72,6 +72,21 @@ private:
spdlog::error("failed to get local configuration"); spdlog::error("failed to get local configuration");
exit(1); exit(1);
} }
// set all module status to 0
ret = LVDB::traverseConfigureModules(config, [](string modname, json &m)->int{
if(m.count("status") != 0)
{
cout << modname <<" ," << m.dump() << endl;
m["status"] = 0;
}
return 0;
});
if(ret < 0) {
spdlog::error("evmgr {} failed to set module status to 0", devSn);
}else{
//spdlog::info("new config: {}", config.dump());
LVDB::setLocalConfig(config);
}
int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT; int opt_notify = ZMQ_NOTIFY_DISCONNECT|ZMQ_NOTIFY_CONNECT;
string proto, addr; string proto, addr;
...@@ -243,7 +258,7 @@ togo_sleep_continue: ...@@ -243,7 +258,7 @@ togo_sleep_continue:
} }
else { else {
// message to mgr // message to mgr
spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2])); // spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
if(meta == "pong"||meta == "ping") { if(meta == "pong"||meta == "ping") {
// update status // update status
spdlog::info("evmgr {}, ping msg from {}", devSn, selfId); spdlog::info("evmgr {}, ping msg from {}", devSn, selfId);
......
...@@ -99,19 +99,18 @@ private: ...@@ -99,19 +99,18 @@ private:
tsLastBoot = info["lastboot"]; tsLastBoot = info["lastboot"];
tsUpdateTime=info["updatetime"]; tsUpdateTime=info["updatetime"];
spdlog::info("evmgr info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime)); spdlog::info("evmlmotion info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime));
devSn = info["sn"]; devSn = info["sn"];
while(!inited) {
// TODO: req config
bool found = false;
try {
ret = LVDB::getLocalConfig(config); ret = LVDB::getLocalConfig(config);
if(ret < 0) { if(ret < 0) {
spdlog::error("failed to get local configuration"); spdlog::error("failed to get local configuration");
exit(1); exit(1);
} }
while(!inited) {
// TODO: req config
bool found = false;
try {
spdlog::info("config: {:s}", config.dump()); spdlog::info("config: {:s}", config.dump());
json evmlmotion; json evmlmotion;
json evmgr; json evmgr;
...@@ -162,7 +161,7 @@ private: ...@@ -162,7 +161,7 @@ private:
// TODO: multiple protocols support // TODO: multiple protocols support
if(evmlmotion.count("path") == 0) { if(evmlmotion.count("path") == 0) {
spdlog::warn("evslicer {} {} no params for path, using default: {}", selfId, URLOUT_DEFAULT); spdlog::warn("evslicer {} no params for path, using default: {}", selfId, URLOUT_DEFAULT);
urlOut = URLOUT_DEFAULT; urlOut = URLOUT_DEFAULT;
} }
else { else {
...@@ -174,44 +173,6 @@ private: ...@@ -174,44 +173,6 @@ private:
spdlog::error("failed mkdir {}", urlOut); spdlog::error("failed mkdir {}", urlOut);
return -1; return -1;
} }
}
catch(exception &e) {
spdlog::error("evmlmotion {} exception in EvPuller.init {:s} retrying", selfId, e.what());
this_thread::sleep_for(chrono::seconds(3));
continue;
}
inited = true;
}
return 0;
}
int ping()
{
// send hello to router
int ret = 0;
vector<vector<uint8_t> >body;
// since identity is auto set
body.push_back(str2body(mgrSn+":0:0"));
body.push_back(str2body(EV_MSG_META_PING)); // blank meta
body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evmlmotion {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
//TODO:
}
else {
spdlog::info("evmlmotion {} sent hello to router: {}", selfId, mgrSn);
}
return ret;
}
int setupMq()
{
int ret = 0;
// setup sub // setup sub
pSubCtx = zmq_ctx_new(); pSubCtx = zmq_ctx_new();
...@@ -230,7 +191,7 @@ private: ...@@ -230,7 +191,7 @@ private:
// setup dealer // setup dealer
pDealerCtx = zmq_ctx_new(); pDealerCtx = zmq_ctx_new();
pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER); pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER);
spdlog::info("evmlmotion {} try create req to {}", selfId, urlRouter); spdlog::info("evmlmotion {} connect to router {}", selfId, urlRouter);
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size()); ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size()); ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) { if(ret < 0) {
...@@ -248,15 +209,37 @@ private: ...@@ -248,15 +209,37 @@ private:
} }
//ping //ping
ret = ping(); ret = ping();
// TODO: don't need this anymore, since I've used the draft feature of ZOUTER_NOTIFICATION instead }
// thPing = thread([&,this]() { catch(exception &e) {
// while(true) { spdlog::error("evmlmotion {} exception in EvPuller.init {:s} retrying", selfId, e.what());
// this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS-2)); this_thread::sleep_for(chrono::seconds(3));
// ping(); continue;
// } }
// });
inited = true;
}
return 0;
}
// thPing.detach(); int ping()
{
// send hello to router
int ret = 0;
vector<vector<uint8_t> >body;
// since identity is auto set
body.push_back(str2body(mgrSn+":0:0"));
body.push_back(str2body(EV_MSG_META_PING)); // blank meta
body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evmlmotion {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
//TODO:
}
else {
spdlog::info("evmlmotion {} sent hello to router: {}", selfId, mgrSn);
}
return ret; return ret;
} }
...@@ -639,7 +622,6 @@ public: ...@@ -639,7 +622,6 @@ public:
{ {
evtQueue = queue; evtQueue = queue;
init(); init();
setupMq();
getInputFormat(); getInputFormat();
setupStream(); setupStream();
}; };
......
...@@ -168,17 +168,16 @@ private: ...@@ -168,17 +168,16 @@ private:
spdlog::info("evpuller info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime)); spdlog::info("evpuller info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime));
devSn = info["sn"]; devSn = info["sn"];
ret = LVDB::getLocalConfig(config);
if(ret < 0) {
spdlog::error("failed to get local configuration");
exit(1);
}
while(!inited) { while(!inited) {
// TODO: req config // TODO: req config
bool found = false; bool found = false;
string user, passwd, addr; string user, passwd, addr;
try { try {
ret = LVDB::getLocalConfig(config);
if(ret < 0) {
spdlog::error("failed to get local configuration");
exit(1);
}
spdlog::info("config dump: {:s}", config.dump()); spdlog::info("config dump: {:s}", config.dump());
json data = config["data"]; json data = config["data"];
// first try to check mgr with same sn // first try to check mgr with same sn
......
...@@ -63,19 +63,18 @@ private: ...@@ -63,19 +63,18 @@ private:
tsLastBoot = info["lastboot"]; tsLastBoot = info["lastboot"];
tsUpdateTime=info["updatetime"]; tsUpdateTime=info["updatetime"];
spdlog::info("evmgr info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime)); spdlog::info("evpusher info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime));
devSn = info["sn"]; devSn = info["sn"];
while(!inited) {
// TODO: req config
bool found = false;
try {
ret = LVDB::getLocalConfig(config); ret = LVDB::getLocalConfig(config);
if(ret < 0) { if(ret < 0) {
spdlog::error("failed to get local configuration"); spdlog::error("failed to get local configuration");
exit(1); exit(1);
} }
while(!inited) {
// TODO: req config
bool found = false;
try {
spdlog::info("config: {:s}", config.dump()); spdlog::info("config: {:s}", config.dump());
json evpusher; json evpusher;
json evmgr; json evmgr;
......
...@@ -72,19 +72,19 @@ private: ...@@ -72,19 +72,19 @@ private:
tsLastBoot = info["lastboot"]; tsLastBoot = info["lastboot"];
tsUpdateTime=info["updatetime"]; tsUpdateTime=info["updatetime"];
spdlog::info("evmgr info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime)); spdlog::info("evslicer info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime));
devSn = info["sn"]; devSn = info["sn"];
ret = LVDB::getLocalConfig(config);
if(ret < 0) {
spdlog::error("failed to get local configuration");
exit(1);
}
// TODO: read local slices list and last index // TODO: read local slices list and last index
while(!inited) { while(!inited) {
// TODO: req config // TODO: req config
bool found = false; bool found = false;
try { try {
ret = LVDB::getLocalConfig(config);
if(ret < 0) {
spdlog::error("failed to get local configuration");
exit(1);
}
spdlog::info("config: {:s}", config.dump()); spdlog::info("config: {:s}", config.dump());
json evslicer; json evslicer;
json evmgr; json evmgr;
......
...@@ -42,6 +42,9 @@ namespace LVDB { ...@@ -42,6 +42,9 @@ namespace LVDB {
// log // log
int getLog(json &log, json &writeOptions, string fileName); int getLog(json &log, json &writeOptions, string fileName);
int setLog(json &log, json &readOptions, string fileName); int setLog(json &log, json &readOptions, string fileName);
typedef int(*cb_traverse_configration_module)(string modname, json &mod);
int traverseConfigureModules(json &config, cb_traverse_configration_module cb, string moduleName = "");
} }
#endif #endif
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论