提交 66b3f4dc authored 作者: blu's avatar blu

add broadcast mesage for all systems

上级 b6e797c3
......@@ -176,6 +176,9 @@ error_exit:
zmq_msg_t msg;
// ID_SENDER, ID_TARGET, meta ,MSG
string selfId, peerId, meta;
bool bProcessed = false;
// connection message
if(body.size() == 2 && body[1].size() == 0) {
selfId = body2str(body[0]);
bool eventConn = false;
......@@ -256,6 +259,7 @@ error_exit:
cachedMsg[peerId].push(v);
if(cachedMsg[peerId].size() > EV_NUM_CACHE_PERPEER) {
cachedMsg[peerId].pop();
spdlog::info("evmgr {} max msg queue size {} reached for {}, dropped the oldest one.", this->devSn, MAX_EVENT_QUEUE_SIZE, peerId);
}
}
......@@ -266,11 +270,13 @@ error_exit:
eventQue.push(body2str(body[3]));
if(eventQue.size() > MAX_EVENT_QUEUE_SIZE) {
eventQue.pop();
spdlog::info("evmgr {} max event queue size {} reached, dropped the oldest one.", this->devSn, MAX_EVENT_QUEUE_SIZE);
}
}
}
catch(exception &e) {
bProcessed = true;
} catch(exception &e) {
bProcessed = false;
spdlog::error("evmgr {} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what());
}
}
......@@ -278,7 +284,6 @@ error_exit:
// message to mgr
// spdlog::info("evmgr {} subsystem report msg received: {}; {}; {}", devSn, zmqhelper::body2str(body[0]), zmqhelper::body2str(body[1]), zmqhelper::body2str(body[2]));
if(meta == "pong"||meta == "ping") {
// update status
spdlog::info("evmgr {}, ping msg from {}", devSn, selfId);
if(meta=="ping") {
if(cachedMsg.find(selfId) != cachedMsg.end()) {
......@@ -293,13 +298,42 @@ error_exit:
}
}
}
bProcessed = true;
}
else {
// TODO:
spdlog::warn("evmgr {} received unknown meta {} from {}", devSn, meta, selfId);
try{
json jmeta = json::parse(meta);
if(jmeta["type"] == EV_MSG_META_TYPE_BROADCAST) {
if(jmeta.count("value") != 0) {
json newMeta;
newMeta["type"] = jmeta["value"];
vector<vector<uint8_t> > broadCastMsg = {str2body(""), body[0], str2body(newMeta.dump()), body[3]};
for(auto &[k,v]: peerData["status"].items()) {
if(k != selfId && v != 0) {
broadCastMsg[0] = str2body(k);
ret = z_send_multiple(pRouter, broadCastMsg);
if(ret < 0) {
spdlog::error("evmgr {} failed to broadcast msg from {} because {}. msg meta: {}", devSn, selfId, zmq_strerror(zmq_errno()), meta);
}else{
spdlog::error("evmgr {} successfully broadcast msg from {} to {}. msg meta: {}", devSn, selfId, k, meta);
}
}
}
}
bProcessed = true;
}
}catch(exception &e) {
bProcessed = false;
spdlog::error("evmgr {} exception process msg from {} with meta {}: {}", devSn, selfId, meta, e.what());
}
}
}
if(!bProcessed) {
spdlog::warn("evmgr {} failed process msg from {}: {}", devSn, selfId, meta);
}
return ret;
}
......
......@@ -91,16 +91,37 @@ private:
return ret;
}
void sendAVInputCtxMsg(string peerId){
json meta;
auto msgBody = data2body(const_cast<char*>(pAVFmtCtxBytes), lenAVFmtCtxBytes);
if(peerId.empty()){
meta["type"] = EV_MSG_META_TYPE_BROADCAST;
meta["value"] = EV_MSG_META_AVFORMATCTX;
peerId = this->mgrSn + ":evmgr:0";
}else{
meta["type"] = EV_MSG_META_AVFORMATCTX;
}
vector<vector<uint8_t> > rep = {str2body(peerId), str2body(meta.dump()), msgBody};
int ret = z_send_multiple(pDealer, rep);
if(ret < 0) {
spdlog::error("evpuller {} failed to send avformatctx data to requester {}: {}", selfId, peerId, zmq_strerror(zmq_errno()));
}
else {
spdlog::info("evpuller {} success to send avformatctx data to requester {}", selfId, peerId);
}
}
int handleEdgeMsg(vector<vector<uint8_t> > v)
{
int ret = 0;
{
// TODO: disable message response before openned input stream
unique_lock<mutex> lk(this->mutMsg);
this->cvMsg.wait(lk, [this] {return this->gotFormat;});
}
spdlog::info("evpuller {} got inputformat", selfId);
auto msgBody = data2body(const_cast<char*>(pAVFmtCtxBytes), lenAVFmtCtxBytes);
try {
// rep framectx
// TODO: verify sender id
......@@ -108,14 +129,7 @@ private:
string peerId = body2str(v[0]);
auto meta = json::parse(sMeta);
if(meta["type"].get<string>() == EV_MSG_META_AVFORMATCTX) {
vector<vector<uint8_t> > rep = {v[0], v[1], msgBody};
ret = z_send_multiple(pDealer, rep);
if(ret < 0) {
spdlog::error("evpuller {} failed to send avformatctx data to requester {}: {}", selfId, peerId, zmq_strerror(zmq_errno()));
}
else {
spdlog::info("evpuller {} success to send avformatctx data to requester {}", selfId, peerId);
}
sendAVInputCtxMsg(peerId);
}
else if(meta["type"].get<string>() == EV_MSG_META_EVENT) {
// event msg
......@@ -321,6 +335,9 @@ protected:
spdlog::error("evpuller {} failed create avformatcontext for output: {}", selfId, av_err2str(AVERROR(ENOMEM)));
}
// broadcast
sendAVInputCtxMsg("");
// serialize formatctx to bytes
// be attention to the scope of lock guard!
{
......
......@@ -33,6 +33,8 @@ namespace zmqhelper {
#define EV_MSG_META_VALUE_CMD_UPDATE "update"
#define EV_MSG_META_VALUE_CMD_STOP "stop"
#define EV_MSG_META_TYPE_BROADCAST "broadcast"
#define EV_MSG_META_CONFIG "config"
#define EV_MSG_META_AVFORMATCTX "afctx"
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论