提交 16035989 authored 作者: blu's avatar blu

feature: report msg

上级 9b652a29
...@@ -30,7 +30,6 @@ private: ...@@ -30,7 +30,6 @@ private:
string urlOut, urlPub, urlDealer, devSn, pullerGid, mgrSn, selfId; string urlOut, urlPub, urlDealer, devSn, pullerGid, mgrSn, selfId;
int iid; int iid;
int *streamList = nullptr; int *streamList = nullptr;
AVDictionary *pOptsRemux = nullptr;
AVFormatContext *pAVFormatRemux = nullptr; AVFormatContext *pAVFormatRemux = nullptr;
AVFormatContext *pAVFormatInput = nullptr; AVFormatContext *pAVFormatInput = nullptr;
json config; json config;
...@@ -303,78 +302,117 @@ private: ...@@ -303,78 +302,117 @@ private:
int setupStream() int setupStream()
{ {
int ret = 0; int ret = -1;
AVDictionary *pOptsRemux = nullptr;
string proto = urlOut.substr(0, 4); string proto = urlOut.substr(0, 4);
if(proto == "rtsp") { int cnt = 0;
// rtsp tcp while(ret < 0)
if(av_dict_set(&pOptsRemux, "rtsp_transport", "tcp", 0) < 0) { {
spdlog::error("evpusher {} {} failed set output pOptsRemux", devSn, iid); if(cnt > 3) {
ret = AVERROR_UNKNOWN; string msg = fmt::format("evpusher {} failed to write stream \"{}\": {}, {}", selfId, urlOut, ret, av_err2str(ret));
} json meta;
av_dict_set_int(&pOptsRemux, "stimeout", (int64_t)(1000* 1000 * 1), 0); json data;
ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, "rtsp", urlOut.c_str()); data["msg"] = msg;
} data["modId"] = selfId;
else if(proto == "rtmp") { data["type"] = EV_MSG_META_TYPE_REPORT;
ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, "rtmp", urlOut.c_str()); data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_FATAL;
} meta["type"] = EV_MSG_META_TYPE_REPORT;
else { meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_FATAL;
ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, nullptr, urlOut.c_str()); z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
} exit(1);
}
if (ret < 0) { if(proto == "rtsp") {
spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret)); // rtsp tcp
// TODO: message report to cloud if(av_dict_set(&pOptsRemux, "rtsp_transport", "tcp", 0) < 0) {
exit(1); spdlog::error("evpusher {} {} failed set output pOptsRemux", devSn, iid);
} ret = AVERROR_UNKNOWN;
}
av_dict_set_int(&pOptsRemux, "stimeout", (int64_t)(1000* 1000 * 1), 0);
ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, "rtsp", urlOut.c_str());
}
else if(proto == "rtmp") {
ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, "rtmp", urlOut.c_str());
}
else {
ret = avformat_alloc_output_context2(&pAVFormatRemux, nullptr, nullptr, urlOut.c_str());
}
streamList = (int *)av_mallocz_array(pAVFormatInput->nb_streams, sizeof(*streamList)); if (ret < 0) {
spdlog::info("evpusher {} {} numStreams: {:d}", devSn, iid, pAVFormatInput->nb_streams); spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret));
if (!streamList) { // TODO: message report to cloud
ret = AVERROR(ENOMEM); exit(1);
spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(AVERROR(ENOMEM))); }
exit(1);
}
int streamIdx = 0; streamList = (int *)av_mallocz_array(pAVFormatInput->nb_streams, sizeof(*streamList));
// find all video & audio streams for remuxing spdlog::info("evpusher {} {} numStreams: {:d}", devSn, iid, pAVFormatInput->nb_streams);
for (int i = 0; i < pAVFormatInput->nb_streams; i++) { if (!streamList) {
AVStream *out_stream; ret = AVERROR(ENOMEM);
AVStream *in_stream = pAVFormatInput->streams[i]; spdlog::error("evpusher {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(AVERROR(ENOMEM)));
AVCodecParameters *in_codecpar = in_stream->codecpar; exit(1);
if (in_codecpar->codec_type != AVMEDIA_TYPE_AUDIO &&
in_codecpar->codec_type != AVMEDIA_TYPE_VIDEO) {
streamList[i] = -1;
continue;
} }
streamList[i] = streamIdx++;
out_stream = avformat_new_stream(pAVFormatRemux, NULL);
if (!out_stream) {
spdlog::error("evpusher {} {} failed allocating output stream", devSn, iid);
ret = AVERROR_UNKNOWN;
int streamIdx = 0;
// find all video & audio streams for remuxing
for (int i = 0; i < pAVFormatInput->nb_streams; i++) {
AVStream *out_stream;
AVStream *in_stream = pAVFormatInput->streams[i];
AVCodecParameters *in_codecpar = in_stream->codecpar;
if (in_codecpar->codec_type != AVMEDIA_TYPE_AUDIO &&
in_codecpar->codec_type != AVMEDIA_TYPE_VIDEO) {
streamList[i] = -1;
continue;
}
streamList[i] = streamIdx++;
out_stream = avformat_new_stream(pAVFormatRemux, NULL);
if (!out_stream) {
spdlog::error("evpusher {} {} failed allocating output stream", devSn, iid);
ret = AVERROR_UNKNOWN;
}
ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar);
spdlog::info("evpusher {} {} copied codepar", devSn, iid);
if (ret < 0) {
spdlog::error("evpusher {} {} failed to copy codec parameters", devSn, iid);
}
} }
ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar);
spdlog::info("evpusher {} {} copied codepar", devSn, iid); for(int i = 0; i < pAVFormatInput->nb_streams; i++ ) {
if (ret < 0) { spdlog::info("evpusher {} streamList[{:d}]: {:d}", selfId, i, streamList[i]);
spdlog::error("evpusher {} {} failed to copy codec parameters", devSn, iid);
} }
}
for(int i = 0; i < pAVFormatInput->nb_streams; i++ ) { av_dump_format(pAVFormatRemux, 0, urlOut.c_str(), 1);
spdlog::info("evpusher {} streamList[{:d}]: {:d}", selfId, i, streamList[i]);
}
av_dump_format(pAVFormatRemux, 0, urlOut.c_str(), 1); if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) {
spdlog::error("evpusher {} {} failed allocating output stream", devSn,iid);
ret = avio_open2(&pAVFormatRemux->pb, urlOut.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux);
if (ret < 0) {
spdlog::error("evpusher {} {} could not open output file '%s'", devSn, iid, urlOut);
// TODO: message report to cloud
string msg = fmt::format("evpusher {} failed to open output stream \"{}\": {}, {}", selfId, urlOut, ret, av_err2str(ret));
json meta;
json data;
data["msg"] = msg;
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_FATAL;
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_FATAL;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::error(msg);
exit(1);
}
}
if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) { ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
spdlog::error("evpusher {} {} failed allocating output stream", devSn,iid); if (ret < 0) {
ret = avio_open2(&pAVFormatRemux->pb, urlOut.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux); // TODO: report message to cloud
if (ret < 0) { string msg = fmt::format("evpusher {} failed to write stream \"{}\": {}, {}", selfId, urlOut, ret, av_err2str(ret));
spdlog::error("evpusher {} {} could not open output file '%s'", devSn, iid, urlOut); spdlog::error(msg);
// TODO: message report to cloud
exit(1);
} }
cnt++;
} }
return ret; return ret;
} }
...@@ -405,12 +443,6 @@ protected: ...@@ -405,12 +443,6 @@ protected:
zmq_msg_t msg; zmq_msg_t msg;
AVPacket packet; AVPacket packet;
uint64_t pktCnt = 0; uint64_t pktCnt = 0;
uint64_t failedCnt = 0, lastFailedCnt = 0;
// const uint64_t PKT_SKIP = 38;
bool bInited = false;
setupStream();
while (true) { while (true) {
ret =zmq_msg_init(&msg); ret =zmq_msg_init(&msg);
if(ret != 0) { if(ret != 0) {
...@@ -440,30 +472,6 @@ protected: ...@@ -440,30 +472,6 @@ protected:
zmq_msg_close(&msg); zmq_msg_close(&msg);
spdlog::debug("packet stream indx: {:d}", packet.stream_index); spdlog::debug("packet stream indx: {:d}", packet.stream_index);
if(!bInited) {
ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
// -1482175736, Server returned 4XX Client Error, but not one of 40{0,1,3,4}
// ignore 406 error
if (ret < 0 && ret != -1482175736) {
// TODO: report message to cloud
string msg = fmt::format("evpusher {} failed to write stream \"{}\": {}, {}", selfId, urlOut, ret, av_err2str(ret));
json meta;
json data;
data["msg"] = msg;
data["modId"] = selfId;
data["type"] = EV_MSG_META_TYPE_REPORT;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_FATAL;
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_FATAL;
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
spdlog::error(msg);
exit(1);
}
bInited = true;
}
// relay // relay
AVStream *in_stream =NULL, *out_stream = nullptr; AVStream *in_stream =NULL, *out_stream = nullptr;
in_stream = pAVFormatInput->streams[packet.stream_index]; in_stream = pAVFormatInput->streams[packet.stream_index];
...@@ -485,39 +493,32 @@ protected: ...@@ -485,39 +493,32 @@ protected:
// lastPts = packet.dts; // lastPts = packet.dts;
// } // }
// spdlog::info("evpusher {} packet new pts: {} dts: {}", selfId, packet.pts, packet.dts); // spdlog::info("evpusher {} packet new pts: {} dts: {}", selfId, packet.pts, packet.dts);
ret = av_interleaved_write_frame(pAVFormatRemux, &packet); ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
av_packet_unref(&packet); av_packet_unref(&packet);
// error write stream, restreaming: -22 ,Invalid argument
if (ret < 0) { if (ret < 0) {
// TODO: report message to cloud // TODO: report message to cloud
failedCnt++; string msg = fmt::format("evpusher {} error write stream, restreaming: {} ,{}", selfId, ret, av_err2str(ret));
if(ret != -22){ json meta;
string msg = fmt::format("evpusher {} error write stream, restreaming: {} ,{}", selfId, ret, av_err2str(ret)); json data;
//if(failedCnt % 2 == 0) { data["msg"] = msg;
json meta; data["modId"] = selfId;
json data; data["type"] = EV_MSG_META_TYPE_REPORT;
data["msg"] = msg; data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["modId"] = selfId; meta["type"] = EV_MSG_META_TYPE_REPORT;
data["type"] = EV_MSG_META_TYPE_REPORT; meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR;
data["level"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR; z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump());
meta["type"] = EV_MSG_META_TYPE_REPORT;
meta["value"] = EV_MSG_META_VALUE_REPORT_LEVEL_ERROR; spdlog::error(msg);
z_send(pDaemon, "evcloudsvc", meta.dump(), data.dump()); if(packet.pts == AV_NOPTS_VALUE) {
// } // reset
// av_write_trailer(pAVFormatRemux);
spdlog::error(msg); freeStream();
if(packet.pts == AV_NOPTS_VALUE) { getInputFormat();
// reset setupStream();
// av_write_trailer(pAVFormatRemux); pktCnt = 0;
freeStream(); continue;
getInputFormat(); }
setupStream();
pktCnt = 0;
bInited = false;
continue;
}
}
} }
} }
av_write_trailer(pAVFormatRemux); av_write_trailer(pAVFormatRemux);
...@@ -593,7 +594,9 @@ public: ...@@ -593,7 +594,9 @@ public:
thEdgeMsgHandler.detach(); thEdgeMsgHandler.detach();
getInputFormat(); getInputFormat();
//setupStream(); ret = 0;
setupStream();
} }
~EvPusher() ~EvPusher()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论