提交 89c4bdfe authored 作者: blu's avatar blu

evmlmotion: delta packet ts

上级 1957673d
...@@ -83,6 +83,8 @@ private: ...@@ -83,6 +83,8 @@ private:
condition_variable cvMsg; condition_variable cvMsg;
mutex mutMsg; mutex mutMsg;
bool gotFormat = false; bool gotFormat = false;
long long packetTs = 0;
long long packetTsDelta = 0;
// //
...@@ -579,7 +581,9 @@ private: ...@@ -579,7 +581,9 @@ private:
p["type"] = EV_MSG_TYPE_AI_MOTION; p["type"] = EV_MSG_TYPE_AI_MOTION;
p["gid"] = selfId; p["gid"] = selfId;
p["event"] = EV_MSG_EVENT_MOTION_START; p["event"] = EV_MSG_EVENT_MOTION_START;
p["ts"] = chrono::duration_cast<chrono::seconds>(evtStartTmLast.time_since_epoch()).count(); auto tmp = chrono::duration_cast<chrono::seconds>(evtStartTmLast.time_since_epoch()).count();
p["ts"] = packetTs;
packetTsDelta = tmp - packetTs;
//p["frame"] = origin.clone(); //p["frame"] = origin.clone();
evtQueue->push(p.dump()); evtQueue->push(p.dump());
if(evtQueue->size() > MAX_EVENT_QUEUE_SIZE * 2) { if(evtQueue->size() > MAX_EVENT_QUEUE_SIZE * 2) {
...@@ -621,7 +625,8 @@ private: ...@@ -621,7 +625,8 @@ private:
p["type"] = EV_MSG_TYPE_AI_MOTION; p["type"] = EV_MSG_TYPE_AI_MOTION;
p["gid"] = selfId; p["gid"] = selfId;
p["event"] = EV_MSG_EVENT_MOTION_END; p["event"] = EV_MSG_EVENT_MOTION_END;
p["ts"] = chrono::duration_cast<chrono::seconds>(evtStartTmLast.time_since_epoch()).count() + (int)(detPara.post/2); auto tmp = chrono::duration_cast<chrono::seconds>(evtStartTmLast.time_since_epoch()).count() + (int)(detPara.post/2);
p["ts"] = tmp - packetTsDelta;
evtQueue->push(p.dump()); evtQueue->push(p.dump());
if(evtQueue->size() > MAX_EVENT_QUEUE_SIZE*2) { if(evtQueue->size() > MAX_EVENT_QUEUE_SIZE*2) {
evtQueue->pop(); evtQueue->pop();
...@@ -737,7 +742,7 @@ protected: ...@@ -737,7 +742,7 @@ protected:
spdlog::error("evmlmotion {} failed to recv zmq msg: {}", selfId, zmq_strerror(ret)); spdlog::error("evmlmotion {} failed to recv zmq msg: {}", selfId, zmq_strerror(ret));
continue; continue;
} }
ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet); ret = AVPacketSerializer::decode((char*)zmq_msg_data(&msg), ret, &packet, &packetTs);
{ {
if (ret < 0) { if (ret < 0) {
spdlog::error("evmlmotion {} packet decode failed: {}", selfId, ret); spdlog::error("evmlmotion {} packet decode failed: {}", selfId, ret);
......
...@@ -61,6 +61,9 @@ int encode(AVPacket &pkt, char **bytes) ...@@ -61,6 +61,9 @@ int encode(AVPacket &pkt, char **bytes)
// 4 + 8: wholeSize + DEADBEAF // 4 + 8: wholeSize + DEADBEAF
wholeSize += sizeof(pkt.pts) * 4 + sizeof(pkt.flags) + sizeof(pkt.stream_index) + sizeof(wholeSize) + strlen(PS_MARK_E); wholeSize += sizeof(pkt.pts) * 4 + sizeof(pkt.flags) + sizeof(pkt.stream_index) + sizeof(wholeSize) + strlen(PS_MARK_E);
// timestamp
wholeSize += 8;
auto now = chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count();
*bytes = (char *)malloc(wholeSize); *bytes = (char *)malloc(wholeSize);
memcpy((*bytes) + cnt, PS_MARK_S, strlen(PS_MARK_S)); memcpy((*bytes) + cnt, PS_MARK_S, strlen(PS_MARK_S));
...@@ -102,6 +105,10 @@ int encode(AVPacket &pkt, char **bytes) ...@@ -102,6 +105,10 @@ int encode(AVPacket &pkt, char **bytes)
cnt += sizeof(pkt.flags); cnt += sizeof(pkt.flags);
memcpy((*bytes) + cnt, &(pkt.stream_index), sizeof(pkt.stream_index)); memcpy((*bytes) + cnt, &(pkt.stream_index), sizeof(pkt.stream_index));
cnt += sizeof(pkt.stream_index); cnt += sizeof(pkt.stream_index);
// ts
memcpy((*bytes) + cnt, &now, sizeof(now));
cnt+= sizeof(now);
// size
memcpy((*bytes) + cnt, &wholeSize, sizeof(wholeSize)); memcpy((*bytes) + cnt, &wholeSize, sizeof(wholeSize));
cnt += sizeof(wholeSize); cnt += sizeof(wholeSize);
memcpy((*bytes) + cnt, PS_MARK_E, strlen(PS_MARK_E)); memcpy((*bytes) + cnt, PS_MARK_E, strlen(PS_MARK_E));
...@@ -111,7 +118,7 @@ int encode(AVPacket &pkt, char **bytes) ...@@ -111,7 +118,7 @@ int encode(AVPacket &pkt, char **bytes)
return wholeSize; return wholeSize;
} }
int decode(char *bytes, int len, AVPacket *pkt) int decode(char *bytes, int len, AVPacket *pkt, long long *ts = nullptr)
{ {
// allocate packet mem on heap // allocate packet mem on heap
//AVPacket *pkt = (AVPacket*)malloc(sizeof(AVPacket)); //AVPacket *pkt = (AVPacket*)malloc(sizeof(AVPacket));
...@@ -158,6 +165,12 @@ int decode(char *bytes, int len, AVPacket *pkt) ...@@ -158,6 +165,12 @@ int decode(char *bytes, int len, AVPacket *pkt)
memcpy(&(pkt->stream_index), bytes + got, sizeof(pkt->stream_index)); memcpy(&(pkt->stream_index), bytes + got, sizeof(pkt->stream_index));
got += sizeof(pkt->stream_index); got += sizeof(pkt->stream_index);
// ts
if(ts != nullptr) {
memcpy(ts, bytes + got, sizeof(long long));
}
got += sizeof(long long);
int wholeSize = 0; int wholeSize = 0;
memcpy(&wholeSize, bytes + got, sizeof(wholeSize)); memcpy(&wholeSize, bytes + got, sizeof(wholeSize));
got += sizeof(wholeSize); got += sizeof(wholeSize);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论