提交 b4f34e71 authored 作者: blu's avatar blu

init

上级 dfecb069
...@@ -7,7 +7,7 @@ LIBOPENCV = `pkg-config opencv --cflags --libs` ...@@ -7,7 +7,7 @@ LIBOPENCV = `pkg-config opencv --cflags --libs`
LIBFFMPEG = `pkg-config libavformat libavutil libavcodec libswscale --cflags --libs` LIBFFMPEG = `pkg-config libavformat libavutil libavcodec libswscale --cflags --libs`
LIBS=-lpthread LIBS=-lpthread
SQLITE=vendor/sqlite/sqlite3.c SQLITE=vendor/sqlite/sqlite3.c
HEADERS=-Iinc HEADERS=-Iinc -Ivendor/include
.PHONY: libzmq .PHONY: libzmq
all: evmgr evpuller evpusher evslicer evmlmotion all: evmgr evpuller evpusher evslicer evmlmotion
......
...@@ -13,12 +13,13 @@ ...@@ -13,12 +13,13 @@
namespace fs = std::filesystem; namespace fs = std::filesystem;
#endif #endif
#include "vendor/include/zmq.h" #include "zmqhelper.hpp"
#include "tinythread.hpp" #include "tinythread.hpp"
#include "common.hpp" #include "common.hpp"
#include "database.h" #include "database.h"
using namespace std; using namespace std;
using namespace zmqhelper;
/** /**
* functions: * functions:
...@@ -83,31 +84,28 @@ private: ...@@ -83,31 +84,28 @@ private:
return ret; return ret;
} }
void handleMsg(vector<string> body) { int handleMsg(vector<vector<uint8_t> > &body) {
int ret = 0;
zmq_msg_t msg; zmq_msg_t msg;
// dump
string dump;
for(auto &i: body) {
dump += i + ";";
}
cout <<endl;
// ID_SENDER, ID_TARGET, MSG
if(body.size() != 3) { if(body.size() != 3) {
spdlog::error("evmgr {} illegal message received: {}", devSn, dump); spdlog::error("evmgr {} illegal message received, frame num: {}", devSn, body.size());
return -1;
} }
if(body[0] != devSn) { // if need forward
for(int i =0; i < 3; i++) { if(memcmp((void*)(body[1].data()), devSn.data(), body[1].size()) != 0) {
spdlog::info("evmgr {}, msg body is {}; {}; {}; forwarding...", devSn, body[0], body[1], body[2]); ret = z_send_multiple(pRouter, body);
zmq_msg_init(&msg); if(ret < 0) {
zmq_msg_init_data(&msg, (void*)body[i].c_str(), body[i].size(), NULL, NULL); spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
mqErrorMsg("evmgr", devSn, "failed to send zmq msg", zmq_send_const(pRouter, zmq_msg_data(&msg), body[i].size(), i ==2?0:ZMQ_SNDMORE));
zmq_msg_close(&msg);
} }
}else{ }else{
// TODO: report msg // TODO: report msg
spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, body[0], body[1], body[2]); spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
} }
return ret;
} }
protected: protected:
...@@ -120,25 +118,11 @@ protected: ...@@ -120,25 +118,11 @@ protected:
bStopSig = true; bStopSig = true;
break; break;
} }
vector<string>body; vector<vector<uint8_t> >body;
int64_t more = 1; ret = z_recv_multiple(pRouter, body);
// business logic if(ret < 0) {
int cnt = 0; spdlog::error("evmgr {} failed to receive multiple msg: {}", devSn, zmq_strerror(zmq_errno()));
char *tmp; continue;
while(more > 0) {
cnt++;
mqErrorMsg("evmgr", devSn, "failed to init zmq msg", zmq_msg_init(&msg));
ret = mqErrorMsg("evmgr", devSn, "failed to recv zmq msg", zmq_recvmsg(pRouter, &msg, 0));
tmp = new char[ret+1];
memcpy(tmp, zmq_msg_data(&msg), ret);
tmp[ret] = 0;
body.push_back(string(tmp));
delete tmp;
zmq_msg_close(&msg);
spdlog::debug("evmgr {} received[{}]: {} ", devSn, cnt, body.back());
size_t more_size = sizeof (more);
mqErrorMsg("evmgr", devSn, "failed to get zmq sockopt", zmq_getsockopt(pRouter, ZMQ_RCVMORE, &more, &more_size));
} }
// full proto msg received. // full proto msg received.
handleMsg(body); handleMsg(body);
......
...@@ -13,12 +13,13 @@ ...@@ -13,12 +13,13 @@
namespace fs = std::filesystem; namespace fs = std::filesystem;
#endif #endif
#include "vendor/include/zmq.h" #include <zmqhelper.hpp>
#include "tinythread.hpp" #include "tinythread.hpp"
#include "common.hpp" #include "common.hpp"
#include "database.h" #include "database.h"
using namespace std; using namespace std;
using namespace zmqhelper;
int mqErrorMsg(string cls, string devSn, int iid, string extraInfo, int ret) int mqErrorMsg(string cls, string devSn, int iid, string extraInfo, int ret)
{ {
...@@ -38,42 +39,31 @@ private: ...@@ -38,42 +39,31 @@ private:
const char * bytes; const char * bytes;
int len; int len;
void *pDealer=NULL; void *pDealer=NULL;
// void *pRepCtx = NULL; // for packets REP
// void *pRep = NULL;
int init()
{
// int ret = 0;
// pRepCtx = zmq_ctx_new();
// pRep = zmq_socket(pRepCtx, ZMQ_REP);
// ret = zmq_bind(pRep, urlRep.c_str());
// if(ret < 0) {
// spdlog::error("failed to bind rep: {}, {}", zmq_strerror(ret), urlRep.c_str());
// this_thread::sleep_for(chrono::seconds(20));
// return -1;
// }
return 0;
}
protected: protected:
void run() void run()
{ {
int ret = 0;
bool bStopSig = false; bool bStopSig = false;
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_t msg1; zmq_msg_t msg1;
int ret =zmq_msg_init(&msg); ret = zmq_msg_init(&msg);
zmq_msg_init_data(&msg, (void*)bytes, len, NULL, NULL); ret += zmq_msg_init_data(&msg, (void*)bytes, len, NULL, NULL);
if(ret < 0) {
spdlog::error("evpuller {} {} failed to init msg: {}", devSn, iid, zmq_strerror(zmq_errno()));
return;
}
// declare ready to router // declare ready to router
vector<string>body; vector<vector<uint8_t> >body;
body.push_back(mgrSn); // since identity is auto set
body.push_back("hello"); body.push_back(str2body(mgrSn));
int cnt = 0; body.push_back(str2body("hello"));
for(auto &i:body) {
zmq_msg_init(&msg1); ret = z_send_multiple(pDealer, body);
zmq_msg_init_data(&msg1, (void*)i.c_str(), i.size(), NULL, NULL); if(ret < 0) {
mqErrorMsg("evpuller", devSn,iid, "failed to send zmq msg", zmq_send_const(pDealer, zmq_msg_data(&msg1), i.size(), cnt==(body.size()-1)?0:ZMQ_SNDMORE)); spdlog::error("evpuller {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
zmq_msg_close(&msg1); //TODO:
cnt++; return;
} }
while (true) { while (true) {
...@@ -82,7 +72,7 @@ protected: ...@@ -82,7 +72,7 @@ protected:
break; break;
} }
spdlog::info("evpuller reqSrv {} {} waiting for req", devSn, iid); spdlog::info("evpuller reqSrv {} {} waiting for req", devSn, iid);
int ret =zmq_msg_init(&msg1); ret =zmq_msg_init(&msg1);
ret = zmq_recvmsg(pDealer, &msg1, 0); ret = zmq_recvmsg(pDealer, &msg1, 0);
if(ret < 0) { if(ret < 0) {
spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret)); spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret));
...@@ -97,19 +87,11 @@ public: ...@@ -97,19 +87,11 @@ public:
RepSrv() = delete; RepSrv() = delete;
RepSrv(RepSrv &) = delete; RepSrv(RepSrv &) = delete;
RepSrv(RepSrv&&) = delete; RepSrv(RepSrv&&) = delete;
RepSrv(string mgrSn, string devSn, int iid, const char* formatBytes, int len, void *pDealer):mgrSn(mgrSn),devSn(devSn), iid(iid), bytes(formatBytes), len(len), pDealer(pDealer) RepSrv(string mgrSn, string devSn, int iid, const char* formatBytes,
{ int len, void *pDealer):mgrSn(mgrSn),devSn(devSn), iid(iid), bytes(formatBytes),
init(); len(len), pDealer(pDealer) {};
};
~RepSrv() ~RepSrv() {};
{
// if(pRep != NULL) {
// zmq_close(pRep);
// }
// if(pRepCtx != NULL) {
// zmq_ctx_destroy(pRepCtx);
// }
};
}; };
class EvPuller: public TinyThread { class EvPuller: public TinyThread {
......
...@@ -370,4 +370,5 @@ json registry(const char *sn, const char *scn, int iid) ...@@ -370,4 +370,5 @@ json registry(const char *sn, const char *scn, int iid)
} }
} // namespace cloudutils } // namespace cloudutils
#endif #endif
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论