提交 d4952ba1 authored 作者: blu's avatar blu

init

上级 f6a306e0
{
"configurations": [
{
"name": "Mac",
"includePath": [
"${workspaceFolder}/**",
"/usr/local/Cellar/opencv/4.1.0_2/include/opencv4",
"/usr/local/Cellar/ffmpeg/4.1.4_1/include"
],
"defines": [],
"macFrameworkPath": [
"/Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.14.sdk/System/Library/Frameworks"
],
"compilerPath": "/usr/bin/clang",
"cStandard": "c11",
"cppStandard": "c++17",
"intelliSenseMode": "clang-x64",
"browse": {
"path": [
"/usr/local/Cellar/"
],
"limitSymbolsToIncludedHeaders": true
}
}
],
"version": 4
}
\ No newline at end of file
{
"C_Cpp.intelliSenseEngineFallback": "Enabled",
"python.pythonPath": "/opt/apps/conda/anaconda3/bin/python"
}
\ No newline at end of file
{
"tasks": [
{
"type": "shell",
"label": "clang++ build active file",
"command": "/Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin/clang++",
"args": [
"-g",
"${file}",
"-o",
"${fileDirname}/${fileBasenameNoExtension}"
],
"options": {
"cwd": "/Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin"
}
}
],
"version": "2.0.0"
}
\ No newline at end of file
#pragma GCC diagnostic ignored "-Wunused-private-field" #pragma GCC diagnostic ignored "-Wunused-private-field"
#pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wunused-variable"
extern "C" {
#include <libavformat/avformat.h>
}
#include <libavutil/timestamp.h>
#include <stdlib.h> #include <stdlib.h>
#include <string> #include <string>
#include <thread> #include <thread>
...@@ -17,102 +14,143 @@ extern "C" { ...@@ -17,102 +14,143 @@ extern "C" {
namespace fs = std::filesystem; namespace fs = std::filesystem;
#endif #endif
#include "vendor/include/zmq.h"
#include "inc/json.hpp" #include "inc/json.hpp"
#include "inc/blockingconcurrentqueue.hpp" #include "inc/blockingconcurrentqueue.hpp"
#include "vendor/include/zmq.h" #include "inc/tinythread.hpp"
#include "inc/common.hpp"
using namespace std; using namespace std;
using json = nlohmann::json; using json = nlohmann::json;
using namespace moodycamel; using namespace moodycamel;
class PacketPusher: public TinyThread {
private:
void *pPubContext = NULL; // for packets publishing
void *pPublisher = NULL;
AVFormatContext *pAVFormatInput = NULL, *pAVFormatRemux = NULL;
string urlIn;
int *streamList = NULL, numStreams = 0;
class Stoppable
{
std::promise<void> exitSignal;
std::future<void> futureObj;
int state = 0;
thread th;
protected:
// Task need to provide defination for this function
// It will be called by thread function
virtual void run() = 0;
public: public:
Stoppable() : PacketPusher(string urlIn):urlIn(urlIn){
futureObj(exitSignal.get_future())
{
setupMq();
} }
Stoppable(Stoppable && obj) : exitSignal(std::move(obj.exitSignal)), futureObj(std::move(obj.futureObj))
{ ~PacketPusher(){
std::cout << "Move Constructor is called" << std::endl;
}
Stoppable & operator=(Stoppable && obj)
{
std::cout << "Move Assignment is called" << std::endl;
exitSignal = std::move(obj.exitSignal);
futureObj = std::move(obj.futureObj);
return *this;
} }
// Thread function to be executed by thread protected:
private: // Function to be executed by thread function
void _run() void run()
{ {
if(state == 0) { int ret = 0;
th =thread([&](){ if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, NULL)) < 0) {
this->run(); logThrow(NULL, AV_LOG_FATAL, "Could not open input file '%s'", urlIn.c_str());
});
state = 1;
} }
if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) {
logThrow(NULL, AV_LOG_FATAL, "Failed to retrieve input stream information");
} }
public: pAVFormatInput->flags = AVFMT_FLAG_NOBUFFER | AVFMT_FLAG_FLUSH_PACKETS;
//Checks if thread is requested to stop
bool checkStop() numStreams = pAVFormatInput->nb_streams;
{ int *streamList = (int *)av_mallocz_array(numStreams, sizeof(*streamList));
// checks if value in future object is available
if (futureObj.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) if (!streamList) {
return false; ret = AVERROR(ENOMEM);
return true; logThrow(NULL, AV_LOG_FATAL, "failed create avformatcontext for output: %s", av_err2str(AVERROR(ENOMEM)));
} }
// Request the thread to stop by setting value in promise object
void stop() // find all video & audio streams for remuxing
{ int i = 0, streamIdx = 0;
exitSignal.set_value(); for (; i < pAVFormatInput->nb_streams; i++) {
AVStream *out_stream;
AVStream *in_stream = pAVFormatInput->streams[i];
AVCodecParameters *in_codecpar = in_stream->codecpar;
if (in_codecpar->codec_type != AVMEDIA_TYPE_AUDIO &&
in_codecpar->codec_type != AVMEDIA_TYPE_VIDEO &&
in_codecpar->codec_type != AVMEDIA_TYPE_SUBTITLE) {
streamList[i] = -1;
continue;
} }
streamList[i] = streamIdx++;
out_stream = avformat_new_stream(pAVFormatRemux, NULL);
if (!out_stream) {
logThrow(NULL, AV_LOG_FATAL, "Failed allocating output stream\n");
ret = AVERROR_UNKNOWN;
void join() { }
_run(); ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar);
if(th.joinable()){ if (ret < 0) {
th.join(); logThrow(NULL, AV_LOG_FATAL, "Failed to copy codec parameters\n");
} }
} }
void detach() { while (checkStop() == false) {
_run(); AVStream *in_stream, *out_stream;
if(th.joinable()){ AVPacket packet;
th.detach(); ret = av_read_frame(pAVFormatInput, &packet);
} if (ret < 0)
break;
in_stream = pAVFormatInput->streams[packet.stream_index];
if (packet.stream_index >= numStreams || streamList[packet.stream_index] < 0) {
av_packet_unref(&packet);
continue;
} }
}; packet.stream_index = streamList[packet.stream_index];
out_stream = pAVFormatRemux->streams[packet.stream_index];
/* copy packet */
packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, AVRounding(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, AVRounding(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
packet.duration = av_rescale_q(packet.duration, in_stream->time_base, out_stream->time_base);
packet.pos = -1;
class MyTask: public Stoppable ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
{ if (ret < 0) {
protected: logThrow(NULL, AV_LOG_FATAL, "Error muxing packet\n");
// Function to be executed by thread function break;
void run() }
{ av_packet_unref(&packet);
std::cout << "Task Start" << std::endl; }
av_write_trailer(pAVFormatRemux);
std::cout << "Pusher Start" << std::endl;
// Check if thread is requested to stop ? // Check if thread is requested to stop ?
while (checkStop() == false) while (checkStop() == false) {
{
std::cout << "Doing Some Work" << std::endl; std::cout << "Doing Some Work" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} }
std::cout << "Task End" << std::endl; std::cout << "Task End" << std::endl;
} }
int setupMq()
{
teardownMq();
pPubContext = zmq_ctx_new();
pPublisher = zmq_socket(pPubContext, ZMQ_PUB);
int rc = zmq_bind(pPublisher, "tcp://*:5556");
if(rc != 0) {
logThrow(NULL, AV_LOG_FATAL, "failed create pub");
}
return 0;
}
int teardownMq()
{
if(pPublisher != NULL) {
zmq_close(pPublisher);
}
if(pPubContext != NULL) {
zmq_ctx_destroy(pPubContext);
}
return 0;
}
}; };
...@@ -120,7 +158,7 @@ protected: ...@@ -120,7 +158,7 @@ protected:
class VideoProcessor { class VideoProcessor {
private: private:
#define SECS_SLICE (60*5/2) #define SECS_SLICE (60*5/2)
AVFormatContext *pAVFormatInput = NULL, *pAVFormatRemux = NULL, *pAVFormatSlice = NULL; AVFormatContext *pAVFormatInput = NULL, *pAVFormatRemux = NULL;
AVCodec *pCodec = NULL; AVCodec *pCodec = NULL;
AVDictionary *pOptsRemux = NULL, *pOptsInput = NULL, *pOptsOutput = NULL; AVDictionary *pOptsRemux = NULL, *pOptsInput = NULL, *pOptsOutput = NULL;
int idxVideo = -1, idxAudio = -1, numStreams = 0, numSlices = 6, secsSlice = SECS_SLICE; int idxVideo = -1, idxAudio = -1, numStreams = 0, numSlices = 6, secsSlice = SECS_SLICE;
...@@ -129,21 +167,9 @@ private: ...@@ -129,21 +167,9 @@ private:
string urlIn, urlOut, pathSlice; string urlIn, urlOut, pathSlice;
unordered_map<string, string> envParams = unordered_map<string, string>(); unordered_map<string, string> envParams = unordered_map<string, string>();
// mq // mq
void *pPubContext = NULL; // for packets publishing
void *pPublisher = NULL;
void *pRepContext = NULL; // for msg from edge gateway void *pRepContext = NULL; // for msg from edge gateway
void *pReqContext = NULL; // for msg to edge gateway void *pReqContext = NULL; // for msg to edge gateway
private: private:
void logThrow(void * avcl, int lvl, const char *fmt, ...) {
(void) avcl;
(void) lvl;
va_list args;
va_start( args, fmt );
av_log(NULL, AV_LOG_FATAL, fmt, args);
va_end( args );
throw fmt;
}
void setupParams() void setupParams()
{ {
char *tmp = getenv("URL_IN"); char *tmp = getenv("URL_IN");
...@@ -162,7 +188,7 @@ private: ...@@ -162,7 +188,7 @@ private:
pathSlice = (tmp == NULL?string("slices"):string(tmp)); pathSlice = (tmp == NULL?string("slices"):string(tmp));
// OSX XCode doesn't ship with the filesystem header as of version 10.x // OSX XCode doesn't ship with the filesystem header as of version 10.x
#ifdef __LINUX___ #ifdef __LINUX___
if (!fs::exists(pathSlice.c_str())) { if (!fs::exists(pathSlice.c_str())) {
if (!fs::create_directory(pathSlice.c_str())) { if (!fs::create_directory(pathSlice.c_str())) {
logThrow(NULL, AV_LOG_FATAL, "can't create directory: %s", pathSlice.c_str()); logThrow(NULL, AV_LOG_FATAL, "can't create directory: %s", pathSlice.c_str());
...@@ -170,7 +196,7 @@ private: ...@@ -170,7 +196,7 @@ private:
} }
fs::permissions(pathSlice.c_str(), fs::perms::all); fs::permissions(pathSlice.c_str(), fs::perms::all);
} }
#endif #endif
tmp = getenv("PUSH"); tmp = getenv("PUSH");
bPush = (tmp == NULL?false: (string(tmp) == string("false")?false:true)); bPush = (tmp == NULL?false: (string(tmp) == string("false")?false:true));
...@@ -187,138 +213,114 @@ private: ...@@ -187,138 +213,114 @@ private:
} }
} }
int teardownMq() {
if(pPublisher != NULL) {
zmq_close(pPublisher);
}
if(pPubContext != NULL) {
zmq_ctx_destroy(pPubContext);
}
return 0;
}
int setupMq(){
teardownMq();
pPubContext = zmq_ctx_new();
pPublisher = zmq_socket(pPubContext, ZMQ_PUB);
int rc = zmq_bind(pPublisher, "tcp://*:5556");
if(rc != 0) {
logThrow(NULL, AV_LOG_FATAL, "failed create pub");
}
MyTask task;
task.detach();
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
task.stop();
std::this_thread::sleep_for(std::chrono::milliseconds(9993000));
return 0;
}
int setupStreams() int setupStreams()
{ {
PacketPusher pusher(urlIn);
pusher.detach();
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
pusher.stop();
std::this_thread::sleep_for(std::chrono::milliseconds(9993000));
int ret = 0, i, streamIdx = 0; int ret = 0, i, streamIdx = 0;
if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, NULL)) < 0) { // if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, NULL)) < 0) {
logThrow(NULL, AV_LOG_FATAL, "Could not open input file '%s'", urlIn.c_str()); // logThrow(NULL, AV_LOG_FATAL, "Could not open input file '%s'", urlIn.c_str());
} // }
if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) { // if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) {
logThrow(NULL, AV_LOG_FATAL, "Failed to retrieve input stream information"); // logThrow(NULL, AV_LOG_FATAL, "Failed to retrieve input stream information");
} // }
pAVFormatInput->flags = AVFMT_FLAG_NOBUFFER | AVFMT_FLAG_FLUSH_PACKETS; // pAVFormatInput->flags = AVFMT_FLAG_NOBUFFER | AVFMT_FLAG_FLUSH_PACKETS;
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOut.c_str()); // ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOut.c_str());
if (ret < 0) { // if (ret < 0) {
logThrow(NULL, AV_LOG_FATAL, "failed create avformatcontext for output: %s", av_err2str(ret)); // logThrow(NULL, AV_LOG_FATAL, "failed create avformatcontext for output: %s", av_err2str(ret));
} // }
numStreams = pAVFormatInput->nb_streams; // numStreams = pAVFormatInput->nb_streams;
streamList = (int *)av_mallocz_array(numStreams, sizeof(*streamList)); // streamList = (int *)av_mallocz_array(numStreams, sizeof(*streamList));
if (!streamList) { // if (!streamList) {
ret = AVERROR(ENOMEM); // ret = AVERROR(ENOMEM);
logThrow(NULL, AV_LOG_FATAL, "failed create avformatcontext for output: %s", av_err2str(AVERROR(ENOMEM))); // logThrow(NULL, AV_LOG_FATAL, "failed create avformatcontext for output: %s", av_err2str(AVERROR(ENOMEM)));
} // }
// find all video & audio streams for remuxing // // find all video & audio streams for remuxing
for (i = 0; i < pAVFormatInput->nb_streams; i++) { // for (i = 0; i < pAVFormatInput->nb_streams; i++) {
AVStream *out_stream; // AVStream *out_stream;
AVStream *in_stream = pAVFormatInput->streams[i]; // AVStream *in_stream = pAVFormatInput->streams[i];
AVCodecParameters *in_codecpar = in_stream->codecpar; // AVCodecParameters *in_codecpar = in_stream->codecpar;
if (in_codecpar->codec_type != AVMEDIA_TYPE_AUDIO && // if (in_codecpar->codec_type != AVMEDIA_TYPE_AUDIO &&
in_codecpar->codec_type != AVMEDIA_TYPE_VIDEO && // in_codecpar->codec_type != AVMEDIA_TYPE_VIDEO &&
in_codecpar->codec_type != AVMEDIA_TYPE_SUBTITLE) { // in_codecpar->codec_type != AVMEDIA_TYPE_SUBTITLE) {
streamList[i] = -1; // streamList[i] = -1;
continue; // continue;
} // }
streamList[i] = streamIdx++; // streamList[i] = streamIdx++;
out_stream = avformat_new_stream(pAVFormatRemux, NULL); // out_stream = avformat_new_stream(pAVFormatRemux, NULL);
if (!out_stream) { // if (!out_stream) {
logThrow(NULL, AV_LOG_FATAL, "Failed allocating output stream\n"); // logThrow(NULL, AV_LOG_FATAL, "Failed allocating output stream\n");
ret = AVERROR_UNKNOWN; // ret = AVERROR_UNKNOWN;
} // }
ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar); // ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar);
if (ret < 0) { // if (ret < 0) {
logThrow(NULL, AV_LOG_FATAL, "Failed to copy codec parameters\n"); // logThrow(NULL, AV_LOG_FATAL, "Failed to copy codec parameters\n");
} // }
} // }
av_dump_format(pAVFormatRemux, 0, urlOut.c_str(), 1); // av_dump_format(pAVFormatRemux, 0, urlOut.c_str(), 1);
// find best video stream // // find best video stream
idxVideo = av_find_best_stream(pAVFormatInput, AVMEDIA_TYPE_VIDEO, -1, -1, &pCodec, 0); // idxVideo = av_find_best_stream(pAVFormatInput, AVMEDIA_TYPE_VIDEO, -1, -1, &pCodec, 0);
if(idxVideo < 0) { // if(idxVideo < 0) {
logThrow(NULL, AV_LOG_FATAL, "failed find best video stream"); // logThrow(NULL, AV_LOG_FATAL, "failed find best video stream");
} // }
if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) { // if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) {
logThrow(NULL, AV_LOG_FATAL, "Failed allocating output stream\n"); // logThrow(NULL, AV_LOG_FATAL, "Failed allocating output stream\n");
ret = avio_open2(&pAVFormatRemux->pb, urlOut.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux); // ret = avio_open2(&pAVFormatRemux->pb, urlOut.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux);
if (ret < 0) { // if (ret < 0) {
logThrow(NULL, AV_LOG_FATAL, "Could not open output file '%s'", urlOut.c_str()); // logThrow(NULL, AV_LOG_FATAL, "Could not open output file '%s'", urlOut.c_str());
} // }
} // }
// rtsp tcp // // rtsp tcp
if(av_dict_set(&pOptsRemux, "rtsp_transport", "tcp", 0) < 0) { // if(av_dict_set(&pOptsRemux, "rtsp_transport", "tcp", 0) < 0) {
logThrow(NULL, AV_LOG_FATAL, "failed set output pOptsRemux"); // logThrow(NULL, AV_LOG_FATAL, "failed set output pOptsRemux");
ret = AVERROR_UNKNOWN; // ret = AVERROR_UNKNOWN;
} // }
ret = avformat_write_header(pAVFormatRemux, &pOptsRemux); // ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
if (ret < 0) { // if (ret < 0) {
logThrow(NULL, AV_LOG_FATAL, "Error occurred when opening output file\n"); // logThrow(NULL, AV_LOG_FATAL, "Error occurred when opening output file\n");
} // }
while (1) { // while (1) {
AVStream *in_stream, *out_stream; // AVStream *in_stream, *out_stream;
AVPacket packet; // AVPacket packet;
ret = av_read_frame(pAVFormatInput, &packet); // ret = av_read_frame(pAVFormatInput, &packet);
if (ret < 0) // if (ret < 0)
break; // break;
in_stream = pAVFormatInput->streams[packet.stream_index]; // in_stream = pAVFormatInput->streams[packet.stream_index];
if (packet.stream_index >= numStreams || streamList[packet.stream_index] < 0) { // if (packet.stream_index >= numStreams || streamList[packet.stream_index] < 0) {
av_packet_unref(&packet); // av_packet_unref(&packet);
continue; // continue;
} // }
packet.stream_index = streamList[packet.stream_index]; // packet.stream_index = streamList[packet.stream_index];
out_stream = pAVFormatRemux->streams[packet.stream_index]; // out_stream = pAVFormatRemux->streams[packet.stream_index];
/* copy packet */ // /* copy packet */
packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, AVRounding(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX)); // packet.pts = av_rescale_q_rnd(packet.pts, in_stream->time_base, out_stream->time_base, AVRounding(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, AVRounding(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX)); // packet.dts = av_rescale_q_rnd(packet.dts, in_stream->time_base, out_stream->time_base, AVRounding(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX));
packet.duration = av_rescale_q(packet.duration, in_stream->time_base, out_stream->time_base); // packet.duration = av_rescale_q(packet.duration, in_stream->time_base, out_stream->time_base);
packet.pos = -1; // packet.pos = -1;
ret = av_interleaved_write_frame(pAVFormatRemux, &packet); // ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
if (ret < 0) { // if (ret < 0) {
logThrow(NULL, AV_LOG_FATAL, "Error muxing packet\n"); // logThrow(NULL, AV_LOG_FATAL, "Error muxing packet\n");
break; // break;
} // }
av_packet_unref(&packet); // av_packet_unref(&packet);
} // }
av_write_trailer(pAVFormatRemux); // av_write_trailer(pAVFormatRemux);
return ret; return ret;
} }
...@@ -327,11 +329,11 @@ public: ...@@ -327,11 +329,11 @@ public:
VideoProcessor() VideoProcessor()
{ {
setupParams(); setupParams();
setupMq();
setupStreams(); setupStreams();
} }
// dtor // dtor
~VideoProcessor() { ~VideoProcessor()
{
avformat_close_input(&pAVFormatInput); avformat_close_input(&pAVFormatInput);
/* close output */ /* close output */
if (pAVFormatRemux && !(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) if (pAVFormatRemux && !(pAVFormatRemux->oformat->flags & AVFMT_NOFILE))
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论