/* module: evmlmotion description: author: Bruce.Lu created: 2019/08/23 update: 2019/09/10 */ #include #include #include #include #include #include #include #include #include #include #include #include "zmqhelper.hpp" #include "tinythread.hpp" #include "common.hpp" #include "avcvhelpers.hpp" #include "database.h" #include "singprocess.hpp" using namespace std; using namespace zmqhelper; #define URLOUT_DEFAULT "frames" #define NUM_PKT_IGNORE 18*5 #define FRAME_SIZE 500 #define AREA_1080P 200 #define DEBUG #ifdef DEBUG cv::Mat matShow1, matShow2, matShow3; #endif bool gFirst = true; struct DetectParam { int thre; int area; int fpsIn; int fpsProc; int pre; int post; float entropy; int maxDuration; // max event video length in minutes float resize; // float size; cv::Point2f region[2]; }; enum EventState { NONE, PRE, IN, POST }; class EvMLMotion: public TinyThread { private: string selfName = "evmlmotion"; void *pSubCtx = nullptr, *pDealerCtx = nullptr; // for packets relay void *pSub = nullptr, *pDealer = nullptr, *pDaemonCtx = nullptr, *pDaemon = nullptr; string urlOut, urlPub, urlRouter, devSn, mgrSn, selfId, pullerGid, slicerGid; int iid; AVFormatContext *pAVFormatInput = nullptr; AVCodecContext *pCodecCtx = nullptr; DetectParam detPara = {25, 0, -1, 3, 3, 30, 0.3, 25, 0, 540, {{0,0},{0,0}}}; EventState evtState = EventState::NONE; // chrono::system_clock::time_point packetTm, evtStartTmLast, evtStartTmOrig; long long packetTm = 0, evtStartTmLast = 0, evtStartTmOrig = 0, evtStartTmPre = 0; queue *evtQueue; int streamIdx = -1; json config; thread thEdgeMsgHandler, thCloudMsgHandler, thHealdChecker; thread thEvent; string drport = "5549"; condition_variable cvMsg; mutex mutMsg; bool gotFormat = false; long long packetTs = 0; long long packetTsDelta = 0; float pps = 0; int pktLag = 0; int fps = 0; uint64_t pktCnt = 0; // int handleCloudMsg(vector > v) { int ret = 0; string peerId, metaType, metaValue, msg; json data; for(auto &b:v) { msg +=body2str(b) + ";"; } msg = msg.substr(0, msg.size()> 100? 15:msg.size()); bool bProcessed = false; if(v.size() == 3) { try { peerId = body2str(v[0]); json meta = json::parse(body2str(v[1])); metaType = meta["type"]; if(meta.count("value") != 0) { metaValue = meta["value"]; } // msg from cluster mgr string daemonId = this->devSn + ":evdaemon:0"; if(peerId == daemonId) { if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) { spdlog::info("{} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId); bProcessed = true; exit(0); } } } catch(exception &e) { spdlog::error("{} exception to process msg {}: {}", selfId, msg, e.what()); } } if(!bProcessed) { spdlog::error("{} received msg having no implementation from peer: {}", selfId, msg); } return ret; } int handleEdgeMsg(vector > v) { int ret = 0; string peerId, metaType, metaValue, msg; json data; for(auto &b:v) { msg +=body2str(b) + ";"; } msg = msg.substr(0, msg.size()> EV_MSG_DEBUG_LEN? EV_MSG_DEBUG_LEN:msg.size()); spdlog::info("{} received edge msg: {}", selfId, msg); bool bProcessed = false; if(v.size() == 3) { try { peerId = body2str(v[0]); json meta = json::parse(body2str(v[1])); metaType = meta["type"]; if(meta.count("value") != 0) { metaValue = meta["value"]; } // msg from cluster mgr string clusterMgrId = this->mgrSn + ":evmgr:0"; if(peerId == clusterMgrId) { // } else if(peerId == pullerGid) { if(metaType == EV_MSG_META_AVFORMATCTX) { lock_guard lock(this->mutMsg); if(pAVFormatInput == nullptr) { pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext)); AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput); gotFormat = true; cvMsg.notify_one(); } else { spdlog::warn("{} received avformatctx msg from {}, but already proceessed before, restarting", selfId, peerId); spdlog::error("{} restart since reinit", selfId); exit(0); } bProcessed = true; } } } catch(exception &e) { spdlog::error("{} exception to process msg {}: {}", selfId, msg, e.what()); } } if(!bProcessed) { spdlog::error("{} received msg having no implementation from peer: {}", selfId, msg); } return ret; } int init() { int ret = 0; bool found = false; try { spdlog::info("{} boot config: {}", selfId, config.dump()); json evmlmotion; json &evmgr = this->config; json ipc; json ipcs = evmgr["ipcs"]; for(auto &j: ipcs) { json mls = j["modules"]["evml"]; for(auto &p:mls) { if(p["sn"] == devSn && p["type"] == "motion" && p["iid"] == iid && p["enabled"] != 0) { evmlmotion = p; iid = p["iid"]; break; } } if(evmlmotion.size() != 0) { ipc = j; break; } } if(ipc.size()!=0 && evmlmotion.size()!=0) { found = true; } if(!found) { spdlog::error("{}: no valid config found. retrying load config...", devSn); exit(1); } // TODO: currently just take the first puller, but should test connectivity json evpuller = ipc["modules"]["evpuller"][0]; pullerGid = evpuller["sn"].get() + ":evpuller:" + to_string(evpuller["iid"]); mgrSn = evmgr["sn"]; // TODO: connect to the first slicer json evslicer = ipc["modules"]["evslicer"][0]; slicerGid = evslicer["sn"].get()+":evslicer:" + to_string(evslicer["iid"]); int portPub = 5556; if(evpuller.count("portPub") != 0 && evpuller["portPub"].is_number_integer()) { portPub = evpuller["portPub"]; } else if(evpuller.count("port-pub") != 0 && evpuller["port-pub"].is_number_integer()) { portPub = evpuller["port-pub"]; } int portRouter = 5550; if(evmgr.count("portRouter") != 0 && evmgr["portRouter"].is_number_integer()) { portRouter = evmgr["portRouter"]; } else if(evmgr.count("port-router") != 0 && evmgr["port-router"].is_number_integer()) { portRouter = evmgr["port-router"]; } urlPub = string("tcp://") + evpuller["addr"].get() + ":" + to_string(portPub); urlRouter = string("tcp://") + evmgr["addr"].get() + ":" + to_string(portRouter); spdlog::info("{} will connect to {} for sub, {} for router", selfId, urlPub, urlRouter); // TODO: multiple protocols support if(evmlmotion.count("path") == 0) { spdlog::info("{} no params for path, using default: {}", selfId, URLOUT_DEFAULT); urlOut = URLOUT_DEFAULT; } else { urlOut = evmlmotion["path"]; } ret = system(("mkdir -p " +urlOut).c_str()); // if(ret == -1) { // spdlog::error("failed mkdir {}", urlOut); // return -1; // } // detection params if(evmlmotion.count("thresh") == 0||evmlmotion["thresh"] < 10 ||evmlmotion["thresh"] >= 255) { spdlog::info("{} invalid thresh value. should be in (10,255), default to {}", selfId, detPara.thre); } else { detPara.thre = evmlmotion["thresh"]; } if(evmlmotion.count("area") == 0||evmlmotion["area"] < 5 ||evmlmotion["area"] >= int(FRAME_SIZE*FRAME_SIZE)*9/10) { spdlog::info("{} invalid area value. should be in (10, 500*500*9/10), default to {}, {}", selfId, detPara.area, AREA_1080P); } else { detPara.area = evmlmotion["area"]; } if(evmlmotion.count("pre") == 0||evmlmotion["pre"] < 1 ||evmlmotion["pre"] >= 120) { spdlog::info("{} invalid pre value. should be in (1, 120), default to {}", selfId, detPara.pre); } else { detPara.pre = evmlmotion["pre"]; } if(evmlmotion.count("post") == 0||evmlmotion["post"] < 6 ||evmlmotion["post"] >= 120) { spdlog::info("{} invalid post value. should be in (6, 120), default to {}", selfId, detPara.post); } else { detPara.post = evmlmotion["post"]; } if(evmlmotion.count("entropy") == 0||evmlmotion["entropy"] < 0 || evmlmotion["entropy"] >= 10) { spdlog::info("{} invalid entropy value. should be in (0, 10), default to {}", selfId, detPara.entropy); } else { detPara.entropy = evmlmotion["entropy"]; } if(evmlmotion.count("fpsProc") == 0|| !evmlmotion["fpsProc"].is_number_integer() ||evmlmotion["fpsProc"] < 0 || evmlmotion["fpsProc"] >= 40) { spdlog::info("{} invalid fpsProc value. should be in (0, 40) as int, default to {}", selfId, detPara.fpsProc); } else { detPara.fpsProc = evmlmotion["fpsProc"]; } if(evmlmotion.count("maxDuration") == 0|| !evmlmotion["maxDuration"].is_number_integer() ||evmlmotion["maxDuration"] <= 1 || evmlmotion["maxDuration"] >= 100) { spdlog::info("{} invalid maxDuration value. should be in (0, 100) as int(minutes), default to {}", selfId, detPara.maxDuration); } else { detPara.maxDuration = evmlmotion["maxDuration"]; } if(evmlmotion.count("resize") == 0|| !evmlmotion["resize"].is_number_float() ||evmlmotion["resize"] < 0 || evmlmotion["resize"] > 1) { spdlog::info("{} invalid resize value. should be in [0, 1] as float value, default to {}", selfId, detPara.resize); } else { detPara.resize = evmlmotion["resize"]; if(detPara.resize < 0.001) { detPara.resize = 0; }else if (detPara.resize > 0.99) { detPara.resize = 1; } } if(evmlmotion.count("size") == 0|| !evmlmotion["size"].is_number_integer() ||evmlmotion["size"] < 100 || evmlmotion["size"] > 540) { spdlog::info("{} invalid size value. should be in [100, 540] as int value, default to {}", selfId, detPara.size); } else { detPara.size = evmlmotion["size"]; } if(evmlmotion.count("region") != 0) { json ®ion = evmlmotion["region"]; if(region.count("minX") == 0|| !region["minX"].is_number() || region.count("minY") == 0||!region["minY"].is_number() || region.count("maxX") == 0||!region["maxX"].is_number() || region.count("maxY") == 0||!region["maxY"].is_number()){ spdlog::error("{} invalid region config: format. ignored", selfId); }else{ cv::Point2f p1, p2; const float EVML_MARGIN_F = 0.0001; try{ p1.x = region["minX"].get(); p1.y = region["minY"].get(); p2.x = region["maxX"].get(); p2.y = region["maxY"].get(); if(p1.x < 0 || p1.x>=1 || p1.y <=0 || p1.y > 1|| p2.x <=0 || p2.x > 1 || p2.y <=0 || p2.y >1 || p1.x >= p2.x || p1.y >= p2.y) { spdlog::error("{} invalid region config: invalid value range. ignored", selfId); }else{ detPara.region[0] = p1; detPara.region[1] = p2; spdlog::info("{} region: {} {}, {} {}", selfId, p1.x, p1.y, p2.x, p2.y); } }catch(exception &e) { spdlog::error("{} failed to parse regoin config: {}. ignored", selfId, e.what()); } } }else{ spdlog::error("{} no/invalid region config. ignored", selfId); } spdlog::info("{} detection params: entropy {}, area {}, thresh {}, fpsProc {}", selfId, detPara.entropy, detPara.area, detPara.thre, detPara.fpsProc); // setup sub pSubCtx = zmq_ctx_new(); pSub = zmq_socket(pSubCtx, ZMQ_SUB); //ZMQ_TCP_KEEPALIVE //ZMQ_TCP_KEEPALIVE_IDLE //ZMQ_TCP_KEEPALIVE_INTVL ret = 1; zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE, &ret, sizeof (ret)); ret = 20; zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE_IDLE, &ret, sizeof (ret)); zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret)); ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0); if(ret != 0) { spdlog::error("{} failed set setsockopt: {}", selfId, urlPub); exit(1); } ret = zmq_connect(pSub, urlPub.c_str()); if(ret != 0) { spdlog::error("{} failed connect pub: {}", selfId, urlPub); exit(1); } // setup dealer pDealerCtx = zmq_ctx_new(); pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER); spdlog::info("{} connect to router {}", selfId, urlRouter); ret = 1; zmq_setsockopt(pDealer, ZMQ_TCP_KEEPALIVE, &ret, sizeof (ret)); ret = 5; zmq_setsockopt(pDealer, ZMQ_TCP_KEEPALIVE_IDLE, &ret, sizeof (ret)); zmq_setsockopt(pDealer, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret)); ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size()); ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size()); if(ret < 0) { spdlog::error("{} {} failed setsockopts router: {}", selfId, urlRouter); exit(1); } if(ret < 0) { spdlog::error("{} failed setsockopts router: {}", selfId, urlRouter); exit(1); } ret = zmq_connect(pDealer, urlRouter.c_str()); if(ret != 0) { spdlog::error("{} failed connect dealer: {}", selfId, urlRouter); exit(1); } //ping ret = ping(); } catch(exception &e) { spdlog::error("{} exception in EvPuller.init {:s} retrying", selfId, e.what()); exit(1); } return 0; } int ping() { // send hello to router int ret = 0; vector >body; // since identity is auto set body.push_back(str2body(mgrSn+":evmgr:0")); body.push_back(str2body(EV_MSG_META_PING)); // blank meta body.push_back(str2body(MSG_HELLO)); ret = z_send_multiple(pDealer, body); if(ret < 0) { spdlog::error("{} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno())); } else { spdlog::info("{} sent hello to router: {}", selfId, mgrSn); } return ret; } int getInputFormat() { int ret = 0; // req avformatcontext packet // send hello to puller spdlog::info("{} send hello to puller: {}", selfId, pullerGid); vector > body; body.push_back(str2body(pullerGid)); json meta; meta["type"] = EV_MSG_META_AVFORMATCTX; body.push_back(str2body(meta.dump())); body.push_back(str2body(MSG_HELLO)); gotFormat = false; ret = z_send_multiple(pDealer, body); if(ret < 0) { spdlog::error("evpusher {} {}, failed to send hello to puller: {}. exiting...", devSn, iid, zmq_strerror(zmq_errno())); // TODO: message report to cloud exit(1); } unique_lock lk(this->mutMsg); bool got = this->cvMsg.wait_for(lk, 30s, [this] {return this->gotFormat;}); if(got) { } else { // restart spdlog::error("{} failed wait for avformatctx for {}s, restart", devSn, 30); exit(1); } return ret; } int setupStream() { int ret = 0; // find video for (int i = 0; i < pAVFormatInput->nb_streams; i++) { AVStream *in_stream = pAVFormatInput->streams[i]; AVCodecParameters *in_codecpar = in_stream->codecpar; if (in_codecpar->codec_type != AVMEDIA_TYPE_VIDEO) { continue; } streamIdx = i; break; } if(streamIdx == -1) { spdlog::error("{} no video stream found.", selfId); return -1; } AVStream *pStream = pAVFormatInput->streams[streamIdx]; detPara.fpsIn = (int)(pStream->r_frame_rate.num/pStream->r_frame_rate.den); AVCodec *pCodec = avcodec_find_decoder(pStream->codecpar->codec_id); if (pCodec==NULL) { spdlog::error("{} ERROR unsupported codec!", selfId); return -1; } pCodecCtx = avcodec_alloc_context3(pCodec); if (!pCodecCtx) { spdlog::error("{} failed to allocated memory for AVCodecContext", selfId); return -1; } if (avcodec_parameters_to_context(pCodecCtx, pStream->codecpar) < 0) { spdlog::error("{} failed to copy codec params to codec context", selfId); return -1; } if (avcodec_open2(pCodecCtx, pCodec, NULL) < 0) { spdlog::error("{} failed to open codec through avcodec_open2", selfId); return -1; } return ret; } void freeStream() { if(pAVFormatInput != nullptr) { AVFormatCtxSerializer::freeCtx(pAVFormatInput); pAVFormatInput = nullptr; } pAVFormatInput = nullptr; } int decode_packet(bool detect, AVPacket *pPacket, AVCodecContext *pCodecContext, AVFrame *pFrame) { int response = avcodec_send_packet(pCodecContext, pPacket); if (response < 0) { spdlog::error("{} Error while sending a packet to the decoder: {}", selfId, av_err2str(response)); return response; } static long long called = 0; static int factor = 0; while (response >= 0) { response = avcodec_receive_frame(pCodecContext, pFrame); if (response == AVERROR(EAGAIN) || response == AVERROR_EOF) { break; } if (response < 0) { spdlog::error("{} Error while receiving a frame from the decoder: {}", selfId, av_err2str(response)); return response; } else { spdlog::debug( "Frame {} (type={}, size={} bytes) pts {} key_frame {} [DTS {}]", pCodecContext->frame_number, av_get_picture_type_char(pFrame->pict_type), pFrame->pkt_size, pFrame->pts, pFrame->key_frame, pFrame->coded_picture_number ); break; } } bool proc = false; if(this->fps != 0 && pktCnt % (int(this->fps / detPara.fpsProc)) == 0) { proc = true; } if(!proc) { if(pktCnt % 60 == 0) spdlog::info("{} pps {}, fps {}, lag {}, skip processing", this->selfId, this->pps, this->fps, this->pktLag); } else { if(detect) { detectMotion(pCodecContext->pix_fmt, pFrame, detect); } } return 0; } void makeEvent(string evtType, int ts) { json p; p["type"] = EV_MSG_TYPE_AI_MOTION; p["gid"] = selfId; p["event"] = evtType; //EV_MSG_EVENT_MOTION_END; p["ts"] = ts; spdlog::info("{} packet ts delta: {}", selfId, packetTsDelta); evtQueue->push(p.dump()); if(evtQueue->size() > MAX_EVENT_QUEUE_SIZE*2) { evtQueue->pop(); } } void detectMotion(AVPixelFormat format,AVFrame *pFrame, bool detect = true) { static bool first = true; static cv::Mat avg; static double _area = 0; static double area = 0; static int frameProcessed = 0; vector > cnts; cv::Mat origin; avcvhelpers::frame2mat(format, pFrame, origin); // check region auto w = origin.size().width; auto h = origin.size().height; // auto config if(detPara.resize == 0) { detPara.resize = detPara.size*1.0/h; }else{ detPara.size = int(h * detPara.resize); } if(detPara.resize > 0 && detPara.resize < 1){ // resize. new w, h caculated w = int(w * detPara.resize); h = int(h * detPara.resize); cv::resize(origin, origin, cv::Size(w,h)); }else{ detPara.resize = 1; } if(detPara.region[0].y == 0 || (detPara.region[0].x == 0 && detPara.region[0].y == 0 && detPara.region[1].x == 1 && detPara.region[1].y == 1)) { // do nothing }else { // crop auto x = (int)(w * detPara.region[0].x); auto y = (int)(h * detPara.region[0].y); w = w * (detPara.region[1].x - detPara.region[0].x); h = h * (detPara.region[1].y - detPara.region[0].y); cv::Rect crop(x,y,w,h); origin = origin(crop); } cv::Mat gray; cv::cvtColor(origin, gray, cv::COLOR_BGR2GRAY); // float fent = avcvhelpers::getEntropy(thresh); cv::GaussianBlur(gray, gray, cv::Size(21, 21), cv::THRESH_BINARY); if(first) { // avg = cv::Mat::zeros(gray.size(), CV_32FC3); avg.release(); avg = gray.clone(); first = false; area = detPara.area == 0 ? (AREA_1080P * detPara.size * detPara.size * 1.0/1080/1080) : detPara.area; spdlog::info("{} resize: {}, area: {}, origin area: {}", selfId, detPara.resize, area, detPara.area); return; } #ifdef DEBUG matShow3 = avg; matShow2 = origin; #endif // packetTm = chrono::system_clock::now(); packetTm = packetTs; // TODO: AVG // cv::accumulateWeighted(gray, avg, 0.5); cv::Mat diff; cv::absdiff(gray, avg, diff); // if(!detect || fent < detPara.entropy) { // return; // } cv::Mat thresh; cv::threshold(diff, thresh, detPara.thre, 255, cv::THRESH_BINARY); //cv::dilate(gray, thresh, cv::Mat(), cv::Point(-1,-1), 2); #ifdef DEBUG matShow1 = thresh.clone(); #endif cv::findContours(thresh, cnts, cv::RETR_EXTERNAL, cv::CHAIN_APPROX_SIMPLE); bool hasEvent = false; static int evtCnt = 0; int i = 0; // avoid vage cases if(area < 5) { area = 5; } for(; i < cnts.size(); i++) { _area = cv::contourArea(cnts[i]); if(_area < area) { // nothing } else { hasEvent = true; evtCnt++; #ifdef DEBUG cv::putText(origin, "motion detected", cv::Point(10, 20), cv::FONT_HERSHEY_SIMPLEX, 0.75, cv::Scalar(0,0,255),2); #endif //spdlog::info("{} motion detected: idx {}, area: {}", selfId, i, _area); break; } } //end for avg.release(); avg = gray.clone(); frameProcessed++; spdlog::debug("{} contours {} area {}, thresh {} hasEvent {}", selfId, cnts.size(), hasEvent? cv::contourArea(cnts[i]):0, detPara.area, hasEvent); // business logic for event // auto dura = chrono::duration_cast(packetTm - evtStartTmLast).count(); long long dura = 0; if(evtStartTmLast != 0) { dura = packetTm - evtStartTmLast; }else{ evtStartTmLast = packetTm; return; } static long long pktDelt = 0; if(pktCnt %18 == 0){ auto tmp = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); pktDelt = tmp - packetTm; } switch(evtState) { case NONE: case PRE: if(dura >= detPara.pre) { auto thr = frameProcessed/2; if(evtCnt <= thr) { // if(evtState == PRE) { // spdlog::info("{} state: PRE->NONE ({}, {}, {}, {}, {}, {})", selfId, this->fps, dura, frameProcessed, evtCnt, thr, _area); // evtState = NONE; // evtStartTmOrig = packetTm - detPara.pre; // }else{ // spdlog::info("state: NONE->NONE ({}, {}, {}, {}, {}, {})", this->fps, dura, frameProcessed, evtCnt, thr, _area); // } spdlog::info("state: NONE->NONE ({}, {}, {}, {}, {}, {}, {})", pktDelt, this->fps, dura, frameProcessed, evtCnt, thr, _area); }else{ // if(evtState == PRE) { // json p; // evtState = IN; // spdlog::info("{} state: PRE->IN ({}, {}, {}, {}, {}, {})", selfId, this->fps, dura, frameProcessed, evtCnt, thr, _area); // makeEvent(EV_MSG_EVENT_MOTION_START, evtStartTmOrig); // auto tmp = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); // packetTsDelta = tmp - packetTm; // spdlog::info("{} packet ts delta: {}", selfId, packetTsDelta); // }else{ // evtState = PRE; // spdlog::info("{} state: NONE->PRE ({}, {}, {}, {}, {}, {})", selfId, this->fps, dura, frameProcessed, evtCnt, thr, _area); // evtStartTmOrig = packetTm; // } json p; evtState = IN; evtStartTmOrig = packetTm - detPara.pre; spdlog::info("{} state: NONE->IN ({}, {}, {}, {}, {}, {}, {})", selfId, pktDelt, this->fps, dura, frameProcessed, evtCnt, thr, _area); makeEvent(EV_MSG_EVENT_MOTION_START, evtStartTmOrig); } evtCnt = 0; frameProcessed = 0; evtStartTmLast = packetTm; } break; case IN: { if( packetTm - evtStartTmOrig > 60 *detPara.maxDuration ) { evtStartTmOrig = packetTm; makeEvent(EV_MSG_EVENT_MOTION_END, packetTs); makeEvent(EV_MSG_EVENT_MOTION_START, packetTs); spdlog::warn("{} event video continued over {} minutes, force segmenting and continue", devSn, detPara.maxDuration); spdlog::debug("{} state: IN->IN ({}, {})",selfId, dura, evtCnt); evtCnt = 0; frameProcessed = 0; evtStartTmLast = packetTm; }else if(dura >= detPara.post/2){ auto thr = frameProcessed/5; if(evtCnt <= thr){ evtState = POST; spdlog::info("{} state: IN->POST ({}, {}, {}, {}, {}, {}, {})", selfId, pktDelt, this->fps, dura, frameProcessed, evtCnt, thr, _area); }else{ spdlog::info("{} state: IN->IN ({}, {}, {}, {}, {}, {}, {})", selfId, pktDelt, this->fps, dura, frameProcessed, evtCnt, thr, _area); } evtCnt = 0; frameProcessed = 0; evtStartTmLast = packetTm; } break; } case POST: { if(dura >= detPara.post/2){ auto thr = frameProcessed/5; if(evtCnt <= thr){ spdlog::info("{} state: POST->NONE ({}, {}, {}, {}, {}, {}, {})",selfId, pktDelt, this->fps, dura, frameProcessed, evtCnt, thr, _area); evtState = NONE; makeEvent(EV_MSG_EVENT_MOTION_END, packetTs - detPara.post/2); }else{ spdlog::info("{} state: POST->IN ({}, {}, {}, {}, {}, {}, {})",selfId, pktDelt, this->fps, dura, frameProcessed, evtCnt, thr, _area); evtState = IN; } evtCnt = 0; frameProcessed = 0; evtStartTmLast = packetTm; } break; } } } protected: void run() { bool bStopSig = false; zmq_msg_t msg; AVPacket packet; json eventToSlicer; // eventToSlicer["type"] = "event"; // eventTOSlicer["extraInfo"] = json(); //array // eventToSlicer["start"] // eventToSlicer["end"] eventToSlicer["sender"] = selfId; //event relay thread: motion to slicer and sn:evdaemon:0 thEvent = thread([&,this]() { json meta; meta["type"] = EV_MSG_META_EVENT; string metaType = meta.dump(); string daemonId = this->devSn + ":evdaemon:0"; int ret = 0; vector > v = {str2body(this->slicerGid), str2body(metaType), str2body("")}; vector > v1 = {str2body(daemonId), str2body(metaType), str2body("")}; while(true) { if(!this->evtQueue->empty()) { // send to evslicer string evt = this->evtQueue->front(); json jevt = json::parse(evt); this->evtQueue->pop(); if(jevt["event"] == EV_MSG_EVENT_MOTION_START) { eventToSlicer["type"] = "event"; eventToSlicer["start"] = jevt["ts"]; eventToSlicer["extraInfo"] = json(); //array eventToSlicer["extraInfo"].push_back(jevt); // TODO: save and load saved evt on crash } else if(jevt["event"] == EV_MSG_EVENT_MOTION_END) { eventToSlicer["end"] = jevt["ts"]; eventToSlicer["extraInfo"].push_back(jevt); v[2] = str2body(eventToSlicer.dump()); ret = z_send_multiple(this->pDealer, v); if(ret < 0) { spdlog::error("{} failed to send event {} to {}: {}", this->selfId, eventToSlicer.dump(), this->slicerGid, zmq_strerror(zmq_errno())); } else { spdlog::info("{} sent event to {}: {}", this->selfId, this->slicerGid, eventToSlicer.dump()); } eventToSlicer.clear(); } else { spdlog::error("{} unknown event to {}: {}", this->selfId, this->slicerGid, eventToSlicer.dump()); } // send to evdaemon v1[2] = str2body(evt); ret = z_send_multiple(this->pDaemon, v1); if(ret < 0) { spdlog::error("{} failed to send event {} to {}: {}", this->selfId, evt, daemonId, zmq_strerror(zmq_errno())); } else { spdlog::info("{} sent event to {}: {}", this->selfId, daemonId, evt); } } else { this_thread::sleep_for(chrono::seconds(3)); } } }); thEvent.detach(); AVFrame *pFrame = av_frame_alloc(); if (!pFrame) { spdlog::error("{} failed to allocated memory for AVFrame", selfId); // TODO: message report to cloud exit(1); } auto start = chrono::system_clock::now(); auto pktCntLast = pktCnt; bool bStatsSent = false; while(true) { if(checkStop() == true) { bStopSig = true; break; } // business logic int ret =zmq_msg_init(&msg); ret = zmq_recvmsg(pSub, &msg, 0); if(ret < 0) { spdlog::error("{} failed to recv zmq msg: {}", selfId, zmq_strerror(ret)); continue; } ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet, &packetTs); { if (ret < 0) { spdlog::error("{} packet decode failed: {}", selfId, ret); continue; } } zmq_msg_close(&msg); static int _fpsDetectCnt = 0; static auto _pts = packet.pts; if(pAVFormatInput->streams[packet.stream_index]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && (this->fps == 0|| pktCnt % 250 == 0)){ // static auto num = pAVFormatInput->streams[packet.stream_index]->time_base.num;s // static auto den = pAVFormatInput->streams[packet.stream_index]->time_base.den; // _fpsDetectCnt++; // if(_fpsDetectCnt < 20){ // if(_fpsDetectCnt == 1) { // _pts = packet.pts; // } // } // else{ // _pts = packet.pts - _pts; // auto ts = _pts * 1.0* num/den; // this->fps = int(20/ts)+5; // if(this->fps <=5) { // this->fps = 18; // } // spdlog::debug("{} calc fps in 20 frame: ts: {}, fps: {}", selfId, ts, this->fps); // _fpsDetectCnt = 0; // _pts = packet.pts; // } auto _fps = av_q2d(pAVFormatInput->streams[packet.stream_index]->r_frame_rate); this->fps = this->fps > _fps? this->fps:_fps; } if(pktCnt % EV_LOG_PACKET_CNT == 0) { spdlog::info("{} seq: {}, pts: {}, dts: {}, idx: {}", selfId, pktCnt, packet.pts, packet.dts, packet.stream_index); } pktCnt++; if (packet.stream_index == streamIdx) { spdlog::debug("AVPacket.pts {}", packet.pts); if(pktCnt < NUM_PKT_IGNORE && gFirst) { // protecting against overflow ret = decode_packet(false, &packet, pCodecCtx, pFrame); } else { gFirst = false; ret = decode_packet(true, &packet, pCodecCtx, pFrame); } } av_packet_unref(&packet); if (ret < 0) { // TODO: report message to cloud string msg = fmt::format("{} failed to decode packet", selfId); json meta; json data; data["msg"] = msg; data["modId"] = selfId; data["type"] = EV_MSG_META_TYPE_REPORT; data["catId"] = EV_MSG_REPORT_CATID_AVWRITEPIPE; data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR; data["time"] = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); data["status"] = "active"; meta["type"] = EV_MSG_META_TYPE_REPORT; meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR; z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump()); spdlog::error(msg); } else { if(!bStatsSent) { bStatsSent = true; string msg = fmt::format("{} successfully decode packet", selfId); json meta; json data; data["msg"] = msg; data["modId"] = selfId; data["type"] = EV_MSG_META_TYPE_REPORT; data["catId"] = EV_MSG_REPORT_CATID_AVWRITEPIPE; data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR; data["time"] = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); data["status"] = "recover"; meta["type"] = EV_MSG_META_TYPE_REPORT; meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR; z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump()); spdlog::info(msg); } } if((pktCnt - pktCntLast ) == 18) { auto now = chrono::system_clock::now(); auto delta = chrono::duration_cast(now - start).count(); pktLag = chrono::duration_cast(now.time_since_epoch()).count() - this->packetTs; this->pps = 18.0 * 1000/delta; // if(pktCnt % (180 * 5) == 0) { // spdlog::info("{} metering: 18 packet in {}ms, pps: {}, lag:{}", selfId, delta, pps, pktLag); // } pktCntLast = pktCnt; start = now; } } av_frame_free(&pFrame); } public: EvMLMotion() = delete; EvMLMotion(queue *queue) { evtQueue = queue; const char *strEnv = getenv("DR_PORT"); if(strEnv != nullptr) { drport = strEnv; } strEnv = getenv("PEERID"); if(strEnv != nullptr) { selfId = strEnv; auto v = strutils::split(selfId, ':'); if(v.size() != 3||v[1] != "evmlmotion") { spdlog::error("received invalid gid: {}", selfId); exit(1); } devSn = v[0]; iid = stoi(v[2]); } else { spdlog::error("failed to start. no SN set"); exit(1); } spdlog::info("evmlmotion {} boot", selfId); SingletonProcess self(selfName, iid); if(!self()){ spdlog::error("{} already running. ignore this instance", selfId); exit(0); } // string addr = string("tcp://127.0.0.1:") + drport; int ret = zmqhelper::setupDealer(&pDaemonCtx, &pDaemon, addr, selfId); if(ret != 0) { spdlog::error("{} failed to setup dealer {}", selfId, addr); exit(1); } // setup heath checker thHealdChecker = thread([this]{ int cnt = 0; uint64_t pktCntLast = pktCnt; while(true){ if(cnt >=3) { kill(getpid(), SIGTERM); exit(1); } this_thread::sleep_for(chrono::seconds(30)); if(pktCntLast == pktCnt){ cnt++; }else{ pktCntLast = pktCnt; cnt = 0; } } }); if(thHealdChecker.joinable()){ thHealdChecker.detach(); } ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId); if(ret != 0) { spdlog::error("{} failed to receive configration message {}", selfId, addr); } init(); thCloudMsgHandler = thread([this] { while(true) { auto body = z_recv_multiple(pDaemon,false); if(body.size() == 0) { spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", selfId, zmq_strerror(zmq_errno())); } else { // full proto msg received. this->handleCloudMsg(body); } } }); thCloudMsgHandler.detach(); thEdgeMsgHandler = thread([this] { while(true) { auto body = z_recv_multiple(pDealer,false); if(body.size() == 0) { spdlog::error("evslicer {} failed to receive multiple edge msg: {}", selfId, zmq_strerror(zmq_errno())); } else { // full proto msg received. this->handleEdgeMsg(body); } } }); thEdgeMsgHandler.detach(); getInputFormat(); setupStream(); }; ~EvMLMotion() { if(pSub != nullptr) { int i = 0; zmq_setsockopt(pSub, ZMQ_LINGER, &i, sizeof(i)); zmq_close(pSub); pSub = nullptr; } if(pSubCtx != nullptr) { zmq_ctx_destroy(pSubCtx); pSubCtx = nullptr; } if(pDealer != nullptr) { int i = 0; zmq_setsockopt(pDealer, ZMQ_LINGER, &i, sizeof(i)); zmq_close(pDealer); pDealer = nullptr; } if(pDealerCtx != nullptr) { zmq_ctx_destroy(pSub); pDealerCtx = nullptr; } if(pDaemon != nullptr) { int i = 0; zmq_setsockopt(pDaemon, ZMQ_LINGER, &i, sizeof(i)); zmq_close(pDaemon); pDaemon = nullptr; } if(pDaemonCtx != nullptr) { zmq_ctx_destroy(pDaemonCtx); pDaemonCtx = nullptr; } }; }; int main(int argc, const char *argv[]) { spdlog::set_level(spdlog::level::info); av_log_set_level(AV_LOG_ERROR); queue evtQueue; EvMLMotion es(&evtQueue); #ifdef DEBUG es.detach(); cv::namedWindow( "Display window", cv::WINDOW_AUTOSIZE ); spdlog::info("running in debug mode"); while(true) { if(gFirst) { this_thread::sleep_for(chrono::seconds(5)); spdlog::info("not initized, waiting"); continue; } cv::imshow("evmlmotion1", matShow1); cv::imshow("evmlmotion2", matShow2); cv::imshow("evmlmotion3", matShow3); if(cv::waitKey(200) == 27) { break; } } #else es.join(); #endif return 0; }