提交 4e0da1fa authored 作者: blu's avatar blu

feature: report msg

上级 23744ba8
cmake_minimum_required (VERSION 3.0) cmake_minimum_required (VERSION 3.0)
project (evsuits) project (evsuits)
if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin") if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
......
...@@ -431,7 +431,7 @@ private: ...@@ -431,7 +431,7 @@ private:
return 0; return 0;
} }
else if(body.size() != 4) { else if(body.size() != 4) {
spdlog::warn("evcloudsvc {} dropped an invalid message, size: {}", devSn, body.size()); spdlog::warn("evcloudsvc dropped an invalid message, size: {}", devSn, body.size());
return 0; return 0;
} }
...@@ -447,15 +447,15 @@ private: ...@@ -447,15 +447,15 @@ private:
// check peer status // check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]}; vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
if(peerData["status"].count(peerId)!= 0 && peerData["status"][peerId] != 0) { if(peerData["status"].count(peerId)!= 0 && peerData["status"][peerId] != 0) {
spdlog::info("evcloudsvc {} route msg from {} to {}", devSn, selfId, peerId); spdlog::info("{} route msg from {} to {}", devSn, selfId, peerId);
ret = z_send_multiple(pRouter, v); ret = z_send_multiple(pRouter, v);
if(ret < 0) { if(ret < 0) {
spdlog::error("evcloudsvc {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno())); spdlog::error("{} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
} }
} }
else { else {
// cache // cache
spdlog::warn("evcloudsvc {} cached msg from {} to {}", devSn, selfId, peerId); spdlog::warn("{} cached msg from {} to {}", devSn, selfId, peerId);
lock_guard<mutex> lock(cacheLock); lock_guard<mutex> lock(cacheLock);
cachedMsg[peerId].push(v); cachedMsg[peerId].push(v);
if(cachedMsg[peerId].size() > EV_NUM_CACHE_PERPEER) { if(cachedMsg[peerId].size() > EV_NUM_CACHE_PERPEER) {
...@@ -474,7 +474,7 @@ private: ...@@ -474,7 +474,7 @@ private:
} }
} }
catch(exception &e) { catch(exception &e) {
spdlog::error("evcloudsvc {} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what()); spdlog::error("{} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what());
} }
} }
else { else {
...@@ -484,7 +484,7 @@ private: ...@@ -484,7 +484,7 @@ private:
// handleConnection(selfId); // handleConnection(selfId);
if(meta=="ping") { if(meta=="ping") {
auto ips = body2str(body[3]); auto ips = body2str(body[3]);
spdlog::info("evcloudsvc {}, ping msg from {}: {}", devSn, selfId, ips); spdlog::info("{}, ping msg from {}: {}", devSn, selfId, ips);
this->peerData["info"]["ips"][selfId] = ips; this->peerData["info"]["ips"][selfId] = ips;
...@@ -495,7 +495,7 @@ private: ...@@ -495,7 +495,7 @@ private:
cachedMsg[selfId].pop(); cachedMsg[selfId].pop();
ret = z_send_multiple(pRouter, v); ret = z_send_multiple(pRouter, v);
if(ret < 0) { if(ret < 0) {
spdlog::error("evcloudsvc {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno())); spdlog::error("{} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
} }
} }
} }
...@@ -506,9 +506,12 @@ private: ...@@ -506,9 +506,12 @@ private:
sendEdgeMsg(resp); sendEdgeMsg(resp);
} }
} }
}else if(meta == EV_MSG_META_TYPE_REPORT) {
// TODO: handle report msg
spdlog::warn("{} received report msg from {}: {}", devSn, selfId, body2str(body[3]));
} }
else { else {
spdlog::warn("evcloudsvc {} received unknown meta {} from {}", devSn, meta, selfId); spdlog::warn("{} received unknown meta {} from {}", devSn, meta, selfId);
} }
} }
......
...@@ -370,6 +370,7 @@ private: ...@@ -370,6 +370,7 @@ private:
int ret = 0; int ret = 0;
// ID_SENDER, ID_TARGET, meta ,MSG // ID_SENDER, ID_TARGET, meta ,MSG
string selfId, peerId, meta; string selfId, peerId, meta;
// connection message
if(body.size() == 2 && body[1].size() == 0) { if(body.size() == 2 && body[1].size() == 0) {
selfId = body2str(body[0]); selfId = body2str(body[0]);
bool eventConn = false; bool eventConn = false;
...@@ -467,26 +468,32 @@ private: ...@@ -467,26 +468,32 @@ private:
// update status; // update status;
this->peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count(); this->peerData["status"][selfId] = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
// msg to peer // msg to other peer
int minLen = std::min(body[1].size(), this->daemonId.size()); int minLen = std::min(body[1].size(), this->daemonId.size());
if(memcmp((void*)(body[1].data()), this->daemonId.data(), minLen) != 0) { if(memcmp((void*)(body[1].data()), this->daemonId.data(), minLen) != 0) {
// message to other peer // message to other peer
// check peer status
vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]}; // check if target is evcloudsvc
if(peerData["status"].count(peerId) != 0 && peerData["status"][peerId] != 0 && this->peerData["status"][peerId][peerId] != -1) { if(peerId == "evcloudsvc") {
spdlog::info("evdaemon {} route msg from {} to {}", devSn, selfId, peerId); zmqhelper::z_send(pDealer, peerId, body[2], body[3]);
ret = z_send_multiple(pRouter, v);
if(ret < 0) {
spdlog::error("evdaemon {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
} }
else { else{
// cache vector<vector<uint8_t> >v = {body[1], body[0], body[2], body[3]};
spdlog::warn("evdaemon {} cached msg from {} to {}", devSn, selfId, peerId); if(peerData["status"].count(peerId) != 0 && peerData["status"][peerId] != 0 && this->peerData["status"][peerId][peerId] != -1) {
lock_guard<mutex> lock(cacheLock); spdlog::info("evdaemon {} route msg from {} to {}", devSn, selfId, peerId);
cachedMsg[peerId].push(v); ret = z_send_multiple(pRouter, v);
if(cachedMsg[peerId].size() > EV_NUM_CACHE_PERPEER) { if(ret < 0) {
cachedMsg[peerId].pop(); spdlog::error("evdaemon {} failed to send multiple: {}", devSn, zmq_strerror(zmq_errno()));
}
}
else {
// cache
spdlog::warn("evdaemon {} cached msg from {} to {}", devSn, selfId, peerId);
lock_guard<mutex> lock(cacheLock);
cachedMsg[peerId].push(v);
if(cachedMsg[peerId].size() > EV_NUM_CACHE_PERPEER) {
cachedMsg[peerId].pop();
}
} }
} }
...@@ -500,7 +507,6 @@ private: ...@@ -500,7 +507,6 @@ private:
} }
} }
} }
catch(exception &e) { catch(exception &e) {
spdlog::error("evdaemon {} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what()); spdlog::error("evdaemon {} exception parse event msg from {} to {}: ", devSn, selfId, peerId, e.what());
} }
......
...@@ -392,6 +392,7 @@ private: ...@@ -392,6 +392,7 @@ private:
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {}, failed to send hello to puller: {}. exiting...", devSn, iid, zmq_strerror(zmq_errno())); spdlog::error("evpusher {} {}, failed to send hello to puller: {}. exiting...", devSn, iid, zmq_strerror(zmq_errno()));
// TODO: message report to cloud
exit(1); exit(1);
} }
unique_lock<mutex> lk(this->mutMsg); unique_lock<mutex> lk(this->mutMsg);
...@@ -734,7 +735,7 @@ protected: ...@@ -734,7 +735,7 @@ protected:
// send to evdaemon // send to evdaemon
v1[2] = str2body(evt); v1[2] = str2body(evt);
ret = z_send_multiple(this->pDealer, v1); ret = z_send_multiple(this->pDaemon, v1);
if(ret < 0) { if(ret < 0) {
spdlog::error("evmlmotion {} failed to send event {} to {}: {}", this->selfId, evt, daemonId, zmq_strerror(zmq_errno())); spdlog::error("evmlmotion {} failed to send event {} to {}: {}", this->selfId, evt, daemonId, zmq_strerror(zmq_errno()));
} }
...@@ -754,6 +755,7 @@ protected: ...@@ -754,6 +755,7 @@ protected:
AVFrame *pFrame = av_frame_alloc(); AVFrame *pFrame = av_frame_alloc();
if (!pFrame) { if (!pFrame) {
spdlog::error("evmlmotion {} failed to allocated memory for AVFrame", selfId); spdlog::error("evmlmotion {} failed to allocated memory for AVFrame", selfId);
// TODO: message report to cloud
exit(1); exit(1);
} }
...@@ -798,7 +800,18 @@ protected: ...@@ -798,7 +800,18 @@ protected:
av_packet_unref(&packet); av_packet_unref(&packet);
if (ret < 0) { if (ret < 0) {
spdlog::error("evmlmotion error muxing packet"); // TODO: report message to cloud
string msg = fmt::format("evmlmotion {} failed to decode packet", selfId);
json meta;
json data;
data["msg"] = msg;
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::error(msg);
} }
if((pktCnt - pktCntLast ) == 18) { if((pktCnt - pktCntLast ) == 18) {
......
...@@ -312,12 +312,14 @@ protected: ...@@ -312,12 +312,14 @@ protected:
spdlog::info("evpuller {} openning stream: {}", selfId, urlIn); spdlog::info("evpuller {} openning stream: {}", selfId, urlIn);
if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, &optsIn)) < 0) { if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, &optsIn)) < 0) {
spdlog::error("evpuller {} Could not open input stream {}", selfId, urlIn); spdlog::error("evpuller {} Could not open input stream {}", selfId, urlIn);
// TODO: message report to cloud
exit(1); exit(1);
} }
spdlog::info("evpuller {} finding stream info: {}", selfId, urlIn); spdlog::info("evpuller {} finding stream info: {}", selfId, urlIn);
if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) { if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) {
spdlog::error("evpuller {} Failed to retrieve input stream information", selfId); spdlog::error("evpuller {} Failed to retrieve input stream information", selfId);
// TODO: message report to cloud
exit(1); exit(1);
} }
...@@ -338,6 +340,7 @@ protected: ...@@ -338,6 +340,7 @@ protected:
lenAVFmtCtxBytes = AVFormatCtxSerializer::encode(pAVFormatInput, &pAVFmtCtxBytes); lenAVFmtCtxBytes = AVFormatCtxSerializer::encode(pAVFormatInput, &pAVFmtCtxBytes);
if(lenAVFmtCtxBytes <= 0 || pAVFmtCtxBytes == nullptr) { if(lenAVFmtCtxBytes <= 0 || pAVFmtCtxBytes == nullptr) {
spdlog::error("evpuller {} failed to pull packet from {}. exiting...", selfId, urlIn); spdlog::error("evpuller {} failed to pull packet from {}. exiting...", selfId, urlIn);
// TODO: message report to cloud
exit(1); exit(1);
} }
// broadcast // broadcast
...@@ -381,6 +384,7 @@ protected: ...@@ -381,6 +384,7 @@ protected:
ret = av_read_frame(pAVFormatInput, &packet); ret = av_read_frame(pAVFormatInput, &packet);
if (ret < 0) { if (ret < 0) {
spdlog::error("evpuller {} failed read packet: {}", selfId, av_err2str(ret)); spdlog::error("evpuller {} failed read packet: {}", selfId, av_err2str(ret));
// TODO: message report to cloud
exit(1); exit(1);
} }
in_stream = pAVFormatInput->streams[packet.stream_index]; in_stream = pAVFormatInput->streams[packet.stream_index];
...@@ -431,6 +435,7 @@ public: ...@@ -431,6 +435,7 @@ public:
auto v = strutils::split(selfId, ':'); auto v = strutils::split(selfId, ':');
if(v.size() != 3||v[1] != "evpuller") { if(v.size() != 3||v[1] != "evpuller") {
spdlog::error("evpuller {} received invalid gid: {}", selfId); spdlog::error("evpuller {} received invalid gid: {}", selfId);
// TODO: message report to cloud
exit(1); exit(1);
} }
devSn = v[0]; devSn = v[0];
......
...@@ -291,6 +291,7 @@ private: ...@@ -291,6 +291,7 @@ private:
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {}, failed to send hello to puller: {}. exiting...", devSn, iid, zmq_strerror(zmq_errno())); spdlog::error("evpusher {} {}, failed to send hello to puller: {}. exiting...", devSn, iid, zmq_strerror(zmq_errno()));
// TODO: message report to cloud
exit(1); exit(1);
} }
unique_lock<mutex> lk(this->mutMsg); unique_lock<mutex> lk(this->mutMsg);
...@@ -322,6 +323,7 @@ private: ...@@ -322,6 +323,7 @@ private:
if (ret < 0) { if (ret < 0) {
spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret)); spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret));
// TODO: message report to cloud
exit(1); exit(1);
} }
...@@ -369,13 +371,25 @@ private: ...@@ -369,13 +371,25 @@ private:
ret = avio_open2(&pAVFormatRemux->pb, urlOut.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux); ret = avio_open2(&pAVFormatRemux->pb, urlOut.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux);
if (ret < 0) { if (ret < 0) {
spdlog::error("evpusher {} {} could not open output file '%s'", devSn, iid, urlOut); spdlog::error("evpusher {} {} could not open output file '%s'", devSn, iid, urlOut);
// TODO: message report to cloud
exit(1); exit(1);
} }
} }
ret = avformat_write_header(pAVFormatRemux, &pOptsRemux); ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
if (ret < 0) { if (ret < 0) {
spdlog::error("evpusher {} {} error occurred when opening output file", devSn, iid); // TODO: report message to cloud
string msg = fmt::format("evpusher {} failed to write stream \"{}\": {}", selfId, urlOut, av_err2str(ret));
json meta;
json data;
data["msg"] = msg;
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_FATAL;
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_FATAL;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::error(msg);
exit(1); exit(1);
} }
...@@ -462,6 +476,18 @@ protected: ...@@ -462,6 +476,18 @@ protected:
ret = av_interleaved_write_frame(pAVFormatRemux, &packet); ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
av_packet_unref(&packet); av_packet_unref(&packet);
if (ret < 0) { if (ret < 0) {
// TODO: report message to cloud
string msg = fmt::format("evpusher {} error write stream, trying restreaming:{}", selfId, av_err2str(ret));
json meta;
json data;
data["msg"] = msg;
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::error("evpusher {} error muxing packet: {}, {}, {}, {}, restreaming...", selfId, av_err2str(ret), packet.dts, packet.pts, packet.dts==AV_NOPTS_VALUE); spdlog::error("evpusher {} error muxing packet: {}, {}, {}, {}, restreaming...", selfId, av_err2str(ret), packet.dts, packet.pts, packet.dts==AV_NOPTS_VALUE);
if(packet.pts == AV_NOPTS_VALUE) { if(packet.pts == AV_NOPTS_VALUE) {
// reset // reset
......
...@@ -424,6 +424,7 @@ private: ...@@ -424,6 +424,7 @@ private:
ret = z_send_multiple(pDealer, body); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evslicer {}, failed to send hello to puller: {}. exiting ...", selfId, zmq_strerror(zmq_errno())); spdlog::error("evslicer {}, failed to send hello to puller: {}. exiting ...", selfId, zmq_strerror(zmq_errno()));
// TODO: message report to cloud
exit(1); exit(1);
} }
unique_lock<mutex> lk(this->mutMsg); unique_lock<mutex> lk(this->mutMsg);
...@@ -501,6 +502,7 @@ protected: ...@@ -501,6 +502,7 @@ protected:
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "segment", name.c_str()); ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "segment", name.c_str());
if (ret < 0) { if (ret < 0) {
spdlog::error("evslicer {} failed create avformatcontext for output: %s", selfId, av_err2str(ret)); spdlog::error("evslicer {} failed create avformatcontext for output: %s", selfId, av_err2str(ret));
// TODO: message report to cloud
exit(1); exit(1);
} }
...@@ -584,6 +586,18 @@ protected: ...@@ -584,6 +586,18 @@ protected:
ret = av_interleaved_write_frame(pAVFormatRemux, &packet); ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
av_packet_unref(&packet); av_packet_unref(&packet);
if (ret < 0) { if (ret < 0) {
// TODO: report message to cloud
string msg = fmt::format("evslicer {} error write stream, resetting:{}", selfId, av_err2str(ret));
json meta;
json data;
data["msg"] = msg;
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::error("evslicer {} error muxing packet: {}, {}, {}, {}, reloading...", selfId, av_err2str(ret), packet.dts, packet.pts, packet.dts==AV_NOPTS_VALUE); spdlog::error("evslicer {} error muxing packet: {}, {}, {}, {}, reloading...", selfId, av_err2str(ret), packet.dts, packet.pts, packet.dts==AV_NOPTS_VALUE);
if(pktCnt != 0 && packet.pts == AV_NOPTS_VALUE) { if(pktCnt != 0 && packet.pts == AV_NOPTS_VALUE) {
// reset // reset
......
...@@ -91,6 +91,24 @@ int z_send(void *s, string peerId, string selfId, const json &meta, string body) ...@@ -91,6 +91,24 @@ int z_send(void *s, string peerId, string selfId, const json &meta, string body)
return z_send_multiple(s, v); return z_send_multiple(s, v);
} }
int z_send(void *s, string peerId, string selfId,vector<uint8_t> meta, vector<uint8_t> body)
{
vector<vector<uint8_t> > v{str2body(peerId), str2body(selfId), meta, body};
return z_send_multiple(s, v);
}
int z_send(void *s, string peerId, string meta, string body)
{
vector<vector<uint8_t> > v{str2body(peerId), str2body(meta), str2body(body)};
return z_send_multiple(s, v);
}
int z_send(void *s, string peerId, vector<uint8_t> meta, vector<uint8_t> body)
{
vector<vector<uint8_t> > v{str2body(peerId), meta, body};
return z_send_multiple(s, v);
}
/// setup router /// setup router
int setupRouter(void **ctx, void **s, string addr){ int setupRouter(void **ctx, void **s, string addr){
int ret = 0; int ret = 0;
......
...@@ -35,6 +35,14 @@ namespace zmqhelper { ...@@ -35,6 +35,14 @@ namespace zmqhelper {
#define EV_MSG_META_VALUE_CMD_UPDATE "update" #define EV_MSG_META_VALUE_CMD_UPDATE "update"
#define EV_MSG_META_VALUE_CMD_STOP "stop" #define EV_MSG_META_VALUE_CMD_STOP "stop"
#define EV_MSG_META_TYPE_REPORT "report"
#define EV_MSG_META_VALUE_REPORT_LEVEL_INFO "info"
#define EV_MSG_META_VALUE_REPORT_LEVEL_DEBUG "debug"
#define EV_MSG_META_VALUE_REPORT_LEVEL_WARN "warn"
#define EV_MSG_META_VALUE_REPORT_LEVEL_ERROR "error"
#define EV_MSG_META_VALUE_REPORT_LEVEL_FATAL "fatal"
#define EV_MSG_META_TYPE_BROADCAST "broadcast" #define EV_MSG_META_TYPE_BROADCAST "broadcast"
#define EV_MSG_META_CONFIG "config" #define EV_MSG_META_CONFIG "config"
...@@ -70,7 +78,10 @@ int setupDealer(void **ctx, void **s, string addr, string ident, int timeout = 0 ...@@ -70,7 +78,10 @@ int setupDealer(void **ctx, void **s, string addr, string ident, int timeout = 0
/// @return 0 success, otherwise failed. /// @return 0 success, otherwise failed.
int recvConfigMsg(void *s, json &config, string addr, string ident); int recvConfigMsg(void *s, json &config, string addr, string ident);
int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid); int forkSubsystem(string devSn, string peerId, int drPort, pid_t &pid);
int z_send(void *s, string peerId, string selfId, vector<uint8_t> meta, vector<uint8_t> body);
int z_send(void *s, string peerId, string selfId, const json &meta, string body); int z_send(void *s, string peerId, string selfId, const json &meta, string body);
int z_send(void *s, string peerId, vector<uint8_t> meta, vector<uint8_t> body);
int z_send(void *s, string peerId, string meta, string body);
} }
#endif #endif
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论