提交 ec001352 authored 作者: blu's avatar blu

init

上级 978e8480
#pragma GCC diagnostic ignored "-Wunused-private-field" #pragma GCC diagnostic ignored "-Wunused-private-field"
#pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wunused-variable"
#include <stdlib.h> #include <stdlib.h>
#include <string> #include <string>
#include <thread> #include <thread>
...@@ -47,13 +46,13 @@ private: ...@@ -47,13 +46,13 @@ private:
while(!inited) { while(!inited) {
// TODO: req config // TODO: req config
bool found = false; bool found = false;
try{ try {
config = json::parse(cloudutils::config); config = json::parse(cloudutils::config);
spdlog::info("config: {:s}", config.dump()); spdlog::info("config: {:s}", config.dump());
json evpusher; json evpusher;
json evmgr; json evmgr;
json ipc; json ipc;
json data = config["data"]; json data = config["data"];
for (auto& [key, value] : data.items()) { for (auto& [key, value] : data.items()) {
//std::cout << key << " : " << dynamic_cast<json&>(value).dump() << "\n"; //std::cout << key << " : " << dynamic_cast<json&>(value).dump() << "\n";
...@@ -98,18 +97,19 @@ private: ...@@ -98,18 +97,19 @@ private:
} }
catch(exception &e) { catch(exception &e) {
spdlog::error("evpusher {} {} exception in EvPuller.init {:s} retrying", devSn, iid, e.what()); spdlog::error("evpusher {} {} exception in EvPuller.init {:s} retrying", devSn, iid, e.what());
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(3));
continue; continue;
} }
inited = true; inited = true;
} }
return 0; return 0;
} }
int ping(){ int ping()
// send hello to router {
// send hello to router
int ret = 0; int ret = 0;
vector<vector<uint8_t> >body; vector<vector<uint8_t> >body;
// since identity is auto set // since identity is auto set
...@@ -121,10 +121,11 @@ private: ...@@ -121,10 +121,11 @@ private:
if(ret < 0) { if(ret < 0) {
spdlog::error("evpusher {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno())); spdlog::error("evpusher {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
//TODO: //TODO:
}else{ }
else {
spdlog::info("evpusher {} {} sent hello to router: {}", devSn, iid, mgrSn); spdlog::info("evpusher {} {} sent hello to router: {}", devSn, iid, mgrSn);
} }
return ret; return ret;
} }
...@@ -162,7 +163,7 @@ private: ...@@ -162,7 +163,7 @@ private:
} }
//ping //ping
ret = ping(); ret = ping();
thPing = thread([&,this](){ thPing = thread([&,this]() {
while(true) { while(true) {
this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS-2)); this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS-2));
ping(); ping();
...@@ -174,7 +175,8 @@ private: ...@@ -174,7 +175,8 @@ private:
return ret; return ret;
} }
int getInputFormat(){ int getInputFormat()
{
int ret = 0; int ret = 0;
// req avformatcontext packet // req avformatcontext packet
// send hello to puller // send hello to puller
...@@ -193,7 +195,7 @@ private: ...@@ -193,7 +195,7 @@ private:
spdlog::error("evpusher {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno())); spdlog::error("evpusher {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno()));
continue; continue;
} }
// expect response with avformatctx // expect response with avformatctx
auto v = z_recv_multiple(pDealer); auto v = z_recv_multiple(pDealer);
if(v.size() != 3) { if(v.size() != 3) {
...@@ -201,24 +203,28 @@ private: ...@@ -201,24 +203,28 @@ private:
if(ret != 0) { if(ret != 0) {
if(failedCnt % 100 == 0) { if(failedCnt % 100 == 0) {
spdlog::error("evpusher {} {}, error receive avformatctx: {}, {}", devSn, iid, v.size(), zmq_strerror(ret)); spdlog::error("evpusher {} {}, error receive avformatctx: {}, {}", devSn, iid, v.size(), zmq_strerror(ret));
spdlog::info("evpusher {} {} retry connect to peers", devSn, iid); spdlog::info("evpusher {} {} retry connect to peers", devSn, iid);
} }
this_thread::sleep_for(chrono::seconds(5)); this_thread::sleep_for(chrono::seconds(5));
failedCnt++; failedCnt++;
}else{ }
else {
spdlog::error("evpusher {} {}, received bad size zmq msg for avformatctx: {}", devSn, iid, v.size()); spdlog::error("evpusher {} {}, received bad size zmq msg for avformatctx: {}", devSn, iid, v.size());
} }
}else if(body2str(v[0]) != pullerGid) { }
else if(body2str(v[0]) != pullerGid) {
spdlog::error("evpusher {} {}, invalid sender for avformatctx: {}, should be: {}", devSn, iid, body2str(v[0]), pullerGid); spdlog::error("evpusher {} {}, invalid sender for avformatctx: {}, should be: {}", devSn, iid, body2str(v[0]), pullerGid);
}else{ }
try{ else {
try {
auto cmd = json::parse(body2str(v[1])); auto cmd = json::parse(body2str(v[1]));
if(cmd["type"].get<string>() == EV_MSG_META_AVFORMATCTX){ if(cmd["type"].get<string>() == EV_MSG_META_AVFORMATCTX) {
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext)); pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput); AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true; gotFormat = true;
} }
}catch(exception &e) { }
catch(exception &e) {
spdlog::error("evpusher {} {}, exception in parsing avformatctx packet: {}", devSn, iid, e.what()); spdlog::error("evpusher {} {}, exception in parsing avformatctx packet: {}", devSn, iid, e.what());
} }
} }
...@@ -277,7 +283,7 @@ private: ...@@ -277,7 +283,7 @@ private:
av_dump_format(pAVFormatRemux, 0, urlOut.c_str(), 1); av_dump_format(pAVFormatRemux, 0, urlOut.c_str(), 1);
if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) { if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) {
spdlog::error("evpusher {} {} failed allocating output stream", devSn ,iid); spdlog::error("evpusher {} {} failed allocating output stream", devSn,iid);
ret = avio_open2(&pAVFormatRemux->pb, urlOut.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux); ret = avio_open2(&pAVFormatRemux->pb, urlOut.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux);
if (ret < 0) { if (ret < 0) {
spdlog::error("evpusher {} {} could not open output file '%s'", devSn, iid, urlOut); spdlog::error("evpusher {} {} could not open output file '%s'", devSn, iid, urlOut);
...@@ -299,14 +305,14 @@ private: ...@@ -299,14 +305,14 @@ private:
return ret; return ret;
} }
void freeStream(){ void freeStream()
// close output context {
if(pAVFormatRemux) // close output context
{ if(pAVFormatRemux) {
if(pAVFormatRemux->pb) { if(pAVFormatRemux->pb) {
avio_closep(&pAVFormatRemux->pb); avio_closep(&pAVFormatRemux->pb);
} }
avformat_free_context(pAVFormatRemux); avformat_free_context(pAVFormatRemux);
} }
pAVFormatRemux = NULL; pAVFormatRemux = NULL;
...@@ -362,22 +368,21 @@ protected: ...@@ -362,22 +368,21 @@ protected:
out_stream = pAVFormatRemux->streams[packet.stream_index]; out_stream = pAVFormatRemux->streams[packet.stream_index];
//calc pts //calc pts
{ if(pktCnt % (18*60*5) == 0) {
if(pktCnt % (18*60*5) == 0) { spdlog::info("seq: {:lld}, pts: {:lld}, dts: {:lld}, dur: {:lld}, idx: {:d}", pktCnt, packet.pts, packet.dts, packet.duration, packet.stream_index);
spdlog::info("seq: {:lld}, pts: {:lld}, dts: {:lld}, dur: {:lld}, idx: {:d}", pktCnt, packet.pts, packet.dts, packet.duration, packet.stream_index); }
} /* copy packet */
/* copy packet */ if(pktCnt == 0) {
if(pktCnt == 0) { packet.pts = 0;
packet.pts = 0; packet.dts = 0;
packet.dts = 0; packet.duration = 0;
packet.duration = 0; packet.pos = -1;
packet.pos = -1; }
}else{ else {
packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX)); packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX)); packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
packet.duration = av_rescale_q(packet.duration, in_stream->time_base, out_stream->time_base); packet.duration = av_rescale_q(packet.duration, in_stream->time_base, out_stream->time_base);
packet.pos = -1; packet.pos = -1;
}
} }
ret = av_interleaved_write_frame(pAVFormatRemux, &packet); ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
...@@ -392,7 +397,7 @@ protected: ...@@ -392,7 +397,7 @@ protected:
getInputFormat(); getInputFormat();
setupStream(); setupStream();
pktCnt = 0; pktCnt = 0;
continue; continue;
} }
} }
} }
......
...@@ -14,12 +14,13 @@ ...@@ -14,12 +14,13 @@
namespace fs = std::filesystem; namespace fs = std::filesystem;
#endif #endif
#include <cstdlib> #include <cstdlib>
#include "vendor/include/zmq.h" #include "zmqhelper.hpp"
#include "tinythread.hpp" #include "tinythread.hpp"
#include "common.hpp" #include "common.hpp"
#include "database.h" #include "database.h"
using namespace std; using namespace std;
using namespace zmqhelper;
class EvSlicer: public TinyThread { class EvSlicer: public TinyThread {
private: private:
...@@ -28,9 +29,9 @@ private: ...@@ -28,9 +29,9 @@ private:
#define MINUTES_PER_SLICE_DEFAULT 2 #define MINUTES_PER_SLICE_DEFAULT 2
// 2 days, 10 minutes per record // 2 days, 10 minutes per record
#define NUM_SLICES_DEFAULT (24 * NUM_DAYS_DEFAULT * 60 / MINUTES_PER_SLICE_DEFAULT) #define NUM_SLICES_DEFAULT (24 * NUM_DAYS_DEFAULT * 60 / MINUTES_PER_SLICE_DEFAULT)
void *pSubCtx = NULL, *pReqCtx = NULL; // for packets relay void *pSubCtx = NULL, *pDealerCtx = NULL; // for packets relay
void *pSub = NULL, *pReq = NULL; void *pSub = NULL, *pDealer = NULL;
string urlOut, urlPub, urlRep, sn; string urlOut, urlPub, urlRouter, devSn, mgrSn, selfId, pullerGid;
int iid, days, minutes, numSlices, lastSliceId; int iid, days, minutes, numSlices, lastSliceId;
bool enablePush = false; bool enablePush = false;
AVFormatContext *pAVFormatRemux = NULL; AVFormatContext *pAVFormatRemux = NULL;
...@@ -39,75 +40,103 @@ private: ...@@ -39,75 +40,103 @@ private:
// load from db // load from db
vector<int> *sliceIdxToName = NULL; vector<int> *sliceIdxToName = NULL;
int *streamList = NULL; int *streamList = NULL;
json config;
thread thPing;
int init() int init()
{ {
int ret = 0; int ret = 0;
bool inited = false; bool inited = false;
// TODO: read db to get sn // TODO: read db to get devSn
sn = "ILS-3"; devSn = "ILSEVSLICER1";
iid = 3; iid = 1;
selfId = devSn + ":evslicer:" + to_string(iid);
while(!inited) { while(!inited) {
// req config // TODO: req config
json jr = cloudutils::registry(sn.c_str(), "evslicer", iid); bool found = false;
bool bcnt = false;
try { try {
spdlog::info("registry: {:s}", jr.dump()); config = json::parse(cloudutils::config);
json data = jr["data"]["services"]["evslicer"]; spdlog::info("config: {:s}", config.dump());
string addr = data["addr"].get<string>(); json evslicer;
if(addr == "0.0.0.0") { json evmgr;
addr = "localhost"; json ipc;
}
urlPub = string("tcp://") + addr + ":" + to_string(data["port-pub"]); json data = config["data"];
urlRep = string("tcp://") + addr + ":" + to_string(data["port-rep"]); for (auto& [key, value] : data.items()) {
spdlog::info("evslicer {} {} will connect to {} for sub, {} for req", sn, iid, urlPub, urlRep); //std::cout << key << " : " << dynamic_cast<json&>(value).dump() << "\n";
evmgr = value;
data = jr["data"]["services"]["evslicer"]; json ipcs = evmgr["ipcs"];
for(auto &j: data) { for(auto &j: ipcs) {
if(j["sn"] == sn && iid == j["iid"] && j["enabled"] != 0) { json pullers = j["modules"]["evslicer"];
try{ for(auto &p:pullers) {
j.at("path").get_to(urlOut); if(p["sn"] == devSn && p["iid"] == iid) {
}catch(exception &e) { evslicer = p;
spdlog::warn("evslicer {} {} exception get params for storing slices: {}, using default: {}", sn, iid, e.what(), URLOUT_DEFAULT); break;
urlOut = URLOUT_DEFAULT; }
}
try{
j.at("days").get_to(days);
}catch(exception &e) {
spdlog::warn("evslicer {} {} exception get params for storing slices: {}, using default: {}", sn, iid, e.what(), NUM_DAYS_DEFAULT);
days = NUM_DAYS_DEFAULT;
} }
try{ if(evslicer.size() != 0) {
j.at("minutes").get_to(minutes); ipc = j;
}catch(exception &e) { break;
spdlog::warn("evslicer {} {} exception get params for storing slices: {}, using default: {}", sn, iid, e.what(),MINUTES_PER_SLICE_DEFAULT);
minutes = MINUTES_PER_SLICE_DEFAULT;
} }
}
numSlices = 24 * days * 60 /minutes; if(ipc.size()!=0 && evslicer.size()!=0) {
// alloc memory found = true;
sliceIdxToName = new vector<int>(numSlices);
// load db
// DB::exec(NULL, "select id, ts, last from slices;", DB::get_slices, sliceIdxToName);
spdlog::info("mkdir -p {}", urlOut);
ret = system((string("mkdir -p ") + urlOut).c_str());
if(ret == -1) {
spdlog::error("failed to create {} dir", urlOut);
return -1;
}
// TODO: check sn
break; break;
} }
} }
if(!found) {
spdlog::error("evslicer {} {}: no valid config found. retrying load config...", devSn, iid);
this_thread::sleep_for(chrono::seconds(3));
continue;
}
json evpuller = ipc["modules"]["evpuller"][0];
pullerGid = evpuller["sn"].get<string>() + ":evpuller:" + to_string(evpuller["iid"]);
mgrSn = evmgr["sn"];
if(evslicer.count("path") == 0) {
spdlog::warn("evslicer {} {} no params for path, using default: {}", devSn, iid, URLOUT_DEFAULT);
urlOut = URLOUT_DEFAULT;
}
else {
urlOut = evslicer["path"];
}
if(evslicer.count("days") == 0) {
spdlog::warn("evslicer {} {} no params for days, using default: {}", devSn, iid, NUM_DAYS_DEFAULT);
days = NUM_DAYS_DEFAULT;
}
else {
days = evslicer["days"].get<int>();
}
if(evslicer.count("minutes") == 0) {
spdlog::warn("evslicer {} {} no params for minutes, using default: {}", devSn, iid, MINUTES_PER_SLICE_DEFAULT);
minutes = MINUTES_PER_SLICE_DEFAULT;
}
else {
minutes = evslicer["minutes"].get<int>();
}
numSlices = 24 * days * 60 /minutes;
// alloc memory
sliceIdxToName = new vector<int>(numSlices);
// load db
// DB::exec(NULL, "select id, ts, last from slices;", DB::get_slices, sliceIdxToName);
spdlog::info("mkdir -p {}", urlOut);
ret = system((string("mkdir -p ") + urlOut).c_str());
if(ret == -1) {
spdlog::error("failed to create {} dir", urlOut);
return -1;
}
urlPub = string("tcp://") + evpuller["addr"].get<string>() + ":" + to_string(evpuller["port-pub"]);
urlRouter = string("tcp://") + evmgr["addr"].get<string>() + ":" + to_string(evmgr["port-router"]);
spdlog::info("evslicer {} {} will connect to {} for sub, {} for router", devSn, iid, urlPub, urlRouter);
} }
catch(exception &e) { catch(exception &e) {
bcnt = true; spdlog::error("evslicer {} {} exception in init {:s} retrying", devSn, iid, e.what());
spdlog::error("evslicer {} {} exception in evslicer.init {:s}, retrying...", sn, iid, e.what()); this_thread::sleep_for(chrono::seconds(3));
}
if(bcnt || urlOut.empty()) {
// TODO: waiting for command
spdlog::warn("evslicer {} {} waiting for command & retrying", sn, iid);
this_thread::sleep_for(chrono::milliseconds(1000*20));
continue; continue;
} }
...@@ -116,9 +145,31 @@ private: ...@@ -116,9 +145,31 @@ private:
return 0; return 0;
} }
int ping()
{
// send hello to router
int ret = 0;
vector<vector<uint8_t> >body;
// since identity is auto set
body.push_back(str2body(mgrSn+":0:0"));
body.push_back(str2body(EV_MSG_META_PING)); // blank meta
body.push_back(str2body(MSG_HELLO));
ret = z_send_multiple(pDealer, body);
if(ret < 0) {
spdlog::error("evslicer {} {} failed to send multiple: {}", devSn, iid, zmq_strerror(zmq_errno()));
//TODO:
}
else {
spdlog::info("evslicer {} {} sent hello to router: {}", devSn, iid, mgrSn);
}
return ret;
}
int setupMq() int setupMq()
{ {
teardownMq();
int ret = 0; int ret = 0;
// setup sub // setup sub
...@@ -126,91 +177,103 @@ private: ...@@ -126,91 +177,103 @@ private:
pSub = zmq_socket(pSubCtx, ZMQ_SUB); pSub = zmq_socket(pSubCtx, ZMQ_SUB);
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0); ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
if(ret != 0) { if(ret != 0) {
spdlog::error("evslicer failed connect to pub: {}, {}", sn, iid); spdlog::error("evslicer {} {} failed set setsockopt: {}", devSn, iid, urlPub);
return -1; return -1;
} }
ret = zmq_connect(pSub, urlPub.c_str()); ret = zmq_connect(pSub, urlPub.c_str());
if(ret != 0) { if(ret != 0) {
spdlog::error("evslicer {} {} failed create sub", sn, iid); spdlog::error("evslicer {} {} failed connect pub: {}", devSn, iid, urlPub);
return -2; return -2;
} }
// setup req // setup dealer
pReqCtx = zmq_ctx_new(); pDealerCtx = zmq_ctx_new();
pReq = zmq_socket(pReqCtx, ZMQ_REQ); pDealer = zmq_socket(pDealerCtx, ZMQ_DEALER);
spdlog::info("evslicer {} {} try create req to {}", sn, iid, urlRep); spdlog::info("evslicer {} {} try create req to {}", devSn, iid, urlRouter);
ret = zmq_connect(pReq, urlRep.c_str()); ret = zmq_setsockopt(pDealer, ZMQ_IDENTITY, selfId.c_str(), selfId.size());
if(ret < 0) {
if(ret != 0) { spdlog::error("evslicer {} {} failed setsockopts router: {}", devSn, iid, urlRouter);
spdlog::error("evslicer {} {} failed create req to {}", sn, iid, urlRep);
return -3; return -3;
} }
ret = zmq_connect(pDealer, urlRouter.c_str());
spdlog::info("evslicer {} {} success setupMq", sn, iid); if(ret != 0) {
spdlog::error("evslicer {} {} failed connect dealer: {}", devSn, iid, urlRouter);
return 0; return -4;
}
int teardownMq()
{
if(pSub != NULL) {
zmq_close(pSub);
pSub = NULL;
}
if(pSubCtx != NULL) {
zmq_ctx_destroy(pSubCtx);
pSubCtx = NULL;
}
if(pReq != NULL) {
zmq_close(pSub);
pReq = NULL;
}
if(pReqCtx != NULL) {
zmq_ctx_destroy(pSub);
pReqCtx = NULL;
} }
//ping
ret = ping();
thPing = thread([&,this]() {
while(true) {
this_thread::sleep_for(chrono::seconds(EV_HEARTBEAT_SECONDS-2));
ping();
}
});
return 0; thPing.detach();
return ret;
} }
int setupStream() int getInputFormat()
{ {
int ret = 0; int ret = 0;
// req avformatcontext packet // req avformatcontext packet
// send first packet to init connection // send hello to puller
zmq_msg_t msg; spdlog::info("evslicer {} {} send hello to puller: {}", devSn, iid, pullerGid);
zmq_send(pReq, "hello", 5, 0); vector<vector<uint8_t> > body;
spdlog::info("evslicer {} {} success send hello", sn, iid); body.push_back(str2body(pullerGid));
ret =zmq_msg_init(&msg); json meta;
if(ret != 0) { meta["type"] = EV_MSG_META_AVFORMATCTX;
spdlog::error("failed to init zmq msg"); body.push_back(str2body(meta.dump()));
exit(1); body.push_back(str2body(MSG_HELLO));
} bool gotFormat = false;
// receive packet uint64_t failedCnt = 0;
ret = zmq_recvmsg(pReq, &msg, 0); while(!gotFormat) {
spdlog::info("evslicer {} {} recv", sn, iid); ret = z_send_multiple(pDealer, body);
if(ret < 0) { if(ret < 0) {
spdlog::error("evslicer {} {} failed to recv zmq msg: {}", sn, iid, zmq_strerror(ret)); spdlog::error("evslicer {} {}, failed to send hello to puller: {}", devSn, iid, zmq_strerror(zmq_errno()));
exit(1); continue;
} }
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)zmq_msg_data(&msg), ret, pAVFormatInput);
// close req // expect response with avformatctx
{ auto v = z_recv_multiple(pDealer);
zmq_msg_close(&msg); if(v.size() != 3) {
if(pReq != NULL) { ret = zmq_errno();
zmq_close(pReq); if(ret != 0) {
pReq = NULL; if(failedCnt % 100 == 0) {
spdlog::error("evslicer {} {}, error receive avformatctx: {}, {}", devSn, iid, v.size(), zmq_strerror(ret));
spdlog::info("evslicer {} {} retry connect to peers", devSn, iid);
}
this_thread::sleep_for(chrono::seconds(5));
failedCnt++;
}
else {
spdlog::error("evslicer {} {}, received bad size zmq msg for avformatctx: {}", devSn, iid, v.size());
}
}
else if(body2str(v[0]) != pullerGid) {
spdlog::error("evslicer {} {}, invalid sender for avformatctx: {}, should be: {}", devSn, iid, body2str(v[0]), pullerGid);
} }
if(pReqCtx != NULL) { else {
zmq_ctx_destroy(pReqCtx); try {
pReqCtx = NULL; auto cmd = json::parse(body2str(v[1]));
if(cmd["type"].get<string>() == EV_MSG_META_AVFORMATCTX) {
pAVFormatInput = (AVFormatContext *)malloc(sizeof(AVFormatContext));
AVFormatCtxSerializer::decode((char *)(v[2].data()), v[2].size(), pAVFormatInput);
gotFormat = true;
}
}
catch(exception &e) {
spdlog::error("evslicer {} {}, exception in parsing avformatctx packet: {}", devSn, iid, e.what());
}
} }
} }
return ret;
}
int setupStream()
{
int ret = 0;
int streamIdx = 0; int streamIdx = 0;
// find all video & audio streams for remuxing // find all video & audio streams for remuxing
streamList = (int *)av_mallocz_array(pAVFormatInput->nb_streams, sizeof(*streamList)); streamList = (int *)av_mallocz_array(pAVFormatInput->nb_streams, sizeof(*streamList));
...@@ -233,6 +296,26 @@ private: ...@@ -233,6 +296,26 @@ private:
//av_dict_set(&pOptsRemux, "movflags", "frag_keyframe+empty_moov+default_base_moof", 0); //av_dict_set(&pOptsRemux, "movflags", "frag_keyframe+empty_moov+default_base_moof", 0);
return ret; return ret;
} }
void freeStream()
{
// close output context
if(pAVFormatRemux) {
if(pAVFormatRemux->pb) {
avio_closep(&pAVFormatRemux->pb);
}
avformat_free_context(pAVFormatRemux);
}
pAVFormatRemux = NULL;
// free avformatcontex
if(pAVFormatInput != NULL) {
AVFormatCtxSerializer::freeCtx(pAVFormatInput);
pAVFormatInput = NULL;
}
pAVFormatInput = NULL;
}
protected: protected:
void run() void run()
{ {
...@@ -242,8 +325,7 @@ protected: ...@@ -242,8 +325,7 @@ protected:
int pktCnt = 0; int pktCnt = 0;
AVStream * out_stream = NULL; AVStream * out_stream = NULL;
zmq_msg_t msg; zmq_msg_t msg;
AVPacket packet, keyPacket; AVPacket packet;
av_init_packet(&keyPacket);
while (true) { while (true) {
auto start = chrono::system_clock::now(); auto start = chrono::system_clock::now();
auto end = start; auto end = start;
...@@ -251,7 +333,7 @@ protected: ...@@ -251,7 +333,7 @@ protected:
name = urlOut + "/" + name; name = urlOut + "/" + name;
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "mp4", name.c_str()); ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "mp4", name.c_str());
if (ret < 0) { if (ret < 0) {
spdlog::error("evslicer {} {} failed create avformatcontext for output: %s", sn, iid, av_err2str(ret)); spdlog::error("evslicer {} {} failed create avformatcontext for output: %s", devSn, iid, av_err2str(ret));
exit(1); exit(1);
} }
...@@ -260,12 +342,12 @@ protected: ...@@ -260,12 +342,12 @@ protected:
if(streamList[i] != -1) { if(streamList[i] != -1) {
out_stream = avformat_new_stream(pAVFormatRemux, NULL); out_stream = avformat_new_stream(pAVFormatRemux, NULL);
if (!out_stream) { if (!out_stream) {
spdlog::error("evslicer {} {} failed allocating output stream 1", sn, iid); spdlog::error("evslicer {} {} failed allocating output stream 1", devSn, iid);
ret = AVERROR_UNKNOWN; ret = AVERROR_UNKNOWN;
} }
ret = avcodec_parameters_copy(out_stream->codecpar, pAVFormatInput->streams[i]->codecpar); ret = avcodec_parameters_copy(out_stream->codecpar, pAVFormatInput->streams[i]->codecpar);
if (ret < 0) { if (ret < 0) {
spdlog::error("evslicer {} {} failed to copy codec parameters", sn, iid); spdlog::error("evslicer {} {} failed to copy codec parameters", devSn, iid);
} }
} }
} }
...@@ -274,22 +356,16 @@ protected: ...@@ -274,22 +356,16 @@ protected:
if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) { if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) {
ret = avio_open2(&pAVFormatRemux->pb, name.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux); ret = avio_open2(&pAVFormatRemux->pb, name.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux);
if (ret < 0) { if (ret < 0) {
spdlog::error("evslicer {} {} could not open output file {}", sn, iid, name); spdlog::error("evslicer {} {} could not open output file {}", devSn, iid, name);
} }
} }
ret = avformat_write_header(pAVFormatRemux, &pOptsRemux); ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
if (ret < 0) { if (ret < 0) {
spdlog::error("evslicer {} {} error occurred when opening output file", sn, iid); spdlog::error("evslicer {} {} error occurred when opening output file", devSn, iid);
} }
// TODO: // TODO:
if(keyPacket.buf != NULL) {
ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
if (ret < 0) {
spdlog::error("evslicer {} {} failed write last key packet", sn, iid);
}
}
spdlog::info("writing new slice {}", name.c_str()); spdlog::info("writing new slice {}", name.c_str());
while(chrono::duration_cast<chrono::seconds>(end-start).count() < minutes * 60) { while(chrono::duration_cast<chrono::seconds>(end-start).count() < minutes * 60) {
...@@ -319,49 +395,82 @@ protected: ...@@ -319,49 +395,82 @@ protected:
packet.stream_index = streamList[packet.stream_index]; packet.stream_index = streamList[packet.stream_index];
out_stream = pAVFormatRemux->streams[packet.stream_index]; out_stream = pAVFormatRemux->streams[packet.stream_index];
//calc pts //calc pts
{
if(pktCnt % 1024 == 0) { if(pktCnt % 1024 == 0) {
spdlog::info("seq: {}, pts: {}, dts: {}, dur: {}, idx: {}", pktCnt, packet.pts, packet.dts, packet.duration, packet.stream_index); spdlog::info("seq: {}, pts: {}, dts: {}, dur: {}, idx: {}", pktCnt, packet.pts, packet.dts, packet.duration, packet.stream_index);
} }
pktCnt++; /* copy packet */
if(pktCnt == 0) {
packet.pts = 0;
packet.dts = 0;
packet.duration = 0;
packet.pos = -1;
}
else {
packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX)); packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX)); packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
packet.duration = av_rescale_q(packet.duration, in_stream->time_base, out_stream->time_base); packet.duration = av_rescale_q(packet.duration, in_stream->time_base, out_stream->time_base);
packet.pos = -1; packet.pos = -1;
} }
// TODO: pktCnt++;
if(packet.data[5] == 0x65 ) {
spdlog::info("pktCnt: {}, keyframe: {:0x}", pktCnt, packet.data[5]);
if(keyPacket.buf != NULL) {
av_packet_unref(&keyPacket);
av_packet_ref(&keyPacket, &packet);
}
}
ret = av_interleaved_write_frame(pAVFormatRemux, &packet); ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
av_packet_unref(&packet); av_packet_unref(&packet);
if (ret < 0) { if (ret < 0) {
spdlog::error("error muxing packet"); spdlog::error("error muxing packet: {}, {}, {}, {}, restreaming...", av_err2str(ret), packet.dts, packet.pts, packet.dts==AV_NOPTS_VALUE);
if(pktCnt != 0 && packet.pts == AV_NOPTS_VALUE) {
// reset
av_write_trailer(pAVFormatRemux);
this_thread::sleep_for(chrono::seconds(5));
freeStream();
getInputFormat();
setupStream();
pktCnt = 0;
break;
}
} }
end = chrono::system_clock::now(); end = chrono::system_clock::now();
}// while in slice }// while in slice
// write tail // write tail
av_write_trailer(pAVFormatRemux);
// close output context // close output context
if (pAVFormatRemux && !(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) if (pAVFormatRemux != NULL){
avio_closep(&pAVFormatRemux->pb); if(pAVFormatRemux->pb != NULL){
avformat_free_context(pAVFormatRemux); avio_closep(&pAVFormatRemux->pb);
} }
avformat_free_context(pAVFormatRemux);
}
}// outer while
} }
public: public:
EvSlicer() { EvSlicer()
{
init(); init();
setupMq(); setupMq();
getInputFormat();
setupStream(); setupStream();
}; };
~EvSlicer() {}; ~EvSlicer()
{
if(pSub != NULL) {
zmq_close(pSub);
pSub = NULL;
}
if(pSubCtx != NULL) {
zmq_ctx_destroy(pSubCtx);
pSubCtx = NULL;
}
if(pDealer != NULL) {
zmq_close(pSub);
pDealer = NULL;
}
if(pDealerCtx != NULL) {
zmq_ctx_destroy(pSub);
pDealerCtx = NULL;
}
freeStream();
};
}; };
int main(int argc, const char *argv[]) int main(int argc, const char *argv[])
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论