提交 025ff250 authored 作者: blu's avatar blu

init

上级 0bd1a31b
...@@ -88,25 +88,26 @@ private: ...@@ -88,25 +88,26 @@ private:
int ret = 0; int ret = 0;
zmq_msg_t msg; zmq_msg_t msg;
// ID_SENDER, ID_TARGET, MSG
if(body.size() != 3) {
spdlog::error("evmgr {} illegal message received, frame num: {}", devSn, body.size());
cout<<endl<<endl; cout<<endl<<endl;
for(auto &j:body) { for(auto &j:body) {
cout<<body2str(j) << "; "; cout<<body2str(j) << "; ";
} }
cout <<endl; cout <<endl;
// ID_SENDER, ID_TARGET, MSG
if(body.size() != 3) {
spdlog::error("evmgr {} illegal message received, frame num: {}", devSn, body.size());
return -1; return -1;
} }
// if need forward // if need forward
if(memcmp((void*)(body[1].data()), devSn.data(), body[1].size()) != 0) { if(memcmp((void*)(body[1].data()), devSn.data(), body[1].size()) != 0) {
spdlog::info("evmgr {} route msg from {} to {}", devSn, body2str(body[0]), body2str(body[1]));
vector<vector<uint8_t> >v; vector<vector<uint8_t> >v;
v.push_back(body[1]); v.push_back(body[1]);
v.push_back(body[0]); v.push_back(body[0]);
v.push_back(body[2]); v.push_back(body[2]);
ret = z_send_multiple(pRouter, v); ret = z_send_multiple(pRouter, v);
spdlog::info("evmgr {} route msg from {} to {}", devSn, body2str(body[0]), body2str(body[1]));
if(ret < 0) { if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno())); spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
} }
...@@ -128,9 +129,8 @@ protected: ...@@ -128,9 +129,8 @@ protected:
bStopSig = true; bStopSig = true;
break; break;
} }
vector<vector<uint8_t> >body; auto body = z_recv_multiple(pRouter);
ret = z_recv_multiple(pRouter, body); if(body.size() == 0) {
if(ret < 0) {
spdlog::error("evmgr {} failed to receive multiple msg: {}", devSn, zmq_strerror(zmq_errno())); spdlog::error("evmgr {} failed to receive multiple msg: {}", devSn, zmq_strerror(zmq_errno()));
continue; continue;
} }
......
...@@ -48,7 +48,7 @@ protected: ...@@ -48,7 +48,7 @@ protected:
vector<vector<uint8_t> >body; vector<vector<uint8_t> >body;
// since identity is auto set // since identity is auto set
body.push_back(str2body(mgrSn)); body.push_back(str2body(mgrSn));
body.push_back(str2body("hello")); body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
...@@ -65,20 +65,26 @@ protected: ...@@ -65,20 +65,26 @@ protected:
bStopSig = true; bStopSig = true;
break; break;
} }
vector<vector<uint8_t> > v;
spdlog::info("evpuller repSrv {} {} waiting for req", devSn, iid); spdlog::info("evpuller repSrv {} {} waiting for req", devSn, iid);
// proto: [sender] [body] // proto: [sender_id] [body]
ret = z_recv_multiple(pDealer, v); auto v = z_recv_multiple(pDealer);
if(v.size() == 0) {
//TODO:
}
cout << endl<<endl; cout << endl<<endl;
for(auto&j:v) { for(auto&j:v) {
cout <<body2str(j) << "; "; cout <<body2str(j) << "; ";
} }
cout << endl; cout << endl;
if(ret < 0|| v.size() !=2) { if(ret < 0) {
spdlog::error("evpuller {} {}, repSrv failed to recv msg: {}, {}", devSn, iid, v.size(), zmq_strerror(zmq_errno())); spdlog::error("evpuller {} {}, repSrv failed to recv msg: {}, {}", devSn, iid, v.size(), zmq_strerror(zmq_errno()));
continue; continue;
}else if(v.size() != 2) {
spdlog::error("evpuller {} {}, repSrv received invalid msg, size: {}", devSn, iid, v.size());
continue;
}else{ }else{
//
} }
...@@ -113,7 +119,7 @@ private: ...@@ -113,7 +119,7 @@ private:
void *pDealerCtx = NULL; void *pDealerCtx = NULL;
void *pDealer = NULL; void *pDealer = NULL;
AVFormatContext *pAVFormatInput = NULL; AVFormatContext *pAVFormatInput = NULL;
string urlIn, urlPub, urlDealer, mgrSn, devSn; string urlIn, urlPub, urlDealer, mgrSn, devSn, pullerGid;
int *streamList = NULL, numStreams = 0, iid; int *streamList = NULL, numStreams = 0, iid;
json config; json config;
...@@ -179,8 +185,8 @@ private: ...@@ -179,8 +185,8 @@ private:
ret = mqErrorMsg("evpuller", devSn, iid, "failed to bind zmq", zmq_bind(pPub, urlPub.c_str())); ret = mqErrorMsg("evpuller", devSn, iid, "failed to bind zmq", zmq_bind(pPub, urlPub.c_str()));
pDealerCtx = zmq_ctx_new(); pDealerCtx = zmq_ctx_new();
pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER); pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER);
string ident = devSn+":evpuller:" + to_string(iid); pullerGid = devSn+":evpuller:" + to_string(iid);
ret += mqErrorMsg("evpuller", devSn, iid, "failed to set socksopt", zmq_setsockopt(pDealer, ZMQ_IDENTITY, ident.c_str(), ident.size())); ret += mqErrorMsg("evpuller", devSn, iid, "failed to set socksopt", zmq_setsockopt(pDealer, ZMQ_IDENTITY, pullerGid.c_str(), pullerGid.size()));
ret += mqErrorMsg("evpuller", devSn, iid, "failed to connect to router " + urlDealer, zmq_connect(pDealer, urlDealer.c_str())); ret += mqErrorMsg("evpuller", devSn, iid, "failed to connect to router " + urlDealer, zmq_connect(pDealer, urlDealer.c_str()));
if(ret < 0) { if(ret < 0) {
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
......
...@@ -140,6 +140,19 @@ private: ...@@ -140,6 +140,19 @@ private:
return -4; return -4;
} }
// send hello to router
vector<vector<uint8_t> >body;
// since identity is auto set
body.push_back(str2body(mgrSn));
body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evpusher {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
//TODO:
return -1;
}
spdlog::info("evpusher {} {} success setupMq", devSn, iid); spdlog::info("evpusher {} {} success setupMq", devSn, iid);
return 0; return 0;
...@@ -173,31 +186,26 @@ private: ...@@ -173,31 +186,26 @@ private:
AVDictionary *pOptsRemux = NULL; AVDictionary *pOptsRemux = NULL;
// req avformatcontext packet // req avformatcontext packet
// send first packet to init connection // send hello to puller
// zmq_send(pDealer, "hello", 5, 0);
vector<vector<uint8_t> > body; vector<vector<uint8_t> > body;
// json msgBody;
body.push_back(str2body(pullerGid)); body.push_back(str2body(pullerGid));
cout << "\n\npullGid: " << pullerGid << endl; body.push_back(str2body(MSG_HELLO));
// msgBody["cmd"] = EVMGRCMD_REQ_FORMAT;
body.push_back(str2body("hello-puller"));
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {}, failed to send EVMGRCMD_REQ_FORMAT: {}", devSn, iid, zmq_strerror(zmq_errno())); spdlog::error("evpusher {} {}, failed to send hello: {}", devSn, iid, zmq_strerror(zmq_errno()));
return ret; return ret;
} }
spdlog::info("evpusher {} {} success send hello", devSn, iid); spdlog::info("evpusher {} {} success send hello", devSn, iid);
// expect response with avformatctx // expect response with avformatctx
body.clear(); auto v = z_recv_multiple(pDealer);
ret = z_recv_multiple(pDealer, body); if(v.size() != 3) {
if(ret < 0||body.size() != 3) { spdlog::error("evpusher {} {}, failed to receive avformatctx: {},{}", devSn, iid, v.size(), zmq_strerror(zmq_errno()));
spdlog::error("evpusher {} {}, failed to receive avformatctx: {},{}", devSn, iid, body.size(), zmq_strerror(zmq_errno()));
return ret; return ret;
} }
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext)); pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)body.back().data(), ret, pAVFormatInput); AVFormatCtxSerializer::decode((char *)v.back().data(), ret, pAVFormatInput);
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOut.c_str()); ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOut.c_str());
if (ret < 0) { if (ret < 0) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论