提交 c8c148d7 authored 作者: blu's avatar blu

init

上级 dc647c7b
...@@ -20,12 +20,74 @@ namespace fs = std::filesystem; ...@@ -20,12 +20,74 @@ namespace fs = std::filesystem;
using namespace std; using namespace std;
class RepSrv: public TinyThread {
private:
string urlRep;
void *pRepCtx = NULL; // for packets REP
void *pRep = NULL;
const char * bytes;
int len;
int teardownMq()
{
if(pRep != NULL) {
zmq_close(pRep);
}
if(pRepCtx != NULL) {
zmq_ctx_destroy(pRepCtx);
}
return 0;
}
int setupMq(){
int ret = 0;
pRepCtx = zmq_ctx_new();
pRep = zmq_socket(pRepCtx, ZMQ_REP);
ret = zmq_bind(pRep, urlRep.c_str());
if(ret < 0) {
spdlog::error("failed to bind rep: {}, {}", zmq_strerror(ret), urlRep.c_str());
this_thread::sleep_for(chrono::seconds(20));
return -1;
}
return 0;
}
public:
RepSrv() = delete;
RepSrv(RepSrv &) = delete;
RepSrv(RepSrv&&) = delete;
RepSrv(string urlRep, const char* formatBytes, int len):urlRep(urlRep), bytes(formatBytes), len(len){};
~RepSrv(){};
protected:
void run(){
bool bStopSig = false;
if(setupMq() != 0) {
exit(1);
}
zmq_msg_t msg;
zmq_msg_t msg1;
int ret =zmq_msg_init(&msg);
zmq_msg_init_data(&msg, (void*)bytes, len, NULL, NULL);
while (true) {
if(checkStop() == true) {
bStopSig = true;
break;
}
int ret =zmq_msg_init(&msg1);
ret = zmq_recvmsg(pRep, &msg1, 0);
if(ret < 0) {
spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret));
continue;
}
zmq_msg_close(&msg1);
zmq_send_const(pRep, zmq_msg_data(&msg), len, 0);
}
}
};
class EvPuller: public TinyThread { class EvPuller: public TinyThread {
private: private:
void *pPubCtx = NULL; // for packets publishing void *pPubCtx = NULL; // for packets publishing
void *pPub = NULL; void *pPub = NULL;
void *pRepCtx = NULL; // for packets REP
void *pRep = NULL;
AVFormatContext *pAVFormatInput = NULL; AVFormatContext *pAVFormatInput = NULL;
string urlIn, urlPub; string urlIn, urlPub;
int *streamList = NULL, numStreams = 0; int *streamList = NULL, numStreams = 0;
...@@ -34,14 +96,16 @@ public: ...@@ -34,14 +96,16 @@ public:
EvPuller() EvPuller()
{ {
int ret = 0; int ret = 0;
do { init();
init(); ret = setupMq();
ret = setupMq(); if(ret != 0) {
}while(ret < 0); exit(1);
}
} }
~EvPuller() ~EvPuller()
{ {
teardownMq();
} }
protected: protected:
...@@ -80,6 +144,8 @@ protected: ...@@ -80,6 +144,8 @@ protected:
streamList[i] = streamIdx++; streamList[i] = streamIdx++;
} }
bool bStopSig = false; bool bStopSig = false;
int pktCnt = 0; int pktCnt = 0;
while (true) { while (true) {
......
...@@ -168,7 +168,7 @@ protected: ...@@ -168,7 +168,7 @@ protected:
// receive packet // receive packet
ret = zmq_recvmsg(pSubscriber, &msg, 0); ret = zmq_recvmsg(pSubscriber, &msg, 0);
if(ret < 0) { if(ret < 0) {
spdlog::error("failed to recv zmq msg"); spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret));
continue; continue;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论