提交 ed3f32a9 authored 作者: blu's avatar blu

init

上级 eee69bf3
......@@ -20,28 +20,35 @@ namespace fs = std::filesystem;
using namespace std;
class PacketProducer: public TinyThread {
class EvPuller: public TinyThread {
private:
void *pPubContext = NULL; // for packets publishing
void *pPublisher = NULL;
void *pPubCtx = NULL; // for packets publishing
void *pPub = NULL;
void *pRepCtx = NULL; // for packets REP
void *pRep = NULL;
AVFormatContext *pAVFormatInput = NULL;
string urlIn;
string urlIn, urlPub;
int *streamList = NULL, numStreams = 0;
public:
PacketProducer(string urlIn):urlIn(urlIn){
setupMq();
}
EvPuller()
{
int ret = 0;
do {
init();
ret = setupMq();
}while(ret < 0);
}
~PacketProducer(){
}
~EvPuller()
{
}
protected:
// Function to be executed by thread function
void run()
{
int ret = 0;
setupMq();
if ((ret = avformat_open_input(&pAVFormatInput, urlIn.c_str(), NULL, NULL)) < 0) {
spdlog::error("Could not open input file {}", urlIn);
}
......@@ -83,7 +90,7 @@ protected:
AVStream *in_stream;
AVPacket packet;
zmq_msg_t msg;
ret = av_read_frame(pAVFormatInput, &packet);
if (ret < 0) {
spdlog::error("failed read packet: {}", av_err2str(ret));
......@@ -109,29 +116,66 @@ protected:
char * data = NULL;
int size = AVPacketSerializer::encode(packet, &data);
zmq_msg_init_data(&msg, (void*)data, size, mqPacketFree, NULL);
zmq_send_const(pPublisher, zmq_msg_data(&msg), size, 0);
zmq_send_const(pPub, zmq_msg_data(&msg), size, 0);
av_packet_unref(&packet);
}
// TODO:
if(ret < 0 && !bStopSig) {
// reconnect
}else {
}
else {
std::cout << "Task End" << std::endl;
}
}
private:
private:
int init()
{
bool inited = false;
while(!inited) {
// TODO: read db to get sn
const char* sn = "ILS-2";
// req config
json jr = cloudutils::registry(sn, "evpuller", 0);
bool bcnt = false;
try {
spdlog::info("registry: {:s}", jr.dump());
string ipc = jr["data"]["ipc"];
string user = jr["data"]["username"];
string passwd = jr["data"]["password"];
json data = jr["data"]["services"]["evpuller"];
urlIn = "rtsp://" + user + ":" + passwd + "@"+ ipc + "/h264/ch1/sub/av_stream";
urlPub = string("tcp://") +data["addr"].get<string>() + ":" + to_string(data["port-pub"]);
}
catch(exception &e) {
bcnt = true;
spdlog::error(e.what());
}
if(bcnt) {
this_thread::sleep_for(chrono::milliseconds(1000*10));
continue;
}
inited = true;
}
return 0;
}
int setupMq()
{
teardownMq();
pPubContext = zmq_ctx_new();
pPublisher = zmq_socket(pPubContext, ZMQ_PUB);
pPubCtx = zmq_ctx_new();
pPub = zmq_socket(pPubCtx, ZMQ_PUB);
int rc = zmq_bind(pPublisher, "tcp://0.0.0.0:5556");
int rc = zmq_bind(pPub, urlPub.c_str());
if(rc != 0) {
spdlog::error("failed create pub");
spdlog::error("failed create pub: {}, {}", zmq_strerror(rc), urlPub.c_str());
this_thread::sleep_for(chrono::milliseconds(100*10));
return -1;
}
return 0;
......@@ -139,11 +183,11 @@ protected:
int teardownMq()
{
if(pPublisher != NULL) {
zmq_close(pPublisher);
if(pPub != NULL) {
zmq_close(pPub);
}
if(pPubContext != NULL) {
zmq_ctx_destroy(pPubContext);
if(pPubCtx != NULL) {
zmq_ctx_destroy(pPubCtx);
}
return 0;
}
......@@ -151,103 +195,13 @@ protected:
class EdgeVideoMgr {
private:
#define SECS_SLICE (60*5/2)
AVFormatContext *pAVFormatInput = NULL, *pAVFormatRemux = NULL;
AVCodec *pCodec = NULL;
AVDictionary *pOptsRemux = NULL, *pOptsInput = NULL, *pOptsOutput = NULL;
int idxVideo = -1, idxAudio = -1, numStreams = 0, numSlices = 6, secsSlice = SECS_SLICE;
int *streamList = NULL;
bool bPush = true, bRecord = false;
string urlIn, urlOut, pathSlice;
unordered_map<string, string> envParams = unordered_map<string, string>();
// mq
void *pRepContext = NULL; // for msg from edge gateway
void *pReqContext = NULL; // for msg to edge gateway
private:
void setupParams()
{
char *tmp = getenv("URL_IN");
urlIn = (tmp == NULL?string(""): string(tmp));
tmp= getenv("URL_OUT");
urlOut = (tmp == NULL?string(""): string(tmp));
tmp = getenv("SLICE_NUM");
numSlices = (tmp == NULL?6:atoi(tmp));
if(numSlices <=2) {
numSlices = 6;
}
spdlog::info("in: {}", urlIn);
tmp = getenv("SLICE_PATH");
pathSlice = (tmp == NULL?string("slices"):string(tmp));
// OSX XCode doesn't ship with the filesystem header as of version 10.x
#ifdef __LINUX___
if (!fs::exists(pathSlice.c_str())) {
if (!fs::create_directory(pathSlice.c_str())) {
spdlog::error("can't create directory: {}", pathSlice.c_str());
exit(1);
}
fs::permissions(pathSlice.c_str(), fs::perms::all);
}
#endif
tmp = getenv("PUSH");
bPush = (tmp == NULL?false: (string(tmp) == string("false")?false:true));
tmp = getenv("SLICE_SECS");
secsSlice = (tmp == NULL?SECS_SLICE:atoi(tmp));
if(secsSlice < SECS_SLICE) {
secsSlice = SECS_SLICE;
}
if(urlIn == "" or urlOut == "") {
spdlog::error("no input/output url");
exit(1);
}
}
int setupStreams()
{
int ret = 0;
PacketProducer packetProducer(urlIn);
packetProducer.join();
// std::this_thread::sleep_for(std::chrono::milliseconds(30000));
// packetProducer.stop();
return ret;
}
public:
// ctor
EdgeVideoMgr()
{
setupParams();
setupStreams();
}
// dtor
~EdgeVideoMgr()
{
avformat_close_input(&pAVFormatInput);
/* close output */
if (pAVFormatRemux && !(pAVFormatRemux->oformat->flags & AVFMT_NOFILE))
avio_closep(&pAVFormatRemux->pb);
avformat_free_context(pAVFormatRemux);
av_freep(&streamList);
}
};
int main(int argc, char **argv)
{
av_log_set_level(AV_LOG_INFO);
spdlog::set_level(spdlog::level::info);
DB::exec(NULL, NULL, NULL ,NULL);
spdlog::info("hello");
auto vp = EdgeVideoMgr();
DB::exec(NULL, NULL, NULL,NULL);
auto evp = EvPuller();
evp.join();
return 0;
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论