提交 e89d7e46 authored 作者: blu's avatar blu

init

上级 194d7abd
CC = gcc
CPP = g++
CPPFLAGS = -Wall -std=gnu++1z
CFLAGS = -Wall
CPPFLAGS = -g -Wall -std=gnu++1z
CFLAGS = -g -Wall
LIBOPENCV = `pkg-config opencv --cflags --libs`
LIBFFMPEG = `pkg-config libavformat libavutil libavcodec libswscale --cflags --libs`
......@@ -14,7 +14,7 @@ all: evmgr evpuller evpusher evslicer evmlmotion
.PHONY: libzmq
libzmq:
cd vendor/libzmq && [ ! -f $(CURDIR)/vendor/lib/pkgconfig/libzmq.pc ] || ./autogen.sh && ./configure --prefix=$(CURDIR)/vendor
cd vendor/libzmq && [ ! -f $(CURDIR)/vendor/lib/pkgconfig/libzmq.pc ] || ./autogen.sh && ./configure --prefix=$(CURDIR)/vendor --enable-drafts
cd vendor/libzmq && make -j 4 && make install
# sqlite C object
......@@ -47,4 +47,9 @@ mux: demuxing_decoding.c
.PHONY: clean
clean:
rm -fr evmgr evpuller evpusher evslicer evmlmotion *.dSYM *.out *.o
\ No newline at end of file
rm -fr evmgr evpuller evpusher evslicer evmlmotion *.dSYM *.out *.o
.PHONY: rebuildzmq
rebuildzmq:
cd vendor/libzmq && make clean && ./autogen.sh && ./configure --prefix=$(CURDIR)/vendor --enable-drafts
cd vendor/libzmq && make -j 4 && make install
\ No newline at end of file
......@@ -73,6 +73,8 @@ private:
// router service
pRouterCtx = zmq_ctx_new();
pRouter = zmq_socket(pRouterCtx, ZMQ_ROUTER);
int opt_notify = ZMQ_NOTIFY_DISCONNECT;
zmq_setsockopt (pRouter, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify));
ret = zmq_bind(pRouter, addr.c_str());
if(ret < 0) {
spdlog::error("evmgr {} failed to bind zmq at {} for reason: {}, retrying load configuration...", devSn, addr, zmq_strerror(zmq_errno()));
......@@ -101,14 +103,20 @@ private:
int ret = 0;
zmq_msg_t msg;
// ID_SENDER, ID_TARGET, meta ,MSG
if(body.size() != 4) {
spdlog::warn("evmgr {} dropped a message, since its size is incorrect: {}", devSn, body.size());
string peerId;
if(body.size() == 2 && body[1].size() == 0) {
peerId = body2str(body[0]);
spdlog::warn("evmgr {} peer disconnected: {}", devSn, peerId);
peerStatus[peerId] = 0;
return 0;
}else if(body.size() != 4) {
spdlog::warn("evmgr {} dropped an invalid message, size: {}", devSn, body.size());
return 0;
}
string meta = body2str(body[2]);
string selfId = body2str(body[0]);
string peerId = body2str(body[1]);
peerId = body2str(body[1]);
// update status;
this->peerStatus[selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
......@@ -183,6 +191,35 @@ private:
return ret;
}
int get_monitor_event (void *monitor, int *value, char **address)
{
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (zmq_msg_more (&msg));
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
if (value)
*value = *(uint32_t *) (data + 2);
// Second frame in message contains event address
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (!zmq_msg_more (&msg));
if (address) {
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
size_t size = zmq_msg_size (&msg);
*address = (char *) malloc (size + 1);
memcpy (*address, data, size);
(*address)[size] = 0;
}
return event;
}
protected:
void run(){
......@@ -190,6 +227,33 @@ protected:
int ret = 0;
zmq_msg_t msg;
// disabled because:
// 1. it can't determine which peer disconnected, but only the underline socket FD.
// 2. used the draft feature of ZMQ_ROUTER_NOTIFY instead to capture peer module disconnections such as evpuser, evmlmotion.
// thread thMon = thread([&,this](){
// int ret = 0;
// string addr = string("inproc://monitor-") + this->devSn;
// ret = zmq_socket_monitor(this->pRouter, addr.c_str(), ZMQ_EVENT_ALL );//ZMQ_EVENT_DISCONNECTED
// if(ret != 0) {
// spdlog::error("evmgr {} failed mon1: {}, {}", this->devSn, addr, zmq_strerror(zmq_errno()));
// }
// void *mon = zmq_socket (this->pRouterCtx, ZMQ_PAIR);
// ret = zmq_connect(mon, addr.c_str());
// if(ret != 0) {
// spdlog::error("evmgr {} failed mon2: {}", this->devSn, zmq_strerror(zmq_errno()));
// }
// spdlog::info("evmgr {} monitoring setup", this->devSn);
// while(true){
// int fd = 0;
// char *pConn = NULL;
// int event = get_monitor_event(mon, &fd, &pConn);
// cout <<"event: " << event << ", fd: "<< fd << ", conn: "<<pConn <<endl;
// }
// });
// thMon.detach();
//
while (true) {
if(checkStop() == true) {
bStopSig = true;
......
......@@ -518,17 +518,24 @@ protected:
string metaType = meta.dump();
int ret = 0;
vector<vector<uint8_t> > v = {str2body(this->pullerGid), str2body(metaType), str2body("")};
while(!this->evtQueue->empty()){
string evt = this->evtQueue->front();
v[2] = str2body(evt);
this->evtQueue->pop();
ret = z_send_multiple(this->pDealer, v);
if(ret < 0) {
spdlog::error("evmlmotion {} {} failed to send event: {}, {}", this->devSn, this->iid, evt, zmq_strerror(zmq_errno()));
while(true){
if(!this->evtQueue->empty()){
string evt = this->evtQueue->front();
v[2] = str2body(evt);
this->evtQueue->pop();
ret = z_send_multiple(this->pDealer, v);
spdlog::info("evmlmotion {} {} send event: {}", this->devSn, this->iid, evt);
if(ret < 0) {
spdlog::error("evmlmotion {} {} failed to send event: {}, {}", this->devSn, this->iid, evt, zmq_strerror(zmq_errno()));
}
}else{
this_thread::sleep_for(chrono::seconds(3));
}
}
});
thEvent.detach();
AVFrame *pFrame = av_frame_alloc();
if (!pFrame) {
spdlog::error("failed to allocated memory for AVFrame");
......
......@@ -41,7 +41,8 @@ private:
void *pDealer=NULL;
thread thPing;
int ping(){
int ping()
{
int ret = 0;
vector<vector<uint8_t> >body;
// since identity is auto set
......@@ -56,6 +57,36 @@ private:
}
return ret;
}
int handleMsg(vector<vector<uint8_t> > v)
{
int ret = 0;
auto msgBody = data2body(const_cast<char*>(bytes), len);
try {
// rep framectx
// TODO: verify sender id
auto meta = json::parse(body2str(v[1]));
if(meta["type"].get<string>() == EV_MSG_META_AVFORMATCTX) {
vector<vector<uint8_t> > rep = {v[0], v[1], msgBody};
ret = z_send_multiple(pDealer, rep);
if(ret < 0) {
spdlog::error("evpuller {} {} failed send rep to requester {}: {}", devSn, iid, body2str(v[0]), zmq_strerror(zmq_errno()));
}
}
else if(meta["type"].get<string>() == EV_MSG_META_EVENT) {
// event msg
spdlog::info("evpuller {} {} received event: {}", devSn, iid, body2str(v[2]));
}
else {
spdlog::error("evpuller {} {} unknown meta from {}: {}", devSn, iid, body2str(v[0]), body2str(v[1]));
}
}
catch(exception &e) {
spdlog::error("evpuller {} {} excpetion parse request from {}: {}", devSn, iid, body2str(v[0]), body2str(v[1]));
}
return ret;
}
protected:
void run()
{
......@@ -64,16 +95,15 @@ protected:
// declare ready to router
ping();
thPing = thread([&,this](){
thPing = thread([&,this]() {
while(true) {
this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS-2));
ping();
}
});
thPing.detach();
// init response msg
auto msgBody = data2body(const_cast<char*>(bytes), len);
while (true) {
if(checkStop() == true) {
bStopSig = true;
......@@ -88,26 +118,7 @@ protected:
spdlog::error("evpuller {} {}, repSrv received invalid message: {}", devSn, iid, v.size());
continue;
}
try{
// rep framectx
// TODO: verify sender id
auto meta = json::parse(body2str(v[1]));
if(meta["type"].get<string>() == EV_MSG_META_AVFORMATCTX) {
vector<vector<uint8_t> > rep = {v[0], v[1], msgBody};
ret = z_send_multiple(pDealer, rep);
if(ret < 0) {
spdlog::error("evpuller {} {} failed send rep to requester {}: {}", devSn, iid, body2str(v[0]), zmq_strerror(zmq_errno()));
}
}else if(meta["type"].get<string>() == EV_MSG_META_PING){
ping();
}
else{
spdlog::error("evpuller {} {} unknown meta from {}: {}", devSn, iid, body2str(v[0]), body2str(v[1]));
}
}catch(exception &e) {
spdlog::error("evpuller {} {} excpetion parse request from {}: {}", devSn, iid, body2str(v[0]), body2str(v[1]));
}
handleMsg(v);
}
}
public:
......
......@@ -152,6 +152,7 @@ private:
pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER);
spdlog::info("evpusher {} {} try create req to {}", devSn, iid, urlDealer);
ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
ret += zmq_setsockopt (pDealer, ZMQ_ROUTING_ID, selfId.c_str(), selfId.size());
if(ret < 0) {
spdlog::error("evpusher {} {} failed setsockopts router: {}", devSn, iid, urlDealer);
return -3;
......
#ifndef __ZMQ_HELPER_H__
#define __ZMQ_HELPER_H__
#undef ZMQ_BUILD_DRAFT_API
#define ZMQ_BUILD_DRAFT_API 1
#include "zmq.h"
#include <vector>
#include <spdlog/spdlog.h>
......@@ -7,93 +11,99 @@
using namespace std;
namespace zmqhelper {
#define EV_HEARTBEAT_SECONDS 30
#define MSG_HELLO "hello"
#define EV_MSG_META_PING "ping"
#define EV_MSG_META_PONG "pong"
#define EV_MSG_META_PEEROFFLINE "offline"
#define EV_MSG_META_RESTART "restart"
#define EV_MSG_META_UPDATE "update"
#define EV_MSG_META_EVENT "event"
#define EV_MSG_META_AVFORMATCTX "afctx"
#define EV_NUM_CACHE_PERPEER 100
#define MAX_EVENT_QUEUE_SIZE 20
#define EV_HEARTBEAT_SECONDS 30
#define MSG_HELLO "hello"
#define EV_MSG_META_PING "ping"
#define EV_MSG_META_PONG "pong"
#define EV_MSG_META_PEEROFFLINE "offline"
#define EV_MSG_META_RESTART "restart"
#define EV_MSG_META_UPDATE "update"
#define EV_MSG_META_EVENT "event"
#define EV_MSG_META_AVFORMATCTX "afctx"
#define EV_NUM_CACHE_PERPEER 100
#define MAX_EVENT_QUEUE_SIZE 20
//
string body2str(vector<uint8_t> body){
return string((char *)(body.data()), body.size());
}
//
string body2str(vector<uint8_t> body)
{
return string((char *)(body.data()), body.size());
}
vector<uint8_t> data2body(char* data, int len){
vector<uint8_t> v;
v.insert(v.end(), (uint8_t *)data, (uint8_t *)data+len);
return v;
}
vector<uint8_t> data2body(char* data, int len)
{
vector<uint8_t> v;
v.insert(v.end(), (uint8_t *)data, (uint8_t *)data+len);
return v;
}
vector<uint8_t> str2body(string const &str){
vector<uint8_t> v;
v.insert(v.end(), (uint8_t*)(str.data()), (uint8_t *)(str.data()) + str.size());
return v;
}
vector<uint8_t> str2body(string const &str)
{
vector<uint8_t> v;
v.insert(v.end(), (uint8_t*)(str.data()), (uint8_t *)(str.data()) + str.size());
return v;
}
// proto: 1. on router [sender_id] [target_id] [body]
// 2. on dealer [sender_id] [body]
vector<vector<uint8_t> > z_recv_multiple(void *s, bool nowait=false) {
int64_t more = 1;
vector<vector<uint8_t> > body;
int cnt = 0;
int ret = 0;
while(more > 0) {
cnt++;
zmq_msg_t msg;
ret = zmq_msg_init(&msg);
if(ret < 0) {
spdlog::debug("failed to receive multiple msg on zmq_msg_init: {}", zmq_strerror(zmq_errno()));
break;
}
ret = zmq_recvmsg(s, &msg, nowait?ZMQ_DONTWAIT:0);
if(ret < 0) {
spdlog::debug("z_recv_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
vector<uint8_t> v;
v.insert(v.end(), (uint8_t*)zmq_msg_data(&msg), (uint8_t*)zmq_msg_data(&msg)+ret);
body.push_back(v);
spdlog::debug("z_rcv_multiple: {}", body2str(v).substr(0, v.size()> 100? 15:v.size()));
zmq_msg_close(&msg);
size_t more_size = sizeof(more);
ret = zmq_getsockopt(s, ZMQ_RCVMORE, &more, &more_size);
if(ret < 0) {
spdlog::debug("z_recv_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
// proto: 1. on router [sender_id] [target_id] [body]
// 2. on dealer [sender_id] [body]
vector<vector<uint8_t> > z_recv_multiple(void *s, bool nowait=false)
{
int64_t more = 1;
vector<vector<uint8_t> > body;
int cnt = 0;
int ret = 0;
while(more > 0) {
cnt++;
zmq_msg_t msg;
ret = zmq_msg_init(&msg);
if(ret < 0) {
spdlog::debug("failed to receive multiple msg on zmq_msg_init: {}", zmq_strerror(zmq_errno()));
break;
}
ret = zmq_recvmsg(s, &msg, nowait?ZMQ_DONTWAIT:0);
if(ret < 0) {
spdlog::debug("z_recv_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
vector<uint8_t> v;
v.insert(v.end(), (uint8_t*)zmq_msg_data(&msg), (uint8_t*)zmq_msg_data(&msg)+ret);
body.push_back(v);
spdlog::debug("z_rcv_multiple: {}", body2str(v).substr(0, v.size()> 100? 15:v.size()));
zmq_msg_close(&msg);
size_t more_size = sizeof(more);
ret = zmq_getsockopt(s, ZMQ_RCVMORE, &more, &more_size);
if(ret < 0) {
spdlog::debug("z_recv_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
return body;
}
// proto [sender_id(only when no identifier set in setsockopts)] [target_id] [body]
int z_send_multiple(void *s, vector<vector<uint8_t> >&body) {
int cnt = 0, ret = 0;
zmq_msg_t msg;
for(auto &i:body) {
ret = zmq_msg_init_size(&msg, i.size());
memcpy(zmq_msg_data(&msg), (void*)(i.data()), i.size());
spdlog::debug("z_send_multiple: {}", body2str(i).substr(0, i.size()>100?15:i.size()));
if(ret < 0) {
spdlog::debug("z_send_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
ret = zmq_msg_send(&msg, s, cnt==(body.size()-1)?0:(ZMQ_SNDMORE));
zmq_msg_close(&msg);
if(ret < 0) {
spdlog::debug("z_send_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
cnt++;
return body;
}
// proto [sender_id(only when no identifier set in setsockopts)] [target_id] [body]
int z_send_multiple(void *s, vector<vector<uint8_t> >&body)
{
int cnt = 0, ret = 0;
zmq_msg_t msg;
for(auto &i:body) {
ret = zmq_msg_init_size(&msg, i.size());
memcpy(zmq_msg_data(&msg), (void*)(i.data()), i.size());
spdlog::debug("z_send_multiple: {}", body2str(i).substr(0, i.size()>100?15:i.size()));
if(ret < 0) {
spdlog::debug("z_send_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
return ret;
ret = zmq_msg_send(&msg, s, cnt==(body.size()-1)?0:(ZMQ_SNDMORE));
zmq_msg_close(&msg);
if(ret < 0) {
spdlog::debug("z_send_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
cnt++;
}
return ret;
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论