提交 d7ad5c00 authored 作者: blu's avatar blu

big refacting of communitation architect

上级 e631e90c
...@@ -69,7 +69,7 @@ private: ...@@ -69,7 +69,7 @@ private:
// //
if(config["addr"].get<string>() == "*" || config["addr"].get<string>() == "0.0.0.0") { if(config["addr"].get<string>() == "*" || config["addr"].get<string>() == "0.0.0.0") {
spdlog::error("invalid mgr address: {} in config:\n{}", config["addr"].get<string>(), config.dump()); spdlog::error("evmgr invalid mgr address: {} in config:\n{}", config["addr"].get<string>(), config.dump());
goto error_exit; goto error_exit;
} }
......
...@@ -207,7 +207,7 @@ private: ...@@ -207,7 +207,7 @@ private:
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size()); ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size()); ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {} failed setsockopts router: {}", selfId, urlRouter); spdlog::error("evmlmotion {} {} failed setsockopts router: {}", selfId, urlRouter);
exit(1); exit(1);
} }
if(ret < 0) { if(ret < 0) {
...@@ -325,7 +325,7 @@ private: ...@@ -325,7 +325,7 @@ private:
} }
if(streamIdx == -1) { if(streamIdx == -1) {
spdlog::error("no video stream found."); spdlog::error("evmlmotion {} no video stream found.", selfId);
return -1; return -1;
} }
...@@ -333,22 +333,22 @@ private: ...@@ -333,22 +333,22 @@ private:
detPara.fpsIn = (int)(pStream->r_frame_rate.num/pStream->r_frame_rate.den); detPara.fpsIn = (int)(pStream->r_frame_rate.num/pStream->r_frame_rate.den);
AVCodec *pCodec = avcodec_find_decoder(pStream->codecpar->codec_id); AVCodec *pCodec = avcodec_find_decoder(pStream->codecpar->codec_id);
if (pCodec==NULL) { if (pCodec==NULL) {
spdlog::error("ERROR unsupported codec!"); spdlog::error("evmlmotion {} ERROR unsupported codec!", selfId);
return -1; return -1;
} }
pCodecCtx = avcodec_alloc_context3(pCodec); pCodecCtx = avcodec_alloc_context3(pCodec);
if (!pCodecCtx) { if (!pCodecCtx) {
spdlog::error("failed to allocated memory for AVCodecContext"); spdlog::error("evmlmotion {} failed to allocated memory for AVCodecContext", selfId);
return -1; return -1;
} }
if (avcodec_parameters_to_context(pCodecCtx, pStream->codecpar) < 0) { if (avcodec_parameters_to_context(pCodecCtx, pStream->codecpar) < 0) {
spdlog::error("failed to copy codec params to codec context"); spdlog::error("evmlmotion {} failed to copy codec params to codec context", selfId);
return -1; return -1;
} }
if (avcodec_open2(pCodecCtx, pCodec, NULL) < 0) { if (avcodec_open2(pCodecCtx, pCodec, NULL) < 0) {
spdlog::error("failed to open codec through avcodec_open2"); spdlog::error("evmlmotion {} failed to open codec through avcodec_open2", selfId);
return -1; return -1;
} }
...@@ -369,7 +369,7 @@ private: ...@@ -369,7 +369,7 @@ private:
{ {
int response = avcodec_send_packet(pCodecContext, pPacket); int response = avcodec_send_packet(pCodecContext, pPacket);
if (response < 0) { if (response < 0) {
spdlog::error("Error while sending a packet to the decoder: {}", av_err2str(response)); spdlog::error("evmlmotion {} Error while sending a packet to the decoder: {}", selfId, av_err2str(response));
return response; return response;
} }
...@@ -380,7 +380,7 @@ private: ...@@ -380,7 +380,7 @@ private:
} }
if (response < 0) { if (response < 0) {
spdlog::error("Error while receiving a frame from the decoder: {}", av_err2str(response)); spdlog::error("evmlmotion {} Error while receiving a frame from the decoder: {}", selfId, av_err2str(response));
return response; return response;
} }
else { else {
...@@ -592,7 +592,7 @@ protected: ...@@ -592,7 +592,7 @@ protected:
AVFrame *pFrame = av_frame_alloc(); AVFrame *pFrame = av_frame_alloc();
if (!pFrame) { if (!pFrame) {
spdlog::error("failed to allocated memory for AVFrame"); spdlog::error("evmlmotion {} failed to allocated memory for AVFrame", selfId);
exit(1); exit(1);
} }
while(true) { while(true) {
...@@ -610,13 +610,13 @@ protected: ...@@ -610,13 +610,13 @@ protected:
int ret =zmq_msg_init(&msg); int ret =zmq_msg_init(&msg);
ret = zmq_recvmsg(pSub, &msg, 0); ret = zmq_recvmsg(pSub, &msg, 0);
if(ret < 0) { if(ret < 0) {
spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret)); spdlog::error("evmlmotion {} failed to recv zmq msg: {}", selfId, zmq_strerror(ret));
continue; continue;
} }
ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet); ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet);
{ {
if (ret < 0) { if (ret < 0) {
spdlog::error("packet decode failed: {:d}", ret); spdlog::error("evmlmotion {} packet decode failed: {}", selfId, ret);
continue; continue;
} }
} }
......
...@@ -216,7 +216,7 @@ private: ...@@ -216,7 +216,7 @@ private:
} }
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size()); ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {} failed setsockopts router: {}", selfId, urlDealer); spdlog::error("evpuller {} {} failed setsockopts router: {}", selfId, urlDealer);
exit(1); exit(1);
} }
ret = zmq_connect(pDealer, urlDealer.c_str()); ret = zmq_connect(pDealer, urlDealer.c_str());
...@@ -248,12 +248,12 @@ protected: ...@@ -248,12 +248,12 @@ protected:
av_dict_set(&optsIn, "rtsp_transport", "tcp", 0); av_dict_set(&optsIn, "rtsp_transport", "tcp", 0);
spdlog::info("evpuller {} openning stream: {}", selfId, urlIn); spdlog::info("evpuller {} openning stream: {}", selfId, urlIn);
if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, &optsIn)) < 0) { if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, &optsIn)) < 0) {
spdlog::error("Could not open input stream {}", urlIn); spdlog::error("evpuller {} Could not open input stream {}", selfId, urlIn);
} }
spdlog::info("evpuller {} finding stream info: {}", selfId, urlIn); spdlog::info("evpuller {} finding stream info: {}", selfId, urlIn);
if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) { if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) {
spdlog::error("Failed to retrieve input stream information"); spdlog::error("evpuller {} Failed to retrieve input stream information", selfId);
} }
//pAVFormatInput->flags = AVFMT_FLAG_NOBUFFER | AVFMT_FLAG_FLUSH_PACKETS; //pAVFormatInput->flags = AVFMT_FLAG_NOBUFFER | AVFMT_FLAG_FLUSH_PACKETS;
...@@ -263,7 +263,7 @@ protected: ...@@ -263,7 +263,7 @@ protected:
if (!streamList) { if (!streamList) {
ret = AVERROR(ENOMEM); ret = AVERROR(ENOMEM);
spdlog::error("failed create avformatcontext for output: {}", av_err2str(AVERROR(ENOMEM))); spdlog::error("evpuller {} failed create avformatcontext for output: {}", selfId, av_err2str(AVERROR(ENOMEM)));
} }
// serialize formatctx to bytes // serialize formatctx to bytes
...@@ -353,13 +353,13 @@ public: ...@@ -353,13 +353,13 @@ public:
selfId = strEnv; selfId = strEnv;
auto v = strutils::split(selfId, ':'); auto v = strutils::split(selfId, ':');
if(v.size() != 3||v[1] != "evpuller") { if(v.size() != 3||v[1] != "evpuller") {
spdlog::error("evpusher received invalid gid: {}", selfId); spdlog::error("evpuller {} received invalid gid: {}", selfId);
exit(1); exit(1);
} }
devSn = v[0]; devSn = v[0];
iid = stoi(v[2]); iid = stoi(v[2]);
}else{ }else{
spdlog::error("evpusher failed to start. no SN set"); spdlog::error("evpuller {} failed to start. no SN set", selfId);
exit(1); exit(1);
} }
...@@ -367,13 +367,13 @@ public: ...@@ -367,13 +367,13 @@ public:
string addr = string("tcp://127.0.0.1:") + drport; string addr = string("tcp://127.0.0.1:") + drport;
int ret = zmqhelper::setupDealer(&pDaemonCtx, &pDaemon, addr, selfId); int ret = zmqhelper::setupDealer(&pDaemonCtx, &pDaemon, addr, selfId);
if(ret != 0) { if(ret != 0) {
spdlog::error("evpusher {} failed to setup dealer {}", devSn, addr); spdlog::error("evpuller {} failed to setup dealer {}", selfId, addr);
exit(1); exit(1);
} }
ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId); ret = zmqhelper::recvConfigMsg(pDaemon, config, addr, selfId);
if(ret != 0) { if(ret != 0) {
spdlog::error("evpusher {} failed to receive configration message {}", devSn , addr); spdlog::error("evpusher {} failed to receive configration message {}", selfId , addr);
} }
init(); init();
} }
......
...@@ -318,20 +318,15 @@ protected: ...@@ -318,20 +318,15 @@ protected:
uint64_t pktCnt = 0; uint64_t pktCnt = 0;
int pktIgnore = 0; int pktIgnore = 0;
while (true) { while (true) {
// if(1 == getppid()) {
// spdlog::error("evpusher {} exit since evdaemon is dead", selfId);
// exit(1);
// }
ret =zmq_msg_init(&msg); ret =zmq_msg_init(&msg);
if(ret != 0) { if(ret != 0) {
spdlog::error("failed to init zmq msg"); spdlog::error("evpusher {} failed to init zmq msg", selfId);
continue; continue;
} }
// receive packet // receive packet
ret = zmq_recvmsg(pSub, &msg, 0); ret = zmq_recvmsg(pSub, &msg, 0);
if(ret < 0) { if(ret < 0) {
spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret)); spdlog::error("evpusher {} failed to recv zmq msg: {}", selfId, zmq_strerror(ret));
continue; continue;
} }
...@@ -344,7 +339,7 @@ protected: ...@@ -344,7 +339,7 @@ protected:
ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet); ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet);
{ {
if (ret < 0) { if (ret < 0) {
spdlog::error("packet decode failed: {:d}", ret); spdlog::error("evpusher {} packet decode failed: {:d}", selfId, ret);
continue; continue;
} }
} }
...@@ -393,13 +388,6 @@ protected: ...@@ -393,13 +388,6 @@ protected:
} }
} }
av_write_trailer(pAVFormatRemux); av_write_trailer(pAVFormatRemux);
if(!bStopSig && ret < 0) {
//TOOD: reconnect
spdlog::error("TODO: failed, reconnecting");
}
else {
spdlog::error("exit on command");
}
} }
public: public:
......
...@@ -362,13 +362,13 @@ protected: ...@@ -362,13 +362,13 @@ protected:
int ret =zmq_msg_init(&msg); int ret =zmq_msg_init(&msg);
ret = zmq_recvmsg(pSub, &msg, 0); ret = zmq_recvmsg(pSub, &msg, 0);
if(ret < 0) { if(ret < 0) {
spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret)); spdlog::error("evslicer {} failed to recv zmq msg: {}",selfId, zmq_strerror(ret));
continue; continue;
} }
ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet); ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet);
{ {
if (ret < 0) { if (ret < 0) {
spdlog::error("packet decode failed: {:d}", ret); spdlog::error("evslicer {} packet decode failed: {}", selfId, ret);
continue; continue;
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论