提交 e631e90c authored 作者: blu's avatar blu

big refacting of communitation architect

上级 576d5a09
......@@ -639,7 +639,7 @@ protected:
av_packet_unref(&packet);
if (ret < 0) {
spdlog::error("error muxing packet");
spdlog::error("evmlmotion error muxing packet");
}
}
......
......@@ -188,7 +188,7 @@ private:
ipcPort = to_string(ipc["port"]);
}
urlIn = "rtsp://" + user + ":" + passwd + "@" + ipc["addr"].get<string>() + ":" + ipcPort + "/h264/ch1/sub/av_stream";
urlIn = "rtsp://" + user + ":" + passwd + "@" + ipc["addr"].get<string>() + ":" + ipcPort + "/h264/ch1/main/av_stream";
addr = evpuller["addr"].get<string>();
spdlog::info("evpuller {} connecting to IPC {}", selfId, urlIn);
if(addr == "*" || addr == "0.0.0.0") {
......@@ -401,7 +401,7 @@ public:
int main(int argc, char **argv)
{
av_log_set_level(AV_LOG_ERROR);
av_log_set_level(AV_LOG_INFO);
spdlog::set_level(spdlog::level::info);
//DB::exec(NULL, NULL, NULL,NULL);
auto evp = EvPuller();
......
......@@ -316,12 +316,8 @@ protected:
zmq_msg_t msg;
AVPacket packet;
uint64_t pktCnt = 0;
int pktIgnore = 0;
while (true) {
if(checkStop() == true) {
bStopSig = true;
break;
}
// if(1 == getppid()) {
// spdlog::error("evpusher {} exit since evdaemon is dead", selfId);
// exit(1);
......@@ -354,6 +350,12 @@ protected:
}
zmq_msg_close(&msg);
if(pktCnt == 0 && pktIgnore < 18*7) {
pktIgnore++;
av_packet_unref(&packet);
continue;
}
spdlog::debug("packet stream indx: {:d}", packet.stream_index);
// relay
AVStream *in_stream =NULL, *out_stream = nullptr;
......@@ -378,11 +380,10 @@ protected:
ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
av_packet_unref(&packet);
if (ret < 0) {
spdlog::error("error muxing packet: {}, {}, {}, {}, restreaming...", av_err2str(ret), packet.dts, packet.pts, packet.dts==AV_NOPTS_VALUE);
spdlog::error("evpusher {} error muxing packet: {}, {}, {}, {}, restreaming...", selfId, av_err2str(ret), packet.dts, packet.pts, packet.dts==AV_NOPTS_VALUE);
if(pktCnt != 0 && packet.pts == AV_NOPTS_VALUE) {
// reset
av_write_trailer(pAVFormatRemux);
this_thread::sleep_for(chrono::seconds(5));
freeStream();
getInputFormat();
setupStream();
......@@ -467,7 +468,7 @@ public:
int main(int argc, char *argv[])
{
av_log_set_level(AV_LOG_ERROR);
av_log_set_level(AV_LOG_INFO);
spdlog::set_level(spdlog::level::info);
EvPusher pusher;
pusher.join();
......
......@@ -41,13 +41,13 @@ class EvSlicer: public TinyThread {
private:
#define URLOUT_DEFAULT "slices"
#define NUM_DAYS_DEFAULT 2
#define MINUTES_PER_SLICE_DEFAULT 5
#define MINUTES_PER_SLICE_DEFAULT 1
// 2 days, 5 minutes per record
void *pSubCtx = nullptr, *pDealerCtx = nullptr; // for packets relay
void *pSub = nullptr, *pDealer = nullptr, *pDaemonCtx = nullptr, *pDaemon = nullptr;
string urlOut, urlPub, urlRouter, devSn, mgrSn, selfId, pullerGid;
int iid, days, minutes, numSlices, lastSliceId;
int iid, days, minutes, numSlices, segHead = 0, segTail = 0;
bool enablePush = false;
AVFormatContext *pAVFormatRemux = nullptr;
AVFormatContext *pAVFormatInput = nullptr;
......@@ -58,7 +58,7 @@ private:
thread thMsgProcessor;
string drport = "5549";
json slices;
int sliceHead = -1, sliceTail = 0;
bool gotFormat = false;
int handleMsg(vector<vector<uint8_t> > v){
int ret = 0;
......@@ -73,8 +73,14 @@ private:
try{
peerId = body2str(v[0]);
meta = json::parse(body2str(v[1]))["type"];
data = json::parse(body2str(v[2]));
spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, peerId, meta, data.dump());
if(meta == EV_MSG_META_AVFORMATCTX) {
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
spdlog::info("evslicer {} got avformat from {}", selfId, peerId);
}else{
spdlog::info("evslicer {} received msg from {}, type = {}, data = {}", selfId, peerId, meta, data.dump());
}
}catch(exception &e){
spdlog::error("evslicer {} failed to process msg:{}", selfId, msg);
}
......@@ -235,7 +241,7 @@ private:
meta["type"] = EV_MSG_META_AVFORMATCTX;
body.push_back(str2body(meta.dump()));
body.push_back(str2body(MSG_HELLO));
bool gotFormat = false;
uint64_t failedCnt = 0;
while(!gotFormat) {
ret = z_send_multiple(pDealer, body);
......@@ -243,40 +249,7 @@ private:
spdlog::error("evslicer {}, failed to send hello to puller: {}", selfId, zmq_strerror(zmq_errno()));
continue;
}
// expect response with avformatctx
auto v = z_recv_multiple(pDealer);
if(v.size() != 3) {
ret = zmq_errno();
if(ret != 0) {
if(failedCnt % 100 == 0) {
spdlog::error("evslicer {}, error receive avformatctx: {}, {}", selfId, v.size(), zmq_strerror(ret));
spdlog::info("evslicer {} retry connect to peers", selfId);
}
this_thread::sleep_for(chrono::seconds(5));
failedCnt++;
}
else {
spdlog::error("evslicer {}, received bad size zmq msg for avformatctx: {}", selfId, v.size());
}
}
else if(body2str(v[0]) != pullerGid) {
spdlog::error("evslicer {}, invalid sender for avformatctx: {}, should be: {}", selfId, body2str(v[0]), pullerGid);
}
else {
try {
auto cmd = json::parse(body2str(v[1]));
if(cmd["type"].get<string>() == EV_MSG_META_AVFORMATCTX) {
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
break;
}
}
catch(exception &e) {
spdlog::error("evslicer {}, exception in parsing avformatctx packet: {}", selfId, e.what());
}
}
this_thread::sleep_for(chrono::seconds(7));
}
return ret;
}
......@@ -303,10 +276,15 @@ private:
spdlog::info("evslicer {} streamList[{:d}]: {:d}", selfId, i, streamList[i]);
}
av_dict_set(&pOptsRemux, "movflags", "frag_keyframe+empty_moov+default_base_moof", 0);
//av_dict_set(&pOptsRemux, "movflags", "frag_keyframe+empty_moov+default_base_moof", 0);
av_dict_set(&pOptsRemux, "c:v", "libx264", 0);
//av_dict_set(&pOptsRemux, "brand", "mp42", 0);
//av_dict_set(&pOptsRemux, "movflags", "faststart", 0);
//av_dict_set(&pOptsRemux, "-f", "mp4", 0);
av_dict_set(&pOptsRemux, "segment_format", "mp4", 0);
av_dict_set(&pOptsRemux, "f", "segment", 0);
av_dict_set(&pOptsRemux, "segment_time", "20", 0);
av_dict_set(&pOptsRemux, "segment_wrap", to_string(numSlices).data(), 0);
return ret;
}
void freeStream()
......@@ -344,8 +322,8 @@ protected:
auto end = start;
int ts = chrono::duration_cast<chrono::seconds>(start.time_since_epoch()).count();
string name = to_string(ts) + ".mp4";
name = urlOut + "/" + name;
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "mp4", name.c_str());
name = urlOut + "/" + "capture-%03d.mp4";
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "segment", name.c_str());
if (ret < 0) {
spdlog::error("evslicer {} failed create avformatcontext for output: %s", selfId, av_err2str(ret));
exit(1);
......@@ -366,41 +344,21 @@ protected:
}
}
//av_dump_format(pAVFormatRemux, 0, name.c_str(), 1);
if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) {
ret = avio_open2(&pAVFormatRemux->pb, name.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux);
if (ret < 0) {
spdlog::error("evslicer {} could not open output file {}", selfId, name);
}
}
av_dict_set(&pOptsRemux, "segment_start_number", to_string(segTail).data(), 0);
ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
if (ret < 0) {
spdlog::error("evslicer {} error occurred when opening output file", selfId);
}
spdlog::info("evslicer {} writing new slice {}", selfId, name.c_str());
// slices[sliceTail] = name;
// sliceTail++;
// if(sliceTail >= numSlices) {
// sliceTail = 0;
// sliceHead = 0;
// }
// slices["sliceTail"] = sliceTail;
// slices["sliceHead"] = sliceHead;
while(chrono::duration_cast<chrono::seconds>(end-start).count() < minutes * 60) {
if(checkStop() == true) {
bStopSig = true;
break;
}
// if(1 == getppid()) {
// spdlog::error("evmgr {} exit since evdaemon is dead", selfId);
// exit(1);
// }
// business logic
int pktIgnore = 0;
while(true) {
int ret =zmq_msg_init(&msg);
ret = zmq_recvmsg(pSub, &msg, 0);
if(ret < 0) {
......@@ -414,9 +372,14 @@ protected:
continue;
}
}
zmq_msg_close(&msg);
if(pktCnt == 0 && pktIgnore < 18*7) {
pktIgnore++;
av_packet_unref(&packet);
continue;
}
AVStream *in_stream =NULL, *out_stream = nullptr;
in_stream = pAVFormatInput->streams[packet.stream_index];
packet.stream_index = streamList[packet.stream_index];
......@@ -428,8 +391,8 @@ protected:
}
/* copy packet */
if(pktCnt == 0) {
packet.pts = 0;
packet.dts = 0;
packet.pts = AV_NOPTS_VALUE;
packet.dts = AV_NOPTS_VALUE;
packet.duration = 0;
packet.pos = -1;
}
......@@ -441,11 +404,10 @@ protected:
}
pktCnt++;
ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
av_packet_unref(&packet);
if (ret < 0) {
spdlog::error("error muxing packet: {}, {}, {}, {}, restreaming...", av_err2str(ret), packet.dts, packet.pts, packet.dts==AV_NOPTS_VALUE);
spdlog::error("evslicer {} error muxing packet: {}, {}, {}, {}, reloading...", selfId, av_err2str(ret), packet.dts, packet.pts, packet.dts==AV_NOPTS_VALUE);
if(pktCnt != 0 && packet.pts == AV_NOPTS_VALUE) {
// reset
av_write_trailer(pAVFormatRemux);
......@@ -459,16 +421,16 @@ protected:
}
end = chrono::system_clock::now();
}// while in slice
// write tail
// close output context
}
if (pAVFormatRemux != nullptr) {
if(pAVFormatRemux->pb != nullptr) {
avio_closep(&pAVFormatRemux->pb);
}
avformat_free_context(pAVFormatRemux);
}
}// outer while
}
public:
EvSlicer()
......@@ -507,8 +469,6 @@ public:
}
init();
getInputFormat();
setupStream();
// thread for msg
thMsgProcessor = thread([this](){
......@@ -524,6 +484,8 @@ public:
});
thMsgProcessor.detach();
getInputFormat();
setupStream();
};
~EvSlicer()
{
......@@ -549,7 +511,7 @@ public:
int main(int argc, const char *argv[])
{
av_log_set_level(AV_LOG_ERROR);
av_log_set_level(AV_LOG_INFO);
spdlog::set_level(spdlog::level::info);
EvSlicer es;
es.join();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论