提交 001c6a1e authored 作者: blu's avatar blu

bugfix: singleton process

上级 99764402
......@@ -19,12 +19,14 @@ update: 2019/09/10
#include "inc/tinythread.hpp"
#include "inc/common.hpp"
#include "inc/database.h"
#include "singprocess.hpp"
using namespace std;
using namespace zmqhelper;
class EvMgr:public TinyThread {
private:
string selfName = "evmgr";
void *pRouterCtx = nullptr;
void *pRouter = nullptr;
void *pCtxDealer = nullptr, *pDealer = nullptr;
......@@ -61,19 +63,19 @@ private:
string daemonId = this->devSn + ":evdaemon:0";
if(peerId == daemonId) {
if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) {
spdlog::info("evmgr {} received {} cmd from cluster mgr {}", devSn, metaValue, daemonId);
spdlog::info("{} received {} cmd from cluster mgr {}", devSn, metaValue, daemonId);
bProcessed = true;
exit(0);
}
}
}
catch(exception &e) {
spdlog::error("evmgr {} exception to process msg {}: {}", devSn, msg, e.what());
spdlog::error("{} exception to process msg {}: {}", devSn, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evmgr {} received msg having no implementation from peer: {}", devSn, msg);
spdlog::error("{} received msg having no implementation from peer: {}", devSn, msg);
}
return ret;
......@@ -91,15 +93,15 @@ private:
try {
//
spdlog::info("evmgr boot configuration: {} -> {}", devSn, config.dump());
spdlog::info("boot configuration: {} -> {}", devSn, config.dump());
if(config["proto"] != "zmq") {
spdlog::warn("evmgr {} unsupported protocol: {}, try fallback to zmq instead now...", devSn, config["proto"].get<string>());
spdlog::warn("{} unsupported protocol: {}, try fallback to zmq instead now...", devSn, config["proto"].get<string>());
}
//
if(config["addr"].get<string>() == "*" || config["addr"].get<string>() == "0.0.0.0") {
spdlog::error("evmgr invalid mgr address: {} in config:\n{}", config["addr"].get<string>(), config.dump());
spdlog::error("invalid mgr address: {} in config:\n{}", config["addr"].get<string>(), config.dump());
goto error_exit;
}
......@@ -126,10 +128,10 @@ private:
zmq_setsockopt (pRouter, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify));
ret = zmq_bind(pRouter, addr.c_str());
if(ret < 0) {
spdlog::error("evmgr {} failed to bind zmq at {} for reason: {}, retrying load configuration...", devSn, addr, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to bind zmq at {} for reason: {}, retrying load configuration...", devSn, addr, zmq_strerror(zmq_errno()));
goto error_exit;
}
spdlog::info("evmgr {} bind success to {}", devSn, addr);
spdlog::info("{} bind success to {}", devSn, addr);
inited = true;
error_exit:
if(inited) {
......@@ -139,7 +141,7 @@ error_exit:
}
}
catch(exception &e) {
spdlog::error("evmgr {} exception on init() for: {}. abort booting up.", devSn, e.what());
spdlog::error("{} exception on init() for: {}. abort booting up.", devSn, e.what());
exit(1);
}
......@@ -158,7 +160,7 @@ error_exit:
});
thCloudMsgHandler.detach();
spdlog::info("evmgr {} successfuly inited", devSn);
spdlog::info("{} successfuly inited", devSn);
}
int mqErrorMsg(string cls, string devSn, string extraInfo, int ret)
......@@ -188,41 +190,41 @@ error_exit:
}
json *mod = LVDB::findConfigModule(config, sp[0], sp[1], stoi(sp[2]));
if(mod == nullptr) {
spdlog::warn("evmgr {} failed to find the connecting/disconnecting module with id {} in config. please check if it was terminated correctly", devSn, selfId);
spdlog::warn("{} failed to find the connecting/disconnecting module with id {} in config. please check if it was terminated correctly", devSn, selfId);
return -1;
}
if(peerData["status"].count(selfId) == 0||peerData["status"][selfId] == 0) {
peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evmgr {} peer connected: {}", devSn, selfId);
spdlog::info("{} peer connected: {}", devSn, selfId);
eventConn = true;
if(cachedMsg.count(selfId) != 0) {
spdlog::info("evmgr {}, send cached msg to {}", devSn, selfId);
spdlog::info("{}, send cached msg to {}", devSn, selfId);
while(!cachedMsg[selfId].empty()) {
lock_guard<mutex> lock(cacheLock);
auto v = cachedMsg[selfId].front();
cachedMsg[selfId].pop();
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
}
}
else {
peerData["status"][selfId] = 0;
spdlog::warn("evmgr {} peer disconnected: {}", devSn, selfId);
spdlog::warn("{} peer disconnected: {}", devSn, selfId);
}
if(ret < 0) {
spdlog::error("evmgr {} failed to update localconfig", devSn);
spdlog::error("{} failed to update localconfig", devSn);
}
return 0;
}
else if(body.size() != 4) {
spdlog::warn("evmgr {} dropped an invalid message, size: {}", devSn, body.size());
spdlog::warn("{} dropped an invalid message, size: {}", devSn, body.size());
return 0;
}
......@@ -240,20 +242,20 @@ error_exit:
// check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerData["status"].count(peerId)!= 0 && peerData["status"][peerId] != 0) {
spdlog::info("evmgr {} route msg from {} to {}", devSn, selfId, peerId);
spdlog::info("{} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
else {
// cache
spdlog::warn("evmgr {} cached msg from {} to {}", devSn, selfId, peerId);
spdlog::warn("{} cached msg from {} to {}", devSn, selfId, peerId);
lock_guard<mutex> lock(cacheLock);
cachedMsg[peerId].push(v);
if(cachedMsg[peerId].size() > EV_NUM_CACHE_PERPEER) {
cachedMsg[peerId].pop();
spdlog::info("evmgr {} max msg queue size {} reached for {}, dropped the oldest one.", this->devSn, MAX_EVENT_QUEUE_SIZE, peerId);
spdlog::info("{} max msg queue size {} reached for {}, dropped the oldest one.", this->devSn, MAX_EVENT_QUEUE_SIZE, peerId);
}
}
......@@ -264,7 +266,7 @@ error_exit:
eventQue.push(body2str(body[3]));
if(eventQue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQue.pop();
spdlog::info("evmgr {} max event queue size {} reached, dropped the oldest one.", this->devSn, MAX_EVENT_QUEUE_SIZE);
spdlog::info("{} max event queue size {} reached, dropped the oldest one.", this->devSn, MAX_EVENT_QUEUE_SIZE);
}
}
......@@ -272,14 +274,14 @@ error_exit:
}
catch(exception &e) {
bProcessed = false;
spdlog::error("evmgr {} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what());
spdlog::error("{} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what());
}
}
else {
// message to mgr
// spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
// spdlog::info("{} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
if(meta == "pong"||meta == "ping") {
spdlog::info("evmgr {}, ping msg from {}", devSn, selfId);
spdlog::info("{}, ping msg from {}", devSn, selfId);
if(meta=="ping") {
if(cachedMsg.find(selfId) != cachedMsg.end()) {
while(!cachedMsg[selfId].empty()) {
......@@ -288,7 +290,7 @@ error_exit:
cachedMsg[selfId].pop();
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
}
......@@ -309,10 +311,10 @@ error_exit:
broadCastMsg[0] = str2body(k);
ret = z_send_multiple(pRouter, broadCastMsg);
if(ret < 0) {
spdlog::error("evmgr {} failed to broadcast msg from {} because {}. msg meta: {}", devSn, selfId, zmq_strerror(zmq_errno()), meta);
spdlog::error("{} failed to broadcast msg from {} because {}. msg meta: {}", devSn, selfId, zmq_strerror(zmq_errno()), meta);
}
else {
spdlog::info("evmgr {} successfully broadcast msg from {} to {}. msg meta: {}", devSn, selfId, k, meta);
spdlog::info("{} successfully broadcast msg from {} to {}. msg meta: {}", devSn, selfId, k, meta);
}
}
}
......@@ -326,13 +328,13 @@ error_exit:
}
catch(exception &e) {
bProcessed = false;
spdlog::error("evmgr {} exception process msg from {} with meta {}: {}", devSn, selfId, meta, e.what());
spdlog::error("{} exception process msg from {} with meta {}: {}", devSn, selfId, meta, e.what());
}
}
}
if(!bProcessed) {
spdlog::warn("evmgr {} failed process msg from {}: {}", devSn, selfId, meta);
spdlog::warn("{} failed process msg from {}: {}", devSn, selfId, meta);
}
return ret;
......@@ -349,13 +351,13 @@ protected:
}
// if(1 == getppid()) {
// spdlog::error("evmgr {} exit since evdaemon is dead", devSn);
// spdlog::error("{} exit since evdaemon is dead", devSn);
// exit(1);
// }
auto body = z_recv_multiple(pRouter,false);
if(body.size() == 0) {
spdlog::error("evmgr {} failed to receive multiple msg: {}", devSn, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to receive multiple msg: {}", devSn, zmq_strerror(zmq_errno()));
continue;
}
// full proto msg received.
......@@ -381,31 +383,36 @@ public:
ident = strEnv;
auto v = strutils::split(ident, ':');
if(v.size() != 3||v[1] != "evmgr" || v[2] != "0") {
spdlog::error("evmgr received invalid gid: {}", ident);
spdlog::error("received invalid gid: {}", ident);
exit(1);
}
devSn = v[0];
}
else {
spdlog::error("evmgr failed to start. no SN set");
spdlog::error("failed to start. no SN set");
exit(1);
}
spdlog::info("evmgr {} boot", devSn);
spdlog::info("{} boot", devSn);
SingletonProcess self(selfName, 0);
if(!self()){
spdlog::error("{} already running. ignore this instance", selfName);
exit(0);
}
//
string addr = string("tcp://127.0.0.1:") + drport;;
ident = devSn + ":evmgr:0";
int ret = zmqhelper::setupDealer(&pCtxDealer, &pDealer, addr, ident);
if(ret != 0) {
spdlog::error("evmgr {} failed to setup dealer {}", devSn, addr);
spdlog::error("{} failed to setup dealer {}", devSn, addr);
exit(1);
}
ret = zmqhelper::recvConfigMsg(pDealer, config, addr, ident);
if(ret != 0) {
spdlog::error("evmgr {} failed to receive configration message {}", devSn, addr);
spdlog::error("{} failed to receive configration message {}", devSn, addr);
}
init();
......
......@@ -23,6 +23,7 @@ update: 2019/09/10
#include "common.hpp"
#include "avcvhelpers.hpp"
#include "database.h"
#include "singprocess.hpp"
using namespace std;
using namespace zmqhelper;
......@@ -59,6 +60,7 @@ enum EventState {
class EvMLMotion: public TinyThread {
private:
string selfName = "evmlmotion";
void *pSubCtx = nullptr, *pDealerCtx = nullptr; // for packets relay
void *pSub = nullptr, *pDealer = nullptr, *pDaemonCtx = nullptr, *pDaemon = nullptr;
string urlOut, urlPub, urlRouter, devSn, mgrSn, selfId, pullerGid, slicerGid;
......@@ -108,19 +110,19 @@ private:
string daemonId = this->devSn + ":evdaemon:0";
if(peerId == daemonId) {
if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) {
spdlog::info("evmlmotion {} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
spdlog::info("{} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
bProcessed = true;
exit(0);
}
}
}
catch(exception &e) {
spdlog::error("evmlmotion {} exception to process msg {}: {}", selfId, msg, e.what());
spdlog::error("{} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evmlmotion {} received msg having no implementation from peer: {}", selfId, msg);
spdlog::error("{} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
......@@ -162,8 +164,8 @@ private:
cvMsg.notify_one();
}
else {
spdlog::warn("evmlmotion {} received avformatctx msg from {}, but already proceessed before, restarting", selfId, peerId);
spdlog::error("evmlmotion {} restart since reinit", selfId);
spdlog::warn("{} received avformatctx msg from {}, but already proceessed before, restarting", selfId, peerId);
spdlog::error("{} restart since reinit", selfId);
exit(0);
}
bProcessed = true;
......@@ -171,12 +173,12 @@ private:
}
}
catch(exception &e) {
spdlog::error("evmlmotion {} exception to process msg {}: {}", selfId, msg, e.what());
spdlog::error("{} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evmlmotion {} received msg having no implementation from peer: {}", selfId, msg);
spdlog::error("{} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
......@@ -187,7 +189,7 @@ private:
int ret = 0;
bool found = false;
try {
spdlog::info("evmlmotion boot config: {} -> {}", selfId, config.dump());
spdlog::info("{} boot config: {}", selfId, config.dump());
json evmlmotion;
json &evmgr = this->config;
json ipc;
......@@ -212,7 +214,7 @@ private:
}
if(!found) {
spdlog::error("evmlmotion {}: no valid config found. retrying load config...", devSn);
spdlog::error("{}: no valid config found. retrying load config...", devSn);
exit(1);
}
// TODO: currently just take the first puller, but should test connectivity
......@@ -242,11 +244,11 @@ private:
urlPub = string("tcp://") + evpuller["addr"].get<string>() + ":" + to_string(portPub);
urlRouter = string("tcp://") + evmgr["addr"].get<string>() + ":" + to_string(portRouter);
spdlog::info("evmlmotion {} will connect to {} for sub, {} for router", selfId, urlPub, urlRouter);
spdlog::info("{} will connect to {} for sub, {} for router", selfId, urlPub, urlRouter);
// TODO: multiple protocols support
if(evmlmotion.count("path") == 0) {
spdlog::info("evmlmotion {} no params for path, using default: {}", selfId, URLOUT_DEFAULT);
spdlog::info("{} no params for path, using default: {}", selfId, URLOUT_DEFAULT);
urlOut = URLOUT_DEFAULT;
}
else {
......@@ -261,49 +263,49 @@ private:
// detection params
if(evmlmotion.count("thresh") == 0||evmlmotion["thresh"] < 10 ||evmlmotion["thresh"] >= 255) {
spdlog::info("evmlmotion {} invalid thresh value. should be in (10,255), default to {}", selfId, detPara.thre);
spdlog::info("{} invalid thresh value. should be in (10,255), default to {}", selfId, detPara.thre);
}
else {
detPara.thre = evmlmotion["thresh"];
}
if(evmlmotion.count("area") == 0||evmlmotion["area"] < 10 ||evmlmotion["area"] >= int(FRAME_SIZE*FRAME_SIZE)*9/10) {
spdlog::info("evmlmotion {} invalid area value. should be in (10, 500*500*9/10), default to {}", selfId, detPara.area);
spdlog::info("{} invalid area value. should be in (10, 500*500*9/10), default to {}", selfId, detPara.area);
}
else {
detPara.area = evmlmotion["area"];
}
if(evmlmotion.count("pre") == 0||evmlmotion["pre"] < 1 ||evmlmotion["pre"] >= 120) {
spdlog::info("evmlmotion {} invalid pre value. should be in (1, 120), default to {}", selfId, detPara.pre);
spdlog::info("{} invalid pre value. should be in (1, 120), default to {}", selfId, detPara.pre);
}
else {
detPara.pre = evmlmotion["pre"];
}
if(evmlmotion.count("post") == 0||evmlmotion["post"] < 6 ||evmlmotion["post"] >= 120) {
spdlog::info("evmlmotion {} invalid post value. should be in (6, 120), default to {}", selfId, detPara.post);
spdlog::info("{} invalid post value. should be in (6, 120), default to {}", selfId, detPara.post);
}
else {
detPara.post = evmlmotion["post"];
}
if(evmlmotion.count("entropy") == 0||evmlmotion["entropy"] < 0 || evmlmotion["entropy"] >= 10) {
spdlog::info("evmlmotion {} invalid entropy value. should be in (0, 10), default to {}", selfId, detPara.entropy);
spdlog::info("{} invalid entropy value. should be in (0, 10), default to {}", selfId, detPara.entropy);
}
else {
detPara.entropy = evmlmotion["entropy"];
}
if(evmlmotion.count("fpsProc") == 0|| !evmlmotion["fpsProc"].is_number_integer() ||evmlmotion["fpsProc"] < 0 || evmlmotion["fpsProc"] >= 40) {
spdlog::info("evmlmotion {} invalid fpsProc value. should be in (0, 40) as int, default to {}", selfId, detPara.fpsProc);
spdlog::info("{} invalid fpsProc value. should be in (0, 40) as int, default to {}", selfId, detPara.fpsProc);
}
else {
detPara.fpsProc = evmlmotion["fpsProc"];
}
if(evmlmotion.count("maxDuration") == 0|| !evmlmotion["maxDuration"].is_number_integer() ||evmlmotion["maxDuration"] <= 1 || evmlmotion["maxDuration"] >= 100) {
spdlog::info("evmlmotion {} invalid maxDuration value. should be in (0, 100) as int(minutes), default to {}", selfId, detPara.maxDuration);
spdlog::info("{} invalid maxDuration value. should be in (0, 100) as int(minutes), default to {}", selfId, detPara.maxDuration);
}
else {
detPara.maxDuration = evmlmotion["maxDuration"];
......@@ -315,7 +317,7 @@ private:
region.count("minY") == 0||!region["minY"].is_number() ||
region.count("maxX") == 0||!region["maxX"].is_number() ||
region.count("maxY") == 0||!region["maxY"].is_number()){
spdlog::error("evmlmotion {} invalid region config: format. ignored", selfId);
spdlog::error("{} invalid region config: format. ignored", selfId);
}else{
cv::Point2f p1, p2;
const float EVML_MARGIN_F = 0.0001;
......@@ -327,21 +329,21 @@ private:
if(p1.x < 0 || p1.x>=1 || p1.y <=0 || p1.y > 1||
p2.x <=0 || p2.x > 1 || p2.y <=0 || p2.y >1 || p1.x >= p2.x || p1.y >= p2.y) {
spdlog::error("evmlmotion {} invalid region config: invalid value range. ignored", selfId);
spdlog::error("{} invalid region config: invalid value range. ignored", selfId);
}else{
detPara.region[0] = p1;
detPara.region[1] = p2;
spdlog::info("evmlmotion {} region: {} {}, {} {}", selfId, p1.x, p1.y, p2.x, p2.y);
spdlog::info("{} region: {} {}, {} {}", selfId, p1.x, p1.y, p2.x, p2.y);
}
}catch(exception &e) {
spdlog::error("evmlmotion {} failed to parse regoin config: {}. ignored", selfId, e.what());
spdlog::error("{} failed to parse regoin config: {}. ignored", selfId, e.what());
}
}
}else{
spdlog::error("evmlmotion {} no/invalid region config. ignored", selfId);
spdlog::error("{} no/invalid region config. ignored", selfId);
}
spdlog::info("evmlmotion {} detection params: entropy {}, area {}, thresh {}, fpsProc {}", selfId, detPara.entropy, detPara.area, detPara.thre, detPara.fpsProc);
spdlog::info("{} detection params: entropy {}, area {}, thresh {}, fpsProc {}", selfId, detPara.entropy, detPara.area, detPara.thre, detPara.fpsProc);
// setup sub
pSubCtx = zmq_ctx_new();
......@@ -356,19 +358,19 @@ private:
zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret));
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
if(ret != 0) {
spdlog::error("evmlmotion {} failed set setsockopt: {}", selfId, urlPub);
spdlog::error("{} failed set setsockopt: {}", selfId, urlPub);
exit(1);
}
ret = zmq_connect(pSub, urlPub.c_str());
if(ret != 0) {
spdlog::error("evmlmotion {} failed connect pub: {}", selfId, urlPub);
spdlog::error("{} failed connect pub: {}", selfId, urlPub);
exit(1);
}
// setup dealer
pDealerCtx = zmq_ctx_new();
pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER);
spdlog::info("evmlmotion {} connect to router {}", selfId, urlRouter);
spdlog::info("{} connect to router {}", selfId, urlRouter);
ret = 1;
zmq_setsockopt(pDealer, ZMQ_TCP_KEEPALIVE, &ret, sizeof (ret));
ret = 5;
......@@ -377,23 +379,23 @@ private:
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) {
spdlog::error("evmlmotion {} {} failed setsockopts router: {}", selfId, urlRouter);
spdlog::error("{} {} failed setsockopts router: {}", selfId, urlRouter);
exit(1);
}
if(ret < 0) {
spdlog::error("evmlmotion {} failed setsockopts router: {}", selfId, urlRouter);
spdlog::error("{} failed setsockopts router: {}", selfId, urlRouter);
exit(1);
}
ret = zmq_connect(pDealer, urlRouter.c_str());
if(ret != 0) {
spdlog::error("evmlmotion {} failed connect dealer: {}", selfId, urlRouter);
spdlog::error("{} failed connect dealer: {}", selfId, urlRouter);
exit(1);
}
//ping
ret = ping();
}
catch(exception &e) {
spdlog::error("evmlmotion {} exception in EvPuller.init {:s} retrying", selfId, e.what());
spdlog::error("{} exception in EvPuller.init {:s} retrying", selfId, e.what());
exit(1);
}
return 0;
......@@ -411,10 +413,10 @@ private:
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evmlmotion {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
}
else {
spdlog::info("evmlmotion {} sent hello to router: {}", selfId, mgrSn);
spdlog::info("{} sent hello to router: {}", selfId, mgrSn);
}
return ret;
......@@ -425,7 +427,7 @@ private:
int ret = 0;
// req avformatcontext packet
// send hello to puller
spdlog::info("evmlmotion {} send hello to puller: {}", selfId, pullerGid);
spdlog::info("{} send hello to puller: {}", selfId, pullerGid);
vector<vector<uint8_t> > body;
body.push_back(str2body(pullerGid));
json meta;
......@@ -446,7 +448,7 @@ private:
}
else {
// restart
spdlog::error("evmlmotion {} failed wait for avformatctx for {}s, restart", devSn, 30);
spdlog::error("{} failed wait for avformatctx for {}s, restart", devSn, 30);
exit(1);
}
return ret;
......@@ -468,7 +470,7 @@ private:
}
if(streamIdx == -1) {
spdlog::error("evmlmotion {} no video stream found.", selfId);
spdlog::error("{} no video stream found.", selfId);
return -1;
}
......@@ -476,22 +478,22 @@ private:
detPara.fpsIn = (int)(pStream->r_frame_rate.num/pStream->r_frame_rate.den);
AVCodec *pCodec = avcodec_find_decoder(pStream->codecpar->codec_id);
if (pCodec==NULL) {
spdlog::error("evmlmotion {} ERROR unsupported codec!", selfId);
spdlog::error("{} ERROR unsupported codec!", selfId);
return -1;
}
pCodecCtx = avcodec_alloc_context3(pCodec);
if (!pCodecCtx) {
spdlog::error("evmlmotion {} failed to allocated memory for AVCodecContext", selfId);
spdlog::error("{} failed to allocated memory for AVCodecContext", selfId);
return -1;
}
if (avcodec_parameters_to_context(pCodecCtx, pStream->codecpar) < 0) {
spdlog::error("evmlmotion {} failed to copy codec params to codec context", selfId);
spdlog::error("{} failed to copy codec params to codec context", selfId);
return -1;
}
if (avcodec_open2(pCodecCtx, pCodec, NULL) < 0) {
spdlog::error("evmlmotion {} failed to open codec through avcodec_open2", selfId);
spdlog::error("{} failed to open codec through avcodec_open2", selfId);
return -1;
}
......@@ -512,7 +514,7 @@ private:
{
int response = avcodec_send_packet(pCodecContext, pPacket);
if (response < 0) {
spdlog::error("evmlmotion {} Error while sending a packet to the decoder: {}", selfId, av_err2str(response));
spdlog::error("{} Error while sending a packet to the decoder: {}", selfId, av_err2str(response));
return response;
}
......@@ -523,7 +525,7 @@ private:
}
if (response < 0) {
spdlog::error("evmlmotion {} Error while receiving a frame from the decoder: {}", selfId, av_err2str(response));
spdlog::error("{} Error while receiving a frame from the decoder: {}", selfId, av_err2str(response));
return response;
}
else {
......@@ -561,13 +563,13 @@ private:
if(!proc) {
if(this->pps != 0 && (called %180) == 0) {
spdlog::info("evmlmotion {} pps {}, fpsFactor {}, called {}, lag {}, skip processing", this->selfId, this->pps, factor, called, this->pktLag);
spdlog::info("{} pps {}, fpsFactor {}, called {}, lag {}, skip processing", this->selfId, this->pps, factor, called, this->pktLag);
}
// detectMotion(pCodecContext->pix_fmt, pFrame, false);
}
else {
if((called % (180*4)) == 0) {
spdlog::info("evmlmotion {} pps {}, fpsFactor {}, called {}, lag {}", this->selfId, this->pps, factor, called, this->pktLag);
spdlog::info("{} pps {}, fpsFactor {}, called {}, lag {}", this->selfId, this->pps, factor, called, this->pktLag);
}
detectMotion(pCodecContext->pix_fmt, pFrame, detect);
factor = 0; // refresh to latest value
......@@ -586,7 +588,7 @@ private:
p["gid"] = selfId;
p["event"] = evtType; //EV_MSG_EVENT_MOTION_END;
p["ts"] = ts;
spdlog::info("evmlmotion {} packet ts delta: {}", selfId, packetTsDelta);
spdlog::info("{} packet ts delta: {}", selfId, packetTsDelta);
evtQueue->push(p.dump());
if(evtQueue->size() > MAX_EVENT_QUEUE_SIZE*2) {
evtQueue->pop();
......@@ -672,7 +674,7 @@ private:
}
} //end for
spdlog::debug("evmlmotion {} contours {} area {}, thresh {} hasEvent {}", selfId, cnts.size(), hasEvent? cv::contourArea(cnts[i]):0, detPara.area, hasEvent);
spdlog::debug("{} contours {} area {}, thresh {} hasEvent {}", selfId, cnts.size(), hasEvent? cv::contourArea(cnts[i]):0, detPara.area, hasEvent);
// business logic for event
// auto dura = chrono::duration_cast<chrono::seconds>(packetTm - evtStartTmLast).count();
......@@ -737,7 +739,7 @@ private:
makeEvent(EV_MSG_EVENT_MOTION_END, packetTs);
evtCnt = 0;
makeEvent(EV_MSG_EVENT_MOTION_START, packetTs);
spdlog::warn("evmlmotion {} event video continued over {} minutes, force segmenting and continue", devSn, detPara.maxDuration);
spdlog::warn("{} event video continued over {} minutes, force segmenting and continue", devSn, detPara.maxDuration);
}
spdlog::debug("{} state: IN->IN ({}, {})",selfId, dura, evtCnt);
evtCnt = 0;
......@@ -806,25 +808,25 @@ protected:
v[2] = str2body(eventToSlicer.dump());
ret = z_send_multiple(this->pDealer, v);
if(ret < 0) {
spdlog::error("evmlmotion {} failed to send event {} to {}: {}", this->selfId, eventToSlicer.dump(), this->slicerGid, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to send event {} to {}: {}", this->selfId, eventToSlicer.dump(), this->slicerGid, zmq_strerror(zmq_errno()));
}
else {
spdlog::info("evmlmotion {} sent event to {}: {}", this->selfId, this->slicerGid, eventToSlicer.dump());
spdlog::info("{} sent event to {}: {}", this->selfId, this->slicerGid, eventToSlicer.dump());
}
eventToSlicer.clear();
}
else {
spdlog::error("evmlmotion {} unknown event to {}: {}", this->selfId, this->slicerGid, eventToSlicer.dump());
spdlog::error("{} unknown event to {}: {}", this->selfId, this->slicerGid, eventToSlicer.dump());
}
// send to evdaemon
v1[2] = str2body(evt);
ret = z_send_multiple(this->pDaemon, v1);
if(ret < 0) {
spdlog::error("evmlmotion {} failed to send event {} to {}: {}", this->selfId, evt, daemonId, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to send event {} to {}: {}", this->selfId, evt, daemonId, zmq_strerror(zmq_errno()));
}
else {
spdlog::info("evmlmotion {} sent event to {}: {}", this->selfId, daemonId, evt);
spdlog::info("{} sent event to {}: {}", this->selfId, daemonId, evt);
}
}
......@@ -838,7 +840,7 @@ protected:
AVFrame *pFrame = av_frame_alloc();
if (!pFrame) {
spdlog::error("evmlmotion {} failed to allocated memory for AVFrame", selfId);
spdlog::error("{} failed to allocated memory for AVFrame", selfId);
// TODO: message report to cloud
exit(1);
}
......@@ -856,19 +858,19 @@ protected:
int ret =zmq_msg_init(&msg);
ret = zmq_recvmsg(pSub, &msg, 0);
if(ret < 0) {
spdlog::error("evmlmotion {} failed to recv zmq msg: {}", selfId, zmq_strerror(ret));
spdlog::error("{} failed to recv zmq msg: {}", selfId, zmq_strerror(ret));
continue;
}
ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet, &packetTs);
{
if (ret < 0) {
spdlog::error("evmlmotion {} packet decode failed: {}", selfId, ret);
spdlog::error("{} packet decode failed: {}", selfId, ret);
continue;
}
}
zmq_msg_close(&msg);
if(pktCnt % EV_LOG_PACKET_CNT == 0) {
spdlog::info("evmlmotion {} seq: {}, pts: {}, dts: {}, idx: {}", selfId, pktCnt, packet.pts, packet.dts, packet.stream_index);
spdlog::info("{} seq: {}, pts: {}, dts: {}, idx: {}", selfId, pktCnt, packet.pts, packet.dts, packet.stream_index);
}
pktCnt++;
......@@ -886,7 +888,7 @@ protected:
av_packet_unref(&packet);
if (ret < 0) {
// TODO: report message to cloud
string msg = fmt::format("evmlmotion {} failed to decode packet", selfId);
string msg = fmt::format("{} failed to decode packet", selfId);
json meta;
json data;
data["msg"] = msg;
......@@ -904,7 +906,7 @@ protected:
else {
if(!bStatsSent) {
bStatsSent = true;
string msg = fmt::format("evmlmotion {} successfully decode packet", selfId);
string msg = fmt::format("{} successfully decode packet", selfId);
json meta;
json data;
data["msg"] = msg;
......@@ -927,7 +929,7 @@ protected:
pktLag = chrono::duration_cast<chrono::seconds>(now.time_since_epoch()).count() - this->packetTs;
this->pps = 18.0 * 1000/delta;
// if(pktCnt % (180 * 5) == 0) {
// spdlog::info("evmlmotion {} metering: 18 packet in {}ms, pps: {}, lag:{}", selfId, delta, pps, pktLag);
// spdlog::info("{} metering: 18 packet in {}ms, pps: {}, lag:{}", selfId, delta, pps, pktLag);
// }
pktCntLast = pktCnt;
......@@ -952,30 +954,35 @@ public:
selfId = strEnv;
auto v = strutils::split(selfId, ':');
if(v.size() != 3||v[1] != "evmlmotion") {
spdlog::error("evmlmotion received invalid gid: {}", selfId);
spdlog::error("received invalid gid: {}", selfId);
exit(1);
}
devSn = v[0];
iid = stoi(v[2]);
}
else {
spdlog::error("evmlmotion failed to start. no SN set");
spdlog::error("failed to start. no SN set");
exit(1);
}
spdlog::info("evmlmotio {} boot", selfId);
SingletonProcess self(selfName, iid);
if(!self()){
spdlog::error("{} already running. ignore this instance", selfId);
exit(0);
}
//
string addr = string("tcp://127.0.0.1:") + drport;
int ret = zmqhelper::setupDealer(&pDaemonCtx, &pDaemon, addr, selfId);
if(ret != 0) {
spdlog::error("evmlmotion {} failed to setup dealer {}", devSn, addr);
spdlog::error("{} failed to setup dealer {}", devSn, addr);
exit(1);
}
ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId);
if(ret != 0) {
spdlog::error("evmlmotion {} failed to receive configration message {}", devSn, addr);
spdlog::error("{} failed to receive configration message {}", devSn, addr);
}
init();
......
......@@ -18,12 +18,14 @@ update: 2019/09/10
#include "inc/tinythread.hpp"
#include "inc/common.hpp"
#include "inc/database.h"
#include "singprocess.hpp"
using namespace std;
using namespace zmqhelper;
class EvPuller: public TinyThread {
private:
string selfName = "evpuller";
void *pPubCtx = nullptr; // for packets publishing
void *pPub = nullptr;
void *pDealerCtx = nullptr;
......@@ -65,19 +67,19 @@ private:
string daemonId = this->devSn + ":evdaemon:0";
if(peerId == daemonId) {
if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) {
spdlog::info("evpuller {} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
spdlog::info("{} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
bProcessed = true;
exit(0);
}
}
}
catch(exception &e) {
spdlog::error("evpuller {} exception to process msg {}: {}", selfId, msg, e.what());
spdlog::error("{} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evpuller {} received msg having no implementation from peer: {}", selfId, msg);
spdlog::error("{} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
......@@ -99,10 +101,10 @@ private:
vector<vector<uint8_t> > rep = {str2body(peerId), str2body(meta.dump()), msgBody};
int ret = z_send_multiple(pDealer, rep);
if(ret < 0) {
spdlog::error("evpuller {} failed to send avformatctx data to requester {}: {}", selfId, peerId, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to send avformatctx data to requester {}: {}", selfId, peerId, zmq_strerror(zmq_errno()));
}
else {
spdlog::info("evpuller {} success to send avformatctx data to requester {}", selfId, peerId);
spdlog::info("{} success to send avformatctx data to requester {}", selfId, peerId);
}
}
......@@ -114,7 +116,7 @@ private:
this->cvMsg.wait(lk, [this] {return this->gotFormat;});
}
spdlog::info("evpuller {} got inputformat", selfId);
spdlog::info("{} got inputformat", selfId);
try {
// rep framectx
// TODO: verify sender id?
......@@ -126,14 +128,14 @@ private:
}
else if(meta["type"].get<string>() == EV_MSG_META_EVENT) {
// event msg
spdlog::info("evpuller {} received event: {}", selfId, body2str(v[2]));
spdlog::info("{} received event: {}", selfId, body2str(v[2]));
}
else {
spdlog::error("evpuller {} unknown meta from {}: {}", selfId, body2str(v[0]), body2str(v[1]));
spdlog::error("{} unknown meta from {}: {}", selfId, body2str(v[0]), body2str(v[1]));
}
}
catch(exception &e) {
spdlog::error("evpuller {} excpetion parse request from {}: {}", selfId, body2str(v[0]), body2str(v[1]));
spdlog::error("{} excpetion parse request from {}: {}", selfId, body2str(v[0]), body2str(v[1]));
}
return ret;
......@@ -146,7 +148,7 @@ private:
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpuller {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
}
return ret;
}
......@@ -158,7 +160,7 @@ private:
bool found = false;
string user, passwd, addr;
try {
spdlog::info("evpuller boot config: {} -> {}", selfId, config.dump());
spdlog::info("{} boot config: {}", selfId, config.dump());
json evpuller;
json &evmgr = this->config;
json ipc;
......@@ -183,7 +185,7 @@ private:
if(!found) {
spdlog::error("evpuller {} no valid config found", devSn);
spdlog::error("{} no valid config found", devSn);
exit(1);
}
......@@ -221,9 +223,9 @@ private:
}
addr = evpuller["addr"].get<string>();
spdlog::info("evpuller {} connecting to IPC {}", selfId, urlIn);
spdlog::info("{} connecting to IPC {}", selfId, urlIn);
if(addr == "*" || addr == "0.0.0.0") {
spdlog::error("evpuller {} invalid addr {} for pub", selfId, evpuller.dump());
spdlog::error("{} invalid addr {} for pub", selfId, evpuller.dump());
exit(1);
}
......@@ -245,13 +247,13 @@ private:
urlPub = string("tcp://*:") + to_string(portPub);
urlDealer = "tcp://" + evmgr["addr"].get<string>() + string(":") + to_string(portRouter);
spdlog::info("evpuller {} bind on {} for pub, connect to {} for dealer", selfId, urlPub, urlDealer);
spdlog::info("{} bind on {} for pub, connect to {} for dealer", selfId, urlPub, urlDealer);
pPubCtx = zmq_ctx_new();
pPub = zmq_socket(pPubCtx, ZMQ_PUB);
ret = zmq_bind(pPub, urlPub.c_str());
if(ret < 0) {
spdlog::error("evpuller {} failed to bind to {}", selfId, urlPub);
spdlog::error("{} failed to bind to {}", selfId, urlPub);
exit(1);
}
pDealerCtx = zmq_ctx_new();
......@@ -266,17 +268,17 @@ private:
zmq_setsockopt(pDealer, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret));
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
if(ret < 0) {
spdlog::error("evpuller {} failed to set identity", selfId);
spdlog::error("{} failed to set identity", selfId);
exit(1);
}
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) {
spdlog::error("evpuller {} {} failed setsockopts router: {}", selfId, urlDealer);
spdlog::error("{} {} failed setsockopts router: {}", selfId, urlDealer);
exit(1);
}
ret = zmq_connect(pDealer, urlDealer.c_str());
if(ret < 0) {
spdlog::error("evpuller {} failed to connect to router {}", selfId, urlDealer);
spdlog::error("{} failed to connect to router {}", selfId, urlDealer);
exit(1);
}
......@@ -284,12 +286,12 @@ private:
}
catch(exception &e) {
this_thread::sleep_for(chrono::seconds(3));
spdlog::error("evpuller {} exception in EvPuller.init {:s}, retrying... ", selfId, e.what());
spdlog::error("{} exception in EvPuller.init {:s}, retrying... ", selfId, e.what());
exit(1);
}
inited = true;
spdlog::info("evpuller successfully load config");
spdlog::info("successfully load config");
return 0;
}
......@@ -313,9 +315,9 @@ protected:
// av_dict_set(&optsIn, "user-agent", userAgent, 0);
spdlog::info("evpuller {} openning stream: {}", selfId, urlIn);
spdlog::info("{} openning stream: {}", selfId, urlIn);
if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, &optsIn)) < 0) {
string msg = fmt::format("evpuller {} Could not open input stream {}: {}", selfId, urlIn, av_err2str(ret));
string msg = fmt::format("{} Could not open input stream {}: {}", selfId, urlIn, av_err2str(ret));
json meta;
json data;
data["msg"] = msg;
......@@ -333,7 +335,7 @@ protected:
exit(1);
}
else {
string msg = fmt::format("evpuller {} successfully openned input stream {}", selfId, urlIn);
string msg = fmt::format("{} successfully openned input stream {}", selfId, urlIn);
json meta;
json data;
data["msg"] = msg;
......@@ -349,9 +351,9 @@ protected:
spdlog::info(msg);
}
spdlog::info("evpuller {} finding stream info: {}", selfId, urlIn);
spdlog::info("{} finding stream info: {}", selfId, urlIn);
if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) {
spdlog::error("evpuller {} Failed to retrieve input stream information", selfId);
spdlog::error("{} Failed to retrieve input stream information", selfId);
// TODO: message report to cloud
exit(1);
}
......@@ -364,7 +366,7 @@ protected:
if (!streamList) {
ret = AVERROR(ENOMEM);
spdlog::error("evpuller {} failed create avformatcontext for output: {}", selfId, av_err2str(AVERROR(ENOMEM)));
spdlog::error("{} failed create avformatcontext for output: {}", selfId, av_err2str(AVERROR(ENOMEM)));
}
// find all video & audio streams for remuxing
......@@ -393,7 +395,7 @@ protected:
}
// if(1 == getppid()) {
// spdlog::error("evpuller {} exit since evdaemon is dead", selfId);
// spdlog::error("{} exit since evdaemon is dead", selfId);
// exit(1);
// }
......@@ -403,7 +405,7 @@ protected:
ret = av_read_frame(pAVFormatInput, &packet);
if (ret < 0) {
spdlog::error("evpuller {} failed read packet: {}", selfId, av_err2str(ret));
spdlog::error("{} failed read packet: {}", selfId, av_err2str(ret));
// TODO: message report to cloud
exit(1);
}
......@@ -413,7 +415,7 @@ protected:
continue;
}
if(pktCnt % EV_LOG_PACKET_CNT == 0) {
spdlog::info("evpuller {} pktCnt: {:d}", selfId, pktCnt);
spdlog::info("{} pktCnt: {:d}", selfId, pktCnt);
}
pktCnt++;
......@@ -427,7 +429,7 @@ protected:
lock_guard<mutex> lock(this->mutMsg);
lenAVFmtCtxBytes = AVFormatCtxSerializer::encode(pAVFormatInput, &pAVFmtCtxBytes, ids);
if(lenAVFmtCtxBytes <= 0 || pAVFmtCtxBytes == nullptr) {
spdlog::error("evpuller {} failed to pull packet from {}. exiting...", selfId, urlIn);
spdlog::error("{} failed to pull packet from {}. exiting...", selfId, urlIn);
// TODO: message report to cloud
exit(1);
}
......@@ -480,7 +482,7 @@ public:
selfId = strEnv;
auto v = strutils::split(selfId, ':');
if(v.size() != 3||v[1] != "evpuller") {
spdlog::error("evpuller {} received invalid gid: {}", selfId);
spdlog::error("{} received invalid gid: {}", selfId);
// TODO: message report to cloud
exit(1);
}
......@@ -488,29 +490,34 @@ public:
iid = stoi(v[2]);
}
else {
spdlog::error("evpuller {} failed to start. no SN set", selfId);
spdlog::error("{} failed to start. no SN set", selfId);
exit(1);
}
spdlog::info("evpuller {} boot", selfId);
spdlog::info("{} boot", selfId);
SingletonProcess self(selfName, iid);
if(!self()){
spdlog::error("{} already running. ignore this instance", selfId);
exit(0);
}
//
string addr = string("tcp://127.0.0.1:") + drport;
int ret = zmqhelper::setupDealer(&pDaemonCtx, &pDaemon, addr, selfId);
if(ret != 0) {
spdlog::error("evpuller {} failed to setup dealer {}", selfId, addr);
spdlog::error("{} failed to setup dealer {}", selfId, addr);
exit(1);
}
spdlog::info("evpuller {} setup dealer to daemon OK", selfId);
spdlog::info("{} setup dealer to daemon OK", selfId);
ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId);
if(ret != 0) {
spdlog::error("evpuller {} failed to receive configration message {}", selfId, addr);
spdlog::error("{} failed to receive configration message {}", selfId, addr);
}
spdlog::info("evpuller {} receive config from daemon OK", selfId);
spdlog::info("{} receive config from daemon OK", selfId);
init();
spdlog::info("evpuller {} init OK", selfId);
spdlog::info("{} init OK", selfId);
thEdgeMsgHandler = thread([this] {
while(true)
......@@ -527,7 +534,7 @@ public:
}
msg = msg.substr(0, msg.size()> EV_MSG_DEBUG_LEN? EV_MSG_DEBUG_LEN:msg.size());
spdlog::info("evpuller {} received edge msg: {}", selfId, msg);
spdlog::info("{} received edge msg: {}", selfId, msg);
this->handleEdgeMsg(body);
}
......
......@@ -19,6 +19,7 @@ update: 2019/09/10
#include "inc/common.hpp"
#include "inc/database.h"
#include "inc/spdlog/spdlog.h"
#include "singprocess.hpp"
using namespace std;
using namespace zmqhelper;
......@@ -26,6 +27,7 @@ using namespace std::chrono_literals;
class EvPusher: public TinyThread {
private:
string selfName = "evpusher";
void *pSubCtx = nullptr, *pDealerCtx = nullptr; // for packets relay
void *pSub = nullptr, *pDealer = nullptr, *pDaemonCtx = nullptr, *pDaemon = nullptr;
string urlOut, urlPub, urlDealer, devSn, pullerGid, mgrSn, selfId;
......@@ -45,7 +47,7 @@ private:
int ret = 0;
bool found = false;
try {
spdlog::info("evpusher boot config: {} -> {}", selfId, config.dump());
spdlog::info("{} boot config: {}", selfId, config.dump());
json evpusher;
json &evmgr = this->config;
json ipc;
......@@ -65,15 +67,15 @@ private:
}
}
spdlog::info("evpusher {} {}, evpusher: {}",devSn, iid, evpusher.dump());
spdlog::info("evpusher {} {}, ipc: {}",devSn, iid, ipc.dump());
spdlog::info("{} {}, evpusher: {}",devSn, iid, evpusher.dump());
spdlog::info("{} {}, ipc: {}",devSn, iid, ipc.dump());
if(ipc.size()!=0 && evpusher.size()!=0) {
found = true;
}
if(!found) {
spdlog::error("evpusher {} : no valid config found: {}", selfId, config.dump());
spdlog::error("{} : no valid config found: {}", selfId, config.dump());
exit(1);
}
......@@ -101,7 +103,7 @@ private:
urlPub = string("tcp://") + evpuller["addr"].get<string>() + ":" + to_string(portPub);
urlDealer = string("tcp://") + evmgr["addr"].get<string>() + ":" + to_string(portRouter);
spdlog::info("evpusher {} connect to {} for sub, {} for router", selfId, urlPub, urlDealer);
spdlog::info("{} connect to {} for sub, {} for router", selfId, urlPub, urlDealer);
// TODO: multiple protocols support
urlOut = evpusher["urlDest"].get<string>();
// setup sub
......@@ -117,12 +119,12 @@ private:
zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret));
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
if(ret != 0) {
spdlog::error("evpusher {} {} failed set setsockopt: {}", devSn, iid, urlPub);
spdlog::error("{} {} failed set setsockopt: {}", devSn, iid, urlPub);
}
ret = zmq_connect(pSub, urlPub.c_str());
if(ret != 0) {
spdlog::error("evpusher {} {} failed connect pub: {}", devSn, iid, urlPub);
spdlog::error("{} {} failed connect pub: {}", devSn, iid, urlPub);
exit(1);
}
......@@ -140,23 +142,23 @@ private:
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) {
spdlog::error("evpusher {} failed setsockopts router {}: {}", selfId, urlDealer, zmq_strerror(zmq_errno()));
spdlog::error("{} failed setsockopts router {}: {}", selfId, urlDealer, zmq_strerror(zmq_errno()));
exit(1);
}
ret = zmq_connect(pDealer, urlDealer.c_str());
if(ret != 0) {
spdlog::error("evpusher {} {} failed connect dealer: {}", devSn, iid, urlDealer);
spdlog::error("{} {} failed connect dealer: {}", devSn, iid, urlDealer);
exit(1);
}
if(ret < 0) {
spdlog::error("evpusher {} failed to set config: {}", selfId, config.dump());
spdlog::error("{} failed to set config: {}", selfId, config.dump());
}
spdlog::info("evpusher new config: {}", config.dump());
spdlog::info("new config: {}", config.dump());
ping();
}
catch(exception &e) {
spdlog::error("evpusher {} {} exception in EvPuller.init {:s} retrying", devSn, iid, e.what());
spdlog::error("{} {} exception in EvPuller.init {:s} retrying", devSn, iid, e.what());
this_thread::sleep_for(chrono::seconds(3));
exit(1);
}
......@@ -171,10 +173,10 @@ private:
vector<vector<uint8_t> >body = {str2body(mgrSn+":evmgr:0"), str2body(EV_MSG_META_PING),str2body(MSG_HELLO)};
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpusher {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
spdlog::error("{} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
}
else {
spdlog::info("evpusher {} sent hello to router: {}", selfId, mgrSn);
spdlog::info("{} sent hello to router: {}", selfId, mgrSn);
}
return ret;
......@@ -203,19 +205,19 @@ private:
string daemonId = this->devSn + ":evdaemon:0";
if(peerId == daemonId) {
if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) {
spdlog::info("evpusher {} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
spdlog::info("{} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
bProcessed = true;
exit(0);
}
}
}
catch(exception &e) {
spdlog::error("evpusher {} exception to process msg {}: {}", selfId, msg, e.what());
spdlog::error("{} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evpusher {} received msg having no implementation from peer: {}", selfId, msg);
spdlog::error("{} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
......@@ -257,8 +259,8 @@ private:
cvMsg.notify_one();
}
else {
spdlog::warn("evpusher {} received avformatctx msg from {}, but already proceessed before, restarting", selfId, peerId);
spdlog::error("evpusher {} restart since reinit", selfId);
spdlog::warn("{} received avformatctx msg from {}, but already proceessed before, restarting", selfId, peerId);
spdlog::error("{} restart since reinit", selfId);
exit(0);
}
bProcessed = true;
......@@ -266,12 +268,12 @@ private:
}
}
catch(exception &e) {
spdlog::error("evpusher {} exception to process msg {}: {}", selfId, msg, e.what());
spdlog::error("{} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evpusher {} received msg having no implementation from peer: {}", selfId, msg);
spdlog::error("{} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
......@@ -282,7 +284,7 @@ private:
int ret = 0;
// req avformatcontext packet
// send hello to puller
spdlog::info("evpusher {} send hello to puller: {}", selfId, pullerGid);
spdlog::info("{} send hello to puller: {}", selfId, pullerGid);
vector<vector<uint8_t> > body;
body.push_back(str2body(pullerGid));
json meta;
......@@ -293,7 +295,7 @@ private:
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpusher {} {}, failed to send hello to puller: {}. exiting...", devSn, iid, zmq_strerror(zmq_errno()));
spdlog::error("{} {}, failed to send hello to puller: {}. exiting...", devSn, iid, zmq_strerror(zmq_errno()));
// TODO: message report to cloud
exit(1);
}
......@@ -303,7 +305,7 @@ private:
}else{
// restart
spdlog::error("evpusher {} failed wait for avformatctx for {}s, restart", devSn, 30);
spdlog::error("{} failed wait for avformatctx for {}s, restart", devSn, 30);
exit(1);
}
......@@ -318,7 +320,7 @@ private:
int cnt = 0;
while(ret < 0) {
if(cnt > 3) {
string msg = fmt::format("evpusher {} failed to write output header {}: {}, {}", selfId, urlOut, ret, av_err2str(ret));
string msg = fmt::format("{} failed to write output header {}: {}, {}", selfId, urlOut, ret, av_err2str(ret));
json meta;
json data;
data["msg"] = msg;
......@@ -336,7 +338,7 @@ private:
if(proto == "rtsp") {
// rtsp tcp
if(av_dict_set(&pOptsRemux, "rtsp_transport", "tcp", 0) < 0) {
spdlog::error("evpusher {} {} failed set output pOptsRemux", devSn, iid);
spdlog::error("{} {} failed set output pOptsRemux", devSn, iid);
ret = AVERROR_UNKNOWN;
}
av_dict_set_int(&pOptsRemux, "stimeout", (int64_t)(1000* 1000 * 1), 0);
......@@ -350,16 +352,16 @@ private:
}
if (ret < 0) {
spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret));
spdlog::error("{} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret));
// TODO: message report to cloud
exit(1);
}
streamList = (int *)av_mallocz_array(pAVFormatInput->nb_streams, sizeof(*streamList));
spdlog::info("evpusher {} {} numStreams: {:d}", devSn, iid, pAVFormatInput->nb_streams);
spdlog::info("{} {} numStreams: {:d}", devSn, iid, pAVFormatInput->nb_streams);
if (!streamList) {
ret = AVERROR(ENOMEM);
spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(AVERROR(ENOMEM)));
spdlog::error("{} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(AVERROR(ENOMEM)));
exit(1);
}
......@@ -377,29 +379,29 @@ private:
streamList[i] = streamIdx++;
out_stream = avformat_new_stream(pAVFormatRemux, NULL);
if (!out_stream) {
spdlog::error("evpusher {} {} failed allocating output stream", devSn, iid);
spdlog::error("{} {} failed allocating output stream", devSn, iid);
ret = AVERROR_UNKNOWN;
}
ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar);
spdlog::info("evpusher {} {} copied codepar", devSn, iid);
spdlog::info("{} {} copied codepar", devSn, iid);
if (ret < 0) {
spdlog::error("evpusher {} {} failed to copy codec parameters", devSn, iid);
spdlog::error("{} {} failed to copy codec parameters", devSn, iid);
}
}
for(int i = 0; i < pAVFormatInput->nb_streams; i++ ) {
spdlog::info("evpusher {} streamList[{:d}]: {:d}", selfId, i, streamList[i]);
spdlog::info("{} streamList[{:d}]: {:d}", selfId, i, streamList[i]);
}
av_dump_format(pAVFormatRemux, 0, urlOut.c_str(), 1);
if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) {
spdlog::error("evpusher {} {} failed allocating output stream", devSn,iid);
spdlog::error("{} {} failed allocating output stream", devSn,iid);
ret = avio_open2(&pAVFormatRemux->pb, urlOut.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux);
if (ret < 0) {
// TODO: message report to cloud
string msg = fmt::format("evpusher {} failed to open output stream {}: {}, {}", selfId, urlOut, ret, av_err2str(ret));
string msg = fmt::format("{} failed to open output stream {}: {}, {}", selfId, urlOut, ret, av_err2str(ret));
json meta;
json data;
data["msg"] = msg;
......@@ -416,7 +418,7 @@ private:
exit(1);
}
else {
string msg = fmt::format("evpusher {} successfully open output {}", selfId, urlOut);
string msg = fmt::format("{} successfully open output {}", selfId, urlOut);
json meta;
json data;
data["msg"] = msg;
......@@ -436,7 +438,7 @@ private:
ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
if (ret < 0) {
// TODO: report message to cloud
string msg = fmt::format("evpusher {} failed to write stream {}: {}, {}", selfId, urlOut, ret, av_err2str(ret));
string msg = fmt::format("{} failed to write stream {}: {}, {}", selfId, urlOut, ret, av_err2str(ret));
spdlog::error(msg);
}
......@@ -444,7 +446,7 @@ private:
}
string msg = fmt::format("evpusher {} successfully write output header {}", selfId, urlOut);
string msg = fmt::format("{} successfully write output header {}", selfId, urlOut);
json meta;
json data;
data["msg"] = msg;
......@@ -491,18 +493,18 @@ protected:
while (true) {
ret =zmq_msg_init(&msg);
if(ret != 0) {
spdlog::error("evpusher {} failed to init zmq msg", selfId);
spdlog::error("{} failed to init zmq msg", selfId);
continue;
}
// receive packet
ret = zmq_recvmsg(pSub, &msg, 0);
if(ret < 0) {
spdlog::error("evpusher {} failed to recv zmq msg: {}", selfId, zmq_strerror(ret));
spdlog::error("{} failed to recv zmq msg: {}", selfId, zmq_strerror(ret));
continue;
}
if(pktCnt % EV_LOG_PACKET_CNT == 0) {
spdlog::info("evpusher {} seq: {}, pts: {}, dts: {}, idx: {}", selfId, pktCnt, packet.pts, packet.dts, packet.stream_index);
spdlog::info("{} seq: {}, pts: {}, dts: {}, idx: {}", selfId, pktCnt, packet.pts, packet.dts, packet.stream_index);
}
pktCnt++;
......@@ -510,7 +512,7 @@ protected:
ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet);
{
if (ret < 0) {
spdlog::error("evpusher {} packet decode failed: {:d}", selfId, ret);
spdlog::error("{} packet decode failed: {:d}", selfId, ret);
continue;
}
}
......@@ -528,7 +530,7 @@ protected:
out_stream = pAVFormatRemux->streams[packet.stream_index];
/* copy packet */
// spdlog::info("evpusher {} packet pts: {} dts: {}", selfId, packet.pts, packet.dts);
// spdlog::info("{} packet pts: {} dts: {}", selfId, packet.pts, packet.dts);
// if(pktCnt == 0) {
// packet.pts = AV_NOPTS_VALUE;
// packet.dts = AV_NOPTS_VALUE;
......@@ -541,14 +543,14 @@ protected:
// packet.pos = -1;
// lastPts = packet.dts;
// }
// spdlog::info("evpusher {} packet new pts: {} dts: {}", selfId, packet.pts, packet.dts);
// spdlog::info("{} packet new pts: {} dts: {}", selfId, packet.pts, packet.dts);
ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
av_packet_unref(&packet);
if (ret < 0) {
// TODO: report message to cloud
bStatsSent = false;
string msg = fmt::format("evpusher {} error write stream, restreaming: {} ,{}", selfId, ret, av_err2str(ret));
string msg = fmt::format("{} error write stream, restreaming: {} ,{}", selfId, ret, av_err2str(ret));
json meta;
json data;
data["msg"] = msg;
......@@ -576,7 +578,7 @@ protected:
else {
if(!bStatsSent) {
bStatsSent = true;
string msg = fmt::format("evpusher {} start pushing {}", selfId, urlOut);
string msg = fmt::format("{} start pushing {}", selfId, urlOut);
json meta;
json data;
data["msg"] = msg;
......@@ -608,30 +610,35 @@ public:
selfId = strEnv;
auto v = strutils::split(selfId, ':');
if(v.size() != 3||v[1] != "evpusher") {
spdlog::error("evpusher received invalid gid: {}", selfId);
spdlog::error("received invalid gid: {}", selfId);
exit(1);
}
devSn = v[0];
iid = stoi(v[2]);
}
else {
spdlog::error("evpusher failed to start. no SN set");
spdlog::error("failed to start. no SN set");
exit(1);
}
spdlog::info("evpusher {} boot", selfId);
spdlog::info("{} boot", selfId);
SingletonProcess self(selfName, iid);
if(!self()){
spdlog::error("{} already running. ignore this instance", selfId);
exit(0);
}
//
string addr = string("tcp://127.0.0.1:") + drport;
int ret = zmqhelper::setupDealer(&pDaemonCtx, &pDaemon, addr, selfId);
if(ret != 0) {
spdlog::error("evpusher {} failed to setup dealer {}", devSn, addr);
spdlog::error("{} failed to setup dealer {}", devSn, addr);
exit(1);
}
ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId);
if(ret != 0) {
spdlog::error("evpusher {} failed to receive configration message {}", devSn, addr);
spdlog::error("{} failed to receive configration message {}", devSn, addr);
}
init();
......
......@@ -28,6 +28,7 @@ update: 2019/09/10
#include "postfile.h"
#include "dirmon.h"
#include "inc/fs.h"
#include "singprocess.hpp"
using namespace std;
using namespace zmqhelper;
......@@ -38,6 +39,7 @@ private:
#define NUM_HOURS_DEFAULT 2
#define SECONDS_PER_SLICE_DEFAULT 30
// 2 hours, 30 seconds per record
string selfName = "evslicer";
void *pSubCtx = nullptr, *pDealerCtx = nullptr; // for packets relay
void *pSub = nullptr, *pDealer = nullptr, *pDaemonCtx = nullptr, *pDaemon = nullptr;
string urlOut, urlPub, urlRouter, devSn, mgrSn, selfId, pullerGid, ipcSn;
......@@ -100,11 +102,11 @@ private:
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
cvMsg.notify_one();
spdlog::info("evslicer {} got avformat from {}", selfId, peerId);
spdlog::info("{} got avformat from {}", selfId, peerId);
}
else {
spdlog::warn("evslicer {} received avformatctx msg from {}, but already proceessed before, restarting", selfId, peerId);
spdlog::error("evslicer {} restart since reinit", selfId);
spdlog::warn("{} received avformatctx msg from {}, but already proceessed before, restarting", selfId, peerId);
spdlog::error("{} restart since reinit", selfId);
exit(0);
}
}
......@@ -112,38 +114,38 @@ private:
else if(metaType == EV_MSG_META_EVENT) {
data = json::parse(body2str(v[2]));
/// evslicer has two msg interfaces to subsystems on edge side
/// has two msg interfaces to subsystems on edge side
/// 1. type = "event"; start: timestamp; end: timestamp
/// 2. type = "media"; duration: seconds
if(!validMsg(data)) {
spdlog::info("evslicer {} received invalid msg from {}: {}", selfId, peerId, msg);
spdlog::info("{} received invalid msg from {}: {}", selfId, peerId, msg);
}
else {
spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, peerId, metaType, data.dump());
spdlog::info("{} received msg from {}, type = {}, data = {}", selfId, peerId, metaType, data.dump());
if(data["type"] == "event") {
lock_guard<mutex> lock(this->mutEvent);
eventQueue.push(data.dump());
spdlog::info("evslicer {} event num: {}", selfId, eventQueue.size());
spdlog::info("{} event num: {}", selfId, eventQueue.size());
if(eventQueue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQueue.pop();
}
cvEvent.notify_one();
}
else {
spdlog::error("evslicer {} msg not supported from {}: {}", selfId, peerId, msg);
spdlog::error("{} msg not supported from {}: {}", selfId, peerId, msg);
}
}
}
else {
spdlog::info("evslicer {} received unkown msg from {}: {}", selfId, peerId, msg);
spdlog::info("{} received unkown msg from {}: {}", selfId, peerId, msg);
}
}
catch(exception &e) {
spdlog::error("evslicer {} exception to process msg {}: {}", selfId, msg, e.what());
spdlog::error("{} exception to process msg {}: {}", selfId, msg, e.what());
}
}
else {
spdlog::error("evslicer {} get invalid msg with size {}: {}", selfId, v.size(), msg);
spdlog::error("{} get invalid msg with size {}: {}", selfId, v.size(), msg);
}
return ret;
......@@ -172,7 +174,7 @@ private:
// msg from cluster mgr
if(peerId == daemonId) {
if(metaValue == EV_MSG_META_VALUE_CMD_STOP || metaValue == EV_MSG_META_VALUE_CMD_RESTART) {
spdlog::info("evslicer {} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
spdlog::info("{} received {} cmd from cluster mgr {}", selfId, metaValue, daemonId);
bProcessed = true;
exit(0);
}
......@@ -193,7 +195,7 @@ private:
}
}
catch(exception &e) {
spdlog::error("evslicer {} exception in handleCloudMsg: {}", selfId, e.what());
spdlog::error("{} exception in handleCloudMsg: {}", selfId, e.what());
}
}
else if(metaValue == "debug:xxx") {
......@@ -221,12 +223,12 @@ private:
}
}
catch(exception &e) {
spdlog::error("evslicer {} exception to process msg {}: {}", selfId, msg, e.what());
spdlog::error("{} exception to process msg {}: {}", selfId, msg, e.what());
}
}
if(!bProcessed) {
spdlog::error("evslicer {} received msg having no implementation from peer: {}", selfId, msg);
spdlog::error("{} received msg having no implementation from peer: {}", selfId, msg);
}
return ret;
......@@ -237,7 +239,7 @@ private:
int ret = 0;
bool found = false;
try {
spdlog::info("evslicer boot config: {} -> {}", selfId, config.dump());
spdlog::info("{} boot config: {}", selfId, config.dump());
json evslicer;
json &evmgr = this->config;
json ipc;
......@@ -262,7 +264,7 @@ private:
}
if(!found) {
spdlog::error("evslicer {}: no valid config found. retrying load config...", devSn);
spdlog::error("{}: no valid config found. retrying load config...", devSn);
exit(1);
}
......@@ -290,7 +292,7 @@ private:
mgrSn = evmgr["sn"];
if(evslicer.count("path") == 0) {
spdlog::info("evslicer {} no params for path, using default: {}", selfId, URLOUT_DEFAULT);
spdlog::info("{} no params for path, using default: {}", selfId, URLOUT_DEFAULT);
urlOut = URLOUT_DEFAULT;
}
else {
......@@ -301,7 +303,7 @@ private:
urlOut += string("/") + ipcSn + "_" + to_string(iid);
if(evslicer.count("hours") == 0) {
spdlog::info("evslicer {} no params for hours, using default: {}", selfId, NUM_HOURS_DEFAULT);
spdlog::info("{} no params for hours, using default: {}", selfId, NUM_HOURS_DEFAULT);
hours = NUM_HOURS_DEFAULT;
}
else {
......@@ -309,7 +311,7 @@ private:
}
if(evslicer.count("seconds") == 0) {
spdlog::info("evslicer {} no params for seconds, using default: {}", selfId, SECONDS_PER_SLICE_DEFAULT);
spdlog::info("{} no params for seconds, using default: {}", selfId, SECONDS_PER_SLICE_DEFAULT);
seconds = SECONDS_PER_SLICE_DEFAULT;
}
else {
......@@ -318,7 +320,7 @@ private:
numSlices = hours * 60 * 60 /seconds;
spdlog::info("evslicer mkdir -p {}", selfId, urlOut);
spdlog::info("mkdir -p {}", selfId, urlOut);
ret = system((string("mkdir -p ") + urlOut).c_str());
......@@ -340,7 +342,7 @@ private:
urlPub = string("tcp://") + evpuller["addr"].get<string>() + ":" + to_string(portPub);
urlRouter = string("tcp://") + evmgr["addr"].get<string>() + ":" + to_string(portRouter);
spdlog::info("evslicer {} will connect to {} for sub, {} for router", selfId, urlPub, urlRouter);
spdlog::info("{} will connect to {} for sub, {} for router", selfId, urlPub, urlRouter);
// setup sub
pSubCtx = zmq_ctx_new();
......@@ -355,20 +357,20 @@ private:
zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret));
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
if(ret != 0) {
spdlog::error("evslicer {} failed set setsockopt: {}", selfId, urlPub);
spdlog::error("{} failed set setsockopt: {}", selfId, urlPub);
exit(1);
}
ret = zmq_connect(pSub, urlPub.c_str());
if(ret != 0) {
spdlog::error("evslicer {} failed connect pub: {}", selfId, urlPub);
spdlog::error("{} failed connect pub: {}", selfId, urlPub);
exit(1);
}
// setup dealer
pDealerCtx = zmq_ctx_new();
pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER);
spdlog::info("evslicer {} try create req to {}", selfId, urlRouter);
spdlog::info("{} try create req to {}", selfId, urlRouter);
//ZMQ_TCP_KEEPALIVE
//ZMQ_TCP_KEEPALIVE_IDLE
//ZMQ_TCP_KEEPALIVE_INTVL
......@@ -380,23 +382,23 @@ private:
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) {
spdlog::error("evslicer {} {} failed setsockopts router: {}", selfId, urlRouter);
spdlog::error("{} {} failed setsockopts router: {}", selfId, urlRouter);
exit(1);
}
if(ret < 0) {
spdlog::error("evslicer {} failed setsockopts router: {}", selfId, urlRouter);
spdlog::error("{} failed setsockopts router: {}", selfId, urlRouter);
exit(1);
}
ret = zmq_connect(pDealer, urlRouter.c_str());
if(ret != 0) {
spdlog::error("evslicer {} failed connect dealer: {}", selfId, urlRouter);
spdlog::error("{} failed connect dealer: {}", selfId, urlRouter);
exit(1);
}
//ping
ret = ping();
}
catch(exception &e) {
spdlog::error("evslicer {} exception in init {:s} retrying", selfId, e.what());
spdlog::error("{} exception in init {:s} retrying", selfId, e.what());
exit(1);
}
......@@ -411,10 +413,10 @@ private:
vector<vector<uint8_t> >body = {str2body(mgrSn+":evmgr:0"), str2body(EV_MSG_META_PING), str2body(MSG_HELLO)};
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evslicer {} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to send multiple: {}", selfId, zmq_strerror(zmq_errno()));
}
else {
spdlog::info("evslicer {} sent hello to router: {}", selfId, mgrSn);
spdlog::info("{} sent hello to router: {}", selfId, mgrSn);
}
return ret;
......@@ -425,7 +427,7 @@ private:
int ret = 0;
// req avformatcontext packet
// send hello to puller
spdlog::info("evslicer {} send hello to puller: {}", selfId, pullerGid);
spdlog::info("{} send hello to puller: {}", selfId, pullerGid);
vector<vector<uint8_t> > body;
body.push_back(str2body(pullerGid));
json meta;
......@@ -437,7 +439,7 @@ private:
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evslicer {}, failed to send hello to puller: {}. exiting ...", selfId, zmq_strerror(zmq_errno()));
spdlog::error("{}, failed to send hello to puller: {}. exiting ...", selfId, zmq_strerror(zmq_errno()));
// TODO: message report to cloud
exit(1);
}
......@@ -447,7 +449,7 @@ private:
}else{
// restart
spdlog::error("evslicer {} failed wait for avformatctx for {}s, restart", devSn, 30);
spdlog::error("{} failed wait for avformatctx for {}s, restart", devSn, 30);
exit(1);
}
......@@ -472,7 +474,7 @@ private:
}
for(int i = 0; i < pAVFormatInput->nb_streams; i++ ) {
spdlog::info("evslicer {} streamList[{:d}]: {:d}", selfId, i, streamList[i]);
spdlog::info("{} streamList[{:d}]: {:d}", selfId, i, streamList[i]);
}
//av_dict_set(&pOptsRemux, "movflags", "frag_keyframe+empty_moov+default_base_moof", 0);
......@@ -523,7 +525,7 @@ protected:
name = urlOut + "/" + "%Y%m%d_%H%M%S.mp4";
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "segment", name.c_str());
if (ret < 0) {
spdlog::error("evslicer {} failed create avformatcontext for output: %s", selfId, av_err2str(ret));
spdlog::error("{} failed create avformatcontext for output: %s", selfId, av_err2str(ret));
// TODO: message report to cloud
exit(1);
}
......@@ -533,12 +535,12 @@ protected:
if(streamList[i] != -1) {
out_stream = avformat_new_stream(pAVFormatRemux, NULL);
if (!out_stream) {
spdlog::error("evslicer {} failed allocating output stream {}", selfId, i);
spdlog::error("{} failed allocating output stream {}", selfId, i);
ret = AVERROR_UNKNOWN;
}
ret = avcodec_parameters_copy(out_stream->codecpar, pAVFormatInput->streams[i]->codecpar);
if (ret < 0) {
spdlog::error("evslicer {} failed to copy codec parameters", selfId);
spdlog::error("{} failed to copy codec parameters", selfId);
}
}
}
......@@ -546,30 +548,30 @@ protected:
if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) {
ret = avio_open2(&pAVFormatRemux->pb, name.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux);
if (ret < 0) {
spdlog::error("evslicer {} could not open output file {}", selfId, name);
spdlog::error("{} could not open output file {}", selfId, name);
}
}
// TODO
av_dict_set(&pOptsRemux, "segment_start_number", to_string(this->sTsList.size()).data(), 0);
ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
if (ret < 0) {
spdlog::error("evslicer {} error occurred when opening output file", selfId);
spdlog::error("{} error occurred when opening output file", selfId);
}
bootTime = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
spdlog::info("evslicer {} start writing new slices", selfId);
spdlog::info("{} start writing new slices", selfId);
int pktIgnore = 0;
while(true) {
int ret =zmq_msg_init(&msg);
ret = zmq_recvmsg(pSub, &msg, 0);
if(ret < 0) {
spdlog::error("evslicer {} failed to recv zmq msg: {}",selfId, zmq_strerror(ret));
spdlog::error("{} failed to recv zmq msg: {}",selfId, zmq_strerror(ret));
continue;
}
ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet);
{
if (ret < 0) {
spdlog::error("evslicer {} packet decode failed: {}", selfId, ret);
spdlog::error("{} packet decode failed: {}", selfId, ret);
continue;
}
}
......@@ -592,7 +594,7 @@ protected:
//calc pts
if(pktCnt % EV_LOG_PACKET_CNT == 0) {
spdlog::info("evslicer {} seq: {}, pts: {}, dts: {}, idx: {}", selfId, pktCnt, packet.pts, packet.dts, packet.stream_index);
spdlog::info("{} seq: {}, pts: {}, dts: {}, idx: {}", selfId, pktCnt, packet.pts, packet.dts, packet.stream_index);
}
/* copy packet */
if(pktCnt == 0) {
......@@ -613,7 +615,7 @@ protected:
av_packet_unref(&packet);
if (ret < 0) {
// TODO: report message to cloud
string msg = fmt::format("evslicer {} error write file, resetting:{}", selfId, av_err2str(ret));
string msg = fmt::format("{} error write file, resetting:{}", selfId, av_err2str(ret));
json meta;
json data;
data["msg"] = msg;
......@@ -627,7 +629,7 @@ protected:
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::error("evslicer {} error muxing packet: {}, {}, {}, {}, reloading...", selfId, av_err2str(ret), packet.dts, packet.pts, packet.dts==AV_NOPTS_VALUE);
spdlog::error("{} error muxing packet: {}, {}, {}, {}, reloading...", selfId, av_err2str(ret), packet.dts, packet.pts, packet.dts==AV_NOPTS_VALUE);
if(pktCnt != 0 && packet.pts == AV_NOPTS_VALUE) {
// reset
av_write_trailer(pAVFormatRemux);
......@@ -642,7 +644,7 @@ protected:
else {
if(!bStatsSent) {
bStatsSent = true;
string msg = fmt::format("evslicer {} starting write file", selfId);
string msg = fmt::format("{} starting write file", selfId);
json meta;
json data;
data["msg"] = msg;
......@@ -689,7 +691,7 @@ protected:
posE = posE -1;
}
if(posE < posS) {
spdlog::error("evslicer getBaseName invalid filename");
spdlog::error("getBaseName invalid filename");
return ret;
}
......@@ -721,7 +723,7 @@ protected:
void printVideoFiles(set<long> &list)
{
spdlog::info("evslicer {} debug files ring. size: {} max: {}",this->selfId, list.size(), this->numSlices);
spdlog::info("{} debug files ring. size: {} max: {}",this->selfId, list.size(), this->numSlices);
// lock_guard<mutex> lg(mutTsList);
for(auto i: list) {
spdlog::info("\tevslicer {} file ts: {}, baseName: {}", selfId, i, videoFileTs2Name(i));
......@@ -736,17 +738,17 @@ protected:
for (const auto & entry : fs::directory_iterator(path)) {
fname = entry.path().c_str();
if(entry.file_size() == 0 || !entry.is_regular_file()||entry.path().extension() != ".mp4") {
spdlog::debug("evslicer {} loadVideoFiles skipped {} (empty/directory/!mp4)", selfId, entry.path().c_str());
spdlog::debug("{} loadVideoFiles skipped {} (empty/directory/!mp4)", selfId, entry.path().c_str());
continue;
}
baseName = getBaseName(fname);
auto ts = videoFileName2Ts(baseName);
spdlog::debug("evslicer {} loadVideoFiles basename: {}, ts: {}", selfId, baseName, ts);
spdlog::debug("{} loadVideoFiles basename: {}, ts: {}", selfId, baseName, ts);
// check old files
if(ts - now > hours * 60 * 60) {
spdlog::info("evslicer {} file {} old than {} hours: {}, {}", selfId, entry.path().c_str(), hours, ts, now);
spdlog::info("{} file {} old than {} hours: {}, {}", selfId, entry.path().c_str(), hours, ts, now);
fs::path fname(this->urlOut + "/" + baseName + ".mp4");
fs::remove(fname);
}
......@@ -756,7 +758,7 @@ protected:
}
}
catch(exception &e) {
spdlog::error("evslicer {} {}:{} loadVideoFiles exception : {}",selfId, __FILE__, __LINE__, e.what());
spdlog::error("{} {}:{} loadVideoFiles exception : {}",selfId, __FILE__, __LINE__, e.what());
}
}
......@@ -804,28 +806,28 @@ protected:
string fullPath = i.get_path();
size_t pos = fullPath.find(ext, 0);
if(fullPath.size() < ext.size() || pos == string::npos || pos != (fullPath.size() - ext.size())) {
spdlog::debug("evslicer {} invalid file: {}, last: {}", self->selfId, fullPath, lastFile);
spdlog::debug("{} invalid file: {}, last: {}", self->selfId, fullPath, lastFile);
continue;
}
if(lastFile == i.get_path()) {
spdlog::debug("evslicer {} skip file : {}, last: {}", self->selfId, fullPath, lastFile);
spdlog::debug("{} skip file : {}, last: {}", self->selfId, fullPath, lastFile);
continue;
}
else if(!lastFile.empty()) {
spdlog::debug("evslicer {} filemon file: {}, ts: {}, last: {}", self->selfId, i.get_path().c_str(), i.get_time(), lastFile);
spdlog::debug("{} filemon file: {}, ts: {}, last: {}", self->selfId, i.get_path().c_str(), i.get_time(), lastFile);
try {
auto baseName = self->getBaseName(lastFile);
auto ts = self->videoFileName2Ts(baseName);
if(ts == -1) {
spdlog::error("evslicer {} fileMonHandler failed to process file: {}", self->selfId, lastFile);
spdlog::error("{} fileMonHandler failed to process file: {}", self->selfId, lastFile);
}
else {
self->insertTsList(self->sTsList, ts, self->numSlices);
}
}
catch(exception &e) {
spdlog::error("evslicer {} fileMonHandler exception: {}", self->selfId, e.what());
spdlog::error("{} fileMonHandler exception: {}", self->selfId, e.what());
}
}
else {
......@@ -850,7 +852,7 @@ protected:
long end = *(--_it);
if(tse < first||tss > end) {
spdlog::info("evslicer {} event range ({}, {}) is not in range ({}, {}).", selfId, tss, tse, first, end);
spdlog::info("{} event range ({}, {}) is not in range ({}, {}).", selfId, tss, tse, first, end);
return ret;
}
......@@ -884,7 +886,7 @@ protected:
sf += "\n\t" + this->urlOut + "/" + fname + ".mp4, " + to_string(*itr);
ret.push_back(fname);
}
spdlog::info("evslicer {} event {} - {} files to upload: {}", selfId, videoFileTs2Name(tss), videoFileTs2Name(tse), sf);
spdlog::info("{} event {} - {} files to upload: {}", selfId, videoFileTs2Name(tss), videoFileTs2Name(tse), sf);
}
return ret;
......@@ -894,7 +896,7 @@ protected:
{
string modifier = fail?"failed": "successfully";
string status = fail?"active":"recover";
string msg = fmt::format("evslicer {} {} upload videos: {}", selfId, modifier, reason);
string msg = fmt::format("{} {} upload videos: {}", selfId, modifier, reason);
json meta;
json data;
data["msg"] = msg;
......@@ -922,30 +924,35 @@ public:
selfId = strEnv;
auto v = strutils::split(selfId, ':');
if(v.size() != 3||v[1] != "evslicer") {
spdlog::error("evslicer received invalid gid: {}", selfId);
spdlog::error("received invalid gid: {}", selfId);
exit(1);
}
devSn = v[0];
iid = stoi(v[2]);
}
else {
spdlog::error("evslicer failed to start. no SN set");
spdlog::error("failed to start. no SN set");
exit(1);
}
spdlog::info("evslicer {} boot", selfId);
spdlog::info("{} boot", selfId);
SingletonProcess self(selfName, iid);
if(!self()){
spdlog::error("{} already running. ignore this instance", selfId);
exit(0);
}
//
string addr = string("tcp://127.0.0.1:") + drport;
int ret = zmqhelper::setupDealer(&pDaemonCtx, &pDaemon, addr, selfId);
if(ret != 0) {
spdlog::error("evslicer {} failed to setup dealer {}", devSn, addr);
spdlog::error("{} failed to setup dealer {}", devSn, addr);
exit(1);
}
ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId);
if(ret != 0) {
spdlog::error("evslicer {} failed to receive configration message {}", devSn, addr);
spdlog::error("{} failed to receive configration message {}", devSn, addr);
}
init();
......@@ -955,7 +962,7 @@ public:
while(true) {
auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple edge msg: {}", selfId, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to receive multiple edge msg: {}", selfId, zmq_strerror(zmq_errno()));
}
else {
// full proto msg received.
......@@ -971,7 +978,7 @@ public:
{
auto body = z_recv_multiple(pDaemon,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple cloud msg: {}", selfId, zmq_strerror(zmq_errno()));
spdlog::error("{} failed to receive multiple cloud msg: {}", selfId, zmq_strerror(zmq_errno()));
}
else {
// full proto msg received.
......@@ -1021,7 +1028,7 @@ public:
long offsetE = 0;
if(tss < this->bootTime) {
spdlog::warn("evslicer {} should we discard old msg? {} < bootTime {}", selfId, evt, this->bootTime);
spdlog::warn("{} should we discard old msg? {} < bootTime {}", selfId, evt, this->bootTime);
}
long first = 0, end = 0;
......@@ -1032,11 +1039,11 @@ public:
}
if(tse < first) {
spdlog::info("evslicer {} thEventHandler event range ({}, {}) is not in range ({}, {}).", selfId, tss, tse, first, end);
spdlog::info("{} thEventHandler event range ({}, {}) is not in range ({}, {}).", selfId, tss, tse, first, end);
continue;
}
else if(first == 0||tse > end) {
spdlog::info("evslicer {} thEventHandler event range ({}, {}) is not in range ({}, {}), resched to run in {}s.", selfId, tss, tse, first, end, this->seconds + 5);
spdlog::info("{} thEventHandler event range ({}, {}) is not in range ({}, {}), resched to run in {}s.", selfId, tss, tse, first, end, this->seconds + 5);
auto th = thread([evt, this]() {
this_thread::sleep_for(chrono::seconds(this->seconds + 5));
lock_guard<mutex> lock(this->mutEvent);
......@@ -1053,7 +1060,7 @@ public:
auto v = findSlicesByRange(tss, tse, offsetS, offsetE);
if(v.size() == 0) {
spdlog::error("evslicer {} thEventHandler event ({}, {}) = ({}, {}) not in range: ({}, {}), ({}, {})", this->selfId, tss, tse, this->videoFileTs2Name(tss), this->videoFileTs2Name(tse), first, end, this->videoFileTs2Name(first), this->videoFileTs2Name(end));
spdlog::error("{} thEventHandler event ({}, {}) = ({}, {}) not in range: ({}, {}), ({}, {})", this->selfId, tss, tse, this->videoFileTs2Name(tss), this->videoFileTs2Name(tse), first, end, this->videoFileTs2Name(first), this->videoFileTs2Name(end));
}
else {
json params;
......@@ -1068,7 +1075,7 @@ public:
for(auto &i: v) {
string fname = this->urlOut + "/" + i + ".mp4";
if(fs::file_size(fname) == 0) {
spdlog::error("evslicer {} video size is 0: {}. ignore this event: {}", this->selfId, fname, evt);
spdlog::error("{} video size is 0: {}. ignore this event: {}", this->selfId, fname, evt);
hasError = true;
break;
}
......@@ -1080,12 +1087,12 @@ public:
continue;
}
spdlog::info("evslicer {} file upload range:({},{}) = ({}, {}), url: {}", selfId, tss, tse, this->videoFileTs2Name(tss), this->videoFileTs2Name(tse), this->videoFileServerApi);
spdlog::info("{} file upload range:({},{}) = ({}, {}), url: {}", selfId, tss, tse, this->videoFileTs2Name(tss), this->videoFileTs2Name(tse), this->videoFileServerApi);
string strResp;
ret = netutils::postFiles(this->videoFileServerApi, params, fileNames, strResp);
if( ret != 0 ) {
bUploadFailed = true;
spdlog::error("evslicer {} failed uploaded ({}, {}). local({}, {}). resp: {} files:\n{}", selfId, tss, tse, first, end, strResp, sf);
spdlog::error("{} failed uploaded ({}, {}). local({}, {}). resp: {} files:\n{}", selfId, tss, tse, first, end, strResp, sf);
reportUploadFailure(selfId, true, strResp);
if(ret > 0) {
if(jEvt.count("cnt") == 0) {
......@@ -1094,7 +1101,7 @@ public:
if(jEvt["cnt"].get<int>() <= 0) {
// TODO: report message to cloud
string msg = fmt::format("evslicer {} failed to upload videos over N times: {}", selfId, strResp);
string msg = fmt::format("{} failed to upload videos over N times: {}", selfId, strResp);
spdlog::error(msg);
// TODO: move to failed folder
string dirDest = "/var/data/evsuits/failed_events/";
......@@ -1121,7 +1128,7 @@ public:
}
}
else {
spdlog::info("evslicer {} retrying upload", selfId);
spdlog::info("{} retrying upload", selfId);
jEvt["cnt"] = jEvt["cnt"].get<int>() - 1;
lock_guard<mutex> lock(this->mutEvent);
this->eventQueue.push(jEvt.dump());
......@@ -1133,7 +1140,7 @@ public:
}
}
else { // ret == 0
spdlog::info("evslicer {} upload ({}, {}). local({}, {}). resp: {} files:\n{}", selfId, tss, tse, first, end, strResp, sf);
spdlog::info("{} upload ({}, {}). local({}, {}). resp: {} files:\n{}", selfId, tss, tse, first, end, strResp, sf);
try {
auto resp = json::parse(strResp);
//TODO: open this swith when video server has implemented this functionality
......@@ -1150,7 +1157,7 @@ public:
}
else {
if(jEvt["cnt"].get<int>() <= 0) {
string msg = fmt::format("evslicer {} failed to upload videos over N times. reason: {}", selfId, strResp);
string msg = fmt::format("{} failed to upload videos over N times. reason: {}", selfId, strResp);
spdlog::error(msg);
}
else {
......@@ -1166,10 +1173,10 @@ public:
}
else if(resp["code"] == 6) {
// TODO: cloud storage issue. need stratigy policy
spdlog::warn("evslicer {} TODO: handle cloud storage", this->selfId);
spdlog::warn("{} TODO: handle cloud storage", this->selfId);
}
else {
spdlog::error("evslicer {} failed to upload videos. abort retry.", this->selfId);
spdlog::error("{} failed to upload videos. abort retry.", this->selfId);
}
}
......@@ -1181,13 +1188,13 @@ public:
}
}
catch(exception &e) {
spdlog::error("evslicer {} {}:{} exception: {}", this->selfId, __FILE__, __LINE__, e.what());
spdlog::error("{} {}:{} exception: {}", this->selfId, __FILE__, __LINE__, e.what());
}
}
}
}
else {
spdlog::error("evslicer {} unkown event :{}", this->selfId, evt);
spdlog::error("{} unkown event :{}", this->selfId, evt);
}
}
});
......
#include <netinet/in.h>
class SingletonProcess {
private:
uint16_t name2port(string name)
{
uint16_t ret = 41070;
if(name == "evdaemon") {
//
}
else if(name == "evmgr") {
ret +=100;
}
else if(name == "evpuller") {
ret +=200;
}
else if(name == "evpusher") {
ret +=300;
}
else if(name == "evslicer") {
ret +=400;
}
else if(name == "evmlmotion") {
ret +=500;
}
else if(name == "evwifi") {
ret +=600;
}
else {
ret += 900;
}
return ret;
}
public:
SingletonProcess(uint16_t port0)
: socket_fd(-1)
, rc(1)
, port(port0)
{
}
SingletonProcess(string moduName, uint16_t iid)
: socket_fd(-1)
, rc(1)
{
port = name2port(moduName) + iid;
}
~SingletonProcess()
{
if (socket_fd != -1) {
close(socket_fd);
}
}
bool operator()()
{
if (socket_fd == -1 || rc) {
socket_fd = -1;
rc = 1;
if ((socket_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
throw std::runtime_error(std::string("Could not create socket: ") + strerror(errno));
}
else {
struct sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = htons (port);
name.sin_addr.s_addr = htonl (INADDR_ANY);
rc = ::bind(socket_fd, (struct sockaddr *) &name, sizeof (name));
}
}
return (socket_fd != -1 && rc == 0);
}
std::string GetLockFileName()
{
return std::to_string(port);
}
private:
int socket_fd = -1;
int rc;
uint16_t port;
};
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论