提交 26e473c5 authored 作者: blu's avatar blu

init

上级 db665d73
......@@ -2,10 +2,11 @@
"time":0,
"code":0,
"data":{
"evmgr":{
"ILSEVMGR1":{
"sn":"ILSEVMGR1",
"addr":"172.31.0.76",
"addr-cloud":"172.31.0.76",
"proto": "zmq",
"port-cloud":5556,
"port-router":5550,
"status":1,
......
......@@ -24,29 +24,137 @@ using namespace std;
* functions:
* app update
* control msg
*
*
**/
class EvMgr:public TinyThread {
private:
private:
void *pRouterCtx = NULL;
void *pRouter = NULL;
json config;
string devSn;
void init(){
void init()
{
int ret;
bool inited = false;
// TODO: load config from local db
devSn = "ILSEVMGR1";
while(!inited) {
try {
config = json::parse(cloudutils::config);
spdlog::info("config dumps: \n{}", config.dump());
// TODO: verify sn
json jmgr = config["data"][devSn];
string proto = jmgr["proto"];
string addr;
if(proto != "zmq"){
spdlog::error("evmgr {} unsupported protocol: {}, try fallback to zmq instead now...", devSn, proto);
}
addr = "tcp://" + jmgr["addr"].get<string>() + ":" + to_string(jmgr["port-router"]);
// setup zmq
// TODO: connect to cloud
// router service
pRouterCtx = zmq_ctx_new();
pRouter = zmq_socket(pRouterCtx, ZMQ_ROUTER);
ret = zmq_bind(pRouter, addr.c_str());
if(ret < 0) {
spdlog::error("evmgr {} failed to bind zmq at {} for reason: {}, retrying load configuration...", devSn, addr, zmq_strerror(zmq_errno()));
this_thread::sleep_for(chrono::seconds(3));
continue;
}
inited = true;
}
catch(exception &e) {
spdlog::error("evmgr {} exception on init() for: {}, retrying load configuration...", devSn, e.what());
this_thread::sleep_for(chrono::seconds(3));
continue;
}
}
spdlog::info("evmgr {} successfuly inited", devSn);
}
protected:
public:
EvMgr() {
init();
int mqErrorMsg(string cls, string devSn, string extraInfo, int ret) {
if(ret < 0) {
spdlog::error("{} {} {}:{} ", cls, devSn, extraInfo, zmq_strerror(zmq_errno()));
}
return ret;
}
~EvMgr(){
void handleMsg(string body[]) {
zmq_msg_t msg;
if(body[0] != devSn) {
for(int i =0; i < 3; i++) {
spdlog::info("evmgr {}, msg idRcv is {}, forwarding...", devSn, body[0]);
zmq_msg_init(&msg);
zmq_msg_init_data(&msg, (void*)body[0].c_str(), body[0].size(), NULL, NULL);
mqErrorMsg("evmgr", devSn, "failed to send zmq msg", zmq_send_const(pRouter, zmq_msg_data(&msg), body[0].size(), i ==2?0:ZMQ_SNDMORE));
zmq_msg_close(&msg);
}
}else{
// TODO: report msg
spdlog::info("evmgr {} subsystem report msg received: {} {} {}", devSn, body[0], body[1], body[2]);
}
}
};
protected:
void run(){
bool bStopSig = false;
int ret = 0;
zmq_msg_t msg;
while (true) {
if(checkStop() == true) {
bStopSig = true;
break;
}
string msgBody[3];
int64_t more = 0;
// business logic
int i = 0;
for(; i < 3; i++) {
mqErrorMsg("evmgr", devSn, "failed to init zmq msg", zmq_msg_init(&msg));
mqErrorMsg("evmgr", devSn, "failed to recv zmq msg", zmq_recvmsg(pRouter, &msg, 0));
msgBody[i] = string((char *)zmq_msg_data(&msg));
zmq_msg_close(&msg);
spdlog::debug("evmgr {} received[{}]: {} ", devSn, i, msgBody[i]);
size_t more_size = sizeof (more);
mqErrorMsg("evmgr", devSn, "failed to get zmq sockopt", zmq_getsockopt(pRouter, ZMQ_RCVMORE, &more, &more_size));
if(!more) {
break;
}
}
if(i >= 3 ) {
// full proto msg received.
handleMsg(msgBody);
}else{
spdlog::warn("partial msg recved, maybe hello msg: {}, {}, {}", msgBody[0], msgBody[1], msgBody[2]);
}
}
}
public:
EvMgr()
{
init();
}
~EvMgr()
{
if(pRouter != NULL) {
zmq_close(pRouter);
pRouter = NULL;
}
if(pRouterCtx != NULL){
zmq_ctx_destroy(pRouterCtx);
pRouterCtx = NULL;
}
}
};
int main(int argc, const char *argv[]){
int main(int argc, const char *argv[])
{
EvMgr mgr;
mgr.join();
return 0;
}
\ No newline at end of file
......@@ -22,52 +22,50 @@ using namespace std;
class RepSrv: public TinyThread {
private:
string sn;
string devSn;
int iid;
string urlRep;
const char * bytes;
int len;
void *pRepCtx = NULL; // for packets REP
void *pRep = NULL;
int teardownMq()
void *pDealer=NULL;
// void *pRepCtx = NULL; // for packets REP
// void *pRep = NULL;
int init()
{
if(pRep != NULL) {
zmq_close(pRep);
}
if(pRepCtx != NULL) {
zmq_ctx_destroy(pRepCtx);
}
// int ret = 0;
// pRepCtx = zmq_ctx_new();
// pRep = zmq_socket(pRepCtx, ZMQ_REP);
// ret = zmq_bind(pRep, urlRep.c_str());
// if(ret < 0) {
// spdlog::error("failed to bind rep: {}, {}", zmq_strerror(ret), urlRep.c_str());
// this_thread::sleep_for(chrono::seconds(20));
// return -1;
// }
return 0;
}
int setupMq()
{
int ret = 0;
pRepCtx = zmq_ctx_new();
pRep = zmq_socket(pRepCtx, ZMQ_REP);
ret = zmq_bind(pRep, urlRep.c_str());
if(ret < 0) {
spdlog::error("failed to bind rep: {}, {}", zmq_strerror(ret), urlRep.c_str());
this_thread::sleep_for(chrono::seconds(20));
return -1;
}
return 0;
}
public:
RepSrv() = delete;
RepSrv(RepSrv &) = delete;
RepSrv(RepSrv&&) = delete;
RepSrv(string sn, int iid, string urlRep, const char* formatBytes, int len):sn(sn), iid(iid),urlRep(urlRep), bytes(formatBytes), len(len) {};
~RepSrv() {};
RepSrv(string devSn, int iid, const char* formatBytes, int len, void *pDealer):devSn(devSn), iid(iid), bytes(formatBytes), len(len), pDealer(pDealer)
{
init();
};
~RepSrv()
{
// if(pRep != NULL) {
// zmq_close(pRep);
// }
// if(pRepCtx != NULL) {
// zmq_ctx_destroy(pRepCtx);
// }
};
protected:
void run()
{
bool bStopSig = false;
if(setupMq() != 0) {
exit(1);
}
zmq_msg_t msg;
zmq_msg_t msg1;
int ret =zmq_msg_init(&msg);
......@@ -77,16 +75,16 @@ protected:
bStopSig = true;
break;
}
spdlog::info("evpuller reqSrv {} {} waiting for req", sn, iid);
spdlog::info("evpuller reqSrv {} {} waiting for req", devSn, iid);
int ret =zmq_msg_init(&msg1);
ret = zmq_recvmsg(pRep, &msg1, 0);
ret = zmq_recvmsg(pDealer, &msg1, 0);
if(ret < 0) {
spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret));
continue;
}
zmq_msg_close(&msg1);
spdlog::info("evpuller {} {} reveived req", sn, iid);
zmq_send_const(pRep, zmq_msg_data(&msg), len, 0);
spdlog::info("evpuller {} {} reveived req", devSn, iid);
zmq_send_const(pDealer, zmq_msg_data(&msg), len, 0);
}
}
};
......@@ -95,88 +93,105 @@ class EvPuller: public TinyThread {
private:
void *pPubCtx = NULL; // for packets publishing
void *pPub = NULL;
void *pDealerCtx = NULL;
void *pDealer = NULL;
AVFormatContext *pAVFormatInput = NULL;
string urlIn, urlPub, urlRep, sn;
string urlIn, urlPub, urlDealer, devSn;
int *streamList = NULL, numStreams = 0, iid;
json config;
int mqErrorMsg(string cls, string devSn, int iid, string extraInfo, int ret)
{
if(ret < 0) {
spdlog::error("{} {} {}, {}: {} ", cls, devSn, iid, extraInfo, zmq_strerror(zmq_errno()));
}
return ret;
}
int init()
{
bool inited = false;
sn = "ILS-2";
iid = 2;
// TODO: load devSn iid from database
devSn = "ILSEVPULLER1";
iid = 1;
int ret = 0;
while(!inited) {
// TODO: read db to get sn
// req config
json jr = cloudutils::registry(sn.c_str(), "evpuller", iid);
bool bcnt = false;
// TODO: req config
bool found = false;
try {
spdlog::info("registry: {:s}", jr.dump());
string ipc = jr["data"]["ipc"];
string user = jr["data"]["username"];
string passwd = jr["data"]["password"];
json data = jr["data"]["services"]["evpuller"];
urlIn = "rtsp://" + user + ":" + passwd + "@"+ ipc + "/h264/ch1/sub/av_stream";
urlPub = string("tcp://") +data["addr"].get<string>() + ":" + to_string(data["port-pub"]);
urlRep = string("tcp://") +data["addr"].get<string>() + ":" + to_string(data["port-rep"]);
spdlog::info("evpuller {} {} bind on {} for pub, {} for rep", sn, iid, urlPub, urlRep);
config = json::parse(cloudutils::config);
spdlog::info("config dump: {:s}", config.dump());
json data = config["data"];
// first try to check mgr with same sn
json evpuller;
json evmgr;
json ipc;
for (auto& [key, value] : data.items()) {
//std::cout << key << " : " << dynamic_cast<json&>(value).dump() << "\n";
evmgr = value;
json ipcs = evmgr["ipcs"];
for(auto &j: ipcs) {
json pullers = j["modules"]["evpuller"];
for(auto &p:pullers) {
if(p["sn"] == devSn && p["iid"] == iid) {
evpuller = p;
break;
}
}
if(evpuller.size() != 0) {
ipc = j;
break;
}
}
if(ipc.size()!=0 && evpuller.size()!=0) {
found = true;
break;
}
}
if(!found) {
this_thread::sleep_for(chrono::seconds(3));
spdlog::error("evpuller {} {} no valid config found. retrying load config...", devSn, iid);
continue;
}
string user = ipc["user"];
string passwd = ipc["password"];
urlIn = "rtsp://" + user + ":" + passwd + "@" + ipc["addr"].get<string>() + "/h264/ch1/sub/av_stream";
urlPub = string("tcp://") + evpuller["addr"].get<string>() + ":" + to_string(evpuller["port-pub"]);
// urlRep = string("tcp://") +data["addr"].get<string>() + ":" + to_string(data["port-rep"]);
urlDealer = "tcp://" + evmgr["addr"].get<string>() + string(":") + to_string(evmgr["port-router"]);
spdlog::info("evpuller {} {} bind on {} for pub, {} for dealer", devSn, iid, urlPub, urlDealer);
pPubCtx = zmq_ctx_new();
pPub = zmq_socket(pPubCtx, ZMQ_PUB);
ret = mqErrorMsg("evpuller", devSn, iid, "failed to bind zmq", zmq_bind(pPub, urlPub.c_str()));
pDealerCtx = zmq_ctx_new();
pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER);
string ident = devSn+":evpuller:" + to_string(iid);
ret += mqErrorMsg("evpuller", devSn, iid, "failed to set socksopt", zmq_setsockopt(pDealer, ZMQ_IDENTITY, ident.c_str(), ident.size()));
ret += mqErrorMsg("evpuller", devSn, iid, "failed to connect to router " + urlDealer, zmq_connect(pDealer, urlDealer.c_str()));
if(ret < 0) {
this_thread::sleep_for(chrono::seconds(3));
spdlog::error("evpuller {} {} zmq setup failed. retrying load config...", devSn, iid);
continue;
}
}
catch(exception &e) {
bcnt = true;
spdlog::error("exception in EvPuller.init {:s}, retrying... ", e.what());
}
if(bcnt) {
this_thread::sleep_for(chrono::milliseconds(1000*20));
this_thread::sleep_for(chrono::seconds(3));
spdlog::error("evpuller {} {} exception in EvPuller.init {:s}, retrying... ",devSn, iid, e.what());
continue;
}
inited = true;
spdlog::info("successfully load config");
}
return 0;
}
int setupMq()
{
teardownMq();
pPubCtx = zmq_ctx_new();
pPub = zmq_socket(pPubCtx, ZMQ_PUB);
int rc = zmq_bind(pPub, urlPub.c_str());
if(rc != 0) {
spdlog::error("failed create pub: {}, {}", zmq_strerror(rc), urlPub.c_str());
this_thread::sleep_for(chrono::milliseconds(1000*20));
return -1;
}
return 0;
}
int teardownMq()
{
if(pPub != NULL) {
zmq_close(pPub);
}
if(pPubCtx != NULL) {
zmq_ctx_destroy(pPubCtx);
}
return 0;
}
public:
EvPuller()
{
int ret = 0;
init();
ret = setupMq();
if(ret != 0) {
exit(1);
}
}
~EvPuller()
{
teardownMq();
}
protected:
// Function to be executed by thread function
void run()
......@@ -202,7 +217,7 @@ protected:
// serialize formatctx to bytes
char *pBytes = NULL;
ret = AVFormatCtxSerializer::encode(pAVFormatInput, &pBytes);
auto repSrv = RepSrv(sn, iid, urlRep, pBytes, ret);
auto repSrv = RepSrv(devSn, iid, pBytes, ret, pDealer);
repSrv.detach();
// find all video & audio streams for remuxing
......@@ -247,12 +262,6 @@ protected:
pktCnt++;
packet.stream_index = streamList[packet.stream_index];
/* copy packet */
//packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, AVRounding(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
//packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, AVRounding(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
//packet.duration = av_rescale_q(packet.duration, in_stream->time_base, out_stream->time_base);
//packet.pos = -1;
// serialize packet to raw bytes
char * data = NULL;
int size = AVPacketSerializer::encode(packet, &data);
......@@ -271,6 +280,32 @@ protected:
std::cout << "Task End" << std::endl;
}
}
public:
EvPuller()
{
init();
}
~EvPuller()
{
if(pPub != NULL) {
zmq_close(pPub);
pPub = NULL;
}
if(pPubCtx != NULL) {
zmq_ctx_destroy(pPubCtx);
pPubCtx = NULL;
}
if(pDealer != NULL) {
zmq_close(pDealer);
pDealer= NULL;
}
if(pDealerCtx != NULL) {
zmq_ctx_destroy(pPubCtx);
pDealerCtx = NULL;
}
}
};
......
......@@ -292,10 +292,11 @@ namespace cloudutils
"time":0,
"code":0,
"data":{
"evmgr":{
"ILSEVMGR1":{
"sn":"ILSEVMGR1",
"addr":"172.31.0.76",
"addr-cloud":"172.31.0.76",
"proto":"zmq",
"port-cloud":5556,
"port-router":5550,
"status":1,
......@@ -312,7 +313,7 @@ namespace cloudutils
"sn":"ILSEVPULLER1",
"addr":"172.31.0.76",
"iid":1,
"port-pub":"5556",
"port-pub":5556,
"status":1
}
],
......@@ -355,7 +356,7 @@ namespace cloudutils
}
}
*/
const char *config = "{\"time\":0,\"code\":0,\"data\":{\"evmgr\":{\"sn\":\"ILSEVMGR1\",\"addr\":\"172.31.0.76\",\"addr-cloud\":\"172.31.0.76\",\"port-cloud\":5556,\"port-router\":5550,\"status\":1,\"ipcs\":[{\"addr\":\"172.31.0.51\",\"proto\":\"rtsp\",\"user\":\"admin\",\"password\":\"FWBWTU\",\"status\":1,\"modules\":{\"evpuller\":[{\"sn\":\"ILSEVPULLER1\",\"addr\":\"172.31.0.76\",\"iid\":1,\"port-pub\":\"5556\",\"status\":1}],\"evpusher\":[{\"sn\":\"ILSEVPUSHER1\",\"iid\":1,\"proto\":\"rtsp\",\"addrDest\":\"40.73.41.176\",\"portDest\":554,\"user\":\"\",\"password\":\"\",\"token\":\"\",\"enabled\":1,\"status\":1}],\"evslicer\":[{\"sn\":\"ILSEVSLICER1\",\"iid\":1,\"path\":\"slices\",\"enabled\":1,\"status\":1}],\"evml\":[{\"type\":\"motion\",\"sn\":\"ILSEVMLMOTION1\",\"iid\":1,\"enabled\":1,\"status\":1}]}}]}}}";
const char *config = "{\"time\":0,\"code\":0,\"data\":{\"ILSEVMGR1\":{\"sn\":\"ILSEVMGR1\",\"addr\":\"172.31.0.76\",\"addr-cloud\":\"172.31.0.76\",\"proto\":\"zmq\",\"port-cloud\":5556,\"port-router\":5550,\"status\":1,\"ipcs\":[{\"addr\":\"172.31.0.51\",\"proto\":\"rtsp\",\"user\":\"admin\",\"password\":\"FWBWTU\",\"status\":1,\"modules\":{\"evpuller\":[{\"sn\":\"ILSEVPULLER1\",\"addr\":\"172.31.0.76\",\"iid\":1,\"port-pub\":5556,\"status\":1}],\"evpusher\":[{\"sn\":\"ILSEVPUSHER1\",\"iid\":1,\"proto\":\"rtsp\",\"addrDest\":\"40.73.41.176\",\"portDest\":554,\"user\":\"\",\"password\":\"\",\"token\":\"\",\"enabled\":1,\"status\":1}],\"evslicer\":[{\"sn\":\"ILSEVSLICER1\",\"iid\":1,\"path\":\"slices\",\"enabled\":1,\"status\":1}],\"evml\":[{\"type\":\"motion\",\"sn\":\"ILSEVMLMOTION1\",\"iid\":1,\"enabled\":1,\"status\":1}]}}]}}}";
json registry(const char *sn, const char *scn, int iid)
{
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论