提交 82e45de2 authored 作者: blu's avatar blu

big refacting of communitation architect

上级 64152ca9
...@@ -556,8 +556,13 @@ protected: ...@@ -556,8 +556,13 @@ protected:
uint64_t pktCnt = 0; uint64_t pktCnt = 0;
zmq_msg_t msg; zmq_msg_t msg;
AVPacket packet; AVPacket packet;
json eventToSlicer;
// eventToSlicer["type"] = "event";
// eventTOSlicer["extraInfo"] = json(); //array
// eventToSlicer["start"]
// eventToSlicer["end"]
//event relay thread: motion to slicer //event relay thread: motion to slicer and sn:evdaemon:0
thEvent = thread([&,this]() { thEvent = thread([&,this]() {
json meta; json meta;
meta["type"] = EV_MSG_META_EVENT; meta["type"] = EV_MSG_META_EVENT;
...@@ -566,17 +571,44 @@ protected: ...@@ -566,17 +571,44 @@ protected:
vector<vector<uint8_t> > v = {str2body(this->slicerGid), str2body(metaType), str2body("")}; vector<vector<uint8_t> > v = {str2body(this->slicerGid), str2body(metaType), str2body("")};
while(true) { while(true) {
if(!this->evtQueue->empty()) { if(!this->evtQueue->empty()) {
// send to evslicer
string evt = this->evtQueue->front(); string evt = this->evtQueue->front();
v[2] = str2body(evt); json jevt = json::parse(evt);
this->evtQueue->pop(); this->evtQueue->pop();
ret = z_send_multiple(this->pDealer, v); if(jevt["event"] == EV_MSG_EVENT_MOTION_START){
eventToSlicer["type"] = "event";
eventToSlicer["start"] = jevt["ts"];
eventToSlicer["extraInfo"] = json(); //array
eventToSlicer["extraInfo"].push_back(jevt);
// TODO: save and load saved evt on crash
}else if(jevt["event"] == EV_MSG_EVENT_MOTION_END){
eventToSlicer["end"] = jevt["ts"];
eventToSlicer["extraInfo"].push_back(jevt);
v[2] = str2body(eventToSlicer.dump());
ret = z_send_multiple(this->pDealer, v);
if(ret < 0) {
spdlog::error("evmlmotion {} failed to send event {} to {}: {}", this->selfId, evt, this->slicerGid, zmq_strerror(zmq_errno()));
}
else {
spdlog::info("evmlmotion {} sent event to {}: {}", this->selfId, this->slicerGid, evt);
}
eventToSlicer.clear();
}else{
spdlog::error("evmlmotion {} unknown event to {}: {}", this->selfId, this->slicerGid, evt);
}
// send to evdaemon
v[2] = str2body(evt);
string daemonId = this->devSn + ":evdaemon:0";
v[0] = str2body(daemonId);
ret = z_send_multiple(this->pDealer, v);
if(ret < 0) { if(ret < 0) {
spdlog::error("evmlmotion {} failed to send event {} to {}: {}", this->selfId, evt, this->slicerGid, zmq_strerror(zmq_errno())); spdlog::error("evmlmotion {} failed to send event {} to {}: {}", this->selfId, evt, daemonId, zmq_strerror(zmq_errno()));
} }
else { else {
spdlog::info("evmlmotion {} sent event: {}", this->selfId, evt); spdlog::info("evmlmotion {} sent event to {}: {}", this->selfId, daemonId, evt);
} }
} }
else { else {
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
......
...@@ -22,6 +22,7 @@ update: 2019/09/10 ...@@ -22,6 +22,7 @@ update: 2019/09/10
#include <vector> #include <vector>
#include <ctime> #include <ctime>
#include <functional> #include <functional>
#include <queue>
#include <cstdlib> #include <cstdlib>
#include "inc/zmqhelper.hpp" #include "inc/zmqhelper.hpp"
...@@ -60,10 +61,14 @@ private: ...@@ -60,10 +61,14 @@ private:
mutex mutTsOld; mutex mutTsOld;
vector<long> vTsActive; vector<long> vTsActive;
mutex mutTsActive; mutex mutTsActive;
map<long, string> mapTs2BaseName; queue<string> eventQueue;
mutex mutTs2BaseName; condition_variable cvEvent;
json eventState; mutex mutEvent;
thread thEventHandler;
bool validMsg(json &msg) {
return true;
}
int handleMsg(vector<vector<uint8_t> > v) int handleMsg(vector<vector<uint8_t> > v)
{ {
int ret = 0; int ret = 0;
...@@ -86,25 +91,23 @@ private: ...@@ -86,25 +91,23 @@ private:
} }
else if(meta == EV_MSG_META_EVENT){ else if(meta == EV_MSG_META_EVENT){
data = json::parse(body2str(v[2])); data = json::parse(body2str(v[2]));
spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, peerId, meta, data.dump());
/// evslicer has two msg interfaces to subsystems on edge side
json eventMem; /// 1. type = "event"; start: timestamp; end: timestamp
json event; /// 2. type = "media"; duration: seconds
if(eventState.count(data["type"] != 0)) { if(!validMsg(data)){
eventMem = eventState[data["type"]]; spdlog::info("evslicer {} received invalid msg from {}: {}", selfId, peerId, msg);
} }else{
spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, peerId, meta, data.dump());
if(data["type"] == EV_MSG_TYPE_AI_MOTION) { if(data["type"] == "event") {
if(eventMem.size() == 0){ lock_guard<mutex> lock(mutEvent);
if(data["event"] == "end" || data["event"] != "start") { eventQueue.push(data);
spdlog::error("evslicer {} invalid event state:{}, ignored", selfId, data["event"].get<string>()); if(eventQueue.size() > MAX_EVENT_QUEUE_SIZE) {
}else{ eventQueue.pop();
}
} cvEvent.notify_one();
} }
} }
}else{ }else{
spdlog::info("evslicer {} received unkown msg from {}: {}", selfId, peerId, msg); spdlog::info("evslicer {} received unkown msg from {}: {}", selfId, peerId, msg);
} }
...@@ -494,7 +497,7 @@ protected: ...@@ -494,7 +497,7 @@ protected:
return mktime(&t); return mktime(&t);
} }
vector<long> LoadVideoFiles(string path, int days, int maxSlices, map<long, string> &ts2fileName, vector<long> &tsNeedUpload) vector<long> LoadVideoFiles(string path, int days, int maxSlices, vector<long> &tsNeedUpload)
{ {
vector<long> v = vector<long>(maxSlices); vector<long> v = vector<long>(maxSlices);
tsNeedUpload = vector<long>(maxSlices); tsNeedUpload = vector<long>(maxSlices);
...@@ -682,7 +685,7 @@ public: ...@@ -682,7 +685,7 @@ public:
// thread for slicer maintenace // thread for slicer maintenace
thSliceMgr = thread([this]() { thSliceMgr = thread([this]() {
// get old and active slices // get old and active slices
this->vTsActive = this->LoadVideoFiles(this->urlOut, this->days, this->numSlices, this->mapTs2BaseName, this->vTsOld); this->vTsActive = this->LoadVideoFiles(this->urlOut, this->days, this->numSlices, this->vTsOld);
spdlog::info("evslicer {} will store slice from index: {}", selfId, this->segHead); spdlog::info("evslicer {} will store slice from index: {}", selfId, this->segHead);
monitor * m = nullptr; monitor * m = nullptr;
...@@ -693,6 +696,26 @@ public: ...@@ -693,6 +696,26 @@ public:
// thread for uploading slices // thread for uploading slices
getInputFormat(); getInputFormat();
setupStream(); setupStream();
// event thread
thEventHandler = thread([this]{
while(true){
unique_lock<mutex> lk(this->mutEvent);
while(this->eventQueue.empty()){
this->cvEvent.wait(lk);
}
if(this->eventQueue.empty()){
continue;
}
auto evt = this->eventQueue.front();
this->eventQueue.pop();
// TODO: upload video
spdlog::info("evslicer processing event: {}", evt);
}
});
}; };
~EvSlicer() ~EvSlicer()
{ {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论