提交 45d16c24 authored 作者: blu's avatar blu

init

上级 a6c2ee88
...@@ -22,11 +22,14 @@ using namespace std; ...@@ -22,11 +22,14 @@ using namespace std;
class RepSrv: public TinyThread { class RepSrv: public TinyThread {
private: private:
string sn;
int iid;
string urlRep; string urlRep;
void *pRepCtx = NULL; // for packets REP
void *pRep = NULL;
const char * bytes; const char * bytes;
int len; int len;
void *pRepCtx = NULL; // for packets REP
void *pRep = NULL;
int teardownMq() int teardownMq()
{ {
if(pRep != NULL) { if(pRep != NULL) {
...@@ -55,7 +58,7 @@ class RepSrv: public TinyThread { ...@@ -55,7 +58,7 @@ class RepSrv: public TinyThread {
RepSrv() = delete; RepSrv() = delete;
RepSrv(RepSrv &) = delete; RepSrv(RepSrv &) = delete;
RepSrv(RepSrv&&) = delete; RepSrv(RepSrv&&) = delete;
RepSrv(string urlRep, const char* formatBytes, int len):urlRep(urlRep), bytes(formatBytes), len(len){}; RepSrv(string sn, int iid, string urlRep, const char* formatBytes, int len):sn(sn), iid(iid),urlRep(urlRep), bytes(formatBytes), len(len){};
~RepSrv(){}; ~RepSrv(){};
protected: protected:
void run(){ void run(){
...@@ -72,6 +75,7 @@ class RepSrv: public TinyThread { ...@@ -72,6 +75,7 @@ class RepSrv: public TinyThread {
bStopSig = true; bStopSig = true;
break; break;
} }
spdlog::info("evpuller reqSrv {} {} waiting for req", sn, iid);
int ret =zmq_msg_init(&msg1); int ret =zmq_msg_init(&msg1);
ret = zmq_recvmsg(pRep, &msg1, 0); ret = zmq_recvmsg(pRep, &msg1, 0);
if(ret < 0) { if(ret < 0) {
...@@ -79,6 +83,7 @@ class RepSrv: public TinyThread { ...@@ -79,6 +83,7 @@ class RepSrv: public TinyThread {
continue; continue;
} }
zmq_msg_close(&msg1); zmq_msg_close(&msg1);
spdlog::info("evpuller {} {} reveived req", sn, iid);
zmq_send_const(pRep, zmq_msg_data(&msg), len, 0); zmq_send_const(pRep, zmq_msg_data(&msg), len, 0);
} }
} }
...@@ -89,8 +94,8 @@ private: ...@@ -89,8 +94,8 @@ private:
void *pPubCtx = NULL; // for packets publishing void *pPubCtx = NULL; // for packets publishing
void *pPub = NULL; void *pPub = NULL;
AVFormatContext *pAVFormatInput = NULL; AVFormatContext *pAVFormatInput = NULL;
string urlIn, urlPub, urlRep; string urlIn, urlPub, urlRep, sn;
int *streamList = NULL, numStreams = 0; int *streamList = NULL, numStreams = 0, iid;
public: public:
EvPuller() EvPuller()
...@@ -133,7 +138,7 @@ protected: ...@@ -133,7 +138,7 @@ protected:
// serialize formatctx to bytes // serialize formatctx to bytes
char *pBytes = NULL; char *pBytes = NULL;
ret = AVFormatCtxSerializer::encode(pAVFormatInput, &pBytes); ret = AVFormatCtxSerializer::encode(pAVFormatInput, &pBytes);
auto repSrv = RepSrv(urlRep, pBytes, ret); auto repSrv = RepSrv(sn, iid, urlRep, pBytes, ret);
repSrv.detach(); repSrv.detach();
// find all video & audio streams for remuxing // find all video & audio streams for remuxing
...@@ -207,12 +212,13 @@ private: ...@@ -207,12 +212,13 @@ private:
int init() int init()
{ {
bool inited = false; bool inited = false;
sn = "ILS-2";
iid = 2;
while(!inited) { while(!inited) {
// TODO: read db to get sn // TODO: read db to get sn
const char* sn = "ILS-2";
// req config // req config
json jr = cloudutils::registry(sn, "evpuller", 0); json jr = cloudutils::registry(sn.c_str(), "evpuller", iid);
bool bcnt = false; bool bcnt = false;
try { try {
spdlog::info("registry: {:s}", jr.dump()); spdlog::info("registry: {:s}", jr.dump());
...@@ -223,6 +229,7 @@ private: ...@@ -223,6 +229,7 @@ private:
urlIn = "rtsp://" + user + ":" + passwd + "@"+ ipc + "/h264/ch1/sub/av_stream"; urlIn = "rtsp://" + user + ":" + passwd + "@"+ ipc + "/h264/ch1/sub/av_stream";
urlPub = string("tcp://") +data["addr"].get<string>() + ":" + to_string(data["port-pub"]); urlPub = string("tcp://") +data["addr"].get<string>() + ":" + to_string(data["port-pub"]);
urlRep = string("tcp://") +data["addr"].get<string>() + ":" + to_string(data["port-rep"]); urlRep = string("tcp://") +data["addr"].get<string>() + ":" + to_string(data["port-rep"]);
spdlog::info("evpuller {} {} bind on {} for pub, {} for rep", sn, iid, urlPub, urlRep);
} }
catch(exception &e) { catch(exception &e) {
bcnt = true; bcnt = true;
......
...@@ -162,6 +162,12 @@ void mqPacketFree(void *data, void *hint) ...@@ -162,6 +162,12 @@ void mqPacketFree(void *data, void *hint)
// AVFormatCtxSerializer // AVFormatCtxSerializer
namespace AVFormatCtxSerializer namespace AVFormatCtxSerializer
{ {
/**
* memory layerout
* PS_MARK_S | NUM_STREAMS | AVSTREAM+AVCODEPAR | WHOLESIZE | PS_MARK_E
* */
int encode(AVFormatContext *ctx, char **bytes) int encode(AVFormatContext *ctx, char **bytes)
{ {
int ret = 0; int ret = 0;
...@@ -171,11 +177,16 @@ int encode(AVFormatContext *ctx, char **bytes) ...@@ -171,11 +177,16 @@ int encode(AVFormatContext *ctx, char **bytes)
wholeSize += strlen(PS_MARK_S); wholeSize += strlen(PS_MARK_S);
// num streams // num streams
wholeSize += sizeof(ctx->nb_streams); wholeSize += sizeof(ctx->nb_streams);
wholeSize += sizeof(ctx->nb_streams); spdlog::info("encode sizeof streams: {:d}, {:d}", sizeof(ctx->nb_streams), ctx->nb_streams);
for (int i = 0; i < ctx->nb_streams; i++) for (int i = 0; i < ctx->nb_streams; i++)
{ {
wholeSize += sizeof(AVStream); wholeSize += sizeof(AVStream);
wholeSize += sizeof(AVCodecParameters); wholeSize += sizeof(AVCodecParameters);
//extradata
wholeSize += sizeof(ctx->streams[i]->codecpar->extradata_size);
if(ctx->streams[i]->codecpar->extradata_size!=0){
wholeSize += ctx->streams[i]->codecpar->extradata_size;
}
} }
wholeSize += sizeof(wholeSize); wholeSize += sizeof(wholeSize);
wholeSize += strlen(PS_MARK_E); wholeSize += strlen(PS_MARK_E);
...@@ -195,11 +206,19 @@ int encode(AVFormatContext *ctx, char **bytes) ...@@ -195,11 +206,19 @@ int encode(AVFormatContext *ctx, char **bytes)
// //
memcpy((*bytes) + got, ctx->streams[i]->codecpar, sizeof(AVCodecParameters)); memcpy((*bytes) + got, ctx->streams[i]->codecpar, sizeof(AVCodecParameters));
got += sizeof(AVCodecParameters); got += sizeof(AVCodecParameters);
//extra
memcpy((*bytes) + got, &(ctx->streams[i]->codecpar->extradata_size), sizeof(int));
got += sizeof(int);
memcpy((*bytes) + got,ctx->streams[i]->codecpar->extradata, ctx->streams[i]->codecpar->extradata_size);
got += ctx->streams[i]->codecpar->extradata_size;
} }
memcpy((*bytes) + got, &wholeSize, sizeof(wholeSize)); memcpy((*bytes) + got, &wholeSize, sizeof(wholeSize));
got += sizeof(wholeSize); got += sizeof(wholeSize);
memcpy((*bytes) + got, PS_MARK_E, strlen(PS_MARK_E)); memcpy((*bytes) + got, PS_MARK_E, strlen(PS_MARK_E));
got += strlen(PS_MARK_E);
assert(wholeSize == got);
spdlog::info("encode wholesize: {}", got);
return wholeSize; return wholeSize;
} }
...@@ -207,11 +226,13 @@ int decode(char *bytes, int len, AVFormatContext *pCtx) ...@@ -207,11 +226,13 @@ int decode(char *bytes, int len, AVFormatContext *pCtx)
{ {
int ret = 0; int ret = 0;
int got = 0; int got = 0;
if (memcmp(PS_MARK_S, bytes + got, strlen(PS_MARK_S)) != 0 && memcmp(PS_MARK_E, bytes + len - strlen(PS_MARK_E), strlen(PS_MARK_E)) != 0) memcpy(&ret, bytes +len -strlen(PS_MARK_E) - sizeof(ret), sizeof(ret));
if ((memcmp(PS_MARK_S, bytes + got, strlen(PS_MARK_S)) != 0 && memcmp(PS_MARK_E, bytes + len - strlen(PS_MARK_E), strlen(PS_MARK_E)) != 0)||ret != len)
{ {
spdlog::error("invalid packet"); spdlog::error("invalid packet: {} {}", ret, len);
return -1; return -1;
} }
spdlog::info("decode len: {}", ret);
got += strlen(PS_MARK_S); got += strlen(PS_MARK_S);
memcpy(&ret, bytes + got, sizeof(ret)); memcpy(&ret, bytes + got, sizeof(ret));
got += sizeof(ret); got += sizeof(ret);
...@@ -225,9 +246,20 @@ int decode(char *bytes, int len, AVFormatContext *pCtx) ...@@ -225,9 +246,20 @@ int decode(char *bytes, int len, AVFormatContext *pCtx)
pCtx->streams[i]->codecpar = (AVCodecParameters *)malloc(sizeof(AVCodecParameters)); pCtx->streams[i]->codecpar = (AVCodecParameters *)malloc(sizeof(AVCodecParameters));
memcpy(pCtx->streams[i]->codecpar, bytes + got, sizeof(AVCodecParameters)); memcpy(pCtx->streams[i]->codecpar, bytes + got, sizeof(AVCodecParameters));
got += sizeof(AVCodecParameters); got += sizeof(AVCodecParameters);
// extra
memcpy(&ret, bytes + got, sizeof(int));
got += sizeof(int);
if(ret != 0) {
pCtx->streams[i]->codecpar->extradata_size = ret;
pCtx->streams[i]->codecpar->extradata = (uint8_t *)malloc(ret);
memcpy(pCtx->streams[i]->codecpar->extradata, bytes + got, ret);
got += ret;
}
} }
memcpy(&ret, bytes + got, sizeof(ret)); memcpy(&ret, bytes + got, sizeof(ret));
got += sizeof(ret);
got += strlen(PS_MARK_E);
spdlog::debug("avformatctx decode: {:d} {:d} {:d}", ret, len, got);
assert(ret == len); assert(ret == len);
return ret; return ret;
} }
...@@ -237,6 +269,9 @@ void freeCtx(AVFormatContext *pCtx) ...@@ -237,6 +269,9 @@ void freeCtx(AVFormatContext *pCtx)
for (int i = 0; i < pCtx->nb_streams; i++) for (int i = 0; i < pCtx->nb_streams; i++)
{ {
free(pCtx->streams[i]->codecpar); free(pCtx->streams[i]->codecpar);
if(pCtx->streams[i]->codecpar->extradata_size != 0) {
free(pCtx->streams[i]->codecpar->extradata);
}
free(pCtx->streams[i]); free(pCtx->streams[i]);
} }
free(pCtx->streams); free(pCtx->streams);
...@@ -274,6 +309,15 @@ namespace cloudutils ...@@ -274,6 +309,15 @@ namespace cloudutils
"port-rep":5557, "port-rep":5557,
"iid":2 "iid":2
}, },
"evpusher":[
{
"sn":"ILS-2",
"addr":"localhost",
"iid":2,
"enabled":1,
"urlDest":"rtsp://40.73.41.176:554/test1"
}
],
"evslicer":[ "evslicer":[
{ {
"sn":"ILS-3", "sn":"ILS-3",
...@@ -293,7 +337,7 @@ namespace cloudutils ...@@ -293,7 +337,7 @@ namespace cloudutils
} }
} }
*/ */
const char *config = "{\"code\":0,\"time\":0,\"data\":{\"ipc\":\"172.31.0.51\",\"username\":\"admin\",\"password\":\"FWBWTU\",\"services\":{\"evmgr\":{\"sn\":\"ILS-1\",\"addr\":\"0.0.0.0\",\"port-pub\":5556,\"port-rep\":5557,\"iid\":1},\"evpuller\":{\"sn\":\"ILS-2\",\"addr\":\"0.0.0.0\",\"port-pub\":5556,\"port-rep\":5557,\"iid\":2},\"evslicer\":[{\"sn\":\"ILS-3\",\"addr\":\"192.168.0.25\",\"iid\":3}],\"evml\":[{\"feature\":\"motion\",\"sn\":\"ILS-4\",\"addr\":\"192.168.0.26\",\"iid\":4}]}}}"; const char *config = "{\"code\":0,\"time\":0,\"data\":{\"ipc\":\"172.31.0.51\",\"username\":\"admin\",\"password\":\"FWBWTU\",\"services\":{\"evmgr\":{\"sn\":\"ILS-1\",\"addr\":\"0.0.0.0\",\"port-pub\":5556,\"port-rep\":5557,\"iid\":1},\"evpuller\":{\"sn\":\"ILS-2\",\"addr\":\"0.0.0.0\",\"port-pub\":5556,\"port-rep\":5557,\"iid\":2},\"evpusher\":[{\"sn\":\"ILS-2\",\"addr\":\"localhost\",\"iid\":2,\"enabled\":1,\"urlDest\":\"rtsp://40.73.41.176:554/test1\"}],\"evslicer\":[{\"sn\":\"ILS-3\",\"addr\":\"192.168.0.25\",\"iid\":3}],\"evml\":[{\"feature\":\"motion\",\"sn\":\"ILS-4\",\"addr\":\"192.168.0.26\",\"iid\":4}]}}}";
json registry(const char *sn, const char *scn, int iid) json registry(const char *sn, const char *scn, int iid)
{ {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论