提交 f6a306e0 authored 作者: blu's avatar blu

init

上级 30634e96
CC = gcc CC = gcc
CPP = g++ CPP = g++
CPPFLAGS = -g -Wall -std=g++1z CPPFLAGS = -g -Wall -std=gnu++1z
CFLAGS = -g -Wall CFLAGS = -g -Wall
LIBOPENCV = `pkg-config opencv --cflags --libs` LIBOPENCV = `pkg-config opencv --cflags --libs`
...@@ -14,7 +14,7 @@ libzmq: ...@@ -14,7 +14,7 @@ libzmq:
cd vendor/libzmq && [ -f $(CURDIR)/vendor/lib/pkgconfig/libzmq.pc ] || ./autogen.sh && ./configure --prefix=$(CURDIR)/vendor cd vendor/libzmq && [ -f $(CURDIR)/vendor/lib/pkgconfig/libzmq.pc ] || ./autogen.sh && ./configure --prefix=$(CURDIR)/vendor
cd vendor/libzmq && make -j 4 && make install cd vendor/libzmq && make -j 4 && make install
rtspr: rtsp-relay.cpp rtspr: rtsp-relay.cpp
$(CPP) $(CPPFLAGS) -o rtspr rtsp-relay.cpp $(LIBFFMPEG) $(pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc) $(CPP) $(CPPFLAGS) -o rtspr rtsp-relay.cpp $(LIBFFMPEG) `pkg-config --cflags --libs vendor/lib/pkgconfig/libzmq.pc`
rtsp: rtsp.cpp rtsp: rtsp.cpp
$(CPP) $(CFLAGS) -o rtsp rtsp.cpp $(LIBFFMPEG) $(CPP) $(CFLAGS) -o rtsp rtsp.cpp $(LIBFFMPEG)
......
...@@ -7,6 +7,10 @@ extern "C" { ...@@ -7,6 +7,10 @@ extern "C" {
#include <libavutil/timestamp.h> #include <libavutil/timestamp.h>
#include <stdlib.h> #include <stdlib.h>
#include <string> #include <string>
#include <thread>
#include <iostream>
#include <chrono>
#include <future>
#ifdef OS_LINUX #ifdef OS_LINUX
#include <filesystem> #include <filesystem>
...@@ -21,6 +25,98 @@ using namespace std; ...@@ -21,6 +25,98 @@ using namespace std;
using json = nlohmann::json; using json = nlohmann::json;
using namespace moodycamel; using namespace moodycamel;
class Stoppable
{
std::promise<void> exitSignal;
std::future<void> futureObj;
int state = 0;
thread th;
protected:
// Task need to provide defination for this function
// It will be called by thread function
virtual void run() = 0;
public:
Stoppable() :
futureObj(exitSignal.get_future())
{
}
Stoppable(Stoppable && obj) : exitSignal(std::move(obj.exitSignal)), futureObj(std::move(obj.futureObj))
{
std::cout << "Move Constructor is called" << std::endl;
}
Stoppable & operator=(Stoppable && obj)
{
std::cout << "Move Assignment is called" << std::endl;
exitSignal = std::move(obj.exitSignal);
futureObj = std::move(obj.futureObj);
return *this;
}
// Thread function to be executed by thread
private:
void _run()
{
if(state == 0) {
th =thread([&](){
this->run();
});
state = 1;
}
}
public:
//Checks if thread is requested to stop
bool checkStop()
{
// checks if value in future object is available
if (futureObj.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout)
return false;
return true;
}
// Request the thread to stop by setting value in promise object
void stop()
{
exitSignal.set_value();
}
void join() {
_run();
if(th.joinable()){
th.join();
}
}
void detach() {
_run();
if(th.joinable()){
th.detach();
}
}
};
class MyTask: public Stoppable
{
protected:
// Function to be executed by thread function
void run()
{
std::cout << "Task Start" << std::endl;
// Check if thread is requested to stop ?
while (checkStop() == false)
{
std::cout << "Doing Some Work" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
std::cout << "Task End" << std::endl;
}
};
class VideoProcessor { class VideoProcessor {
private: private:
#define SECS_SLICE (60*5/2) #define SECS_SLICE (60*5/2)
...@@ -32,16 +128,18 @@ private: ...@@ -32,16 +128,18 @@ private:
bool bPush = true, bRecord = false; bool bPush = true, bRecord = false;
string urlIn, urlOut, pathSlice; string urlIn, urlOut, pathSlice;
unordered_map<string, string> envParams = unordered_map<string, string>(); unordered_map<string, string> envParams = unordered_map<string, string>();
BlockingConcurrentQueue<AVPacket*> remuxPktQueue; // mq
BlockingConcurrentQueue<AVPacket*> slicePktQueue; void *pPubContext = NULL; // for packets publishing
BlockingConcurrentQueue<AVPacket*> framePktQueue; void *pPublisher = NULL;
void *pRepContext = NULL; // for msg from edge gateway
void *pReqContext = NULL; // for msg to edge gateway
private: private:
void logThrow(void * avcl, int lvl, const char *fmt, ...) { void logThrow(void * avcl, int lvl, const char *fmt, ...) {
(void) avcl; (void) avcl;
(void) lvl; (void) lvl;
va_list args; va_list args;
va_start( args, fmt ); va_start( args, fmt );
av_log(NULL, AV_LOG_ERROR, fmt, args); av_log(NULL, AV_LOG_FATAL, fmt, args);
va_end( args ); va_end( args );
throw fmt; throw fmt;
} }
...@@ -67,14 +165,13 @@ private: ...@@ -67,14 +165,13 @@ private:
#ifdef __LINUX___ #ifdef __LINUX___
if (!fs::exists(pathSlice.c_str())) { if (!fs::exists(pathSlice.c_str())) {
if (!fs::create_directory(pathSlice.c_str())) { if (!fs::create_directory(pathSlice.c_str())) {
logThrow(NULL, AV_LOG_ERROR, "can't create directory: %s", pathSlice.c_str()); logThrow(NULL, AV_LOG_FATAL, "can't create directory: %s", pathSlice.c_str());
exit(1); exit(1);
} }
fs::permissions(pathSlice.c_str(), fs::perms::all); fs::permissions(pathSlice.c_str(), fs::perms::all);
} }
#endif #endif
tmp = getenv("PUSH"); tmp = getenv("PUSH");
bPush = (tmp == NULL?false: (string(tmp) == string("false")?false:true)); bPush = (tmp == NULL?false: (string(tmp) == string("false")?false:true));
...@@ -85,26 +182,55 @@ private: ...@@ -85,26 +182,55 @@ private:
} }
if(urlIn == "" or urlOut == "") { if(urlIn == "" or urlOut == "") {
logThrow(NULL, AV_LOG_ERROR, "no input/output url"); logThrow(NULL, AV_LOG_FATAL, "no input/output url");
exit(1); exit(1);
} }
} }
int teardownMq() {
if(pPublisher != NULL) {
zmq_close(pPublisher);
}
if(pPubContext != NULL) {
zmq_ctx_destroy(pPubContext);
}
return 0;
}
int setupMq(){
teardownMq();
pPubContext = zmq_ctx_new();
pPublisher = zmq_socket(pPubContext, ZMQ_PUB);
int rc = zmq_bind(pPublisher, "tcp://*:5556");
if(rc != 0) {
logThrow(NULL, AV_LOG_FATAL, "failed create pub");
}
MyTask task;
task.detach();
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
task.stop();
std::this_thread::sleep_for(std::chrono::milliseconds(9993000));
return 0;
}
int setupStreams() int setupStreams()
{ {
int ret = 0, i, streamIdx = 0; int ret = 0, i, streamIdx = 0;
if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, NULL)) < 0) { if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, NULL)) < 0) {
logThrow(NULL, AV_LOG_ERROR, "Could not open input file '%s'", urlIn.c_str()); logThrow(NULL, AV_LOG_FATAL, "Could not open input file '%s'", urlIn.c_str());
} }
if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) { if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) {
logThrow(NULL, AV_LOG_ERROR, "Failed to retrieve input stream information"); logThrow(NULL, AV_LOG_FATAL, "Failed to retrieve input stream information");
} }
pAVFormatInput->flags = AVFMT_FLAG_NOBUFFER | AVFMT_FLAG_FLUSH_PACKETS; pAVFormatInput->flags = AVFMT_FLAG_NOBUFFER | AVFMT_FLAG_FLUSH_PACKETS;
ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOut.c_str()); ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOut.c_str());
if (ret < 0) { if (ret < 0) {
logThrow(NULL, AV_LOG_ERROR, "failed create avformatcontext for output: %s", av_err2str(ret)); logThrow(NULL, AV_LOG_FATAL, "failed create avformatcontext for output: %s", av_err2str(ret));
} }
numStreams = pAVFormatInput->nb_streams; numStreams = pAVFormatInput->nb_streams;
...@@ -112,7 +238,7 @@ private: ...@@ -112,7 +238,7 @@ private:
if (!streamList) { if (!streamList) {
ret = AVERROR(ENOMEM); ret = AVERROR(ENOMEM);
logThrow(NULL, AV_LOG_ERROR, "failed create avformatcontext for output: %s", av_err2str(AVERROR(ENOMEM))); logThrow(NULL, AV_LOG_FATAL, "failed create avformatcontext for output: %s", av_err2str(AVERROR(ENOMEM)));
} }
// find all video & audio streams for remuxing // find all video & audio streams for remuxing
...@@ -129,13 +255,13 @@ private: ...@@ -129,13 +255,13 @@ private:
streamList[i] = streamIdx++; streamList[i] = streamIdx++;
out_stream = avformat_new_stream(pAVFormatRemux, NULL); out_stream = avformat_new_stream(pAVFormatRemux, NULL);
if (!out_stream) { if (!out_stream) {
logThrow(NULL, AV_LOG_ERROR, "Failed allocating output stream\n"); logThrow(NULL, AV_LOG_FATAL, "Failed allocating output stream\n");
ret = AVERROR_UNKNOWN; ret = AVERROR_UNKNOWN;
} }
ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar); ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar);
if (ret < 0) { if (ret < 0) {
logThrow(NULL, AV_LOG_ERROR, "Failed to copy codec parameters\n"); logThrow(NULL, AV_LOG_FATAL, "Failed to copy codec parameters\n");
} }
} }
...@@ -144,26 +270,26 @@ private: ...@@ -144,26 +270,26 @@ private:
// find best video stream // find best video stream
idxVideo = av_find_best_stream(pAVFormatInput, AVMEDIA_TYPE_VIDEO, -1, -1, &pCodec, 0); idxVideo = av_find_best_stream(pAVFormatInput, AVMEDIA_TYPE_VIDEO, -1, -1, &pCodec, 0);
if(idxVideo < 0) { if(idxVideo < 0) {
logThrow(NULL, AV_LOG_ERROR, "failed find best video stream"); logThrow(NULL, AV_LOG_FATAL, "failed find best video stream");
} }
if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) { if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) {
logThrow(NULL, AV_LOG_ERROR, "Failed allocating output stream\n"); logThrow(NULL, AV_LOG_FATAL, "Failed allocating output stream\n");
ret = avio_open2(&pAVFormatRemux->pb, urlOut.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux); ret = avio_open2(&pAVFormatRemux->pb, urlOut.c_str(), AVIO_FLAG_WRITE, NULL, &pOptsRemux);
if (ret < 0) { if (ret < 0) {
logThrow(NULL, AV_LOG_ERROR, "Could not open output file '%s'", urlOut.c_str()); logThrow(NULL, AV_LOG_FATAL, "Could not open output file '%s'", urlOut.c_str());
} }
} }
// rtsp tcp // rtsp tcp
if(av_dict_set(&pOptsRemux, "rtsp_transport", "tcp", 0) < 0) { if(av_dict_set(&pOptsRemux, "rtsp_transport", "tcp", 0) < 0) {
logThrow(NULL, AV_LOG_ERROR, "failed set output pOptsRemux"); logThrow(NULL, AV_LOG_FATAL, "failed set output pOptsRemux");
ret = AVERROR_UNKNOWN; ret = AVERROR_UNKNOWN;
} }
ret = avformat_write_header(pAVFormatRemux, &pOptsRemux); ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
if (ret < 0) { if (ret < 0) {
logThrow(NULL, AV_LOG_ERROR, "Error occurred when opening output file\n"); logThrow(NULL, AV_LOG_FATAL, "Error occurred when opening output file\n");
} }
while (1) { while (1) {
AVStream *in_stream, *out_stream; AVStream *in_stream, *out_stream;
...@@ -186,7 +312,7 @@ private: ...@@ -186,7 +312,7 @@ private:
ret = av_interleaved_write_frame(pAVFormatRemux, &packet); ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
if (ret < 0) { if (ret < 0) {
logThrow(NULL, AV_LOG_ERROR, "Error muxing packet\n"); logThrow(NULL, AV_LOG_FATAL, "Error muxing packet\n");
break; break;
} }
av_packet_unref(&packet); av_packet_unref(&packet);
...@@ -201,6 +327,7 @@ public: ...@@ -201,6 +327,7 @@ public:
VideoProcessor() VideoProcessor()
{ {
setupParams(); setupParams();
setupMq();
setupStreams(); setupStreams();
} }
// dtor // dtor
...@@ -241,17 +368,17 @@ int main(int argc, char **argv) ...@@ -241,17 +368,17 @@ int main(int argc, char **argv)
// urlOutput = argv[2]; // urlOutput = argv[2];
// if ((ret = avformat_open_input(&pAVFormatInput, urlInput, NULL, NULL)) < 0) { // if ((ret = avformat_open_input(&pAVFormatInput, urlInput, NULL, NULL)) < 0) {
// logThrow(NULL, AV_LOG_ERROR, "Could not open input file '%s'", urlInput); // logThrow(NULL, AV_LOG_FATAL, "Could not open input file '%s'", urlInput);
// goto end; // goto end;
// } // }
// if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) { // if ((ret = avformat_find_stream_info(pAVFormatInput, NULL)) < 0) {
// logThrow(NULL, AV_LOG_ERROR, "Failed to retrieve input stream information"); // logThrow(NULL, AV_LOG_FATAL, "Failed to retrieve input stream information");
// goto end; // goto end;
// } // }
// ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOutput); // ret = avformat_alloc_output_context2(&pAVFormatRemux, NULL, "rtsp", urlOutput);
// if (ret < 0) { // if (ret < 0) {
// logThrow(NULL, AV_LOG_ERROR, "failed create avformatcontext for output: %s", av_err2str(ret)); // logThrow(NULL, AV_LOG_FATAL, "failed create avformatcontext for output: %s", av_err2str(ret));
// goto end; // goto end;
// } // }
...@@ -277,13 +404,13 @@ int main(int argc, char **argv) ...@@ -277,13 +404,13 @@ int main(int argc, char **argv)
// streamList[i] = streamIdx++; // streamList[i] = streamIdx++;
// out_stream = avformat_new_stream(pAVFormatRemux, NULL); // out_stream = avformat_new_stream(pAVFormatRemux, NULL);
// if (!out_stream) { // if (!out_stream) {
// logThrow(NULL, AV_LOG_ERROR, "Failed allocating output stream\n"); // logThrow(NULL, AV_LOG_FATAL, "Failed allocating output stream\n");
// ret = AVERROR_UNKNOWN; // ret = AVERROR_UNKNOWN;
// goto end; // goto end;
// } // }
// ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar); // ret = avcodec_parameters_copy(out_stream->codecpar, in_codecpar);
// if (ret < 0) { // if (ret < 0) {
// logThrow(NULL, AV_LOG_ERROR, "Failed to copy codec parameters\n"); // logThrow(NULL, AV_LOG_FATAL, "Failed to copy codec parameters\n");
// goto end; // goto end;
// } // }
// } // }
...@@ -293,7 +420,7 @@ int main(int argc, char **argv) ...@@ -293,7 +420,7 @@ int main(int argc, char **argv)
// // find best video stream // // find best video stream
// idxVideo = av_find_best_stream(pAVFormatInput, AVMEDIA_TYPE_VIDEO, -1, -1, &pCodec, 0); // idxVideo = av_find_best_stream(pAVFormatInput, AVMEDIA_TYPE_VIDEO, -1, -1, &pCodec, 0);
// if(idxVideo < 0) { // if(idxVideo < 0) {
// logThrow(NULL, AV_LOG_ERROR, "failed find best video stream"); // logThrow(NULL, AV_LOG_FATAL, "failed find best video stream");
// goto end; // goto end;
// } // }
...@@ -303,10 +430,10 @@ int main(int argc, char **argv) ...@@ -303,10 +430,10 @@ int main(int argc, char **argv)
// // wherever you want. // // wherever you want.
// if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) { // if (!(pAVFormatRemux->oformat->flags & AVFMT_NOFILE)) {
// logThrow(NULL, AV_LOG_ERROR, "Failed allocating output stream\n"); // logThrow(NULL, AV_LOG_FATAL, "Failed allocating output stream\n");
// ret = avio_open2(&pAVFormatRemux->pb, urlOutput, AVIO_FLAG_WRITE, NULL, &pOptsRemux); // ret = avio_open2(&pAVFormatRemux->pb, urlOutput, AVIO_FLAG_WRITE, NULL, &pOptsRemux);
// if (ret < 0) { // if (ret < 0) {
// logThrow(NULL, AV_LOG_ERROR, "Could not open output file '%s'", urlOutput); // logThrow(NULL, AV_LOG_FATAL, "Could not open output file '%s'", urlOutput);
// goto end; // goto end;
// } // }
// } // }
...@@ -317,14 +444,14 @@ int main(int argc, char **argv) ...@@ -317,14 +444,14 @@ int main(int argc, char **argv)
// // rtsp tcp // // rtsp tcp
// if(av_dict_set(&pOptsRemux, "rtsp_transport", "tcp", 0) < 0) { // if(av_dict_set(&pOptsRemux, "rtsp_transport", "tcp", 0) < 0) {
// logThrow(NULL, AV_LOG_ERROR, "failed set output pOptsRemux"); // logThrow(NULL, AV_LOG_FATAL, "failed set output pOptsRemux");
// ret = AVERROR_UNKNOWN; // ret = AVERROR_UNKNOWN;
// goto end; // goto end;
// } // }
// ret = avformat_write_header(pAVFormatRemux, &pOptsRemux); // ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
// if (ret < 0) { // if (ret < 0) {
// logThrow(NULL, AV_LOG_ERROR, "Error occurred when opening output file\n"); // logThrow(NULL, AV_LOG_FATAL, "Error occurred when opening output file\n");
// goto end; // goto end;
// } // }
// while (1) { // while (1) {
...@@ -347,7 +474,7 @@ int main(int argc, char **argv) ...@@ -347,7 +474,7 @@ int main(int argc, char **argv)
// ret = av_interleaved_write_frame(pAVFormatRemux, &packet); // ret = av_interleaved_write_frame(pAVFormatRemux, &packet);
// if (ret < 0) { // if (ret < 0) {
// logThrow(NULL, AV_LOG_ERROR, "Error muxing packet\n"); // logThrow(NULL, AV_LOG_FATAL, "Error muxing packet\n");
// break; // break;
// } // }
// av_packet_unref(&packet); // av_packet_unref(&packet);
...@@ -362,7 +489,7 @@ int main(int argc, char **argv) ...@@ -362,7 +489,7 @@ int main(int argc, char **argv)
// avformat_free_context(pAVFormatRemux); // avformat_free_context(pAVFormatRemux);
// av_freep(&streamList); // av_freep(&streamList);
// if (ret < 0 && ret != AVERROR_EOF) { // if (ret < 0 && ret != AVERROR_EOF) {
// logThrow(NULL, AV_LOG_ERROR, "Error occurred: %s\n", av_err2str(ret)); // logThrow(NULL, AV_LOG_FATAL, "Error occurred: %s\n", av_err2str(ret));
// return 1; // return 1;
// } // }
// return 0; // return 0;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论