提交 194d7abd authored 作者: blu's avatar blu

init

上级 f25c04a6
......@@ -39,7 +39,10 @@ private:
json jmgr;
unordered_map<string, queue<vector<vector<uint8_t> >> > cachedMsg;
mutex cacheLock;
queue<json> *eventQue;
queue<string> eventQue;
mutex eventQLock;
//
void init()
{
int ret;
......@@ -109,6 +112,7 @@ private:
// update status;
this->peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// msg to peer
if(memcmp((void*)(body[1].data()), (devSn +":0:0").data(), body[1].size()) != 0) {
// message to other peer
// check peer status
......@@ -139,6 +143,19 @@ private:
cachedMsg[peerId].pop();
}
}
// check if event
try{
string metaType = json::parse(meta)["type"];
if(metaType == EV_MSG_META_EVENT) {
eventQue.push(body2str(body[3]));
if(eventQue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQue.pop();
}
}
}catch(exception &e) {
spdlog::error("evmgr {} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what());
}
}else{
// message to mgr
spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
......@@ -188,13 +205,12 @@ protected:
}
}
public:
EvMgr() = delete;
EvMgr(EvMgr &&) = delete;
EvMgr(EvMgr &) = delete;
EvMgr(const EvMgr &) = delete;
EvMgr& operator=(const EvMgr &) = delete;
EvMgr& operator=(EvMgr &&) = delete;
EvMgr(queue<json> *queue):eventQue(queue)
EvMgr()
{
init();
}
......@@ -215,8 +231,7 @@ int main(int argc, const char *argv[])
{
av_log_set_level(AV_LOG_ERROR);
spdlog::set_level(spdlog::level::debug);
queue<json> queue;
EvMgr mgr(&queue);
EvMgr mgr;
mgr.join();
return 0;
}
\ No newline at end of file
......@@ -68,6 +68,7 @@ private:
int streamIdx = -1;
json config;
thread thPing;
thread thEvent;
//
int init()
......@@ -165,11 +166,11 @@ private:
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpusher {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
spdlog::error("evmlmotion {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
//TODO:
}
else {
spdlog::info("evpusher {} {} sent hello to router: {}", devSn, iid, mgrSn);
spdlog::info("evmlmotion {} {} sent hello to router: {}", devSn, iid, mgrSn);
}
return ret;
......@@ -184,27 +185,27 @@ private:
pSub = zmq_socket(pSubCtx, ZMQ_SUB);
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
if(ret != 0) {
spdlog::error("evpusher {} {} failed set setsockopt: {}", devSn, iid, urlPub);
spdlog::error("evmlmotion {} {} failed set setsockopt: {}", devSn, iid, urlPub);
return -1;
}
ret = zmq_connect(pSub, urlPub.c_str());
if(ret != 0) {
spdlog::error("evpusher {} {} failed connect pub: {}", devSn, iid, urlPub);
spdlog::error("evmlmotion {} {} failed connect pub: {}", devSn, iid, urlPub);
return -2;
}
// setup dealer
pDealerCtx = zmq_ctx_new();
pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER);
spdlog::info("evpusher {} {} try create req to {}", devSn, iid, urlRouter);
spdlog::info("evmlmotion {} {} try create req to {}", devSn, iid, urlRouter);
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
if(ret < 0) {
spdlog::error("evpusher {} {} failed setsockopts router: {}", devSn, iid, urlRouter);
spdlog::error("evmlmotion {} {} failed setsockopts router: {}", devSn, iid, urlRouter);
return -3;
}
ret = zmq_connect(pDealer, urlRouter.c_str());
if(ret != 0) {
spdlog::error("evpusher {} {} failed connect dealer: {}", devSn, iid, urlRouter);
spdlog::error("evmlmotion {} {} failed connect dealer: {}", devSn, iid, urlRouter);
return -4;
}
//ping
......@@ -226,7 +227,7 @@ private:
int ret = 0;
// req avformatcontext packet
// send hello to puller
spdlog::info("evpusher {} {} send hello to puller: {}", devSn, iid, pullerGid);
spdlog::info("evmlmotion {} {} send hello to puller: {}", devSn, iid, pullerGid);
vector<vector<uint8_t> > body;
body.push_back(str2body(pullerGid));
json meta;
......@@ -238,7 +239,7 @@ private:
while(!gotFormat) {
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpusher {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno()));
spdlog::error("evmlmotion {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno()));
continue;
}
......@@ -248,18 +249,18 @@ private:
ret = zmq_errno();
if(ret != 0) {
if(failedCnt % 100 == 0) {
spdlog::error("evpusher {} {}, error receive avformatctx: {}, {}", devSn, iid, v.size(), zmq_strerror(ret));
spdlog::info("evpusher {} {} retry connect to peers", devSn, iid);
spdlog::error("evmlmotion {} {}, error receive avformatctx: {}, {}", devSn, iid, v.size(), zmq_strerror(ret));
spdlog::info("evmlmotion {} {} retry connect to peers", devSn, iid);
}
this_thread::sleep_for(chrono::seconds(5));
failedCnt++;
}
else {
spdlog::error("evpusher {} {}, received bad size zmq msg for avformatctx: {}", devSn, iid, v.size());
spdlog::error("evmlmotion {} {}, received bad size zmq msg for avformatctx: {}", devSn, iid, v.size());
}
}
else if(body2str(v[0]) != pullerGid) {
spdlog::error("evpusher {} {}, invalid sender for avformatctx: {}, should be: {}", devSn, iid, body2str(v[0]), pullerGid);
spdlog::error("evmlmotion {} {}, invalid sender for avformatctx: {}, should be: {}", devSn, iid, body2str(v[0]), pullerGid);
}
else {
try {
......@@ -271,7 +272,7 @@ private:
}
}
catch(exception &e) {
spdlog::error("evpusher {} {}, exception in parsing avformatctx packet: {}", devSn, iid, e.what());
spdlog::error("evmlmotion {} {}, exception in parsing avformatctx packet: {}", devSn, iid, e.what());
}
}
}
......@@ -446,10 +447,15 @@ private:
evtState = IN;
json p;
spdlog::info("state: PRE->IN");
p["type"] = "start";
p["type"] = "motion";
p["gid"] = selfId;
p["event"] = "end";
p["ts"] = chrono::duration_cast<chrono::seconds>(evtStartTmLast.time_since_epoch()).count();
//p["frame"] = origin.clone();
evtQueue->push(p.dump());
if(evtQueue->size() > MAX_EVENT_QUEUE_SIZE * 2) {
evtQueue->pop();
}
}
}else{
if(dura > detPara.pre){
......@@ -477,9 +483,14 @@ private:
spdlog::info("state: POST->NONE");
evtState = NONE;
json p;
p["type"] = "end";
p["type"] = "motion";
p["gid"] = selfId;
p["event"] = "end";
p["ts"] = chrono::duration_cast<chrono::seconds>(evtStartTmLast.time_since_epoch()).count() + (int)(detPara.post/2);
evtQueue->push(p.dump());
if(evtQueue->size() > MAX_EVENT_QUEUE_SIZE*2) {
evtQueue->pop();
}
}
}else{
spdlog::info("state: POST->IN");
......@@ -501,6 +512,23 @@ protected:
zmq_msg_t msg;
AVPacket packet;
//event thread
thEvent = thread([&,this](){
json meta; meta["type"] = EV_MSG_META_EVENT;
string metaType = meta.dump();
int ret = 0;
vector<vector<uint8_t> > v = {str2body(this->pullerGid), str2body(metaType), str2body("")};
while(!this->evtQueue->empty()){
string evt = this->evtQueue->front();
v[2] = str2body(evt);
this->evtQueue->pop();
ret = z_send_multiple(this->pDealer, v);
if(ret < 0) {
spdlog::error("evmlmotion {} {} failed to send event: {}, {}", this->devSn, this->iid, evt, zmq_strerror(zmq_errno()));
}
}
});
AVFrame *pFrame = av_frame_alloc();
if (!pFrame) {
spdlog::error("failed to allocated memory for AVFrame");
......@@ -600,11 +628,6 @@ int main(int argc, const char *argv[])
cv::imshow("evmlmotion1", matShow1);
cv::imshow("evmlmotion2", matShow2);
cv::imshow("evmlmotion3", matShow3);
if(evtQueue.size() > 0) {
string p = evtQueue.front();
spdlog::info("event: {}", p);
evtQueue.pop();
}
if(cv::waitKey(200) == 27) {
break;
}
......
......@@ -17,6 +17,7 @@ namespace zmqhelper {
#define EV_MSG_META_EVENT "event"
#define EV_MSG_META_AVFORMATCTX "afctx"
#define EV_NUM_CACHE_PERPEER 100
#define MAX_EVENT_QUEUE_SIZE 20
//
string body2str(vector<uint8_t> body){
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论