提交 c63dad3b authored 作者: blu's avatar blu

init

上级 b18a8572
......@@ -20,16 +20,29 @@ namespace fs = std::filesystem;
#include "inc/tinythread.hpp"
#include "inc/common.hpp"
#define MAX_ZMQ_MSG_SIZE 1204 * 1024 * 2
using namespace std;
using json = nlohmann::json;
using namespace moodycamel;
class PacketPusher: public TinyThread {
private:
// 2M
#define MAX_ZMQ_MSG_SIZE 1204 * 1024 * 2
void *pSubContext = NULL; // for packets relay
void *pSubscriber = NULL;
char *urlOut = NULL;
char *urlIn = NULL;
bool enablePush = false;
int *streamList = NULL;
AVFormatContext *pAVFormatRemux = NULL;
AVFormatContext *pAVFormatInput = NULL;
int getEnv(){
// TODO:
//urlOut = getenv("URL_OUT");
//urlIn=
urlOut = (char*)"rtsp://40.73.41.176:554/test1";
}
int setupMq()
{
teardownMq();
......@@ -58,6 +71,78 @@ private:
}
return 0;
}
int setupStream(){
int ret = 0, numStreams = 0, streamIdx = 0;
AVDictionary *pOptsRemux = NULL, *pOptsInput = NULL, *pOptsOutput = NULL;
urlIn = (char*)"rtsp://admin:FWBWTU@172.31.0.51/h264/ch1/sub/av_stream";
if ((ret = avformat_open_input(&pAVFormatInput, urlIn, NULL, NULL)) < 0) {
logThrow(NULL, AV_LOG_FATAL, "Could not open input file '%s'", urlIn);
}
if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) {
logThrow(NULL, AV_LOG_FATAL, "Failed to retrieve input stream information");
}
pAVFormatInput->flags = AVFMT_FLAG_NOBUFFER | AVFMT_FLAG_FLUSH_PACKETS;
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOut);
if (ret < 0) {
logThrow(NULL, AV_LOG_FATAL, "failed create avformatcontext for output: %s", av_err2str(ret));
}
numStreams = pAVFormatInput->nb_streams;
streamList = (int *)av_mallocz_array(numStreams, sizeof(*streamList));
if (!streamList) {
ret = AVERROR(ENOMEM);
logThrow(NULL, AV_LOG_FATAL, "failed create avformatcontext for output: %s", av_err2str(AVERROR(ENOMEM)));
}
// find all video & audio streams for remuxing
for (int i = 0; i < pAVFormatInput->nb_streams; i++) {
AVStream *out_stream;
AVStream *in_stream = pAVFormatInput->streams[i];
AVCodecParameters *in_codecpar = in_stream->codecpar;
if (in_codecpar->codec_type != AVMEDIA_TYPE_AUDIO &&
in_codecpar->codec_type != AVMEDIA_TYPE_VIDEO &&
in_codecpar->codec_type != AVMEDIA_TYPE_SUBTITLE) {
streamList[i] = -1;
continue;
}
streamList[i] = streamIdx++;
out_stream = avformat_new_stream(pAVFormatRemux, NULL);
if (!out_stream) {
logThrow(NULL, AV_LOG_FATAL, "Failed allocating output stream\n");
ret = AVERROR_UNKNOWN;
}
ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar);
if (ret < 0) {
logThrow(NULL, AV_LOG_FATAL, "Failed to copy codec parameters\n");
}
}
av_dump_format(pAVFormatRemux, 0, urlOut, 1);
if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) {
logThrow(NULL, AV_LOG_FATAL, "Failed allocating output stream\n");
ret = avio_open2(&pAVFormatRemux->pb, urlOut, AVIO_FLAG_WRITE, NULL, &pOptsRemux);
if (ret < 0) {
logThrow(NULL, AV_LOG_FATAL, "Could not open output file '%s'", urlOut);
}
}
// rtsp tcp
if(av_dict_set(&pOptsRemux, "rtsp_transport", "tcp", 0) < 0) {
logThrow(NULL, AV_LOG_FATAL, "failed set output pOptsRemux");
ret = AVERROR_UNKNOWN;
}
ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
if (ret < 0) {
logThrow(NULL, AV_LOG_FATAL, "Error occurred when opening output file\n");
}
}
protected:
void run()
{
......@@ -65,6 +150,7 @@ protected:
bool bStopSig = false;
zmq_msg_t msg;
av_log_set_level(AV_LOG_DEBUG);
int pktCnt = 0;
while (true) {
if(checkStop() == true) {
bStopSig = true;
......@@ -75,15 +161,46 @@ protected:
av_log(NULL, AV_LOG_ERROR, "failed to init zmq msg");
continue;
}
// receive packet
ret = zmq_recvmsg(pSubscriber, &msg, 0);
if(ret < 0) {
av_log(NULL, AV_LOG_ERROR, "failed to recv zmq msg");
continue;
}
av_log(NULL, AV_LOG_DEBUG, "msg size: %d, %d", ret, zmq_msg_size(&msg));
av_log(NULL, AV_LOG_DEBUG, "msg size: %d, %d\n", ret, zmq_msg_size(&msg));
// deserialize the packet
pktCnt++;
AVPacket packet;
av_log(NULL, AV_LOG_WARNING, "chkpt1: %d\n", pktCnt);
PacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet);
av_log(NULL, AV_LOG_WARNING, "chkpt2: %d\n", pktCnt);
zmq_msg_close(&msg);
// relay
AVStream *in_stream =NULL, *out_stream = NULL;
in_stream = pAVFormatInput->streams[packet.stream_index];
packet.stream_index = streamList[packet.stream_index];
out_stream = pAVFormatRemux->streams[packet.stream_index];
av_log(NULL, AV_LOG_WARNING, "chkpt3: %d, sidx: %d\n", pktCnt, packet.stream_index);
/* copy packet */
packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, AVRounding(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
av_log(NULL, AV_LOG_WARNING, "chkpt3.1: %d, sidx: %d\n", pktCnt, packet.stream_index);
packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, AVRounding(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
av_log(NULL, AV_LOG_WARNING, "chkpt3.2: %d, sidx: %d\n", pktCnt, packet.stream_index);
packet.duration = av_rescale_q(packet.duration, in_stream->time_base, out_stream->time_base);
packet.pos = -1;
av_log(NULL, AV_LOG_WARNING, "chkpt4: %d\n", pktCnt);
ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
av_log(NULL, AV_LOG_WARNING, "chkpt5: %d\n", pktCnt);
av_packet_unref(&packet);
av_log(NULL, AV_LOG_WARNING, "chkpt6: %d\n", pktCnt);
if (ret < 0) {
logThrow(NULL, AV_LOG_FATAL, "Error muxing packet\n");
break;
}
}
av_write_trailer(pAVFormatRemux);
if(!bStopSig && ret < 0) {
//TOOD: reconnect
av_log(NULL, AV_LOG_ERROR, "TODO: failed, reconnecting");
......@@ -95,8 +212,11 @@ protected:
public:
PacketPusher()
{
getEnv();
setupMq();
setupStream();
}
~PacketPusher()
{
teardownMq();
......
......@@ -20,48 +20,72 @@ void logThrow(void * avcl, int lvl, const char *fmt, ...)
namespace PacketSerializer {
int encode(AVPacket &pkt, char **bytes) {
int cnt = 0;
//data
int wholeSize = 4 + pkt.size;
//side data
wholeSize +=4;
if(pkt.side_data_elems != 0) {
for(int i = 0; i < pkt.side_data_elems; i++) {
wholeSize += pkt.side_data[i].size + sizeof(AVPacketSideData);
}
}else{
wholeSize +=4;
}
wholeSize += 8 * 5 + 4;
// 4 + 8: wholeSize + DEADBEAF
wholeSize += 8 * 5 + 4 + 4 + 8;
*bytes = (char*)malloc(wholeSize);
// data
memcpy(*bytes, &(pkt.size), 4);
memcpy(*bytes, pkt.data, pkt.size);
memcpy((*bytes)+cnt, &(pkt.size), 4);
cnt +=4;
memcpy((*bytes )+cnt, pkt.data, pkt.size);
cnt += pkt.size;
//side data
memcpy(*bytes, &(pkt.side_data_elems), 4);
memcpy((*bytes )+cnt, &(pkt.side_data_elems), 4);
cnt +=4;
if(pkt.side_data_elems != 0) {
for(int i = 0; i < pkt.side_data_elems; i++) {
memcpy(*bytes, &(pkt.side_data[i].size), 4);
memcpy(*bytes, pkt.side_data[i].data, pkt.side_data[i].size);
memcpy(*bytes, &(pkt.side_data[i].type), 4);
memcpy((*bytes )+cnt, &(pkt.side_data[i].size), 4);
cnt+=4;
memcpy((*bytes )+cnt, pkt.side_data[i].data, pkt.side_data[i].size);
cnt+=pkt.side_data[i].size;
memcpy((*bytes )+cnt, &(pkt.side_data[i].type), 4);
cnt+=4;
}
}else{
wholeSize +=4;
}
// other properties
memcpy(*bytes, &(pkt.pts), 8);
memcpy(*bytes, &(pkt.dts), 8);
memcpy(*bytes, &(pkt.pos), 8);
memcpy(*bytes, &(pkt.duration), 8);
memcpy(*bytes, &(pkt.convergence_duration), 8);
memcpy(*bytes, &(pkt.flags), 4);
memcpy((*bytes )+cnt, &(pkt.pts), 8);
cnt+=8;
memcpy((*bytes )+cnt, &(pkt.dts), 8);
cnt+=8;
memcpy((*bytes )+cnt, &(pkt.pos), 8);
cnt+=8;
memcpy((*bytes )+cnt, &(pkt.duration), 8);
cnt+=8;
memcpy((*bytes )+cnt, &(pkt.convergence_duration), 8);
cnt+=8;
memcpy((*bytes )+cnt, &(pkt.flags), 4);
cnt+=4;
memcpy((*bytes )+cnt,&wholeSize, 4);
cnt+=4;
memcpy((*bytes )+cnt, (char*)"DEADBEEF", 8);
cnt+=8;
av_log_set_level(AV_LOG_DEBUG);
assert(cnt == wholeSize);
av_log(NULL, AV_LOG_DEBUG, "\n\n\npkt origin size %d, serialized size: %d, elems:%d\n\n\n", pkt.size, wholeSize, pkt.side_data_elems);
return wholeSize;
}
AVPacket *decode(char * bytes) {
int decode(char * bytes, int len, AVPacket *pkt) {
// allocate packet mem on heap
AVPacket *pkt = (AVPacket*)malloc(sizeof(AVPacket));
//AVPacket *pkt = (AVPacket*)malloc(sizeof(AVPacket));
int ret = 0;
int got = 0;
if(strncmp("DEADBEEF", bytes + len - 8, 8) != 0) {
av_log(NULL, AV_LOG_ERROR, "invalid packet");
return -1;
}
memcpy(&(pkt->size), bytes, 4);
got += 4;
av_new_packet(pkt, pkt->size);
......@@ -92,7 +116,11 @@ namespace PacketSerializer {
memcpy(&(pkt->flags), bytes + got, 4);
got += 4;
return pkt;
int wholeSize = 0;
memcpy(&wholeSize, bytes + got, 4);
av_log(NULL, AV_LOG_WARNING, "wholeSize: %d, %d", wholeSize, got);
return ret;
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论