提交 3bbee587 authored 作者: blu's avatar blu

init

上级 47edd465
...@@ -54,7 +54,7 @@ namespace LVDB { ...@@ -54,7 +54,7 @@ namespace LVDB {
} }
Status s = DB::Open(options, fileName, &pdb); Status s = DB::Open(options, fileName, &pdb);
if(!s.ok() && s.) { if(!s.ok()) {
spdlog::error("failed to open db {}: {}", fileName, s.ToString()); spdlog::error("failed to open db {}: {}", fileName, s.ToString());
} }
......
...@@ -108,7 +108,6 @@ private: ...@@ -108,7 +108,6 @@ private:
exit(1); exit(1);
} }
selfId = devSn + ":evmlmotion:" + to_string(iid);
while(!inited) { while(!inited) {
// TODO: req config // TODO: req config
bool found = false; bool found = false;
...@@ -126,8 +125,9 @@ private: ...@@ -126,8 +125,9 @@ private:
for(auto &j: ipcs) { for(auto &j: ipcs) {
json mls = j["modules"]["evml"]; json mls = j["modules"]["evml"];
for(auto &p:mls) { for(auto &p:mls) {
if(p["sn"] == devSn && p["iid"] == iid && p["type"] == "motion") { if(p["sn"] == devSn && p["status"] == 0 && p["type"] == "motion") {
evmlmotion = p; evmlmotion = p;
iid = p["iid"];
break; break;
} }
} }
...@@ -144,11 +144,13 @@ private: ...@@ -144,11 +144,13 @@ private:
} }
if(!found) { if(!found) {
spdlog::error("evmlmotion {} {}: no valid config found. retrying load config...", devSn, iid); spdlog::error("evmlmotion {}: no valid config found. retrying load config...", devSn);
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
continue; continue;
} }
selfId = devSn + ":evmlmotion:" + to_string(iid);
// TODO: currently just take the first puller, but should test connectivity // TODO: currently just take the first puller, but should test connectivity
json evpuller = ipc["modules"]["evpuller"][0]; json evpuller = ipc["modules"]["evpuller"][0];
pullerGid = evpuller["sn"].get<string>() + ":evpuller:" + to_string(evpuller["iid"]); pullerGid = evpuller["sn"].get<string>() + ":evpuller:" + to_string(evpuller["iid"]);
...@@ -156,11 +158,11 @@ private: ...@@ -156,11 +158,11 @@ private:
urlPub = string("tcp://") + evpuller["addr"].get<string>() + ":" + to_string(evpuller["port-pub"]); urlPub = string("tcp://") + evpuller["addr"].get<string>() + ":" + to_string(evpuller["port-pub"]);
urlRouter = string("tcp://") + evmgr["addr"].get<string>() + ":" + to_string(evmgr["port-router"]); urlRouter = string("tcp://") + evmgr["addr"].get<string>() + ":" + to_string(evmgr["port-router"]);
spdlog::info("evmlmotion {} {} will connect to {} for sub, {} for router", devSn, iid, urlPub, urlRouter); spdlog::info("evmlmotion {} will connect to {} for sub, {} for router", selfId, urlPub, urlRouter);
// TODO: multiple protocols support // TODO: multiple protocols support
if(evmlmotion.count("path") == 0) { if(evmlmotion.count("path") == 0) {
spdlog::warn("evslicer {} {} no params for path, using default: {}", devSn, iid, URLOUT_DEFAULT); spdlog::warn("evslicer {} {} no params for path, using default: {}", selfId, URLOUT_DEFAULT);
urlOut = URLOUT_DEFAULT; urlOut = URLOUT_DEFAULT;
} }
else { else {
...@@ -174,7 +176,7 @@ private: ...@@ -174,7 +176,7 @@ private:
} }
} }
catch(exception &e) { catch(exception &e) {
spdlog::error("evmlmotion {} {} exception in EvPuller.init {:s} retrying", devSn, iid, e.what()); spdlog::error("evmlmotion {} exception in EvPuller.init {:s} retrying", selfId, e.what());
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
continue; continue;
} }
...@@ -197,11 +199,11 @@ private: ...@@ -197,11 +199,11 @@ private:
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evmlmotion {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno())); spdlog::error("evmlmotion {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
//TODO: //TODO:
} }
else { else {
spdlog::info("evmlmotion {} {} sent hello to router: {}", devSn, iid, mgrSn); spdlog::info("evmlmotion {} sent hello to router: {}", selfId, mgrSn);
} }
return ret; return ret;
...@@ -216,32 +218,32 @@ private: ...@@ -216,32 +218,32 @@ private:
pSub = zmq_socket(pSubCtx, ZMQ_SUB); pSub = zmq_socket(pSubCtx, ZMQ_SUB);
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0); ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
if(ret != 0) { if(ret != 0) {
spdlog::error("evmlmotion {} {} failed set setsockopt: {}", devSn, iid, urlPub); spdlog::error("evmlmotion {} failed set setsockopt: {}", selfId, urlPub);
return -1; return -1;
} }
ret = zmq_connect(pSub, urlPub.c_str()); ret = zmq_connect(pSub, urlPub.c_str());
if(ret != 0) { if(ret != 0) {
spdlog::error("evmlmotion {} {} failed connect pub: {}", devSn, iid, urlPub); spdlog::error("evmlmotion {} failed connect pub: {}", selfId, urlPub);
return -2; return -2;
} }
// setup dealer // setup dealer
pDealerCtx = zmq_ctx_new(); pDealerCtx = zmq_ctx_new();
pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER); pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER);
spdlog::info("evmlmotion {} {} try create req to {}", devSn, iid, urlRouter); spdlog::info("evmlmotion {} try create req to {}", selfId, urlRouter);
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size()); ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size()); ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {} failed setsockopts router: {}", devSn, iid, urlRouter); spdlog::error("evpusher {} {} failed setsockopts router: {}", selfId, urlRouter);
return -3; return -3;
} }
if(ret < 0) { if(ret < 0) {
spdlog::error("evmlmotion {} {} failed setsockopts router: {}", devSn, iid, urlRouter); spdlog::error("evmlmotion {} failed setsockopts router: {}", selfId, urlRouter);
return -3; return -3;
} }
ret = zmq_connect(pDealer, urlRouter.c_str()); ret = zmq_connect(pDealer, urlRouter.c_str());
if(ret != 0) { if(ret != 0) {
spdlog::error("evmlmotion {} {} failed connect dealer: {}", devSn, iid, urlRouter); spdlog::error("evmlmotion {} failed connect dealer: {}", selfId, urlRouter);
return -4; return -4;
} }
//ping //ping
...@@ -264,7 +266,7 @@ private: ...@@ -264,7 +266,7 @@ private:
int ret = 0; int ret = 0;
// req avformatcontext packet // req avformatcontext packet
// send hello to puller // send hello to puller
spdlog::info("evmlmotion {} {} send hello to puller: {}", devSn, iid, pullerGid); spdlog::info("evmlmotion {} send hello to puller: {}", selfId, pullerGid);
vector<vector<uint8_t> > body; vector<vector<uint8_t> > body;
body.push_back(str2body(pullerGid)); body.push_back(str2body(pullerGid));
json meta; json meta;
...@@ -276,7 +278,7 @@ private: ...@@ -276,7 +278,7 @@ private:
while(!gotFormat) { while(!gotFormat) {
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evmlmotion {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno())); spdlog::error("evmlmotion {}, failed to send hello to puller: {}", selfId, zmq_strerror(zmq_errno()));
continue; continue;
} }
...@@ -286,18 +288,18 @@ private: ...@@ -286,18 +288,18 @@ private:
ret = zmq_errno(); ret = zmq_errno();
if(ret != 0) { if(ret != 0) {
if(failedCnt % 100 == 0) { if(failedCnt % 100 == 0) {
spdlog::error("evmlmotion {} {}, error receive avformatctx: {}, {}", devSn, iid, v.size(), zmq_strerror(ret)); spdlog::error("evmlmotion {}, error receive avformatctx: {}, {}", selfId, v.size(), zmq_strerror(ret));
spdlog::info("evmlmotion {} {} retry connect to peers", devSn, iid); spdlog::info("evmlmotion {} retry connect to peers", selfId);
} }
this_thread::sleep_for(chrono::seconds(5)); this_thread::sleep_for(chrono::seconds(5));
failedCnt++; failedCnt++;
} }
else { else {
spdlog::error("evmlmotion {} {}, received bad size zmq msg for avformatctx: {}", devSn, iid, v.size()); spdlog::error("evmlmotion {}, received bad size zmq msg for avformatctx: {}", selfId, v.size());
} }
} }
else if(body2str(v[0]) != pullerGid) { else if(body2str(v[0]) != pullerGid) {
spdlog::error("evmlmotion {} {}, invalid sender for avformatctx: {}, should be: {}", devSn, iid, body2str(v[0]), pullerGid); spdlog::error("evmlmotion {}, invalid sender for avformatctx: {}, should be: {}", selfId, body2str(v[0]), pullerGid);
} }
else { else {
try { try {
...@@ -309,7 +311,7 @@ private: ...@@ -309,7 +311,7 @@ private:
} }
} }
catch(exception &e) { catch(exception &e) {
spdlog::error("evmlmotion {} {}, exception in parsing avformatctx packet: {}", devSn, iid, e.what()); spdlog::error("evmlmotion {}, exception in parsing avformatctx packet: {}", selfId, e.what());
} }
} }
} }
...@@ -567,9 +569,9 @@ protected: ...@@ -567,9 +569,9 @@ protected:
v[2] = str2body(evt); v[2] = str2body(evt);
this->evtQueue->pop(); this->evtQueue->pop();
ret = z_send_multiple(this->pDealer, v); ret = z_send_multiple(this->pDealer, v);
spdlog::info("evmlmotion {} {} send event: {}", this->devSn, this->iid, evt); spdlog::info("evmlmotion {} send event: {}", this->devSn, this->iid, evt);
if(ret < 0) { if(ret < 0) {
spdlog::error("evmlmotion {} {} failed to send event: {}, {}", this->devSn, this->iid, evt, zmq_strerror(zmq_errno())); spdlog::error("evmlmotion {} failed to send event: {}, {}", this->devSn, this->iid, evt, zmq_strerror(zmq_errno()));
} }
} }
else { else {
......
...@@ -33,19 +33,11 @@ namespace fs = std::filesystem; ...@@ -33,19 +33,11 @@ namespace fs = std::filesystem;
using namespace std; using namespace std;
using namespace zmqhelper; using namespace zmqhelper;
int mqErrorMsg(string cls, string devSn, int iid, string extraInfo, int ret)
{
if(ret < 0) {
spdlog::error("{} {} {}, {}: {} ", cls, devSn, iid, extraInfo, zmq_strerror(zmq_errno()));
}
return ret;
}
class RepSrv: public TinyThread { class RepSrv: public TinyThread {
private: private:
string mgrSn; string mgrSn;
string devSn; string devSn;
string selfId;
int iid; int iid;
string urlRep; string urlRep;
const char * bytes; const char * bytes;
...@@ -64,7 +56,7 @@ private: ...@@ -64,7 +56,7 @@ private:
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpuller {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno())); spdlog::error("evpuller {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
} }
return ret; return ret;
} }
...@@ -81,19 +73,19 @@ private: ...@@ -81,19 +73,19 @@ private:
vector<vector<uint8_t> > rep = {v[0], v[1], msgBody}; vector<vector<uint8_t> > rep = {v[0], v[1], msgBody};
ret = z_send_multiple(pDealer, rep); ret = z_send_multiple(pDealer, rep);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpuller {} {} failed send rep to requester {}: {}", devSn, iid, body2str(v[0]), zmq_strerror(zmq_errno())); spdlog::error("evpuller {} failed send rep to requester {}: {}", selfId, body2str(v[0]), zmq_strerror(zmq_errno()));
} }
} }
else if(meta["type"].get<string>() == EV_MSG_META_EVENT) { else if(meta["type"].get<string>() == EV_MSG_META_EVENT) {
// event msg // event msg
spdlog::info("evpuller {} {} received event: {}", devSn, iid, body2str(v[2])); spdlog::info("evpuller {} received event: {}", selfId, body2str(v[2]));
} }
else { else {
spdlog::error("evpuller {} {} unknown meta from {}: {}", devSn, iid, body2str(v[0]), body2str(v[1])); spdlog::error("evpuller {} unknown meta from {}: {}", selfId, body2str(v[0]), body2str(v[1]));
} }
} }
catch(exception &e) { catch(exception &e) {
spdlog::error("evpuller {} {} excpetion parse request from {}: {}", devSn, iid, body2str(v[0]), body2str(v[1])); spdlog::error("evpuller {} excpetion parse request from {}: {}", selfId, body2str(v[0]), body2str(v[1]));
} }
return ret; return ret;
...@@ -128,7 +120,7 @@ protected: ...@@ -128,7 +120,7 @@ protected:
auto v = z_recv_multiple(pDealer, false); auto v = z_recv_multiple(pDealer, false);
if(v.size() != 3) { if(v.size() != 3) {
//TODO: //TODO:
spdlog::error("evpuller {} {}, repSrv received invalid message: {}", devSn, iid, v.size()); spdlog::error("evpuller {}, repSrv received invalid message: {}", selfId, v.size());
continue; continue;
} }
handleMsg(v); handleMsg(v);
...@@ -140,7 +132,9 @@ public: ...@@ -140,7 +132,9 @@ public:
RepSrv(RepSrv&&) = delete; RepSrv(RepSrv&&) = delete;
RepSrv(string mgrSn, string devSn, int iid, const char* formatBytes, RepSrv(string mgrSn, string devSn, int iid, const char* formatBytes,
int len, void *pDealer):mgrSn(mgrSn),devSn(devSn), iid(iid), bytes(formatBytes), int len, void *pDealer):mgrSn(mgrSn),devSn(devSn), iid(iid), bytes(formatBytes),
len(len), pDealer(pDealer) {}; len(len), pDealer(pDealer) {
selfId = devSn+":evpuller:" + to_string(iid);
};
~RepSrv() {}; ~RepSrv() {};
}; };
...@@ -171,7 +165,7 @@ private: ...@@ -171,7 +165,7 @@ private:
tsLastBoot = info["lastboot"]; tsLastBoot = info["lastboot"];
tsUpdateTime=info["updatetime"]; tsUpdateTime=info["updatetime"];
spdlog::info("evmgr info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime)); spdlog::info("evpuller info: sn = {}, lastboot = {}, updatetime = {}", info["sn"].get<string>(), ctime(&tsLastBoot), ctime(&tsUpdateTime));
devSn = info["sn"]; devSn = info["sn"];
ret = LVDB::getLocalConfig(config); ret = LVDB::getLocalConfig(config);
...@@ -180,7 +174,6 @@ private: ...@@ -180,7 +174,6 @@ private:
exit(1); exit(1);
} }
selfId = devSn+":evpuller:" + to_string(iid);
while(!inited) { while(!inited) {
// TODO: req config // TODO: req config
bool found = false; bool found = false;
...@@ -200,8 +193,9 @@ private: ...@@ -200,8 +193,9 @@ private:
for(auto &j: ipcs) { for(auto &j: ipcs) {
json pullers = j["modules"]["evpuller"]; json pullers = j["modules"]["evpuller"];
for(auto &p:pullers) { for(auto &p:pullers) {
if(p["sn"] == devSn && p["iid"] == iid) { if(p["sn"] == devSn && p["status"] == 0) {
evpuller = p; evpuller = p;
iid = p["iid"];
break; break;
} }
} }
...@@ -217,48 +211,55 @@ private: ...@@ -217,48 +211,55 @@ private:
} }
if(!found) { if(!found) {
spdlog::error("evpuller {} {} no valid config found. retrying load config...", devSn, iid); spdlog::error("evpuller {} no valid config found", devSn);
goto togo_sleep_continue; goto togo_sleep_continue;
} }
selfId = devSn+":evpuller:" + to_string(iid);
mgrSn = evmgr["sn"]; mgrSn = evmgr["sn"];
user = ipc["user"]; user = ipc["user"];
passwd = ipc["password"]; passwd = ipc["password"];
urlIn = "rtsp://" + user + ":" + passwd + "@" + ipc["addr"].get<string>() + "/h264/ch1/sub/av_stream"; urlIn = "rtsp://" + user + ":" + passwd + "@" + ipc["addr"].get<string>() + "/h264/ch1/sub/av_stream";
addr = evpuller["addr"].get<string>(); addr = evpuller["addr"].get<string>();
if(addr == "*" || addr == "0.0.0.0") { if(addr == "*" || addr == "0.0.0.0") {
spdlog::error("evpuller {} {} invalid addr {} for pub", devSn, iid, evpuller.dump()); spdlog::error("evpuller {} invalid addr {} for pub", selfId, evpuller.dump());
goto togo_sleep_continue; goto togo_sleep_continue;
} }
urlPub = string("tcp://*:") + to_string(evpuller["port-pub"]); urlPub = string("tcp://*:") + to_string(evpuller["port-pub"]);
// urlRep = string("tcp://") +data["addr"].get<string>() + ":" + to_string(data["port-rep"]); // urlRep = string("tcp://") +data["addr"].get<string>() + ":" + to_string(data["port-rep"]);
urlDealer = "tcp://" + evmgr["addr"].get<string>() + string(":") + to_string(evmgr["port-router"]); urlDealer = "tcp://" + evmgr["addr"].get<string>() + string(":") + to_string(evmgr["port-router"]);
spdlog::info("evpuller {} {} bind on {} for pub, connect to {} for dealer", devSn, iid, urlPub, urlDealer); spdlog::info("evpuller {} bind on {} for pub, connect to {} for dealer", selfId, urlPub, urlDealer);
pPubCtx = zmq_ctx_new(); pPubCtx = zmq_ctx_new();
pPub = zmq_socket(pPubCtx, ZMQ_PUB); pPub = zmq_socket(pPubCtx, ZMQ_PUB);
ret = mqErrorMsg("evpuller", devSn, iid, "failed to bind zmq", zmq_bind(pPub, urlPub.c_str())); ret = zmq_bind(pPub, urlPub.c_str());
if(ret < 0) { if(ret < 0) {
spdlog::error("evpuller {} failed to bind to {}", selfId, urlPub);
goto togo_sleep_continue; goto togo_sleep_continue;
} }
pDealerCtx = zmq_ctx_new(); pDealerCtx = zmq_ctx_new();
pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER); pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER);
ret += mqErrorMsg("evpuller", devSn, iid, "failed to set socksopt", zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size())); ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
if(ret < 0) {
spdlog::error("evpuller {} failed to set identity", selfId);
goto togo_sleep_continue;
}
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size()); ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {} failed setsockopts router: {}", devSn, iid, urlDealer); spdlog::error("evpusher {} {} failed setsockopts router: {}", selfId, urlDealer);
goto togo_sleep_continue; goto togo_sleep_continue;
} }
ret += mqErrorMsg("evpuller", devSn, iid, "failed to connect to router " + urlDealer, zmq_connect(pDealer, urlDealer.c_str())); ret = zmq_connect(pDealer, urlDealer.c_str());
if(ret < 0) { if(ret < 0) {
spdlog::error("evpuller {} {} zmq setup failed. retrying load config...", devSn, iid); spdlog::error("evpuller {} failed to connect to router {}", selfId, urlDealer);
goto togo_sleep_continue; goto togo_sleep_continue;
} }
} }
catch(exception &e) { catch(exception &e) {
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
spdlog::error("evpuller {} {} exception in EvPuller.init {:s}, retrying... ",devSn, iid, e.what()); spdlog::error("evpuller {} exception in EvPuller.init {:s}, retrying... ", selfId, e.what());
continue; continue;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论