提交 fd21ee33 authored 作者: blu's avatar blu

add new raw format compilation option: stable network streaming, but with cpu…

add new raw format compilation option: stable network streaming, but with cpu useage increased a little bit
上级 69852f39
No preview for this file type
...@@ -244,6 +244,12 @@ void frame_free(void *data, void*hint) ...@@ -244,6 +244,12 @@ void frame_free(void *data, void*hint)
MaQue_Demo_Mem_release(frame->handleMem); MaQue_Demo_Mem_release(frame->handleMem);
} }
//
void my_free(void *data, void* hint)
{
free(data);
}
// //
XM_S32 cb_frame_proc(XM_VOID *pUserArg, MaQueVideoEncFrameInfo_s *frame) XM_S32 cb_frame_proc(XM_VOID *pUserArg, MaQueVideoEncFrameInfo_s *frame)
{ {
...@@ -255,7 +261,11 @@ XM_S32 cb_frame_proc(XM_VOID *pUserArg, MaQueVideoEncFrameInfo_s *frame) ...@@ -255,7 +261,11 @@ XM_S32 cb_frame_proc(XM_VOID *pUserArg, MaQueVideoEncFrameInfo_s *frame)
zmq_msg_t msg; zmq_msg_t msg;
void *hint = NULL; void *hint = NULL;
if(frame->eEncodeType == MAQUE_ENCODE_TYPE_H264 && frame->eFrameType == MAQUE_FRAME_TYPE_VIDEO && frame->nDataLen > 0) { if(frame->eEncodeType == MAQUE_ENCODE_TYPE_H264 && frame->eFrameType == MAQUE_FRAME_TYPE_VIDEO && frame->nDataLen > 0) {
#ifdef RAW_FORMAT
evpacket_t* pkt = (evpacket_t*)malloc(sizeof(evpacket_t) + frame->nDataLen);
#else
evpacket_t* pkt = (evpacket_t*)((char*)frame->pData - sizeof(evpacket_t)); evpacket_t* pkt = (evpacket_t*)((char*)frame->pData - sizeof(evpacket_t));
#endif
pkt->meta.magic[0] = 0xBE; pkt->meta.magic[0] = 0xBE;
pkt->meta.magic[1] = 0xEF; pkt->meta.magic[1] = 0xEF;
memcpy(pkt->meta.sn, dev_sn, sizeof(dev_sn)); memcpy(pkt->meta.sn, dev_sn, sizeof(dev_sn));
...@@ -277,7 +287,10 @@ XM_S32 cb_frame_proc(XM_VOID *pUserArg, MaQueVideoEncFrameInfo_s *frame) ...@@ -277,7 +287,10 @@ XM_S32 cb_frame_proc(XM_VOID *pUserArg, MaQueVideoEncFrameInfo_s *frame)
else if(frame->eSubType == MAQUE_FRAME_SUBTYPE_P) { else if(frame->eSubType == MAQUE_FRAME_SUBTYPE_P) {
frameCntPframe++; frameCntPframe++;
} }
zmq_msg_init_data(&msg, (char*)pkt, frame->nDataLen + sizeof(evpacket_t), nullptr, nullptr); #ifdef RAW_FORMAT
memcpy((char*)pkt + sizeof(evpacket_t), frame->pData, frame->nDataLen);
#endif
zmq_msg_init_data(&msg, (char*)pkt, frame->nDataLen + sizeof(evpacket_t), my_free, nullptr);
zmq_msg_send(&msg, pPub, 0); zmq_msg_send(&msg, pPub, 0);
packetId++; packetId++;
} }
...@@ -422,7 +435,7 @@ string apply_config(json &data) ...@@ -422,7 +435,7 @@ string apply_config(json &data)
gConfigSystem.module.sys.video_quality = MAQUE_IMG_QUALITY_BETTER; gConfigSystem.module.sys.video_quality = MAQUE_IMG_QUALITY_BETTER;
gConfigSystem.module.record.interval = 60 * 2; // 2 minutes interval gConfigSystem.module.record.interval = 60 * 2; // 2 minutes interval
gConfigSystem.module.sys.bitrate_kb = 1024; // 1Mbps gConfigSystem.module.sys.bitrate_kb = 1024; // 1Mbps
gConfigSystem.module.sys.bitrate_type = MAQUE_BITRATE_CTRL_VBR; gConfigSystem.module.sys.bitrate_type = MAQUE_BITRATE_CTRL_CBR;
gConfigSystem.module.sys.push = 0; gConfigSystem.module.sys.push = 0;
// motion // motion
gConfigSystem.module.motion.enabled = 1; gConfigSystem.module.motion.enabled = 1;
......
...@@ -23,7 +23,7 @@ using namespace std; ...@@ -23,7 +23,7 @@ using namespace std;
int _raw_connect(std::string host, std::string port, int *socket_, int recv_timeout, int send_timeout) int _raw_connect(std::string host, std::string port, int *socket_, int recv_timeout, int send_timeout)
{ {
int rv = 0; int rv = -1;
struct addrinfo hints {}; struct addrinfo hints {};
memset(&hints, 0, sizeof(struct addrinfo)); memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_INET; // IPv4 hints.ai_family = AF_INET; // IPv4
...@@ -36,7 +36,7 @@ int _raw_connect(std::string host, std::string port, int *socket_, int recv_time ...@@ -36,7 +36,7 @@ int _raw_connect(std::string host, std::string port, int *socket_, int recv_time
addrinfo *addrinfo_result; addrinfo *addrinfo_result;
rv = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &addrinfo_result); rv = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &addrinfo_result);
if (rv != 0) { if (rv != 0) {
spdlog::error("::getaddrinfo failed: {}", gai_strerror(rv)); spdlog::error("raw_connect ::getaddrinfo failed: {}", gai_strerror(rv));
return rv; return rv;
} }
...@@ -49,31 +49,34 @@ int _raw_connect(std::string host, std::string port, int *socket_, int recv_time ...@@ -49,31 +49,34 @@ int _raw_connect(std::string host, std::string port, int *socket_, int recv_time
} }
struct timeval timeout; struct timeval timeout;
if(recv_timeout != 0){ if(recv_timeout != 0) {
timeout.tv_sec = recv_timeout; timeout.tv_sec = recv_timeout;
timeout.tv_usec = 0; timeout.tv_usec = 0;
if (setsockopt (*socket_, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) { if (setsockopt (*socket_, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
spdlog::error("setsockopt SO_RCVTIMEO failed"); spdlog::warn("raw_connect setsockopt SO_RCVTIMEO failed");
return -1; continue;
} }
} }
if(send_timeout != 0){ if(send_timeout != 0) {
timeout.tv_sec = send_timeout; timeout.tv_sec = send_timeout;
if (setsockopt (*socket_, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) { if (setsockopt (*socket_, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
spdlog::error("setsockopt SO_SNDTIMEO failed"); spdlog::warn("raw_connect setsockopt SO_SNDTIMEO failed");
return -1; continue;
} }
} }
rv = ::connect(*socket_, rp->ai_addr, rp->ai_addrlen); rv = ::connect(*socket_, rp->ai_addr, rp->ai_addrlen);
// auto addr = (struct sockaddr_in *)rp->ai_addr;
// auto hostIp = string(inet_ntoa((struct in_addr)addr->sin_addr));
if (rv == 0) { if (rv == 0) {
auto addr = (struct sockaddr_in *)rp->ai_addr; // auto addr = (struct sockaddr_in *)rp->ai_addr;
// //auto hostIp = string(inet_ntoa((struct in_addr)addr->sin_addr)); // //auto hostIp = string(inet_ntoa((struct in_addr)addr->sin_addr));
// rv = fcntl(*socket_, F_SETFL, fcntl(*socket_, F_GETFL, 0) | O_NONBLOCK); // rv = fcntl(*socket_, F_SETFL, fcntl(*socket_, F_GETFL, 0) | O_NONBLOCK);
// if (rv == -1){ // if (rv == -1){
// spdlog::error("error calling fcntl"); // spdlog::error("error calling fcntl");
// } // }
spdlog::info("raw_connect success to {}:{}", host, port);
break; break;
} }
else { else {
...@@ -81,6 +84,7 @@ int _raw_connect(std::string host, std::string port, int *socket_, int recv_time ...@@ -81,6 +84,7 @@ int _raw_connect(std::string host, std::string port, int *socket_, int recv_time
::close(*socket_); ::close(*socket_);
*socket_ = -1; *socket_ = -1;
rv = -1; rv = -1;
spdlog::warn("raw_connect failed to {}:{}", host, port);
} }
} }
...@@ -89,10 +93,12 @@ int _raw_connect(std::string host, std::string port, int *socket_, int recv_time ...@@ -89,10 +93,12 @@ int _raw_connect(std::string host, std::string port, int *socket_, int recv_time
} }
int raw_connect(std::string host, std::string port, int *socket_){ int raw_connect(std::string host, std::string port, int *socket_)
{
return _raw_connect(host, port, socket_, 0, 0); return _raw_connect(host, port, socket_, 0, 0);
} }
int raw_connect_nonblock(std::string host, std::string port, int *socket_, int recv_timeout, int send_timeout){ int raw_connect_nonblock(std::string host, std::string port, int *socket_, int recv_timeout, int send_timeout)
{
return _raw_connect(host, port, socket_, recv_timeout, send_timeout); return _raw_connect(host, port, socket_, recv_timeout, send_timeout);
} }
...@@ -104,7 +110,7 @@ string raw_send(int s, const void *buf, size_t len) ...@@ -104,7 +110,7 @@ string raw_send(int s, const void *buf, size_t len)
ssize_t ret = 0; ssize_t ret = 0;
int retries = 0; int retries = 0;
while(len > 0) { while(len > 0) {
ret = ::send(s, (char*)buf+ sent, len, MSG_NOSIGNAL); ret = ::send(s, (char*)buf + sent, len, MSG_NOSIGNAL);
if(ret < 0) { if(ret < 0) {
break; break;
} }
......
...@@ -58,8 +58,13 @@ XM_S32 MaQue_Demo_Mem_alloc(XM_HANDLE *pHandle, MaQueMemAllocParam_s *pstAllocPa ...@@ -58,8 +58,13 @@ XM_S32 MaQue_Demo_Mem_alloc(XM_HANDLE *pHandle, MaQueMemAllocParam_s *pstAllocPa
case MAQUE_MEM_TYPE_YUV_GET: case MAQUE_MEM_TYPE_YUV_GET:
{ {
/// NOTE by Bruce: ATTENTION! we place the extr evpacket_t ahead for saving memory ops later /// NOTE by Bruce: ATTENTION! we place the extr evpacket_t ahead for saving memory ops later
#ifdef RAW_FORMAT
XM_U8 * raw = (XM_U8 *)malloc(pstAllocParam->nBufSize);
pstMem->pBuffer = raw;
#else
XM_U8 * raw = (XM_U8 *)malloc(pstAllocParam->nBufSize + sizeof(evpacket_t)); XM_U8 * raw = (XM_U8 *)malloc(pstAllocParam->nBufSize + sizeof(evpacket_t));
pstMem->pBuffer = raw + sizeof(evpacket_t); pstMem->pBuffer = raw + sizeof(evpacket_t);
#endif
//printf("malloc. raw: %08X, shifted: %08X\n", (uint32_t)raw, (uint32_t)pstMem->pBuffer); //printf("malloc. raw: %08X, shifted: %08X\n", (uint32_t)raw, (uint32_t)pstMem->pBuffer);
if (raw) { if (raw) {
pstMem->index = 0xff; pstMem->index = 0xff;
...@@ -102,6 +107,28 @@ XM_S32 MaQue_Demo_Mem_release(XM_HANDLE handle) ...@@ -102,6 +107,28 @@ XM_S32 MaQue_Demo_Mem_release(XM_HANDLE handle)
return 0; return 0;
} }
#ifdef RAW_FORMAT
if(pstMem == nullptr || pstMem->pBuffer == nullptr ){
pthread_mutex_unlock(&g_mutexMem);
return 0;
}
XM_U8* raw = pstMem->pBuffer;
//printf("release. h:%08X, raw: %08X, shifted: %08X, %uref\n", (int)handle, (int)raw, (int)shifted, pstMem->nRefCount);
if (pstMem->nRefCount > 1) {
pstMem->nRefCount--;
}
else if (raw) {
/// NOTE by Bruce: ATTENTION!
free(raw);
free(pstMem);
}else if(pstMem->nRefCount < 0){
spdlog::error("shouldn't be here. refcnt:{}", pstMem->nRefCount);
}
else{
spdlog::error("shouldn't be here");
}
#else
if(pstMem == nullptr || pstMem->pBuffer == nullptr || pstMem->pBuffer - sizeof(evpacket_t) == nullptr || pstMem->nRefCount == 0){ if(pstMem == nullptr || pstMem->pBuffer == nullptr || pstMem->pBuffer - sizeof(evpacket_t) == nullptr || pstMem->nRefCount == 0){
//printf("invalid free: %08X, %08X, %uref\n", (int)pstMem, (int)pstMem->pBuffer, pstMem->nRefCount); //printf("invalid free: %08X, %08X, %uref\n", (int)pstMem, (int)pstMem->pBuffer, pstMem->nRefCount);
pthread_mutex_unlock(&g_mutexMem); pthread_mutex_unlock(&g_mutexMem);
...@@ -126,6 +153,7 @@ XM_S32 MaQue_Demo_Mem_release(XM_HANDLE handle) ...@@ -126,6 +153,7 @@ XM_S32 MaQue_Demo_Mem_release(XM_HANDLE handle)
}else{ }else{
spdlog::error("shouldn't be here"); spdlog::error("shouldn't be here");
} }
#endif
pthread_mutex_unlock(&g_mutexMem); pthread_mutex_unlock(&g_mutexMem);
return 0; return 0;
......
...@@ -18,6 +18,7 @@ extern "C" ...@@ -18,6 +18,7 @@ extern "C"
#include <maque_type.h> #include <maque_type.h>
#include <maque_smart.h> #include <maque_smart.h>
#include <maque_osd.h> #include <maque_osd.h>
#include <maque_watchdog.h>
} }
#include "mqtt_helper.hpp" #include "mqtt_helper.hpp"
...@@ -192,6 +193,13 @@ void maq_smart_task_entry(ev_module_config_t *pArg) ...@@ -192,6 +193,13 @@ void maq_smart_task_entry(ev_module_config_t *pArg)
sml::sm<md::fsm> fsm{md::fsm{}}; sml::sm<md::fsm> fsm{md::fsm{}};
bool hasHuman = false; bool hasHuman = false;
bool hasMotion = false; bool hasMotion = false;
/// setup watchdog
LibXmMaQue_Watchdog_enable(1);
// 20s timeout
LibXmMaQue_Watchdog_setTimeOutSecs(20);
while(1) { while(1) {
// assuming it's 1s // assuming it's 1s
this_thread::sleep_for(1s); this_thread::sleep_for(1s);
...@@ -222,9 +230,14 @@ void maq_smart_task_entry(ev_module_config_t *pArg) ...@@ -222,9 +230,14 @@ void maq_smart_task_entry(ev_module_config_t *pArg)
motionCntLast = motionCnt_; motionCntLast = motionCnt_;
if(deltaTimeMs == 0) { if(deltaTimeMs == 0) {
/// abnormal conditions, should restart and investigation
spdlog::warn("no video frames in last ~5 seconds"); spdlog::warn("no video frames in last ~5 seconds");
frameCntTotal.store(0, std::memory_order_relaxed);
frameCntLast = 0;
} }
else { else {
// feed the dog
LibXmMaQue_Watchdog_keepAlive();
if(deltaMotionCnt >= motionCntThresh) { if(deltaMotionCnt >= motionCntThresh) {
hasMotion = true; hasMotion = true;
} }
......
...@@ -15,6 +15,9 @@ ...@@ -15,6 +15,9 @@
#define __EVPACKET_H__ #define __EVPACKET_H__
#include <mutex> #include <mutex>
/// TODO: documentation
#define RAW_FORMAT
#define TERMINAL_SN_SIZE 14 #define TERMINAL_SN_SIZE 14
typedef enum evpacket_type_t { typedef enum evpacket_type_t {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论