提交 0bd1a31b authored 作者: blu's avatar blu

init

上级 882cf044
...@@ -21,19 +21,19 @@ libzmq: ...@@ -21,19 +21,19 @@ libzmq:
sqlite3.o: vendor/sqlite/sqlite3.c sqlite3.o: vendor/sqlite/sqlite3.c
gcc -D SQLITE_THREADSAFE=1 -c vendor/sqlite/sqlite3.c gcc -D SQLITE_THREADSAFE=1 -c vendor/sqlite/sqlite3.c
evmgr: evmgr.cpp database.cpp inc/common.hpp inc/tinythread.hpp sqlite3.o evmgr: evmgr.cpp database.cpp inc/common.hpp inc/zmqhelper.hpp inc/tinythread.hpp sqlite3.o
$(CPP) $(CPPFLAGS) -o evmgr evmgr.cpp sqlite3.o database.cpp $(HEADERS) $(LIBFFMPEG) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) -o evmgr evmgr.cpp sqlite3.o database.cpp $(HEADERS) $(LIBFFMPEG) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evpuller: evpuller.cpp database.cpp inc/common.hpp inc/tinythread.hpp sqlite3.o evpuller: evpuller.cpp database.cpp inc/common.hpp inc/zmqhelper.hpp inc/tinythread.hpp sqlite3.o
$(CPP) $(CPPFLAGS) -o evpuller evpuller.cpp sqlite3.o database.cpp $(HEADERS) $(LIBFFMPEG) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) -o evpuller evpuller.cpp sqlite3.o database.cpp $(HEADERS) $(LIBFFMPEG) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evpusher: evpusher.cpp inc/common.hpp inc/tinythread.hpp database.cpp sqlite3.o evpusher: evpusher.cpp inc/common.hpp inc/tinythread.hpp inc/zmqhelper.hpp database.cpp sqlite3.o
$(CPP) $(CPPFLAGS) -o evpusher evpusher.cpp database.cpp sqlite3.o $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) -o evpusher evpusher.cpp database.cpp sqlite3.o $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evslicer: evslicer.cpp inc/common.hpp inc/tinythread.hpp database.cpp sqlite3.o evslicer: evslicer.cpp inc/common.hpp inc/tinythread.hpp inc/zmqhelper.hpp database.cpp sqlite3.o
$(CPP) $(CPPFLAGS) -o evslicer evslicer.cpp database.cpp sqlite3.o $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) -o evslicer evslicer.cpp database.cpp sqlite3.o $(LIBFFMPEG) $(HEADERS) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
evmlmotion: evmlmotion.cpp inc/common.hpp inc/avcvhelpers.hpp inc/tinythread.hpp database.cpp sqlite3.o evmlmotion: evmlmotion.cpp inc/common.hpp inc/avcvhelpers.hpp inc/zmqhelper.hpp inc/tinythread.hpp database.cpp sqlite3.o
$(CPP) $(CPPFLAGS) -o evmlmotion evmlmotion.cpp database.cpp sqlite3.o $(LIBFFMPEG) $(HEADERS) $(LIBOPENCV) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS) $(CPP) $(CPPFLAGS) -o evmlmotion evmlmotion.cpp database.cpp sqlite3.o $(LIBFFMPEG) $(HEADERS) $(LIBOPENCV) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc` $(LIBS)
rtspr: rtsp-relay.cpp rtspr: rtsp-relay.cpp
......
...@@ -91,12 +91,22 @@ private: ...@@ -91,12 +91,22 @@ private:
// ID_SENDER, ID_TARGET, MSG // ID_SENDER, ID_TARGET, MSG
if(body.size() != 3) { if(body.size() != 3) {
spdlog::error("evmgr {} illegal message received, frame num: {}", devSn, body.size()); spdlog::error("evmgr {} illegal message received, frame num: {}", devSn, body.size());
cout<<endl<<endl;
for(auto &j:body) {
cout<<body2str(j) << "; ";
}
cout <<endl;
return -1; return -1;
} }
// if need forward // if need forward
if(memcmp((void*)(body[1].data()), devSn.data(), body[1].size()) != 0) { if(memcmp((void*)(body[1].data()), devSn.data(), body[1].size()) != 0) {
ret = z_send_multiple(pRouter, body); vector<vector<uint8_t> >v;
v.push_back(body[1]);
v.push_back(body[0]);
v.push_back(body[2]);
ret = z_send_multiple(pRouter, v);
spdlog::info("evmgr {} route msg from {} to {}", devSn, body2str(body[0]), body2str(body[1]));
if(ret < 0) { if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno())); spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
} }
......
...@@ -44,15 +44,6 @@ protected: ...@@ -44,15 +44,6 @@ protected:
{ {
int ret = 0; int ret = 0;
bool bStopSig = false; bool bStopSig = false;
zmq_msg_t msg;
zmq_msg_t msg1;
ret = zmq_msg_init(&msg);
ret += zmq_msg_init_data(&msg, (void*)bytes, len, NULL, NULL);
if(ret < 0) {
spdlog::error("evpuller {} {} failed to init msg: {}", devSn, iid, zmq_strerror(zmq_errno()));
return;
}
// declare ready to router // declare ready to router
vector<vector<uint8_t> >body; vector<vector<uint8_t> >body;
// since identity is auto set // since identity is auto set
...@@ -66,21 +57,42 @@ protected: ...@@ -66,21 +57,42 @@ protected:
return; return;
} }
// init response msg
vector<uint8_t> msgBody;
msgBody.insert(msgBody.end(), (uint8_t *)bytes, (uint8_t *)bytes+len);
while (true) { while (true) {
if(checkStop() == true) { if(checkStop() == true) {
bStopSig = true; bStopSig = true;
break; break;
} }
spdlog::info("evpuller reqSrv {} {} waiting for req", devSn, iid); vector<vector<uint8_t> > v;
ret =zmq_msg_init(&msg1); spdlog::info("evpuller repSrv {} {} waiting for req", devSn, iid);
ret = zmq_recvmsg(pDealer, &msg1, 0); // proto: [sender] [body]
if(ret < 0) { ret = z_recv_multiple(pDealer, v);
spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret)); cout << endl<<endl;
for(auto&j:v) {
cout <<body2str(j) << "; ";
}
cout << endl;
if(ret < 0|| v.size() !=2) {
spdlog::error("evpuller {} {}, repSrv failed to recv msg: {}, {}", devSn, iid, v.size(), zmq_strerror(zmq_errno()));
continue; continue;
}else{
} }
zmq_msg_close(&msg1);
spdlog::info("evpuller {} {} reveived req", devSn, iid);
zmq_send_const(pDealer, zmq_msg_data(&msg), len, 0); // vector<uint8_t> v;
// ret =zmq_msg_init(&msg1);
// ret = zmq_recvmsg(pDealer, &msg1, 0);
// if(ret < 0) {
// spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret));
// continue;
// }
// zmq_msg_close(&msg1);
// spdlog::info("evpuller {} {} reveived req", devSn, iid);
// zmq_send_const(pDealer, zmq_msg_data(&data), len, 0);
} }
} }
public: public:
......
...@@ -22,12 +22,13 @@ namespace fs = std::filesystem; ...@@ -22,12 +22,13 @@ namespace fs = std::filesystem;
#define MAX_ZMQ_MSG_SIZE 1204 * 1024 * 2 #define MAX_ZMQ_MSG_SIZE 1204 * 1024 * 2
using namespace std; using namespace std;
using namespace zmqhelper;
class EvPusher: public TinyThread { class EvPusher: public TinyThread {
private: private:
void *pSubCtx = NULL, *pDealerCtx = NULL; // for packets relay void *pSubCtx = NULL, *pDealerCtx = NULL; // for packets relay
void *pSub = NULL, *pDealer = NULL; void *pSub = NULL, *pDealer = NULL;
string urlOut, urlPub, urlDealer, devSn, pullerGid, mgrSn; string urlOut, urlPub, urlDealer, devSn, pullerGid, mgrSn, pusherGid;
int iid; int iid;
bool enablePush = false; bool enablePush = false;
int *streamList = NULL; int *streamList = NULL;
...@@ -41,6 +42,7 @@ private: ...@@ -41,6 +42,7 @@ private:
// TODO: read db to get devSn // TODO: read db to get devSn
devSn = "ILSEVPUSHER1"; devSn = "ILSEVPUSHER1";
iid = 1; iid = 1;
pusherGid = devSn + ":evpusher:" + to_string(iid);
while(!inited) { while(!inited) {
// TODO: req config // TODO: req config
bool found = false; bool found = false;
...@@ -59,7 +61,7 @@ private: ...@@ -59,7 +61,7 @@ private:
for(auto &j: ipcs) { for(auto &j: ipcs) {
json pullers = j["modules"]["evpusher"]; json pullers = j["modules"]["evpusher"];
for(auto &p:pullers) { for(auto &p:pullers) {
if(p["devSn"] == devSn && p["iid"] == iid) { if(p["sn"] == devSn && p["iid"] == iid) {
evpusher = p; evpusher = p;
break; break;
} }
...@@ -77,7 +79,7 @@ private: ...@@ -77,7 +79,7 @@ private:
} }
if(!found) { if(!found) {
spdlog::error("evpusher {} {} no valid config found. retrying load config...", devSn, iid); spdlog::error("evpusher {} {}: no valid config found. retrying load config...", devSn, iid);
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
continue; continue;
} }
...@@ -114,24 +116,28 @@ private: ...@@ -114,24 +116,28 @@ private:
pSub = zmq_socket(pSubCtx, ZMQ_SUB); pSub = zmq_socket(pSubCtx, ZMQ_SUB);
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0); ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
if(ret != 0) { if(ret != 0) {
spdlog::error("evpusher failed connect to pub: {}, {}", devSn, iid); spdlog::error("evpusher {} {} failed set setsockopt: {}", devSn, iid, urlPub);
return -1; return -1;
} }
ret = zmq_connect(pSub, urlPub.c_str()); ret = zmq_connect(pSub, urlPub.c_str());
if(ret != 0) { if(ret != 0) {
spdlog::error("evpusher {} {} failed create sub", devSn, iid); spdlog::error("evpusher {} {} failed connect pub: {}", devSn, iid, urlPub);
return -2; return -2;
} }
// setup req // setup req
pDealerCtx = zmq_ctx_new(); pDealerCtx = zmq_ctx_new();
pDealer = zmq_socket(pDealerCtx, ZMQ_REQ); pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER);
spdlog::info("evpusher {} {} try create req to {}", devSn, iid, urlDealer); spdlog::info("evpusher {} {} try create req to {}", devSn, iid, urlDealer);
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, pusherGid.c_str(), pusherGid.size());
if(ret < 0) {
spdlog::error("evpusher {} {} failed setsockopts router: {}", devSn, iid, urlDealer);
return -3;
}
ret = zmq_connect(pDealer, urlDealer.c_str()); ret = zmq_connect(pDealer, urlDealer.c_str());
if(ret != 0) { if(ret != 0) {
spdlog::error("evpusher {} {} failed create req to {}", devSn, iid, urlDealer); spdlog::error("evpusher {} {} failed connect dealer: {}", devSn, iid, urlDealer);
return -3; return -4;
} }
spdlog::info("evpusher {} {} success setupMq", devSn, iid); spdlog::info("evpusher {} {} success setupMq", devSn, iid);
...@@ -168,38 +174,31 @@ private: ...@@ -168,38 +174,31 @@ private:
// req avformatcontext packet // req avformatcontext packet
// send first packet to init connection // send first packet to init connection
zmq_msg_t msg; // zmq_send(pDealer, "hello", 5, 0);
zmq_send(pDealer, "hello", 5, 0); vector<vector<uint8_t> > body;
spdlog::info("evpusher {} {} success send hello", devSn, iid); // json msgBody;
ret =zmq_msg_init(&msg); body.push_back(str2body(pullerGid));
if(ret != 0) { cout << "\n\npullGid: " << pullerGid << endl;
spdlog::error("failed to init zmq msg"); // msgBody["cmd"] = EVMGRCMD_REQ_FORMAT;
exit(1); body.push_back(str2body("hello-puller"));
} ret = z_send_multiple(pDealer, body);
// receive packet
ret = zmq_recvmsg(pDealer, &msg, 0);
spdlog::info("evpusher {} {} recv", devSn, iid);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {} failed to recv zmq msg: {}", devSn, iid, zmq_strerror(ret)); spdlog::error("evpusher {} {}, failed to send EVMGRCMD_REQ_FORMAT: {}", devSn, iid, zmq_strerror(zmq_errno()));
exit(1); return ret;
} }
spdlog::info("evpusher {} {} success send hello", devSn, iid);
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext)); // expect response with avformatctx
AVFormatCtxSerializer::decode((char *)zmq_msg_data(&msg), ret, pAVFormatInput); body.clear();
ret = z_recv_multiple(pDealer, body);
// close req if(ret < 0||body.size() != 3) {
{ spdlog::error("evpusher {} {}, failed to receive avformatctx: {},{}", devSn, iid, body.size(), zmq_strerror(zmq_errno()));
zmq_msg_close(&msg); return ret;
if(pDealer != NULL) {
zmq_close(pDealer);
pDealer = NULL;
}
if(pDealerCtx != NULL) {
zmq_ctx_destroy(pDealerCtx);
pDealerCtx = NULL;
}
} }
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)body.back().data(), ret, pAVFormatInput);
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOut.c_str()); ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOut.c_str());
if (ret < 0) { if (ret < 0) {
spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret)); spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret));
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论