提交 30d84b0e authored 作者: blu's avatar blu

big refacting of communitation architect

上级 cda9bce5
...@@ -242,7 +242,6 @@ private: ...@@ -242,7 +242,6 @@ private:
int handleMsg(vector<vector<uint8_t> > &body) int handleMsg(vector<vector<uint8_t> > &body)
{ {
int ret = 0; int ret = 0;
zmq_msg_t msg;
// ID_SENDER, ID_TARGET, meta ,MSG // ID_SENDER, ID_TARGET, meta ,MSG
string selfId, peerId, meta; string selfId, peerId, meta;
if(body.size() == 2 && body[1].size() == 0) { if(body.size() == 2 && body[1].size() == 0) {
......
...@@ -251,8 +251,6 @@ protected: ...@@ -251,8 +251,6 @@ protected:
{ {
bool bStopSig = false; bool bStopSig = false;
int ret = 0; int ret = 0;
zmq_msg_t msg;
while (true) { while (true) {
if(checkStop() == true) { if(checkStop() == true) {
bStopSig = true; bStopSig = true;
......
...@@ -462,7 +462,7 @@ private: ...@@ -462,7 +462,7 @@ private:
} }
} //end for } //end for
spdlog::debug("evmlmotion {} contours {} area {}, thresh {} hasEvent {}", selfId, cnts.size(), cnts.size() == 0? 0:cv::contourArea(cnts[i]), detPara.area, hasEvent); spdlog::debug("evmlmotion {} contours {} area {}, thresh {} hasEvent {}", selfId, cnts.size(), hasEvent? cv::contourArea(cnts[i]):0, detPara.area, hasEvent);
// business logic for event // business logic for event
auto dura = chrono::duration_cast<chrono::seconds>(evtStartTm - evtStartTmLast).count(); auto dura = chrono::duration_cast<chrono::seconds>(evtStartTm - evtStartTmLast).count();
...@@ -579,7 +579,7 @@ protected: ...@@ -579,7 +579,7 @@ protected:
spdlog::error("evmlmotion {} failed to send event {} to {}: {}", this->selfId, evt, this->slicerGid, zmq_strerror(zmq_errno())); spdlog::error("evmlmotion {} failed to send event {} to {}: {}", this->selfId, evt, this->slicerGid, zmq_strerror(zmq_errno()));
} }
else { else {
spdlog::info("evmlmotion {} send event: {}", this->selfId, evt); spdlog::info("evmlmotion {} sent event: {}", this->selfId, evt);
} }
} }
else { else {
......
...@@ -57,9 +57,33 @@ private: ...@@ -57,9 +57,33 @@ private:
int *streamList = nullptr; int *streamList = nullptr;
time_t tsLastBoot, tsUpdateTime; time_t tsLastBoot, tsUpdateTime;
json config; json config;
thread thPing; thread thMsgProcessor;
string drport = "5549"; string drport = "5549";
int handleMsg(vector<vector<uint8_t> > v){
int ret = 0;
string peerId, meta;
json data;
if(v.size() == 3) {
try{
peerId = body2str(v[0]);
meta = json::parse(body2str(v[1]))["type"];
data = json::parse(body2str(v[2]));
spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, meta, data.dump());
}catch(exception &e){
spdlog::error("evslicer {} failed to process msg:{}", body2str(v[1]), body2str(v[2]));
}
}else{
string msg;
for(auto &b:v) {
msg +=body2str(b) + ";";
}
spdlog::error("evslicer {} get invalid msg with size {}: {}", selfId, v.size(), msg);
}
return ret;
}
int init() int init()
{ {
int ret = 0; int ret = 0;
...@@ -179,6 +203,19 @@ private: ...@@ -179,6 +203,19 @@ private:
exit(1); exit(1);
} }
thMsgProcessor = thread([this](){
while(true) {
auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// full proto msg received.
handleMsg(body);
}
});
thMsgProcessor.detach();
return ret; return ret;
} }
......
...@@ -230,9 +230,11 @@ int encode(AVFormatContext *ctx, char **bytes) ...@@ -230,9 +230,11 @@ int encode(AVFormatContext *ctx, char **bytes)
got += sizeof(wholeSize); got += sizeof(wholeSize);
memcpy((*bytes) + got, PS_MARK_E, strlen(PS_MARK_E)); memcpy((*bytes) + got, PS_MARK_E, strlen(PS_MARK_E));
got += strlen(PS_MARK_E); got += strlen(PS_MARK_E);
if(wholeSize != got){
spdlog::error("encode wholesize: {}, should be {}", got, wholeSize);
}
assert(wholeSize == got); assert(wholeSize == got);
spdlog::info("encode wholesize: {}", got);
return wholeSize; return wholeSize;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论