提交 8f5849ed authored 作者: blu's avatar blu

refactor of delta config

上级 809746d4
...@@ -98,6 +98,8 @@ private: ...@@ -98,6 +98,8 @@ private:
unique_lock<mutex> lk(this->mutMsg); unique_lock<mutex> lk(this->mutMsg);
this->cvMsg.wait(lk, [this] {return this->gotFormat;}); this->cvMsg.wait(lk, [this] {return this->gotFormat;});
} }
spdlog::info("evpuller {} got inputformat", selfId);
auto msgBody = data2body(const_cast<char*>(pAVFmtCtxBytes), lenAVFmtCtxBytes); auto msgBody = data2body(const_cast<char*>(pAVFmtCtxBytes), lenAVFmtCtxBytes);
try { try {
// rep framectx // rep framectx
...@@ -298,32 +300,16 @@ protected: ...@@ -298,32 +300,16 @@ protected:
} }
// serialize formatctx to bytes // serialize formatctx to bytes
lock_guard<mutex> lock(this->mutMsg); {
lenAVFmtCtxBytes = AVFormatCtxSerializer::encode(pAVFormatInput, &pAVFmtCtxBytes); lock_guard<mutex> lock(this->mutMsg);
if(lenAVFmtCtxBytes <= 0 || pAVFmtCtxBytes == nullptr) { lenAVFmtCtxBytes = AVFormatCtxSerializer::encode(pAVFormatInput, &pAVFmtCtxBytes);
spdlog::error("evpuller {} failed to pull packet from {}. exiting...", selfId, urlIn); if(lenAVFmtCtxBytes <= 0 || pAVFmtCtxBytes == nullptr) {
exit(1); spdlog::error("evpuller {} failed to pull packet from {}. exiting...", selfId, urlIn);
exit(1);
}
gotFormat = true;
cvMsg.notify_one();
} }
gotFormat = true;
cvMsg.notify_one();
thEdgeMsgHandler = thread([this]{
while(true) {
auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// full proto msg received.
string msg;
for(auto &v: body) {
msg += body2str(v);
}
spdlog::info("evpuller {} received edge msg: {}", selfId, msg);
this->handleEdgeMsg(body);
}
});
thEdgeMsgHandler.detach();
// find all video & audio streams for remuxing // find all video & audio streams for remuxing
int i = 0, streamIdx = 0; int i = 0, streamIdx = 0;
...@@ -436,6 +422,25 @@ public: ...@@ -436,6 +422,25 @@ public:
} }
init(); init();
thEdgeMsgHandler = thread([this]{
while(true) {
auto body = z_recv_multiple(pDealer,false);
if(body.size() == 0) {
spdlog::error("evslicer {} failed to receive multiple msg: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// full proto msg received.
string msg;
for(auto &v: body) {
msg += body2str(v) + ",";
}
spdlog::info("evpuller {} received edge msg: {}", selfId, msg);
this->handleEdgeMsg(body);
}
});
thEdgeMsgHandler.detach();
thCloudMsgHandler = thread([this]{ thCloudMsgHandler = thread([this]{
while(true) { while(true) {
auto body = z_recv_multiple(pDaemon,false); auto body = z_recv_multiple(pDaemon,false);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论