提交 de4bc4ab authored 作者: blu's avatar blu

init

上级 0ca29392
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <iostream> #include <iostream>
#include <chrono> #include <chrono>
#include <future> #include <future>
#include <queue>
#ifdef OS_LINUX #ifdef OS_LINUX
#include <filesystem> #include <filesystem>
...@@ -34,7 +35,10 @@ private: ...@@ -34,7 +35,10 @@ private:
void *pRouter = NULL; void *pRouter = NULL;
json config; json config;
string devSn; string devSn;
json peerStatus;
json jmgr;
unordered_map<string, queue<vector<vector<uint8_t> >> > cachedMsg;
mutex cacheLock;
void init() void init()
{ {
int ret; int ret;
...@@ -46,7 +50,12 @@ private: ...@@ -46,7 +50,12 @@ private:
config = json::parse(cloudutils::config); config = json::parse(cloudutils::config);
spdlog::info("config dumps: \n{}", config.dump()); spdlog::info("config dumps: \n{}", config.dump());
// TODO: verify sn // TODO: verify sn
json jmgr = config["data"][devSn]; if(!config.count("data")||!config["data"].count(devSn)||!config["data"][devSn].count("ipcs")){
spdlog::error("evmgr {} invalid config. reload now...", devSn);
this_thread::sleep_for(chrono::seconds(3));
continue;
}
jmgr = config["data"][devSn];
string proto = jmgr["proto"]; string proto = jmgr["proto"];
string addr; string addr;
...@@ -87,34 +96,69 @@ private: ...@@ -87,34 +96,69 @@ private:
int handleMsg(vector<vector<uint8_t> > &body) { int handleMsg(vector<vector<uint8_t> > &body) {
int ret = 0; int ret = 0;
zmq_msg_t msg; zmq_msg_t msg;
cout<<endl<<endl;
for(auto &j:body) {
cout<<body2str(j) << "; ";
}
cout <<endl;
// ID_SENDER, ID_TARGET, meta ,MSG // ID_SENDER, ID_TARGET, meta ,MSG
if(body.size() != 4) { if(body.size() != 4) {
spdlog::warn("evmgr {} dropped a message, since its size is incorrect: {}", devSn, body.size()); spdlog::warn("evmgr {} dropped a message, since its size is incorrect: {}", devSn, body.size());
return 0; return 0;
} }
// if need forward
if(memcmp((void*)(body[1].data()), (devSn +":0:0").data(), body[1].size()) != 0) { if(memcmp((void*)(body[1].data()), (devSn +":0:0").data(), body[1].size()) != 0) {
// message to other peer
// check peer status
string gid = body2str(body[1]);
if(peerStatus.count(gid)!= 0) {
auto t = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count() - peerStatus[gid].get<long long>();
if(t > EV_HEARTBEAT_SECONDS*5/4){
peerStatus[gid] = 0;
// need cache
}else{
spdlog::info("evmgr {} route msg from {} to {}", devSn, body2str(body[0]), body2str(body[1])); spdlog::info("evmgr {} route msg from {} to {}", devSn, body2str(body[0]), body2str(body[1]));
vector<vector<uint8_t> >v; vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
v.push_back(body[1]);
v.push_back(body[0]);
v.push_back(body[2]);
v.push_back(body[3]);
ret = z_send_multiple(pRouter, v); ret = z_send_multiple(pRouter, v);
if(ret < 0) { if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno())); spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
} }
}
}else{ }else{
// TODO: report msg peerStatus[gid] = 0;
// need cache
}
if(peerStatus[gid] == 0) {
// cache
spdlog::warn("evmgr {} cached msg from {} to {}", devSn, body2str(body[0]), gid);
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
lock_guard<mutex> lock(cacheLock);
cachedMsg[gid].push(v);
if(cachedMsg[gid].size() > EV_NUM_CACHE_PERPEER) {
cachedMsg[gid].pop();
}
}
}else{
// message to mgr
spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2])); spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
string meta = body2str(body[2]);
string gid = body2str(body[0]);
if(meta == "pong"||meta == "ping") {
// update status
this->peerStatus[gid] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
if(meta=="ping") {
if(cachedMsg.find(gid) != cachedMsg.end()) {
while(!cachedMsg[gid].empty()){
lock_guard<mutex> lock(cacheLock);
auto v = cachedMsg[gid].front();
cachedMsg[gid].pop();
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
}
}
}else{
// TODO:
spdlog::warn("evmgr {} received unknown meta {} from {}", devSn, meta, gid);
}
} }
return ret; return ret;
...@@ -125,12 +169,46 @@ protected: ...@@ -125,12 +169,46 @@ protected:
bool bStopSig = false; bool bStopSig = false;
int ret = 0; int ret = 0;
zmq_msg_t msg; zmq_msg_t msg;
// health checking thread
auto thHealth = thread([&,this](){
auto ipcs = this->jmgr["ipcs"];
json jmeta; jmeta["type"] = "ping";
auto meta = str2body(jmeta.dump());
auto mgrId = str2body(this->devSn + ":0:0");
while(true) {
for(auto &j:ipcs) {
if(j.count("modules") != 0) {
for(auto &[k, v]: j["modules"].items()) {
// k = module name
for(auto &m: v) {
if(!m.count("sn") && !m.count("iid")) {
// construct gid for module
string gid = m["sn"].get<string>() + ":" + k + ":" + to_string(m["iid"]);
// build ping msg
vector<vector<uint8_t> > v = {str2body(gid), mgrId, meta, str2body("hello")};
ret = z_send_multiple(this->pRouter, v);
if(ret < 0) {
spdlog::error("evmgr {} failed to send ping to module {}", devSn, gid);
}else{
//
}
}
}
}
}
}
// TODO:
this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS));
}
});
while (true) { while (true) {
if(checkStop() == true) { if(checkStop() == true) {
bStopSig = true; bStopSig = true;
break; break;
} }
auto body = z_recv_multiple(pRouter); auto body = z_recv_multiple(pRouter,false);
if(body.size() == 0) { if(body.size() == 0) {
spdlog::error("evmgr {} failed to receive multiple msg: {}", devSn, zmq_strerror(zmq_errno())); spdlog::error("evmgr {} failed to receive multiple msg: {}", devSn, zmq_strerror(zmq_errno()));
continue; continue;
......
...@@ -39,25 +39,27 @@ private: ...@@ -39,25 +39,27 @@ private:
const char * bytes; const char * bytes;
int len; int len;
void *pDealer=NULL; void *pDealer=NULL;
protected: void sendPing(){
void run()
{
int ret = 0; int ret = 0;
bool bStopSig = false;
// declare ready to router
vector<vector<uint8_t> >body; vector<vector<uint8_t> >body;
// since identity is auto set // since identity is auto set
body.push_back(str2body(mgrSn + ":0:0")); body.push_back(str2body(mgrSn + ":0:0"));
body.push_back(str2body("")); // blank meta body.push_back(str2body(EV_MSG_META_PING));
body.push_back(str2body(MSG_HELLO)); body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpuller {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno())); spdlog::error("evpuller {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
//TODO:
return; return;
} }
}
protected:
void run()
{
int ret = 0;
bool bStopSig = false;
// declare ready to router
sendPing();
// init response msg // init response msg
auto msgBody = data2body(const_cast<char*>(bytes), len); auto msgBody = data2body(const_cast<char*>(bytes), len);
while (true) { while (true) {
...@@ -68,31 +70,27 @@ protected: ...@@ -68,31 +70,27 @@ protected:
spdlog::info("evpuller repSrv {} {} waiting for req", devSn, iid); spdlog::info("evpuller repSrv {} {} waiting for req", devSn, iid);
// proto: [sender_id] [meta] [body] // proto: [sender_id] [meta] [body]
auto v = z_recv_multiple(pDealer); auto v = z_recv_multiple(pDealer, false);
if(v.size() != 3) { if(v.size() != 3) {
//TODO: //TODO:
spdlog::error("evpuller {} {}, repSrv received invalid message: {}", devSn, iid, v.size()); spdlog::error("evpuller {} {}, repSrv received invalid message: {}", devSn, iid, v.size());
continue; continue;
} }
cout << endl<<endl;
for(auto&j:v) {
cout <<body2str(j) << "; ";
}
cout << endl;
try{ try{
// rep framectx // rep framectx
// TODO: verify sender id // TODO: verify sender id
auto meta = json::parse(body2str(v[1])); auto meta = json::parse(body2str(v[1]));
if(meta["type"].get<string>() == EV_PACKET_TYPE_AVFORMATCTX) { if(meta["type"].get<string>() == EV_MSG_META_AVFORMATCTX) {
vector<vector<uint8_t> > rep; vector<vector<uint8_t> > rep = {v[0], v[1], msgBody};
rep.push_back(v[0]);
rep.push_back(v[1]);
rep.push_back(msgBody);
ret = z_send_multiple(pDealer, rep); ret = z_send_multiple(pDealer, rep);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpuller {} {} failed send rep to requester {}: {}", devSn, iid, body2str(v[0]), zmq_strerror(zmq_errno())); spdlog::error("evpuller {} {} failed send rep to requester {}: {}", devSn, iid, body2str(v[0]), zmq_strerror(zmq_errno()));
} }
}else{ }else if(meta["type"].get<string>() == EV_MSG_META_PING){
sendPing();
}
else{
spdlog::error("evpuller {} {} unknown meta from {}: {}", devSn, iid, body2str(v[0]), body2str(v[1])); spdlog::error("evpuller {} {} unknown meta from {}: {}", devSn, iid, body2str(v[0]), body2str(v[1]));
} }
}catch(exception &e) { }catch(exception &e) {
......
...@@ -140,11 +140,10 @@ private: ...@@ -140,11 +140,10 @@ private:
} }
// send hello to router // send hello to router
spdlog::info("evpusher {} {} send hello to router: {}", devSn, iid, mgrSn);
vector<vector<uint8_t> >body; vector<vector<uint8_t> >body;
// since identity is auto set // since identity is auto set
body.push_back(str2body(mgrSn+":0:0")); body.push_back(str2body(mgrSn+":0:0"));
body.push_back(str2body("")); // blank meta body.push_back(str2body(EV_MSG_META_PING)); // blank meta
body.push_back(str2body(MSG_HELLO)); body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
...@@ -154,7 +153,7 @@ private: ...@@ -154,7 +153,7 @@ private:
return -1; return -1;
} }
spdlog::info("evpusher {} {} success setupMq", devSn, iid); spdlog::info("evpusher {} {} sent hello to router: {}", devSn, iid, mgrSn);
return 0; return 0;
} }
...@@ -171,28 +170,38 @@ private: ...@@ -171,28 +170,38 @@ private:
vector<vector<uint8_t> > body; vector<vector<uint8_t> > body;
body.push_back(str2body(pullerGid)); body.push_back(str2body(pullerGid));
json meta; json meta;
meta["type"] = EV_PACKET_TYPE_AVFORMATCTX; meta["type"] = EV_MSG_META_AVFORMATCTX;
body.push_back(str2body(meta.dump())); body.push_back(str2body(meta.dump()));
body.push_back(str2body(MSG_HELLO)); body.push_back(str2body(MSG_HELLO));
bool gotFormat = false; bool gotFormat = false;
uint64_t failedCnt = 0;
while(!gotFormat) { while(!gotFormat) {
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno())); spdlog::error("evpusher {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno()));
continue; continue;
} }
spdlog::info("evpusher {} {} success send hello", devSn, iid);
// expect response with avformatctx // expect response with avformatctx
auto v = z_recv_multiple(pDealer); auto v = z_recv_multiple(pDealer);
if(v.size() != 3) { if(v.size() != 3) {
ret = zmq_errno();
if(ret != 0) {
if(failedCnt % 100 == 0) {
spdlog::error("evpusher {} {}, error receive avformatctx: {}, {}", devSn, iid, v.size(), zmq_strerror(ret));
spdlog::info("evpusher {} {} retry connect to peers", devSn, iid);
}
this_thread::sleep_for(chrono::seconds(5));
failedCnt++;
}else{
spdlog::error("evpusher {} {}, received bad size zmq msg for avformatctx: {}", devSn, iid, v.size()); spdlog::error("evpusher {} {}, received bad size zmq msg for avformatctx: {}", devSn, iid, v.size());
}
}else if(body2str(v[0]) != pullerGid) { }else if(body2str(v[0]) != pullerGid) {
spdlog::error("evpusher {} {}, invalid sender for avformatctx: {}, should be: {}", devSn, iid, body2str(v[0]), pullerGid); spdlog::error("evpusher {} {}, invalid sender for avformatctx: {}, should be: {}", devSn, iid, body2str(v[0]), pullerGid);
}else{ }else{
try{ try{
auto cmd = json::parse(body2str(v[1])); auto cmd = json::parse(body2str(v[1]));
if(cmd["type"].get<string>() == EV_PACKET_TYPE_AVFORMATCTX){ if(cmd["type"].get<string>() == EV_MSG_META_AVFORMATCTX){
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext)); pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput); AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true; gotFormat = true;
...@@ -316,7 +325,9 @@ protected: ...@@ -316,7 +325,9 @@ protected:
//calc pts //calc pts
{ {
spdlog::debug("seq: {:lld}, pts: {:lld}, dts: {:lld}, dur: {:lld}, idx: {:d}", pktCnt, packet.pts, packet.dts, packet.duration, packet.stream_index); if(pktCnt % (18*60*5) == 0) {
spdlog::info("seq: {:lld}, pts: {:lld}, dts: {:lld}, dur: {:lld}, idx: {:d}", pktCnt, packet.pts, packet.dts, packet.duration, packet.stream_index);
}
/* copy packet */ /* copy packet */
packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX)); packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX)); packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
...@@ -327,7 +338,8 @@ protected: ...@@ -327,7 +338,8 @@ protected:
ret = av_interleaved_write_frame(pAVFormatRemux, &packet); ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
av_packet_unref(&packet); av_packet_unref(&packet);
if (ret < 0) { if (ret < 0) {
spdlog::error("error muxing packet"); spdlog::error("error muxing packet: {}", av_err2str(ret));
exit(1);
} }
} }
av_write_trailer(pAVFormatRemux); av_write_trailer(pAVFormatRemux);
...@@ -377,7 +389,7 @@ public: ...@@ -377,7 +389,7 @@ public:
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
av_log_set_level(AV_LOG_INFO); av_log_set_level(AV_LOG_INFO);
spdlog::set_level(spdlog::level::debug); spdlog::set_level(spdlog::level::info);
EvPusher pusher; EvPusher pusher;
pusher.join(); pusher.join();
} }
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论