提交 dfecb069 authored 作者: blu's avatar blu

init

上级 26e473c5
......@@ -83,19 +83,30 @@ private:
return ret;
}
void handleMsg(string body[]) {
void handleMsg(vector<string> body) {
zmq_msg_t msg;
// dump
string dump;
for(auto &i: body) {
dump += i + ";";
}
cout <<endl;
if(body.size() != 3) {
spdlog::error("evmgr {} illegal message received: {}", devSn, dump);
}
if(body[0] != devSn) {
for(int i =0; i < 3; i++) {
spdlog::info("evmgr {}, msg idRcv is {}, forwarding...", devSn, body[0]);
spdlog::info("evmgr {}, msg body is {}; {}; {}; forwarding...", devSn, body[0], body[1], body[2]);
zmq_msg_init(&msg);
zmq_msg_init_data(&msg, (void*)body[0].c_str(), body[0].size(), NULL, NULL);
mqErrorMsg("evmgr", devSn, "failed to send zmq msg", zmq_send_const(pRouter, zmq_msg_data(&msg), body[0].size(), i ==2?0:ZMQ_SNDMORE));
zmq_msg_init_data(&msg, (void*)body[i].c_str(), body[i].size(), NULL, NULL);
mqErrorMsg("evmgr", devSn, "failed to send zmq msg", zmq_send_const(pRouter, zmq_msg_data(&msg), body[i].size(), i ==2?0:ZMQ_SNDMORE));
zmq_msg_close(&msg);
}
}else{
// TODO: report msg
spdlog::info("evmgr {} subsystem report msg received: {} {} {}", devSn, body[0], body[1], body[2]);
spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, body[0], body[1], body[2]);
}
}
......@@ -109,29 +120,28 @@ protected:
bStopSig = true;
break;
}
string msgBody[3];
int64_t more = 0;
vector<string>body;
int64_t more = 1;
// business logic
int i = 0;
for(; i < 3; i++) {
int cnt = 0;
char *tmp;
while(more > 0) {
cnt++;
mqErrorMsg("evmgr", devSn, "failed to init zmq msg", zmq_msg_init(&msg));
mqErrorMsg("evmgr", devSn, "failed to recv zmq msg", zmq_recvmsg(pRouter, &msg, 0));
msgBody[i] = string((char *)zmq_msg_data(&msg));
ret = mqErrorMsg("evmgr", devSn, "failed to recv zmq msg", zmq_recvmsg(pRouter, &msg, 0));
tmp = new char[ret+1];
memcpy(tmp, zmq_msg_data(&msg), ret);
tmp[ret] = 0;
body.push_back(string(tmp));
delete tmp;
zmq_msg_close(&msg);
spdlog::debug("evmgr {} received[{}]: {} ", devSn, i, msgBody[i]);
spdlog::debug("evmgr {} received[{}]: {} ", devSn, cnt, body.back());
size_t more_size = sizeof (more);
mqErrorMsg("evmgr", devSn, "failed to get zmq sockopt", zmq_getsockopt(pRouter, ZMQ_RCVMORE, &more, &more_size));
if(!more) {
break;
}
}
if(i >= 3 ) {
// full proto msg received.
handleMsg(msgBody);
}else{
spdlog::warn("partial msg recved, maybe hello msg: {}, {}, {}", msgBody[0], msgBody[1], msgBody[2]);
}
handleMsg(body);
}
}
public:
......
......@@ -20,8 +20,18 @@ namespace fs = std::filesystem;
using namespace std;
int mqErrorMsg(string cls, string devSn, int iid, string extraInfo, int ret)
{
if(ret < 0) {
spdlog::error("{} {} {}, {}: {} ", cls, devSn, iid, extraInfo, zmq_strerror(zmq_errno()));
}
return ret;
}
class RepSrv: public TinyThread {
private:
string mgrSn;
string devSn;
int iid;
string urlRep;
......@@ -44,23 +54,6 @@ private:
// }
return 0;
}
public:
RepSrv() = delete;
RepSrv(RepSrv &) = delete;
RepSrv(RepSrv&&) = delete;
RepSrv(string devSn, int iid, const char* formatBytes, int len, void *pDealer):devSn(devSn), iid(iid), bytes(formatBytes), len(len), pDealer(pDealer)
{
init();
};
~RepSrv()
{
// if(pRep != NULL) {
// zmq_close(pRep);
// }
// if(pRepCtx != NULL) {
// zmq_ctx_destroy(pRepCtx);
// }
};
protected:
void run()
{
......@@ -70,6 +63,19 @@ protected:
zmq_msg_t msg1;
int ret =zmq_msg_init(&msg);
zmq_msg_init_data(&msg, (void*)bytes, len, NULL, NULL);
// declare ready to router
vector<string>body;
body.push_back(mgrSn);
body.push_back("hello");
int cnt = 0;
for(auto &i:body) {
zmq_msg_init(&msg1);
zmq_msg_init_data(&msg1, (void*)i.c_str(), i.size(), NULL, NULL);
mqErrorMsg("evpuller", devSn,iid, "failed to send zmq msg", zmq_send_const(pDealer, zmq_msg_data(&msg1), i.size(), cnt==(body.size()-1)?0:ZMQ_SNDMORE));
zmq_msg_close(&msg1);
cnt++;
}
while (true) {
if(checkStop() == true) {
bStopSig = true;
......@@ -87,6 +93,23 @@ protected:
zmq_send_const(pDealer, zmq_msg_data(&msg), len, 0);
}
}
public:
RepSrv() = delete;
RepSrv(RepSrv &) = delete;
RepSrv(RepSrv&&) = delete;
RepSrv(string mgrSn, string devSn, int iid, const char* formatBytes, int len, void *pDealer):mgrSn(mgrSn),devSn(devSn), iid(iid), bytes(formatBytes), len(len), pDealer(pDealer)
{
init();
};
~RepSrv()
{
// if(pRep != NULL) {
// zmq_close(pRep);
// }
// if(pRepCtx != NULL) {
// zmq_ctx_destroy(pRepCtx);
// }
};
};
class EvPuller: public TinyThread {
......@@ -96,19 +119,10 @@ private:
void *pDealerCtx = NULL;
void *pDealer = NULL;
AVFormatContext *pAVFormatInput = NULL;
string urlIn, urlPub, urlDealer, devSn;
string urlIn, urlPub, urlDealer, mgrSn, devSn;
int *streamList = NULL, numStreams = 0, iid;
json config;
int mqErrorMsg(string cls, string devSn, int iid, string extraInfo, int ret)
{
if(ret < 0) {
spdlog::error("{} {} {}, {}: {} ", cls, devSn, iid, extraInfo, zmq_strerror(zmq_errno()));
}
return ret;
}
int init()
{
bool inited = false;
......@@ -157,6 +171,7 @@ private:
continue;
}
mgrSn = evmgr["sn"];
string user = ipc["user"];
string passwd = ipc["password"];
urlIn = "rtsp://" + user + ":" + passwd + "@" + ipc["addr"].get<string>() + "/h264/ch1/sub/av_stream";
......@@ -217,7 +232,7 @@ protected:
// serialize formatctx to bytes
char *pBytes = NULL;
ret = AVFormatCtxSerializer::encode(pAVFormatInput, &pBytes);
auto repSrv = RepSrv(devSn, iid, pBytes, ret, pDealer);
auto repSrv = RepSrv(mgrSn, devSn, iid, pBytes, ret, pDealer);
repSrv.detach();
// find all video & audio streams for remuxing
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论