提交 0ca29392 authored 作者: blu's avatar blu

init

上级 1afbaa51
...@@ -94,8 +94,8 @@ private: ...@@ -94,8 +94,8 @@ private:
} }
cout <<endl; cout <<endl;
// ID_SENDER, ID_TARGET, MSG // ID_SENDER, ID_TARGET, meta ,MSG
if(body.size() != 3) { if(body.size() != 4) {
spdlog::warn("evmgr {} dropped a message, since its size is incorrect: {}", devSn, body.size()); spdlog::warn("evmgr {} dropped a message, since its size is incorrect: {}", devSn, body.size());
return 0; return 0;
} }
...@@ -107,6 +107,7 @@ private: ...@@ -107,6 +107,7 @@ private:
v.push_back(body[1]); v.push_back(body[1]);
v.push_back(body[0]); v.push_back(body[0]);
v.push_back(body[2]); v.push_back(body[2]);
v.push_back(body[3]);
ret = z_send_multiple(pRouter, v); ret = z_send_multiple(pRouter, v);
if(ret < 0) { if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno())); spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
......
...@@ -48,6 +48,7 @@ protected: ...@@ -48,6 +48,7 @@ protected:
vector<vector<uint8_t> >body; vector<vector<uint8_t> >body;
// since identity is auto set // since identity is auto set
body.push_back(str2body(mgrSn + ":0:0")); body.push_back(str2body(mgrSn + ":0:0"));
body.push_back(str2body("")); // blank meta
body.push_back(str2body(MSG_HELLO)); body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
...@@ -58,8 +59,7 @@ protected: ...@@ -58,8 +59,7 @@ protected:
} }
// init response msg // init response msg
vector<uint8_t> msgBody; auto msgBody = data2body(const_cast<char*>(bytes), len);
msgBody.insert(msgBody.end(), (uint8_t *)bytes, (uint8_t *)bytes+len);
while (true) { while (true) {
if(checkStop() == true) { if(checkStop() == true) {
bStopSig = true; bStopSig = true;
...@@ -67,38 +67,37 @@ protected: ...@@ -67,38 +67,37 @@ protected:
} }
spdlog::info("evpuller repSrv {} {} waiting for req", devSn, iid); spdlog::info("evpuller repSrv {} {} waiting for req", devSn, iid);
// proto: [sender_id] [body] // proto: [sender_id] [meta] [body]
auto v = z_recv_multiple(pDealer); auto v = z_recv_multiple(pDealer);
if(v.size() == 0) { if(v.size() != 3) {
//TODO: //TODO:
spdlog::error("evpuller {} {}, repSrv received invalid message: {}", devSn, iid, v.size());
continue;
} }
cout << endl<<endl; cout << endl<<endl;
for(auto&j:v) { for(auto&j:v) {
cout <<body2str(j) << "; "; cout <<body2str(j) << "; ";
} }
cout << endl; cout << endl;
if(ret < 0) { try{
spdlog::error("evpuller {} {}, repSrv failed to recv msg: {}, {}", devSn, iid, v.size(), zmq_strerror(zmq_errno())); // rep framectx
continue; // TODO: verify sender id
}else if(v.size() != 2) { auto meta = json::parse(body2str(v[1]));
spdlog::error("evpuller {} {}, repSrv received invalid msg, size: {}", devSn, iid, v.size()); if(meta["type"].get<string>() == EV_PACKET_TYPE_AVFORMATCTX) {
continue; vector<vector<uint8_t> > rep;
}else{ rep.push_back(v[0]);
// rep.push_back(v[1]);
rep.push_back(msgBody);
ret = z_send_multiple(pDealer, rep);
if(ret < 0) {
spdlog::error("evpuller {} {} failed send rep to requester {}: {}", devSn, iid, body2str(v[0]), zmq_strerror(zmq_errno()));
}
}else{
spdlog::error("evpuller {} {} unknown meta from {}: {}", devSn, iid, body2str(v[0]), body2str(v[1]));
}
}catch(exception &e) {
spdlog::error("evpuller {} {} excpetion parse request from {}: {}", devSn, iid, body2str(v[0]), body2str(v[1]));
} }
// vector<uint8_t> v;
// ret =zmq_msg_init(&msg1);
// ret = zmq_recvmsg(pDealer, &msg1, 0);
// if(ret < 0) {
// spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret));
// continue;
// }
// zmq_msg_close(&msg1);
// spdlog::info("evpuller {} {} reveived req", devSn, iid);
// zmq_send_const(pDealer, zmq_msg_data(&data), len, 0);
} }
} }
public: public:
......
...@@ -144,6 +144,7 @@ private: ...@@ -144,6 +144,7 @@ private:
vector<vector<uint8_t> >body; vector<vector<uint8_t> >body;
// since identity is auto set // since identity is auto set
body.push_back(str2body(mgrSn+":0:0")); body.push_back(str2body(mgrSn+":0:0"));
body.push_back(str2body("")); // blank meta
body.push_back(str2body(MSG_HELLO)); body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
...@@ -169,24 +170,40 @@ private: ...@@ -169,24 +170,40 @@ private:
spdlog::info("evpusher {} {} send hello to puller: {}", devSn, iid, pullerGid); spdlog::info("evpusher {} {} send hello to puller: {}", devSn, iid, pullerGid);
vector<vector<uint8_t> > body; vector<vector<uint8_t> > body;
body.push_back(str2body(pullerGid)); body.push_back(str2body(pullerGid));
json meta;
meta["type"] = EV_PACKET_TYPE_AVFORMATCTX;
body.push_back(str2body(meta.dump()));
body.push_back(str2body(MSG_HELLO)); body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body); bool gotFormat = false;
if(ret < 0) { while(!gotFormat) {
spdlog::error("evpusher {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno())); ret = z_send_multiple(pDealer, body);
return ret; if(ret < 0) {
} spdlog::error("evpusher {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno()));
spdlog::info("evpusher {} {} success send hello", devSn, iid); continue;
}
// expect response with avformatctx spdlog::info("evpusher {} {} success send hello", devSn, iid);
auto v = z_recv_multiple(pDealer);
if(v.size() != 3) { // expect response with avformatctx
spdlog::error("evpusher {} {}, failed to receive avformatctx: {},{}", devSn, iid, v.size(), zmq_strerror(zmq_errno())); auto v = z_recv_multiple(pDealer);
return ret; if(v.size() != 3) {
spdlog::error("evpusher {} {}, received bad size zmq msg for avformatctx: {}", devSn, iid, v.size());
}else if(body2str(v[0]) != pullerGid) {
spdlog::error("evpusher {} {}, invalid sender for avformatctx: {}, should be: {}", devSn, iid, body2str(v[0]), pullerGid);
}else{
try{
auto cmd = json::parse(body2str(v[1]));
if(cmd["type"].get<string>() == EV_PACKET_TYPE_AVFORMATCTX){
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
}
}catch(exception &e) {
spdlog::error("evpusher {} {}, exception in parsing avformatctx packet: {}", devSn, iid, e.what());
}
}
} }
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)v.back().data(), ret, pAVFormatInput);
//
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOut.c_str()); ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOut.c_str());
if (ret < 0) { if (ret < 0) {
spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret)); spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret));
......
...@@ -230,7 +230,7 @@ int decode(char *bytes, int len, AVFormatContext *pCtx) ...@@ -230,7 +230,7 @@ int decode(char *bytes, int len, AVFormatContext *pCtx)
memcpy(&ret, bytes +len -strlen(PS_MARK_E) - sizeof(ret), sizeof(ret)); memcpy(&ret, bytes +len -strlen(PS_MARK_E) - sizeof(ret), sizeof(ret));
if ((memcmp(PS_MARK_S, bytes + got, strlen(PS_MARK_S)) != 0 && memcmp(PS_MARK_E, bytes + len - strlen(PS_MARK_E), strlen(PS_MARK_E)) != 0)||ret != len) if ((memcmp(PS_MARK_S, bytes + got, strlen(PS_MARK_S)) != 0 && memcmp(PS_MARK_E, bytes + len - strlen(PS_MARK_E), strlen(PS_MARK_E)) != 0)||ret != len)
{ {
spdlog::error("invalid packet: {} {}", ret, len); spdlog::error("invalid avformatctx: {} {}", ret, len);
return -1; return -1;
} }
spdlog::info("decode len: {}", ret); spdlog::info("decode len: {}", ret);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论