提交 a8c826da authored 作者: blu's avatar blu

init

上级 b4f34e71
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
namespace fs = std::filesystem; namespace fs = std::filesystem;
#endif #endif
#include <zmqhelper.hpp> #include "zmqhelper.hpp"
#include "tinythread.hpp" #include "tinythread.hpp"
#include "common.hpp" #include "common.hpp"
#include "database.h" #include "database.h"
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
namespace fs = std::filesystem; namespace fs = std::filesystem;
#endif #endif
#include "vendor/include/zmq.h" #include "zmqhelper.hpp"
#include "tinythread.hpp" #include "tinythread.hpp"
#include "common.hpp" #include "common.hpp"
#include "database.h" #include "database.h"
...@@ -25,14 +25,15 @@ using namespace std; ...@@ -25,14 +25,15 @@ using namespace std;
class EvPusher: public TinyThread { class EvPusher: public TinyThread {
private: private:
void *pSubCtx = NULL, *pReqCtx = NULL; // for packets relay void *pSubCtx = NULL, *pDealerCtx = NULL; // for packets relay
void *pSub = NULL, *pReq = NULL; void *pSub = NULL, *pDealer = NULL;
string urlOut, urlPub, urlRep, sn; string urlOut, urlPub, urlRep, sn;
int iid; int iid;
bool enablePush = false; bool enablePush = false;
int *streamList = NULL; int *streamList = NULL;
AVFormatContext *pAVFormatRemux = NULL; AVFormatContext *pAVFormatRemux = NULL;
AVFormatContext *pAVFormatInput = NULL; AVFormatContext *pAVFormatInput = NULL;
json config;
int init() int init()
{ {
...@@ -42,11 +43,11 @@ private: ...@@ -42,11 +43,11 @@ private:
iid = 2; iid = 2;
while(!inited) { while(!inited) {
// req config // req config
json jr = cloudutils::registry(sn.c_str(), "evpusher", iid); onfig = cloudutils::registry(sn.c_str(), "evpusher", iid);
bool bcnt = false; bool bcnt = false;
try { try {
spdlog::info("registry: {:s}", jr.dump()); spdlog::info("registry: {:s}", config.dump());
json data = jr["data"]["services"]["evpuller"]; json data = config["data"]["services"]["evpuller"];
string addr = data["addr"].get<string>(); string addr = data["addr"].get<string>();
if(addr == "0.0.0.0") { if(addr == "0.0.0.0") {
addr = "localhost"; addr = "localhost";
...@@ -55,7 +56,7 @@ private: ...@@ -55,7 +56,7 @@ private:
urlRep = string("tcp://") + addr + ":" + to_string(data["port-rep"]); urlRep = string("tcp://") + addr + ":" + to_string(data["port-rep"]);
spdlog::info("evpusher {} {} will connect to {} for sub, {} for req", sn, iid, urlPub, urlRep); spdlog::info("evpusher {} {} will connect to {} for sub, {} for req", sn, iid, urlPub, urlRep);
data = jr["data"]["services"]["evpusher"]; data = config["data"]["services"]["evpusher"];
for(auto &j: data) { for(auto &j: data) {
if(j["sn"] == sn && iid == j["iid"] && j["enabled"] != 0) { if(j["sn"] == sn && iid == j["iid"] && j["enabled"] != 0) {
urlOut = j["urlDest"]; urlOut = j["urlDest"];
...@@ -99,10 +100,10 @@ private: ...@@ -99,10 +100,10 @@ private:
} }
// setup req // setup req
pReqCtx = zmq_ctx_new(); pDealerCtx = zmq_ctx_new();
pReq = zmq_socket(pReqCtx, ZMQ_REQ); pDealer = zmq_socket(pDealerCtx, ZMQ_REQ);
spdlog::info("evpusher {} {} try create req to {}", sn, iid, urlRep); spdlog::info("evpusher {} {} try create req to {}", sn, iid, urlRep);
ret = zmq_connect(pReq, urlRep.c_str()); ret = zmq_connect(pDealer, urlRep.c_str());
if(ret != 0) { if(ret != 0) {
spdlog::error("evpusher {} {} failed create req to {}", sn, iid, urlRep); spdlog::error("evpusher {} {} failed create req to {}", sn, iid, urlRep);
...@@ -124,13 +125,13 @@ private: ...@@ -124,13 +125,13 @@ private:
zmq_ctx_destroy(pSubCtx); zmq_ctx_destroy(pSubCtx);
pSubCtx = NULL; pSubCtx = NULL;
} }
if(pReq != NULL) { if(pDealer != NULL) {
zmq_close(pSub); zmq_close(pSub);
pReq = NULL; pDealer = NULL;
} }
if(pReqCtx != NULL) { if(pDealerCtx != NULL) {
zmq_ctx_destroy(pSub); zmq_ctx_destroy(pSub);
pReqCtx = NULL; pDealerCtx = NULL;
} }
return 0; return 0;
...@@ -144,7 +145,7 @@ private: ...@@ -144,7 +145,7 @@ private:
// req avformatcontext packet // req avformatcontext packet
// send first packet to init connection // send first packet to init connection
zmq_msg_t msg; zmq_msg_t msg;
zmq_send(pReq, "hello", 5, 0); zmq_send(pDealer, "hello", 5, 0);
spdlog::info("evpusher {} {} success send hello", sn, iid); spdlog::info("evpusher {} {} success send hello", sn, iid);
ret =zmq_msg_init(&msg); ret =zmq_msg_init(&msg);
if(ret != 0) { if(ret != 0) {
...@@ -152,7 +153,7 @@ private: ...@@ -152,7 +153,7 @@ private:
exit(1); exit(1);
} }
// receive packet // receive packet
ret = zmq_recvmsg(pReq, &msg, 0); ret = zmq_recvmsg(pDealer, &msg, 0);
spdlog::info("evpusher {} {} recv", sn, iid); spdlog::info("evpusher {} {} recv", sn, iid);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {} failed to recv zmq msg: {}", sn, iid, zmq_strerror(ret)); spdlog::error("evpusher {} {} failed to recv zmq msg: {}", sn, iid, zmq_strerror(ret));
...@@ -165,13 +166,13 @@ private: ...@@ -165,13 +166,13 @@ private:
// close req // close req
{ {
zmq_msg_close(&msg); zmq_msg_close(&msg);
if(pReq != NULL) { if(pDealer != NULL) {
zmq_close(pReq); zmq_close(pDealer);
pReq = NULL; pDealer = NULL;
} }
if(pReqCtx != NULL) { if(pDealerCtx != NULL) {
zmq_ctx_destroy(pReqCtx); zmq_ctx_destroy(pDealerCtx);
pReqCtx = NULL; pDealerCtx = NULL;
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论