提交 2c0181b7 authored 作者: blu's avatar blu

recording

上级 298fb4fe
......@@ -46,7 +46,7 @@ payload: JSON
```
topic: evcamera/v1.0/res_rep
qos:2
retian: true
retian: false
payload: JSON
```
......@@ -90,7 +90,12 @@ payload:
"features": {
"motion": {
"enable": 1,
"region": {"min": [0.5,0.5], "max": [0.75, 0.75]},
"region": {
"minX": 0.1,
"minY": 0.1,
"maxX": 0.9,
"maxY": 0.9
},
"level": 3
},
"record":{
......@@ -119,11 +124,13 @@ payload:
"rid": "<random_str>",
"sn": "A000000Z",
"data": {
{
"op": "replace",
"path": "/features/ai/face_thresh",
"value": 0.5
}
[
{
"op": "replace",
"path": "/features/ai/face_thresh",
"value": 0.5
}
]
}
```
......@@ -147,8 +154,13 @@ payload:
"features": {
"motion": {
"enable": 1,
"region": {"min": [0.5,0.5], "max": [0.75, 0.75]},
"level": 3 // 1 - 6
"level": 3, // 1 - 6
"region":{
"minX": 0.1,
"minY": 0.1,
"maxX": 0.9,
"maxY": 0.9
}
},
"record":{
"enable": 1,
......@@ -158,7 +170,13 @@ payload:
"ai":{
"enable": 1,
"face_thresh": 0.75, // 0 - 1
"human_thresh": 0.63 // 0 - 1
"human_thresh": 0.63, // 0 - 1
"region":{
"minX": 0.1,
"minY": 0.1,
"maxX": 0.9,
"maxY": 0.9
}
}
}
}
......
......@@ -8,12 +8,12 @@ if("${VENDOR}" STREQUAL "")
set(SHAREDINCS "${CMAKE_SOURCE_DIR}/../include")
endif()
#set_source_files_properties(main.cc PROPERTIES COMPILE_FLAGS -g)
set(COMM_INC_DIR ${VENDOR}/shared/include ${VENDOR}/hi3518/include ${SHAREDINCS})
set(COMMON_LIB_DIR ${VENDOR}/shared/lib ${VENDOR}/hi3518/lib)
list(APPEND COMMON_LIBS fmt m dl pthread XmMaQue securec XmSns_50H20AI uv)
list(APPEND COMMON_LIBS fmt m dl pthread)
list(APPEND XM_LIBS XmMaQue securec XmSns_50H20AI)
include_directories(${CMAKE_SOURCE_DIR} ${PROJECT_SOURCE_DIR} ${COMM_INC_DIR})
link_directories(${COMMON_LIB_DIR} /root/xiongmai/arm-himix100-linux/target/lib/)
......@@ -27,10 +27,8 @@ add_library(motion STATIC motion.cc)
add_executable(test_mqtt test_mqtt.cc)
target_link_libraries(test_mqtt PUBLIC paho-mqtt3a fmt m dl pthread)
add_executable(test_jsoncons test_jsoncons.cc)
add_executable(evcamera main.cc)
target_link_libraries(evcamera PUBLIC ntp paho-mqtt3a tcp_client smart motion ${COMMON_LIBS})
target_link_libraries(evcamera PUBLIC ntp paho-mqtt3a tcp_client smart motion zmq ${COMMON_LIBS} ${XM_LIBS})
add_executable(ntp_client ntp_main.cc)
target_link_libraries(ntp_client PUBLIC ntp)
#include <iostream>
#include <string>
#include <chrono>
#include <sys/stat.h>
#include <fmt/format.h>
#include <spdlog/spdlog.h>
#include "ref-memory.hpp"
......@@ -17,10 +18,13 @@
#include <thread>
#include "mqttmgr.hpp"
#include "utils.hpp"
#include <ghc/filesystem.hpp>
using namespace std;
using namespace jsoncons;
using namespace evutils;
using namespace chrono_literals;
namespace fs = ghc::filesystem;
///
#define NUM_IFRAME_PICK 2
......@@ -29,13 +33,13 @@ using namespace evutils;
#define INTERVAL_LOG_QFULL 3 // its about 60/15 *3 secs
/// device info
static char dev_sn[13] = {0};
static char dev_sn[TERMINAL_SN_SIZE] = {0};
/// default video gateway info
static int videoQuality = MAQUE_IMG_QUALITY_BETTER;
static uint32_t frameCntTotal = 0;
static uint64_t frameCntTotal = 0;
static uint32_t frameCntIframe = 0;
static uint32_t frameCntPframe = 0;
static uint64_t packetId = 0;
......@@ -53,9 +57,58 @@ static mutex _mutFrame;
static condition_variable _condFrame;
static Notifier _notiFrame = {&_mutFrame, &_condFrame};
static CallBackArg args = {nullptr, &_notiFrame, &_frameQueue};
static int raw_socket_ = 0;
static bool bGotTime = false;
// zmq
static void *pPubCtx = nullptr;
static void *pPub = nullptr;
static const string strPubUrl = "inproc://frame";
static void *pSub = nullptr;
static void *pDealerCtx = nullptr;
static void *pDealer = nullptr;
static string vgwUrl = "tcp://192.168.55.104:7123";
//
static string configFilePath = "/etc/evcamera.json";
static json gJsonConfig;
//
static OrderedList<long long> *gRecFilesList = nullptr;
//
static string strMaQuePath = "/mnt/sd/Config/";
static string strSDCardReservedPaths[] = {recFilePath, strMaQuePath};
/// flags used to stop modules
/// 0: stopped, 1: requested to stop, 2: runningstatic int
typedef struct ev_region_t {
float minx;
float miny;
float maxx;
float maxy;
} ev_region_t;
typedef struct ev_module_config_t {
int stat_current;
ev_region_t region;
bool enabled;
union {
struct {
int interval; // in seconds
}record;
struct {
float face_thresh; //[0,1]
float human_thresh;
} ai;
struct motion{
int level; // 1 - 6
} motion;
} module;
} ev_module_config_t;
static ev_module_config_t gConfigRecord = {0};
static ev_module_config_t gConfigAI = {0};
static ev_module_config_t gConfigMotion = {0};
enum EV_MSG_ERROR_CODE {
EV_MSG_ERROR_NONE,
EV_MSG_ERROR_EXCEPTION,
......@@ -66,14 +119,18 @@ enum EV_MSG_ERROR_CODE {
void clean_up(int sig)
{
if(sig == SIGPIPE) {
// vgw connection error
bConnected = false;
bNeedRetry = true;
return;
}
spdlog::warn("clearn up called, with sig: {}", sig);
if (args.recFD) {
fclose(args.recFD);
}
if (raw_socket_) {
::close(raw_socket_);
}
// while (args.dataq->size() > 0) {
// DataItem elem = args.dataq->front();
......@@ -162,6 +219,13 @@ int32_t configure_stream(MaQueStreamChannel_e eStreamChn, MaQueVideoEncodeCfg_s
return res;
}
//
void frame_free(void *data, void*hint)
{
MaQueVideoEncFrameInfo_s *frame = (MaQueVideoEncFrameInfo_s *)hint;
MaQue_Demo_Mem_release(frame->handleMem);
}
//
XM_S32 cb_frame_proc(XM_VOID *pUserArg, MaQueVideoEncFrameInfo_s *frame)
{
......@@ -169,219 +233,237 @@ XM_S32 cb_frame_proc(XM_VOID *pUserArg, MaQueVideoEncFrameInfo_s *frame)
CallBackArg *args = (CallBackArg *)pUserArg;
struct timeval stTimeVal;
int ret;
frameCntTotal++;
if(logLevel <= spdlog::level::debug) {
printf("\n\nnew frame: %u, p0 h:%08X\n", frameCntTotal, (int)frame->handleMem);
zmq_msg_t msg;
void *hint = NULL;
if(frame->eEncodeType == MAQUE_ENCODE_TYPE_H264 && frame->eFrameType == MAQUE_FRAME_TYPE_VIDEO && frame->nDataLen > 0) {
evpacket_t* pkt = (evpacket_t*)((char*)frame->pData - sizeof(evpacket_t));
pkt->meta.magic[0] = 0xBE;
pkt->meta.magic[1] = 0xEF;
memcpy(pkt->meta.sn, dev_sn, sizeof(dev_sn));
/// NOTES: since both ends use little-endian, we don't use htonl, htons to save clocks
pkt->vpara.frame_type = frame->eSubType + 1;
pkt->vpara.packet_type = frame->eEncodeType;
pkt->vpara.res.width = frame->nWidth;
pkt->vpara.res.height = frame->nHeight;
/// TODO:
pkt->vpara.fps = 15;
//pkt->vpara.ts = frame->stTimeStamp;
pkt->length = frame->nDataLen;
pkt->meta.packet_id = packetId;
pkt->meta.crc = 0;
pkt->meta.crc = crc16((unsigned char*)pkt, sizeof(evpacket_t));
if(frame->eSubType == MAQUE_FRAME_SUBTYPE_I) {
frameCntIframe++;
}
else if(frame->eSubType == MAQUE_FRAME_SUBTYPE_P) {
frameCntPframe++;
}
zmq_msg_init_data(&msg, (char*)pkt, frame->nDataLen + sizeof(evpacket_t), nullptr, nullptr);
zmq_msg_send(&msg, pPub, 0);
packetId++;
}
if (bCanRecord) {
auto nWrite = fwrite(frame->pData, 1, frame->nDataLen, args->recFD);
if(frame->nDataLen != nWrite) {
spdlog::error("failed to write file");
if(args->recFD) {
fclose(args->recFD);
}
MaQue_Demo_Mem_release(frame->handleMem);
return XM_SUCCESS;
}
void frame_send_entry(void *args)
{
int ret = 0;
int flagFail = 3; // 1 - sub, 2 - dealer , 3 - both
// should never exit
while(1){
/// TODO: check enable status
this_thread::sleep_for(4s);
if(false){
continue;
}
}
if(enablePush && bConnected == false && frameCntTotal % (NUM_MAX_QUEQUE_SIZE * 3) == 0) {
bNeedRetry = true;
bConnected = false;
if(raw_socket_) {
::close(raw_socket_);
raw_socket_ = 0;
// setup dealer
if(flagFail & 2){
ret = setupDealer(&pDealerCtx, &pDealer, string(dev_sn), vgwUrl, 10);
if(ret < 0) {
spdlog::error("failed to connect cloud vgw: {}", vgwUrl);
continue;
}
spdlog::info("success connect to vgw");
}
}
//printf("\n==== prepare frame with enablePush:%d, dataQ: %08hhX, frametype: %d\n", enablePush, (int)args->dataq, frame->eEncodeType);
if (enablePush && args->dataq && frame->eEncodeType == MAQUE_ENCODE_TYPE_H264) {
unique_lock<mutex> lock(*args->noti->mut);
// check delayed packets for about 12s. sufficiently high
if (args->dataq->size() >= NUM_MAX_QUEQUE_SIZE) {
spdlog::warn("dataq full");
volatile MaQueVideoEncFrameInfo_s * p1 = (MaQueVideoEncFrameInfo_s *)args->dataq->front().ud;
if(logLevel == spdlog::level::debug) {
printf("%u, p1 h:%08X\n", frameCntTotal, (int)p1->handleMem);
// setup sub
if(flagFail &1){
pSub = zmq_socket(pPubCtx, ZMQ_SUB);
if(pSub == nullptr) {
spdlog::error("failed to create frame sending socket");
continue;
}
args->dataq->pop();
if(p1 == frame) {
spdlog::error("BUG!!! new frame address same as what in Queue");
ret = zmq_connect(pSub, strPubUrl.c_str());
if(ret != 0) {
spdlog::error("failed connect frame pub: {}", strPubUrl);
continue;
}
MaQue_Demo_Mem_release(p1->handleMem);
p1 = nullptr;
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
spdlog::info("zmq sub successed (for frame sending)");
}
if (frame->pData && frame->nDataLen > 0) {
timeval tv;
//::gettimeofday(&tv,NULL);
DataItem dt = {(char *)frame->pData - sizeof(evpacket_t), frame->nDataLen + sizeof(evpacket_t), (void *)frame};
evpacket_t *pkt = (evpacket_t *)dt.buf;
memset(pkt, 0, sizeof(*pkt));
pkt->meta.magic[0] = 0xBE;
pkt->meta.magic[1] = 0xEF;
memcpy(pkt->meta.sn, dev_sn, sizeof(dev_sn));
/// NOTES: since both ends use little-endian, we don't use htonl, htons to save clocks
pkt->vpara.frame_type = frame->eSubType + 1;
pkt->vpara.packet_type = frame->eEncodeType;
pkt->vpara.res.width = frame->nWidth;
pkt->vpara.res.height = frame->nHeight;
/// TODO:
pkt->vpara.fps = 15;
//pkt->vpara.ts = frame->stTimeStamp;
pkt->length = frame->nDataLen;
pkt->meta.packet_id = packetId;
pkt->meta.crc = 0;
pkt->meta.crc = crc16((unsigned char*)pkt, sizeof(evpacket_t));
if (frame->eSubType == MAQUE_FRAME_SUBTYPE_I) {
frameCntIframe++;
}
else if(frame->eSubType == MAQUE_FRAME_SUBTYPE_P) {
frameCntPframe++;
zmq_msg_t msg;
while (1) {
// TODO: reconnect
zmq_msg_init(&msg);
ret = zmq_msg_recv(&msg, pSub, 0);
if(ret < 0) {
spdlog::error("failed to recv zmq msg: {}", zmq_strerror(zmq_errno()));
flagFail = 1;
break;
}
MaQue_Demo_Mem_addRef(frame->handleMem);
args->dataq->push(dt);
bPFrameAvail = true;
spdlog::debug("frame meter ic:{}, pc:{}, tc:{}, len:{}, cid:{}", frameCntIframe, frameCntPframe,frameCntTotal, frame->nDataLen, packetId);
packetId++;
// ssize_t sz = zmq_msg_size(&msg);
// char *ptr = (char*)zmq_msg_data(&msg);
ret = zmq_msg_send(&msg, pDealer, 0);
zmq_msg_close(&msg);
if(ret < 0) {
spdlog::error("failed to send vgw: {}, {}", zmq_strerror(zmq_errno()));
flagFail = 2;
break;
}
}
else {
spdlog::error("invalid frame");
if(flagFail & 1) {
zmq_close(pSub);
pSub = nullptr;
}
if(flagFail &2){
zmq_close(pDealer);
zmq_ctx_destroy(pDealerCtx);
pDealer = nullptr;
pDealerCtx = nullptr;
}
lock.unlock();
args->noti->cond->notify_one();
}
else {
//spdlog::warn("pusher is disabled {}:{}", __FILE__, __LINE__);
}
if(logLevel <= spdlog::level::debug) {
printf("%u, p2 h:%08X\n", frameCntTotal, (int)frame->handleMem);
}
MaQue_Demo_Mem_release(frame->handleMem);
return XM_SUCCESS;
}
void frame_send_entry(void *args)
{
CallBackArg *pvArg = (CallBackArg *)args;
int freshConnect = 0;
DataItem elem = {0};
while (1) {
MaQueVideoEncFrameInfo_s *pMem;
{
unique_lock<mutex> lk(*pvArg->noti->mut);
pvArg->noti->cond->wait(lk, [pvArg] { return !pvArg->dataq->empty();});
if (pvArg->dataq->empty()) {
spdlog::warn("dataq empty");
void load_sd_files(OrderedList<long long> &_list){
auto now = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
try {
string fname, baseName;
for (const auto & entry : fs::directory_iterator(recFilePath)) {
fname = entry.path().c_str();
if(entry.file_size() == 0 || !entry.is_regular_file()||entry.path().extension() != ".h264") {
spdlog::warn("{} loadVideoFiles skipped {} (empty/directory/!h264)", entry.path().c_str());
continue;
}
// /// remove old frames
// lk.unlock();
// while(pvArg->dataq->size() > 2) {
// lk.lock();
// elem = pvArg->dataq->front();
// pvArg->dataq->pop();
// lk.unlock();
// pMem = (MaQueVideoEncFrameInfo_s *)elem.ud;
// MaQue_Demo_Mem_release(pMem->handleMem);
// }
// lk.lock();
elem = pvArg->dataq->front();
pvArg->dataq->pop();
lk.unlock();
pMem = (MaQueVideoEncFrameInfo_s *)elem.ud;
}
if(bConnected == false) {
if(bNeedRetry == true) {
// try reconect
bNeedRetry = false;
if(raw_socket_ != 0) {
::close(raw_socket_);
raw_socket_ = 0;
}
spdlog::info("try reconnecting to video gateway {}:{}", host, port);
auto ret = raw_connect(host, port, &raw_socket_);
if (ret < 0 || raw_socket_ <= 0) {
spdlog::error("failed to reconnect {}:{}", host, port);
bConnected = false;
}
else {
spdlog::info("successfully connected to {}:{}", host, port);
bConnected = true;
//LibXmMaQue_OSD_showTime(0, MAQUE_STREAM_CHN_MAIN, &stOsdTimeModParamMain.stOsdParam);
//freshConnect = pvArg->dataq->size();
/// pop all frames
}
auto ts = stoi(entry.path().stem().c_str());
if(ts < TS_2020){
/// TODO: files records when offline with no valid time got
fs::remove(entry.path());
spdlog::warn("remove records having invalid timestamp: {}", ts);
continue;
}
_list.insert(ts);
}
}
catch(exception &e) {
spdlog::error("{}:{} loadVideoFiles exception : {}", __FILE__, __LINE__, e.what());
}
}
if (bConnected == false || !pMem||elem.size <= 0 || elem.buf == nullptr || (elem.buf + sizeof(evpacket_t)) == nullptr) {
//printf("invalid frame. addr: %08X, len:%u\n", (uint32_t)elem.buf, elem.size);
string msg;
if(elem.size <=0) {
msg = fmt::format("invalid frame size: {}", elem.size);
spdlog::warn(msg);
}
else if(elem.buf == nullptr||elem.buf + sizeof(evpacket_t) == nullptr) {
msg = fmt::format("invalid buffer addr: {0:p}", elem.buf);
spdlog::warn(msg);
}
else {
/// bConnected is false. needn't to log again
//msg = fmt::format("no connection to video gateway server {}:{}", host, port);
//spdlog::warn(msg);
}
void remove_ts_file(long long ts){
string fname = to_string(ts) + ".h264";
fs::remove(fname);
}
/// release msg
if(pMem && pMem->handleMem) {
if(logLevel <= spdlog::level::debug) {
printf("%u, p3 h:%08X\n", frameCntTotal, (int)pMem->handleMem);
}
MaQue_Demo_Mem_release(pMem->handleMem);
pMem = nullptr;
}
void record_video_entry(void *args)
{
int ret = 0;
// this thread will never exit
while(1) {
if(!is_sdcard_avail()) {
spdlog::error("no sd card valial. can't record local videos");
this_thread::sleep_for(10s);
continue;
}
uint64_t total = 0, avail = 0;
if(!get_sdcard_megabytes(total, avail)){
spdlog::error("failed to get sd card size");
continue;
}
/// send frame
if (elem.size > 0) {
auto sz = elem.size;
char *ptr = elem.buf;
ssize_t sent = 0;
while (elem.size > 0) {
ptr += sent;
sent = ::send(raw_socket_, ptr, elem.size > NUM_MAX_PACKET_BYTES? NUM_MAX_PACKET_BYTES:elem.size, 0);
if (sent <= 0) {
break;
spdlog::info("sd card: {}MB total, {}MB free", total, avail);
// calc num of slices
// reserve 0.5GB space
ssize_t maxSlices = (total - 512)/10;
OrderedList<long long> tsList(maxSlices, remove_ts_file);
gRecFilesList = &tsList;
// load existing videos
load_sd_files(tsList);
// setup sub
void *pSub = zmq_socket(pPubCtx, ZMQ_SUB);
// ret = 1;
// zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE, &ret, sizeof (ret));
// ret = 2;
// zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE_IDLE, &ret, sizeof (ret));
// zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret));
ret = zmq_connect(pSub, strPubUrl.c_str());
if(ret != 0) {
spdlog::error("failed connect frame pub: {}", strPubUrl);
exit(1);
}
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
spdlog::info("zmq sub successed (record)");
zmq_msg_t msg;
long long preTs = 0;
long long nowTs = 0;
uint32_t cnt = 0;
fstream * fp = nullptr;
while (1) {
zmq_msg_init(&msg);
ret = zmq_msg_recv(&msg, pSub, 0);
if(ret < 0) {
spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret));
continue;
}
ssize_t sz = zmq_msg_size(&msg);
char *ptr = (char*)zmq_msg_data(&msg);
/// we calc timestamps about every 2s
if(cnt % 30 == 0) {
nowTs = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
}
if(nowTs - preTs > gConfigRecord.module.record.interval*1000) {
if(fp != nullptr) {
if(fp->is_open())
fp->close();
delete fp;
}
else {
//spdlog::debug("sent:{}, total:{}", sent, sz);
fp = new fstream(recFilePath+to_string(nowTs), std::ios::out|std::ios::binary);
if(!fp->is_open()){
spdlog::error("failed to open record file");
delete fp;
fp = nullptr;
}else{
tsList.insert(nowTs);
}
elem.size -= sent;
preTs = nowTs;
}
if (sent <= 0) {
spdlog::error("failed to send, peer down");
/// TODO: reconnection
bConnected = false;
bNeedRetry = true;
if(fp != nullptr) {
fp->write(ptr, sz);
}
cnt++;
zmq_msg_close(&msg);
}
else {
spdlog::error("frame size error");
}
if(logLevel <= spdlog::level::debug) {
printf("%u, p4 h:%08X\n", frameCntTotal, (int)pMem->handleMem);
}
MaQue_Demo_Mem_release(pMem->handleMem);
zmq_close (pSub);
/// DO NOT! do this
//zmq_ctx_destroy(pPubCtx);
}
}
/// verify request message
......@@ -468,66 +550,42 @@ void report_response_args(string topic, int code, string message, string cata, s
_report_response(topic, js.to_string());
}
int apply_config(json &js)
string verify_config(json &data)
{
auto hasSD = is_sdcard_avail();
/// verify parameters
string str, strWarn;
if(!js.contains(kMsgData)) {
str = fmt::format("no data object in config: {}", js.to_string());
}
else {
json data = js[kMsgData];
string veri[] = {kMsgConfigVgw, kMsgConfigMqtt, kMsgConfigUpload};
for(auto &k:veri) {
auto value = msg_field(data, k);
if(value.empty()) {
str = fmt::format("no {} field in data: {}", k, js.to_string());
string veri[] = {kMsgConfigVgw, kMsgConfigMqtt, kMsgConfigUpload};
for(auto &k:veri) {
auto value = msg_field(data, k);
if(value.empty()) {
str = fmt::format("no {} field in data: {}", k, data.to_string());
break;
}
else {
// parse host and port
auto uri = httplib::Uri::Parse(value);
if(uri.Host.empty()|| uri.Port.empty()) {
str = fmt::format("host addr invalid {}:{}, {}", k, value, data.to_string());
break;
}
else {
// parse host and port
auto uri = httplib::Uri::Parse(value);
if(uri.Host.empty()|| uri.Port.empty()) {
str = fmt::format("host addr invalid {}:{}, {}", k, value, js.to_string());
break;
}
// check network connections
int socket = 0;
auto rc = raw_connect(uri.Host, uri.Port, &socket);
if(rc < 0 || socket == 0) {
str = fmt::format("failed to connect to {}:{}, {}", k, value, js.to_string());
break;
}
else {
// we must close the test socket
::close(socket);
}
}
}
// check features field
json features = data.get_value_or<json>(kMsgConfigFeatures, json());
string feats[]= {kMsgConfigMotion, kMsgConfigRecord, kMsgConfigAi};
for(auto &k:feats) {
auto f = features.get_value_or<json>(k, json());
if(f.size() == 0) {
strWarn = fmt::format("{}, ", k);
// check network connections
int socket = 0;
auto rc = raw_connect(uri.Host, uri.Port, &socket);
if(rc < 0 || socket == 0) {
str = fmt::format("failed to connect to {}:{}, {}", k, value, data.to_string());
break;
}
if(!strWarn.empty()) {
strWarn = "no below feature in config, default configuration is applied: " + strWarn;
else {
// we must close the test socket
::close(socket);
}
}
}
}
if(!str.empty()) {
report_response_args(pub_topic, EV_MSG_ERROR_CONTENT_SYNTAX, str, kMsgCmdConfig, msg_field(js, kMsgRid), json());
spdlog::error(str);
return -EV_MSG_ERROR_CONTENT_SYNTAX;
}
// try to apply
return str;
}
/// handles all incoming request from cloud services
......@@ -588,14 +646,36 @@ void start_mqtt(void *arg)
client->subscribe(sub_topic + dev_sn, handle_mqtt_req, false);
}
//
void updateConfigFile(json &config){
ofstream ocfg_(configFilePath, std::ios::trunc);
ocfg_ << pretty_print(gJsonConfig);
}
void readConfigFile(json &config){
try{
ifstream cfg_(configFilePath);
gJsonConfig = json::parse(cfg_);
}catch(exception &e){
gJsonConfig = evutils::make_default_config();
spdlog::error("failed get config: {}, force to make default config:\n{}", e.what(), gJsonConfig.to_string());
updateConfigFile(gJsonConfig);
}
}
void create_sd_directories(){
// create record paths
for(auto &k:strSDCardReservedPaths){
system((string("mkdir -p ") + k).c_str());
}
}
///
int main(int argc, char *argv[])
{
int ret = XM_SUCCESS;
spdlog::info("test on hi3518ev300, {}", is_big_endian());
get_mac_addr(dev_sn);
spdlog::info("hi3518ev300, sn: {}, version: {}, BE: {}", dev_sn, evutils::version, is_big_endian());
// get env for host and port
auto host_ = getenv("VGW_HOST");
......@@ -617,6 +697,13 @@ int main(int argc, char *argv[])
/// TODO: check sd availability
bool bSDAvail = is_sdcard_avail();
if(bSDAvail) {
create_sd_directories();
}
/// TODO: load configuration
readConfigFile(gJsonConfig);
verify_config(gJsonConfig);
auto logLevel_ = getenv("LOG_LEVEL");
if(logLevel_) {
......@@ -636,31 +723,26 @@ int main(int argc, char *argv[])
}
}
MaQue_Demo_Mem_Init();
/// thread of utp time
auto thNtp = thread([] {
bool bGotTime = false;
time_t stm;
while(!bGotTime)
{
if(getNtpTime(&stm) >=0) {
spdlog::info("ntp got time");
::stime(&stm);
bGotTime = true;
}
else {
spdlog::warn("failed to get ntp time");
}
this_thread::sleep_for(chrono::seconds(20));
// blocking update time, since time is critical to every other component
bool bGotTime = false;
time_t stm;
while(!bGotTime)
{
if(getNtpTime(&stm) >=0) {
spdlog::info("ntp got time");
::stime(&stm);
bGotTime = true;
}
});
if(thNtp.joinable()) {
thNtp.detach();
else {
spdlog::warn("failed to get ntp time");
}
this_thread::sleep_for(chrono::seconds(3));
}
MaQueStartParam_s startParam = {MAQUE_VIDEO_STANDARD_PAL, {MAQUE_VIDEO_COMPRESS_H264, MAQUE_VIDEO_COMPRESS_H265}, "/mnt/sd/Config/"};
MaQue_Demo_Mem_Init();
MaQueStartParam_s startParam = {MAQUE_VIDEO_STANDARD_PAL, {MAQUE_VIDEO_COMPRESS_H264, MAQUE_VIDEO_COMPRESS_H265}};
memcpy(startParam.aWritableDir, strMaQuePath.c_str(), sizeof(startParam.aWritableDir));
ret = LibXmMaQue_System_startUp(&startParam);
spdlog::info("ret: {}", ret);
......@@ -678,10 +760,10 @@ int main(int argc, char *argv[])
spdlog::info("abt ret: {}", ret);
if (XM_SUCCESS == ret) {
spdlog::info(fmt::format("capbilities: supported codecs {0:#b},"
"max fhd/s {0:d}, max res: {0:d}, per ch mres: {0:d}, {0:d}, {0:d}, {0:d}",
int(capb.videoEncTypeMask),
int(capb.maxEncPowerX1080P), int(capb.eDecImageSizeMax), int(capb.astVidEncChnAbility[0].eCapSizeMax),
int(capb.astVidEncChnAbility[1].eCapSizeMax), int(capb.astVidEncChnAbility[2].eCapSizeMax), int(capb.astVidEncChnAbility[3].eCapSizeMax)));
"max fhd/s {0:d}, max res: {0:d}, per ch mres: {0:d}, {0:d}, {0:d}, {0:d}",
int(capb.videoEncTypeMask),
int(capb.maxEncPowerX1080P), int(capb.eDecImageSizeMax), int(capb.astVidEncChnAbility[0].eCapSizeMax),
int(capb.astVidEncChnAbility[1].eCapSizeMax), int(capb.astVidEncChnAbility[2].eCapSizeMax), int(capb.astVidEncChnAbility[3].eCapSizeMax)));
}
//
......@@ -692,23 +774,13 @@ int main(int argc, char *argv[])
cfg.nIFrameInterval = 10;
cfg.eImageQuality = (MaQueImageQuality_e)videoQuality;
cfg.eVidComp = MAQUE_VIDEO_COMPRESS_H264;
cfg.nBitRate = 200 * 8; // 200KB
cfg.eBitrateCtrl = MAQUE_BITRATE_CTRL_VBR;
cfg.nBitRate = 1024; // 1Mbps
ret = configure_stream((MaQueStreamChannel_e)0, &cfg);
spdlog::info("cfg stream ret: {}", ret);
///
/// record setting
if(enableRecord && bSDAvail) {
FILE *recFD = fopen(recFilePath, "w+");
if (!recFD) {
spdlog::error("fopen() Failed: {}", recFilePath);
bCanRecord = false;
}
else {
args.recFD = recFD;
bCanRecord = true;
}
}
/// TODO: load or make defualt configuration
gConfigRecord.module.record.interval = 60 * 2; // 2 minutes interval
ret = LibXmMaQue_VideoEnc_startStream(0, (MaQueStreamChannel_e)0, cb_frame_proc, (void *)&args);
if (XM_SUCCESS == ret) {
......@@ -716,25 +788,32 @@ int main(int argc, char *argv[])
}
signal(SIGINT, clean_up);
//signal(SIGTERM, clean_up);
// signal(SIGTERM, clean_up);
signal(SIGKILL, clean_up);
signal(SIGPIPE, SIG_IGN);
if (enablePush) {
ret = raw_connect(host, port, &raw_socket_);
if (ret < 0 || raw_socket_ <= 0) {
spdlog::error("failed connect to video gateway at {}:{}", host, port);
bConnected = false;
}
else {
bConnected = true;
spdlog::info("successfully connected to video gateway at {}:{}", host, port);
}
thread thPush = thread(frame_send_entry, &args);
spdlog::info("sizeof pkt header {}, sizeof tv {}", sizeof(evpacket_t), sizeof(timeval));
// frame dispatching
pPubCtx = zmq_ctx_new();
pPub = zmq_socket(pPubCtx, ZMQ_PUB);
ret = zmq_bind(pPub, strPubUrl.c_str());
if(ret < 0) {
spdlog::error("{} failed to create zmq pub topic: {}", strPubUrl);
exit(1);
}
/// TODO: configuration issues handling
if(msg_field(gJsonConfig, kMsgConfigVgw).empty()){
spdlog::error("missing vgw config");
}
thread thPush = thread(frame_send_entry, &args);
spdlog::info("sizeof pkt header {}, sizeof tv {}", sizeof(evpacket_t), sizeof(timeval));
if(thPush.joinable()){
thPush.detach();
}
else {
spdlog::warn("push video is not enabled in environ settings");
thread thVideoRecord = thread(record_video_entry, nullptr);
if(thVideoRecord.joinable()){
thVideoRecord.detach();
}
start_md_bd(&args);
......
#include <iostream>
#include <string>
#include <chrono>
#include <fmt/format.h>
#include <spdlog/spdlog.h>
#include "utils.h"
#include "ref-memory.hpp"
#include <maque_system.h>
#include <maque_video_enc.h>
#include <maque_time.h>
#include "common.h"
#include "ntp.h"
#include <time.h>
#include "raw_tcp.h"
#include <evpacket.h>
#include "smart.h"
#include "motion.h"
#include <thread>
#include <rtsp_pusher.hpp>
using namespace std;
//
//#define ENALBE_RECORD
#define NUM_IFRAME_PICK 2
#define NUM_MAX_QUEQUE_SIZE 60
#define NUM_MAX_PACKET_BYTES 1000
static unsigned long long frameCntTotal = 0;
static unsigned long long frameCntIframe = 0;
static unsigned long long frameCntPframe = 0;
static unsigned long long frameCntIframePrev = 0;
static bool bPFrameAvail = false;
static bool bAvailable = false;
static bool enablePush = false;
static queue<DataItem> _frameQueue;
static mutex _mutFrame;
static condition_variable _condFrame;
static Notifier _notiFrame = {&_mutFrame, &_condFrame};
static CallBackArg args = {nullptr, &_notiFrame, &_frameQueue};
static int raw_socket_ = 0;
static bool bGotTime = false;
void clean_up(int sig){
spdlog::warn("clearn up called, with sig: {}", sig);
if(args.recFD){
fclose(args.recFD);
}
if(raw_socket_){
::close(raw_socket_);
}
while(args.dataq->size() > 0){
DataItem elem = args.dataq->front();
args.dataq->pop();
MaQueVideoEncFrameInfo_s *pMem = (MaQueVideoEncFrameInfo_s*) elem.ud;
MaQue_Demo_Mem_release(pMem->handleMem);
}
//stop_md_bd();
LibXmMaQue_System_destroy();
exit(1);
}
MaQueVideoEncodeCfg_s *init_stream_cfg(MaQueVideoEncodeCfg_s *cfg) {
cfg->eVidComp = -1;
cfg->eCapSize = -1;
cfg->eBitrateCtrl = -1;
cfg->eImageQuality = -1;
cfg->nFps = -1;
cfg->nBitRate = -1;
cfg->nIFrameInterval = -1;
cfg->iQpMin = -1;
cfg->iQpMax = -1;
return cfg;
}
int32_t configure_stream(MaQueStreamChannel_e eStreamChn, MaQueVideoEncodeCfg_s *cfg)
{
int32_t res;
MaQueVideoEncodeCfg_s oldCfg;
if (LibXmMaQue_VideoEnc_getCfg(0, eStreamChn, &oldCfg)){
spdlog::error("LibXmMaQue_VideoEncode_getCfg error");
return -1;
}
if(cfg->eVidComp != -1) {
oldCfg.eVidComp = cfg->eVidComp;
}
if(cfg->eCapSize != -1) {
oldCfg.eCapSize = cfg->eCapSize;
}
if(cfg->eBitrateCtrl != -1) {
oldCfg.eBitrateCtrl = cfg->eBitrateCtrl;
}
if(cfg->eImageQuality != -1) {
oldCfg.eImageQuality = cfg->eImageQuality;
}
if(cfg->nFps != -1){
oldCfg.nFps = cfg->nFps;
}
if(cfg->nBitRate != -1){
oldCfg.nBitRate = cfg->nBitRate;
}
if(cfg->nIFrameInterval != -1) {
oldCfg.nIFrameInterval = cfg->nIFrameInterval;
}
if (cfg->iQpMin > 0 && cfg->iQpMin <= 51
&& cfg->iQpMax > 0 && cfg->iQpMax <= 51)
{
oldCfg.iQpMin = cfg->iQpMin;
oldCfg.iQpMax = cfg->iQpMax;
oldCfg.bUseUserQp = 1;
}
spdlog::info("main: fps={}", oldCfg.nFps);
res = LibXmMaQue_VideoEnc_setCfg(0, eStreamChn, &oldCfg);
if(!res){
memcpy(cfg, &oldCfg, sizeof(oldCfg));
}
return res;
}
//
XM_S32 cb_frame_proc(XM_VOID *pUserArg, MaQueVideoEncFrameInfo_s *frame){
MaQueSystemTime_s *pstSysTime = &frame->stTimeStamp.stSysTime;
CallBackArg* args = (CallBackArg *)pUserArg;
struct timeval stTimeVal;
int ret;
//spdlog::info("new frame {}", frameCntTotal);
frameCntTotal++;
// stTimeVal.tv_sec = pstSysTime->day * 24 * 60 * 60 + pstSysTime->hour * 60 * 60
// + pstSysTime->minute * 60 + pstSysTime->second;
// stTimeVal.tv_usec = pstSysTime->second * 1000;
#ifdef ENALBE_RECORD
if(args->recFD)
{
fwrite(frame->pData, 1, frame->nDataLen, args->recFD);
}
#endif
if(enablePush && args->dataq && frame->eEncodeType == MAQUE_ENCODE_TYPE_H264) {
lock_guard<mutex> lock(*args->noti->mut);
if(args->dataq->size() >= NUM_MAX_QUEQUE_SIZE){
bAvailable = false;
spdlog::warn("dataq full");
}else if(args->dataq->size() <= NUM_MAX_QUEQUE_SIZE*2/3){
bAvailable = true;
}
if(frame->eSubType == MAQUE_FRAME_SUBTYPE_I){
frameCntIframe++;
}
if(bAvailable && frame->pData && frame->nDataLen > 0){
timeval tv;
//::gettimeofday(&tv,NULL);
//DataItem dt = {(char *)frame->pData - sizeof(evpacket_t), frame->nDataLen + sizeof(evpacket_t), (void*)frame};
DataItem dt = {(char *)frame->pData, frame->nDataLen, (void*)frame};
evpacket_t * pkt = (evpacket_t *)dt.buf;
memset(pkt, 0, sizeof(*pkt));
pkt->meta.magic[0] = 0xBE;
pkt->meta.magic[1] = 0xEF;
pkt->vpara.frame_type = (evframe_type_t)(frame->eSubType +1);
pkt->vpara.packet_type = (evpacket_type_t)(frame->eEncodeType);
pkt->vpara.res.width = htons(frame->nWidth);
pkt->vpara.res.height = htons(frame->nHeight);
//pkt->vpara.ts = tv;
pkt->length = htonl(frame->nDataLen);
//if(frame->eSubType == MAQUE_FRAME_SUBTYPE_I && (frameCntIframe-1) % NUM_IFRAME_PICK == 0) {
MaQue_Demo_Mem_addRef(frame->handleMem);
args->dataq->push(std::move(dt));
args->noti->cond->notify_all();
frameCntIframePrev = frameCntIframe;
// bPFrameAvail = true;
// frameCntPframe = 0;
// spdlog::debug("=====\nframe meter ic: {}, tc: {}, len: {}, p avail", frameCntIframe, frameCntTotal, frame->nDataLen);
// }else if(bPFrameAvail && frame->eSubType == MAQUE_FRAME_SUBTYPE_P && frameCntIframe == frameCntIframePrev){
// MaQue_Demo_Mem_addRef(frame->handleMem);
// args->dataq->push(std::move(dt));
// args->noti->cond->notify_all();
// ++frameCntPframe;
// spdlog::debug("=====\nframe meter pc: {}, tc: {}, len: {}, p avail", frameCntPframe, frameCntTotal, frame->nDataLen);
// // spdlog::info("pframe : {}, {}", frameCntIframe, frameCntIframePrev);
// }else{
// //spdlog::error("pframe not avail: {}, {}", frameCntIframe, frameCntIframePrev);
// bPFrameAvail = false;
// }
}else{
spdlog::warn("h264 not avail");
}
}
if(frame->handleMem != 0) {
MaQue_Demo_Mem_release(frame->handleMem);
}
return XM_SUCCESS;
}
void frame_send_entry(void * args){
CallBackArg * pvArg = (CallBackArg *)args;
unsigned long long frameCnt = 0;
while(1){
DataItem elem;
{
unique_lock<mutex> lk(*pvArg->noti->mut);
pvArg->noti->cond->wait(lk, [pvArg] {return !pvArg->dataq->empty();});
if(pvArg->dataq->empty()){
this_thread::sleep_for(chrono::milliseconds(500));
printf("dataq empty");
continue;
}
elem = pvArg->dataq->front();
//pvArg->dataq->pop();
frameCnt++;
}
MaQueVideoEncFrameInfo_s *pMem = (MaQueVideoEncFrameInfo_s *)elem.ud;
// send frame
if(elem.size <= 0 || elem.buf == nullptr || (elem.buf + sizeof(evpacket_t)) == nullptr) {
spdlog::error("invalid frame. addr: {0:x}, len: {0:d}", (uint32_t)elem.buf, elem.size);
pvArg->dataq->pop();
continue;
}
if(elem.size > 0){
char * ptr = elem.buf;
size_t sent = 0;
// while((elem.size -= sent) > 0 &&
// (sent = ::send(raw_socket_, (ptr+=sent), elem.size >
// NUM_MAX_PACKET_BYTES? NUM_MAX_PACKET_BYTES:elem.size, 0)) > 0) {
// //
// }
while(elem.size >0) {
ptr += sent;
//sent = ::send(raw_socket_, ptr, elem.size > NUM_MAX_PACKET_BYTES? NUM_MAX_PACKET_BYTES:elem.size, 0);
sent = ::send(raw_socket_, ptr, elem.size, 0);
if(sent <= 0){
break;
}
elem.size -= sent;
}
if(sent < 0){
spdlog::error("faile to send");
exit(1);
}
}else{
spdlog::error("size error or not got time");
}
MaQue_Demo_Mem_release(pMem->handleMem);
pvArg->dataq->pop();
}
}
//
int main(int argc, char *argv[]){
int ret = XM_SUCCESS;
spdlog::set_level(spdlog::level::debug);
spdlog::info("test on hi3518ev300");
char *host = "192.168.55.104", *port = "7123";
if(argc == 3){
host = argv[1];
port = argv[2];
enablePush = true;
}
// time_t stm;
// if(getNtpTime(&stm) >=0){
// if(stime(&stm) < 0){
// spdlog::error("failed to set system time");
// }else{
// spdlog::info("ntp got time");
// bGotTime = true;
// }
// }
MaQueStartParam_s startParam = {0, {MAQUE_VIDEO_COMPRESS_H264, MAQUE_VIDEO_COMPRESS_H265}, "/mnt/sd/Config/"};
ret = LibXmMaQue_System_startUp(&startParam);
spdlog::info("ret: {}", ret);
//
MaQueMemoryApi_s memApi = {MaQue_Demo_Mem_alloc, MaQue_Demo_Mem_release, MaQue_Demo_Mem_addRef, MaQue_Demo_Mem_setLength};
ret = LibXmMaQue_Mem_init(&memApi);
spdlog::info("mem ret: {}", ret);
MaQueSystemTime_s tm;
LibXmMaQue_Time_getCurrentTime(&tm);
LibXmMaQue_Time_print(&tm, "NULL");
const char * testUrl = "rtsp://evcloud.ilabservice.cloud/test_pusher";
//
MaQueCodeAbilities_s capb;
ret = LibXmMaQue_VideoEnc_getAbilities(0, &capb);
spdlog::info("abt ret: {}", ret);
if(XM_SUCCESS == ret){
spdlog::info(fmt::format("capbilities: supported codecs {0:#b}, \
max fhd/s {0:d}, max res: {0:d}, per ch mres: {0:d}, {0:d}, {0:d}, {0:d}", int(capb.videoEncTypeMask),
int(capb.maxEncPowerX1080P), int(capb.eDecImageSizeMax), int(capb.astVidEncChnAbility[0].eCapSizeMax),
int(capb.astVidEncChnAbility[1].eCapSizeMax), int(capb.astVidEncChnAbility[2].eCapSizeMax), int(capb.astVidEncChnAbility[3].eCapSizeMax)));
}
//
MaQueVideoEncodeCfg_s cfg;
init_stream_cfg(&cfg);
cfg.nFps = 10;
cfg.eIFrmIntvType=IFRAME_INTV_TYPE_TIME;
cfg.nIFrameInterval = 10;
cfg.eImageQuality = MAQUE_IMG_QUALITY_GOOD;
cfg.eVidComp = MAQUE_VIDEO_COMPRESS_H264;
ret = configure_stream(0, &cfg);
spdlog::info("cfg stream ret: {}", ret);
//
#ifdef ENALBE_RECORD
const char * const recFilePath = "/mnt/sd/record/rec.264";
FILE *recFD = fopen(recFilePath, "w+");
if(!recFD)
{
spdlog::error("fopen() Failed: {}", recFilePath);
LibXmMaQue_System_destroy();
exit(0);
}
args.recFD = recFD;
#endif
ret = LibXmMaQue_VideoEnc_startStream(0, 0, cb_frame_proc, (void*)&args);
if(XM_SUCCESS == ret) {
spdlog::info("created record task successfully");
}
signal(SIGINT, clean_up);
//signal(SIGTERM, clean_up);
signal(SIGKILL, clean_up);
RtspPusher pusher1(testUrl, &_notiFrame, &_frameQueue);
thread thPusher;
if(enablePush) {
// ret = raw_connect(host, port, &raw_socket_);
// if(ret <0 || raw_socket_ <= 0){
// spdlog::error("failed to create socket");
// exit(1);
// }
// thread thPush = thread(frame_send_entry, &args);
// spdlog::info("sizeof pkt header {}, sizeof tv {}", sizeof(evpacket_t), sizeof(timeval));
// thPush.detach();
pusher1.start(thPusher);
}
start_md_bd(&args);
thread thSmart = thread(maq_smart_task_entry, &args);
thSmart.join();
}
\ No newline at end of file
......@@ -22,7 +22,7 @@ int getNtpTime(time_t * txTm)
{
int socket_ = 0, n = 0, rv = 0; // Socket file descriptor and the n return result from writing/reading from the socket.
int portno = 123; // NTP UDP port number.
const char* host_name = "cn.pool.ntp.org"; // NTP server host-name.
const char* host_name = "time.nist.gov"; // NTP server host-name.
// Structure that defines the 48 byte NTP packet protocol.
typedef struct {
......
......@@ -33,6 +33,23 @@ int raw_connect(std::string host, std::string port, int *socket_, int recv_timeo
last_errno = errno;
continue;
}
struct timeval timeout;
timeout.tv_sec = recv_timeout;
timeout.tv_usec = 0;
if (setsockopt (*socket_, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
spdlog::error("setsockopt SO_RCVTIMEO failed");
return -1;
}
timeout.tv_sec = send_timeout;
if (setsockopt (*socket_, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
spdlog::error("setsockopt SO_SNDTIMEO failed");
return -1;
}
rv = ::connect(*socket_, rp->ai_addr, rp->ai_addrlen);
if (rv == 0) {
auto addr = (struct sockaddr_in *)rp->ai_addr;
......@@ -50,21 +67,6 @@ int raw_connect(std::string host, std::string port, int *socket_, int recv_timeo
rv = -1;
}
}
struct timeval timeout;
timeout.tv_sec = recv_timeout;
timeout.tv_usec = 0;
if (setsockopt (*socket_, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
spdlog::error("setsockopt SO_RCVTIMEO failed");
return -1;
}
timeout.tv_sec = send_timeout;
if (setsockopt (*socket_, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
spdlog::error("setsockopt SO_SNDTIMEO failed");
return -1;
}
::freeaddrinfo(addrinfo_result);
return rv;
......
......@@ -42,15 +42,15 @@ XM_S32 MaQue_Demo_Mem_setLength(XM_HANDLE handle, XM_U32 len)
XM_S32 MaQue_Demo_Mem_alloc(XM_HANDLE *pHandle, MaQueMemAllocParam_s *pstAllocParam)
{
pthread_mutex_lock(&g_mutexMem);
DemoMemory_s *pstMem = NULL;
pstMem = (DemoMemory_s *)malloc(sizeof(DemoMemory_s));
memset(pstMem, 0, sizeof(*pstMem));
if (!pstMem) {
spdlog::error("malloc failed!");
pthread_mutex_unlock(&g_mutexMem);
return -1;
}
memset(pstMem, 0, sizeof(*pstMem));
pthread_mutex_lock(&g_mutexMem);
switch(pstAllocParam->eMemType) {
case MAQUE_MEM_TYPE_VIDEO_ENC:
......@@ -61,7 +61,6 @@ XM_S32 MaQue_Demo_Mem_alloc(XM_HANDLE *pHandle, MaQueMemAllocParam_s *pstAllocPa
XM_U8 * raw = (XM_U8 *)malloc(pstAllocParam->nBufSize + sizeof(evpacket_t));
pstMem->pBuffer = raw + sizeof(evpacket_t);
//printf("malloc. raw: %08X, shifted: %08X\n", (uint32_t)raw, (uint32_t)pstMem->pBuffer);
if (raw) {
pstMem->index = 0xff;
pstMem->nBufSize = pstAllocParam->nBufSize;
......
......@@ -12,31 +12,31 @@ extern "C"
static int smartCnt = 0;
XM_S32 MaQue_JpegEnc_getFrame_callback (XM_VOID *pUserArg, MaQueSmartJpegFrame_s *pstJpegFrame)
{
if(smartCnt > 0) {
smartCnt--;
return 0;
}
smartCnt = 1000;
XM_CHAR acFile[256] = {0};
FILE *pFile;
static XM_S32 jpeg_cnt = 0;
// if(smartCnt > 0) {
// smartCnt--;
// return 0;
// }
// smartCnt = 1000;
// XM_CHAR acFile[256] = {0};
// FILE *pFile;
// static XM_S32 jpeg_cnt = 0;
printf("aClassName[%s] idx = %d, toltal = %d\n", pstJpegFrame->aClassName, pstJpegFrame->nIndex, pstJpegFrame->nToltalJpeg);
if (jpeg_cnt < 30000) {
sprintf(acFile, "ai/snap_%d.jpg", jpeg_cnt);
pFile = fopen(acFile, "wb");
if (pFile == NULL) {
spdlog::error("open file err");
return XM_FAILURE;
}
// if (jpeg_cnt < 30000) {
// sprintf(acFile, "ai/snap_%d.jpg", jpeg_cnt);
// pFile = fopen(acFile, "wb");
// if (pFile == NULL) {
// spdlog::error("open file err");
// return XM_FAILURE;
// }
fwrite(pstJpegFrame->pBuffer, pstJpegFrame->nDataLen, 1, pFile);
fflush(pFile);
// fwrite(pstJpegFrame->pBuffer, pstJpegFrame->nDataLen, 1, pFile);
// fflush(pFile);
fclose(pFile);
jpeg_cnt++;
}
// fclose(pFile);
// jpeg_cnt++;
// }
return XM_SUCCESS;
}
......
......@@ -6,6 +6,12 @@
#include <fstream>
#include <algorithm>
#include <sys/statvfs.h>
#include <mutex>
#include <set>
#include <zmq.h>
#include <string>
#include <spdlog/spdlog.h>
#include <fmt/format.h>
using namespace std;
using namespace jsoncons;
......@@ -52,8 +58,8 @@ const string kMsgSn = "sn";
char hostArr[] = "192.168.55.104";
char portArr[] = "7123";
char *host = hostArr, *port = portArr;
char recFilePathArr[] = "/mnt/sd/record/rec.264";
char *recFilePath = recFilePathArr;
string recFilePath = "/mnt/sd/records/";
const long long TS_2020 = 1577836800000L;
/// topics
string sub_topic = "evcamera/v1.0/request/";
......@@ -79,25 +85,32 @@ bool is_sdcard_avail(char *path = nullptr)
json make_default_config(){
return R"(
{
"sn": "A000000Z",
"vgw": "192.168.55.104:7123",
"mqtt": "admin:vJ3zHqWrHbrqxVMT@evcloudsvc.ilabservice.cloud:11883",
"upload": "evcloudsvc.ilabservice.cloud:10008",
"features": {
"motion": {
"enable": 1,
"region": {"min": [0.35,0.35], "max": [0.65, 0.65]},
"level": 3
},
"record":{
"enable": 1,
"interval": 300,
"duration": 24
"vgw":"evcloudsvc.ilabservice.cloud:7123",
"mqtt":"admin:vJ3zHqWrHbrqxVMT@evcloudsvc.ilabservice.cloud:11883",
"upload":"evcloudsvc.ilabservice.cloud:10010",
"features":{
"push":0,
"motion":{
"enabled":1,
"region":{
"maxX":1,
"maxY":1,
"minX":0,
"minY":0
},
"level":3
},
"recordLen":120,
"ai":{
"enable": 1,
"faceThresh": 0.6,
"humanThresh": 0.6
"enabled":1,
"faceThresh":0.75,
"humanThresh":0.63,
"region":{
"maxX":1,
"maxY":1,
"minX":0,
"minY":0
}
}
}
}
......@@ -119,6 +132,7 @@ void get_mac_addr(char *buf, char *intf = nullptr){
*(buf+ i++) = ::toupper(k);
}
}
*(buf+i) = 0;
}
bool get_sdcard_megabytes(uint64_t &total, uint64_t &free, char* path = nullptr)
......@@ -141,6 +155,228 @@ bool get_sdcard_megabytes(uint64_t &total, uint64_t &free, char* path = nullptr)
return true;
}
template <typename T>
using cb_remove_elem = void(*)(T elem);
template <typename TN>
class OrderedList {
private:
set<TN> list_;
size_t maxSize;
mutex mut;
TN oldestTs;
cb_remove_elem<TN> fn_remove;
unsigned long cntInsert = 0;
public:
OrderedList() = delete;
OrderedList(ssize_t maxSize, cb_remove_elem<TN> fn_remove):maxSize(maxSize), fn_remove(fn_remove){}
void insert(TN elem, cb_remove_elem<TN> fn=nullptr)
{
// list_.insert(lower_bound(list_.begin(), list_.end(), elem), elem);
if(cntInsert % 10 == 0) {
oldestTs = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count() - 60*2*1000*maxSize;
spdlog::info("record: maxSlices {}, oldestTs {}", maxSize, oldestTs);
}
++cntInsert;
if(elem < oldestTs){
if(fn != nullptr){
(*fn)(elem);
}else if(fn_remove != nullptr){
(*fn_remove)(elem);
}
return;
}
if(list_.size() == 0) {
list_.insert(list_.begin(),elem);
return;
}
auto itr = list_.rbegin();
for(; itr != list_.rend(); itr++) {
if(*itr < elem) {
break;
}
}
if(itr == list_.rbegin() ) {
list_.insert(list_.end(), elem);
}
else {
list_.insert(itr.base(), elem);
}
if(list_.size() > maxSize) {
lock_guard<mutex> lg(mut);
auto ts = *(list_.begin());
list_.erase(list_.begin());
if(fn != nullptr){
(*fn)(ts);
}else if(fn_remove != nullptr){
(*fn_remove)(ts);
}else{
// no callback available
}
// auto baseName = videoFileTs2Name(ts);
// fs::path fname(this->urlOut + "/" + baseName + ".mp4");
// fs::remove(fname);
}
}
set<TN> findByRange(TN tss, TN tse, TN & offsetS, TN &offsetE){
set<TN> ret;
lock_guard<mutex> lg(mut);
if(list_.size() == 0) {
return ret;
}
TN first = *(list_.begin());
auto _it = list_.end();
TN end = *(--_it);
if(tse < first||tss > end) {
spdlog::info("range requested ({}, {}) is not in range existed ({}, {}).", tss, tse, first, end);
return ret;
}
first = end = 0;
int found = 0;
TN last = 0;
auto itr = list_.rbegin();
for(; itr != list_.rend(); itr++) {
if(*itr > tse) {
continue;
}
if(*itr <= tse) {
if(found != 1) {
spdlog::info("\t matched : {}, s:{}, e:{}", *itr, tss, tse);
found = 1;
// check the end offset, not guaranteed
if( itr != list_.rbegin()){
auto t = itr;
last = *(--t);
}
}
ret.insert(*itr);
if(tss >= *itr) {
break;
}
}
}
if(found == 1) {
auto itr = ret.begin();
offsetS = tss - *itr;
if(last != 0) {
offsetE = last - tse;
}
}
return ret;
}
};
/// returns negtive for failure, otherwise success
int setupDealer(void **ctx, void **s, string ident, string addr, int sndQS=0, int timeoutMs = -1) {
int ret = 0;
*ctx = zmq_ctx_new();
*s = zmq_socket(*ctx, ZMQ_DEALER);
ret = 1;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE, &ret, sizeof (ret));
ret = 20;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_IDLE, &ret, sizeof (ret));
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret));
ret = 2;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_CNT, &ret, sizeof (ret));
if(sndQS!=0){
zmq_setsockopt(*s, ZMQ_SNDHWM, &sndQS, sizeof (sndQS));
}
if(timeoutMs != 0) {
if(timeoutMs == -1) {
timeoutMs = 5 *1000;
}
zmq_setsockopt(*s, ZMQ_RCVTIMEO, &timeoutMs, sizeof(timeoutMs));
}
ret = zmq_setsockopt(*s, ZMQ_IDENTITY, ident.c_str(), ident.size());
if(ret < 0) {
zmq_close(*s);
zmq_ctx_destroy(*ctx);
spdlog::debug("{} failed setsockopts ZMQ_ROUTING_ID to {}: {}", ident, addr, zmq_strerror(zmq_errno()));
}else{
ret = zmq_connect(*s, addr.c_str());
if(ret != 0) {
zmq_close(*s);
zmq_ctx_destroy(*ctx);
spdlog::error("{} failed connect dealer: {}", ident, addr);
}
}
return ret;
}
int setupRouter(void **ctx, void **s, string addr, int rcvQS=0){
int ret = 0;
*ctx = zmq_ctx_new();
*s = zmq_socket(*ctx, ZMQ_ROUTER);
ret = 1;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE, &ret, sizeof (ret));
ret = 5;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_IDLE, &ret, sizeof (ret));
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret));
ret = 2;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_CNT, &ret, sizeof (ret));
if(rcvQS !=0) {
zmq_setsockopt(*s, ZMQ_RCVHWM, &rcvQS, sizeof(rcvQS));
}
ret = zmq_bind(*s, addr.c_str());
if(ret < 0) {
spdlog::debug("failed to bind zmq at {} for reason: {}, retrying load configuration...", addr, zmq_strerror(zmq_errno()));
}
return ret;
}
int z_recv_multiple(void *s, vector<uint8_t> &buf, int &frames) {
int64_t more = 1;
size_t more_size = sizeof(more);
int ret = 0;
int cnt = 0;
while(more > 0) {
cnt++;
zmq_msg_t msg;
ret = zmq_msg_init(&msg);
if(ret < 0) {
spdlog::debug("failed to receive multiple msg on zmq_msg_init: {}", zmq_strerror(zmq_errno()));
break;
}
ret = zmq_msg_recv(&msg, s, 0);
if(ret < 0) {
spdlog::debug("z_recv_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
buf.insert(buf.end(), (uint8_t*)zmq_msg_data(&msg), (uint8_t*)zmq_msg_data(&msg)+ret);
zmq_msg_close(&msg);
ret = zmq_getsockopt(s, ZMQ_RCVMORE, &more, &more_size);
if(ret < 0) {
spdlog::debug("z_recv_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
}
if(ret < 0 || (frames != 0 && cnt != frames)) {
spdlog::error("failed to recv msg: {}", ret < 0? zmq_strerror(ret): "invalid frames");
return -1;
}else{
frames = cnt;
}
return 0;
}
}
......
......@@ -11,16 +11,16 @@ endif()
set(COMM_INC_DIR ${VENDOR}/shared/include ${VENDOR}/x64/include ${SHAREDINC})
set(COMMON_LIB_DIR ${VENDOR}/shared/lib ${VENDOR}/x64/lib)
# list(APPEND COMMON_LIBS avformat avdevice avcodec swscale avfilter avutil swresample lzma x264 fmt uv pthread dl m z)
list(APPEND COMM_LIBS fmt uv pthread dl m z)
list(APPEND COMM_LIBS fmt zmq pthread dl m z)
list(APPEND VGW_LIBS avformat avcodec swscale avutil swresample lzma x264)
include_directories(${CMAKE_SOURCE_DIR} ${PROJECT_SOURCE_DIR} ${COMM_INC_DIR})
link_directories(${COMMON_LIB_DIR})
#add_library(mime STATIC mime.c)
#add_library(util STATIC utils.cpp)
add_executable(vgw videogateway.cc)
target_link_libraries(vgw PUBLIC ${VGW_LIBS} ${COMM_LIBS})
add_executable(agent agent.cc)
target_link_libraries(agent PUBLIC paho-mqtt3a ${COMM_LIBS})
# add_executable(test_mqtt test_mqtt_rd.cc)
# target_link_libraries(test_mqtt PUBLIC ${COMM_LIBS})
# add_executable(agent agent.cc)
# target_link_libraries(agent PUBLIC paho-mqtt3a ${COMM_LIBS})
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>
#include <spdlog/spdlog.h>
#include <evpacket.h>
#include <sys/time.h>
#include <mutex>
#include <map>
#include <queue>
#include <zmq.h>
extern "C"
{
......@@ -23,24 +23,12 @@ using namespace std;
#define MAX_FRAME_SIZE 500000
#define MAX_RETRY_PUSH_INTV 450
string darwinUrl = "rtsp://evcloud.ilabservice.cloud:554/";
string darwinUrl = "rtsp://evcloudsvc.ilabservice.cloud:554/";
string bindPort = "7123";
string bindAddr = "0.0.0.0";
uv_loop_t *loop;
struct sockaddr_in addr;
typedef struct {
uv_write_t req;
uv_buf_t buf;
} write_req_t;
typedef struct {
uv_tcp_t *handle;
evpacket_t hdr;
evpacket_video_t vpara;
char *buf;
unsigned int size;
int state; // 0 - unkown; 1 - receiving header; 2 - header ready, receiving body; 3 - body ready, processing
AVFormatContext *pAvCtx;
uint64_t packetId;
......@@ -49,42 +37,9 @@ typedef struct {
int64_t pts;
} packet_processor_t;
typedef struct {
uv_tcp_t handle;
packet_processor_t processor;
} packet_client;
typedef struct dataque_frame_t {
char devsn[12];
int codec;
int width;
int height;
int size;
uint8_t *buf;
} *dataque_frame_ptr_t;
typedef struct devinfo_t {
string devsn;
int state; // 0 - init, 1 - running, 2 - send err, 3 - request close, 4 - ready to close
queue<dataque_frame_t> *que;
AVFormatContext *ctx;
devinfo_t(string devsn, queue<dataque_frame_t> *que, AVFormatContext *ctx):devsn(devsn), que(que), ctx(ctx) {}
~devinfo_t()
{
// TODO: for each item in que, delete them
if(que)
delete que;
if(ctx)
avformat_free_context(ctx);
}
} *devinfo_ptr_t;
AVIOInterruptCB int_cb;
map<string, packet_client*> devConnMap;
bool is_big_endian(void)
{
union {
......@@ -95,57 +50,6 @@ bool is_big_endian(void)
return bint.c[0] == 1;
}
void free_write_req(uv_write_t *req)
{
write_req_t *wr = (write_req_t *)req;
free(wr->buf.base);
free(wr);
}
void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
buf->base = (char *)malloc(suggested_size);
buf->len = suggested_size;
}
void on_closed(uv_handle_t *handle)
{
packet_client *pclient = (packet_client *)handle;
spdlog::error("closing client");
// TODO:
if(pclient->processor.buf) {
free(pclient->processor.buf);
}
if(pclient->processor.pAvCtx) {
if(pclient->processor.pAvCtx->pb) {
avio_closep(&pclient->processor.pAvCtx->pb);
}
avformat_free_context(pclient->processor.pAvCtx);
}
if(pclient->processor.hdr.meta.sn[0] != 0) {
/// NOTES: avoid corrupted stacks
pclient->processor.hdr.meta.sn[9] = 0;
string sn_ = string(pclient->processor.hdr.meta.sn);
if(devConnMap.count(sn_) != 0) {
devConnMap.erase(sn_);
}
}
pclient->processor.pAvCtx = nullptr;
delete pclient;
}
void on_written(uv_write_t *req, int status)
{
if (status) {
spdlog::error("Write error {}", uv_strerror(status));
}
free_write_req(req);
}
void debugHex(char *buf, int len)
{
int i = 0;
......@@ -202,21 +106,14 @@ AVFormatContext *rtsp_init(string rtsp_url, int codec, int height, int width, in
out_codecpar->format = AV_PIX_FMT_YUV420P;
//int_cb = {av_callback, nullptr};
pAVFormatRemux->interrupt_callback = int_cb;
/// NOTES: avio_open2 is not needed below, since we directly constructed the avformat ctx
// ret = avio_open2(&pAVFormatRemux->pb, rtsp_url.c_str(), AVIO_FLAG_WRITE, nullptr/*&pAVFormatRemux->interrupt_callback*/, &pOptsRemux);
// /// TODO: not supported protocol
// if(ret <0) {
// spdlog::error("failed to open2: {}: {}", rtsp_url, av_err2str(ret));
// if(rc) *rc = ret;
// return nullptr;
// }
//pAVFormatRemux->interrupt_callback = int_cb;
ret = avformat_write_header(pAVFormatRemux, &pOptsRemux);
if(ret < 0) {
spdlog::error("failed to writeheader: {}", rtsp_url);
if(rc) *rc = ret;
return nullptr;
}
spdlog::info("success connect to darwin {}", rtsp_url);
av_dict_free(&pOptsRemux);
......@@ -226,7 +123,7 @@ AVFormatContext *rtsp_init(string rtsp_url, int codec, int height, int width, in
int write_packet(packet_processor_t *processor, char *data, int len)
{
AVFormatContext *ctx = nullptr;
if(processor == nullptr||(ctx = processor->pAvCtx) == nullptr || ctx->streams == nullptr || *ctx->streams == nullptr) {
if(processor == nullptr||(ctx = processor->pAvCtx) == nullptr || ctx->streams == nullptr || *(ctx->streams) == nullptr) {
spdlog::error("no avformatctx or stream available for write");
return -1;
}
......@@ -240,7 +137,15 @@ int write_packet(packet_processor_t *processor, char *data, int len)
pkt.data = (uint8_t *)data;
pkt.size = len;
AVRational time_base;
time_base.den = processor->vpara.fps; // this is fps
char *fps = getenv("FPS");
if(processor->hdr.vpara.fps > 0 && processor->hdr.vpara.fps < 100){
time_base.den = processor->hdr.vpara.fps;
}else{
time_base.den = 15;
}
if(fps){
time_base.den = atoi(fps);
}
time_base.num = 1;
pkt.dts = av_rescale_q_rnd(++processor->pts, time_base, out_stream->time_base, (AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
pkt.pts = av_rescale_q(processor->pts, time_base, out_stream->time_base);
......@@ -249,327 +154,107 @@ int write_packet(packet_processor_t *processor, char *data, int len)
ret = av_write_frame(ctx, &pkt);
if (ret < 0) {
fprintf(stderr, "Error muxing packet\n");
spdlog::error("Error muxing packet: {}", av_err2str(ret));
}
//av_packet_unref(&pkt);
return ret;
}
void on_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf)
{
packet_client *pclient = (packet_client *)client;
char *data = buf->base;
lock_guard<mutex> lk(pclient->processor.mut);
if (nread > 0) {
evpacket_ptr_t pkt = (evpacket_ptr_t)data;
if (nread >= 2 && pkt->meta.magic[0] == (char)0xBE && pkt->meta.magic[1] == (char)0xEF) {
// new packet
pclient->processor.size = 0;
pclient->processor.state = 1; // rcv header
}
while (pclient->processor.state != 0 && nread > 0) {
switch (pclient->processor.state) {
// rcv header
case 1: {
if (nread + pclient->processor.size >= sizeof(evpacket_t)) {
// whole header ready
auto delta = sizeof(evpacket_t) - pclient->processor.size;
memcpy(&pclient->processor.hdr + pclient->processor.size, data, delta);
/// NOTES: since both ends use little-endian, we don't apply ntohl, ntohs to save clocks
//debugHex((char*)&pclient->processor.hdr, 64);
/// check crc first
uint16_t crc = pclient->processor.hdr.meta.crc;
pclient->processor.hdr.meta.crc = 0;
uint16_t crc_ = crc16((unsigned char*)&pclient->processor.hdr, sizeof(evpacket_t));
if (crc!=crc_||pclient->processor.hdr.meta.magic[0] != (char)0xBE || pclient->processor.hdr.meta.magic[1] != (char)0xEF || pclient->processor.hdr.length == 0) {
printf("invalid magic/len/cid. hdr: %02hhX%02hhX, len:%d, cid:%lu, sid:%lu. crc:%04hhX, crcc:%04hhX. BUG!!!\n", pclient->processor.hdr.meta.magic[0], pclient->processor.hdr.meta.magic[1], pclient->processor.hdr.length,
pclient->processor.hdr.meta.packet_id,pclient->processor.packetId,crc, crc_);
pclient->processor.size = 0;
pclient->processor.state = 1;
nread = 0;
continue;
}
else {
// printf("new packet: %02hhX%02hhX, len: %u, cid:%lu, sid:%lu\n", pclient->processor.hdr.meta.magic[0], pclient->processor.hdr.meta.magic[1], pclient->processor.hdr.length, pclient->processor.hdr.meta.packet_id,pclient->processor.packetId);
}
/// check packetid
if(pclient->processor.packetId == 0) {
pclient->processor.packetId = pclient->processor.hdr.meta.packet_id;
}
else {
/// ignore too large packets
if(/*pclient->processor.hdr.meta.packet_id <= pclient->processor.packetId ||*/ pclient->processor.hdr.length >= MAX_FRAME_SIZE) {
spdlog::error("invalid packetId or len, ignored. len: {}, server:{}, client:{}", pclient->processor.hdr.length, pclient->processor.packetId, pclient->processor.hdr.meta.packet_id );
pclient->processor.size = 0;
pclient->processor.state = 1;
nread = 0;
continue;
}
}
pclient->processor.buf = (char *)malloc(pclient->processor.hdr.length);
if (pclient->processor.buf == nullptr) {
spdlog::error("memroy issues {}:{}", __FILE__, __LINE__);
pclient->processor.size = 0;
pclient->processor.state = 1;
nread = 0;
continue;
}
/// reconnect every 30s
if (pclient->processor.pAvCtx == nullptr && (pclient->processor.failedCnt++ % MAX_RETRY_PUSH_INTV == 0)) {
spdlog::info("device sn: {}", pclient->processor.hdr.meta.sn);
int rc = 0;
pclient->processor.pAvCtx = rtsp_init(darwinUrl + string(pclient->processor.hdr.meta.sn),
AV_CODEC_ID_H264, pclient->processor.hdr.vpara.res.height, pclient->processor.hdr.vpara.res.width, &rc);
if(pclient->processor.pAvCtx == nullptr) {
spdlog::error("failed connect to rtsp server {}: {}", darwinUrl, av_err2str(rc));
}
else {
pclient->processor.failedCnt = 0;
string sn_ = string(pclient->processor.hdr.meta.sn);
if(devConnMap.count(sn_) == 0) {
devConnMap[sn_] = pclient;
}
}
}
/// copy vpara
if(pclient->processor.vpara.fps == 0) {
pclient->processor.vpara = pclient->processor.hdr.vpara;
}
nread -= delta;
data += delta;
pclient->processor.size = 0;
pclient->processor.state = 2; // header ready. rcv body
}
else {
// small header
memcpy(&pclient->processor.hdr + pclient->processor.size, data, nread);
//data += nread;
pclient->processor.size += nread;
nread = 0;
if (pclient->processor.size >= 2 && (pclient->processor.hdr.meta.magic[0] != (char)0xBE || pclient->processor.hdr.meta.magic[1] != (char)0xEF)) {
printf("invalid packet header: %02hhX%02hhX. BUG!!! %s:%d\n", pclient->processor.hdr.meta.magic[0], pclient->processor.hdr.meta.magic[1], __FILE__, __LINE__);
pclient->processor.size = 0;
pclient->processor.state = 1;
nread = 0;
continue;
}
// state remains in rcv header
spdlog::debug("small header. size:{}", pclient->processor.size);
}
}
break;
case 2: {
// full body
if (nread + pclient->processor.size >= pclient->processor.hdr.length) {
auto delta = pclient->processor.hdr.length - pclient->processor.size;
memcpy(pclient->processor.buf + pclient->processor.size, data, delta);
nread -= delta;
data += delta;
// TODO: handle body dispatch
// spdlog::debug("TODO: full body got. left for next header: {}", nread);
if (pclient->processor.pAvCtx != nullptr) {
auto ret = write_packet(&pclient->processor, pclient->processor.buf, pclient->processor.hdr.length);
pclient->processor.packetId = pclient->processor.hdr.meta.packet_id;
if (ret < 0) {
spdlog::error("failed to send packet");
// reset hand reconnect
if(pclient->processor.pAvCtx && pclient->processor.pAvCtx->pb) {
avio_closep(&pclient->processor.pAvCtx->pb);
}
if(pclient->processor.pAvCtx) {
avformat_free_context(pclient->processor.pAvCtx);
}
pclient->processor.pAvCtx = nullptr;
}
}
/// clear instance-local buffer
free(pclient->processor.buf);
pclient->processor.buf = nullptr;
// /// response two bytes
// write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t));
// char *res= (char*)malloc(2);
// res[0] = (char)0xDE;
// res[1] = (char)0xAD;
// req->buf = uv_buf_init(res, 2);
// uv_write((uv_write_t*) req, client, &req->buf, 1, on_written);
// next
pclient->processor.state = 1;
pclient->processor.size = 0;
}
else {
//TODO: handle buf. <<NEED BUGFIX HERE>>
if (pclient->processor.buf == nullptr) {
spdlog::error("error state {} {}", __FILE__, __LINE__);
exit(1);
}
memcpy(pclient->processor.buf + pclient->processor.size, data, nread);
pclient->processor.size += nread;
nread = 0;
//spdlog::debug("small body. size:{}", pclient->processor.size);
}
}
int setupRouter(void **ctx, void **s, string addr, int rcvQS=0){
int ret = 0;
*ctx = zmq_ctx_new();
*s = zmq_socket(*ctx, ZMQ_ROUTER);
ret = 1;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE, &ret, sizeof (ret));
ret = 5;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_IDLE, &ret, sizeof (ret));
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret));
ret = 2;
zmq_setsockopt(*s, ZMQ_TCP_KEEPALIVE_CNT, &ret, sizeof (ret));
if(rcvQS !=0) {
zmq_setsockopt(*s, ZMQ_RCVHWM, &rcvQS, sizeof(rcvQS));
}
ret = zmq_bind(*s, addr.c_str());
if(ret < 0) {
spdlog::debug("failed to bind zmq at {} for reason: {}, retrying load configuration...", addr, zmq_strerror(zmq_errno()));
}
return ret;
}
int z_recv_multiple(void *s, vector<uint8_t> &buf, vector<int>&frameIdx) {
int64_t more = 1;
size_t more_size = sizeof(more);
int ret = 0;
int cnt = 0;
while(more > 0) {
cnt++;
zmq_msg_t msg;
ret = zmq_msg_init(&msg);
if(ret < 0) {
spdlog::debug("failed to receive multiple msg on zmq_msg_init: {}", zmq_strerror(zmq_errno()));
break;
case 0: {
// TODO: none
spdlog::warn("should be here state=0");
// force to waiting for header
pclient->processor.state = 1;
nread = 0;
}
}
ret = zmq_recvmsg(s, &msg, 0);
if(ret < 0) {
spdlog::debug("z_recv_multiple: {}", zmq_strerror(zmq_errno()));
break;
default: {
spdlog::warn("should be here state=none");
// TODO: close and free resources
uv_close((uv_handle_t *)client, on_closed);
}
}
buf.insert(buf.end(), (uint8_t*)zmq_msg_data(&msg), (uint8_t*)zmq_msg_data(&msg)+ret);
// spdlog::info("buff size: {}", buf.size());
frameIdx.push_back(buf.size());
zmq_msg_close(&msg);
ret = zmq_getsockopt(s, ZMQ_RCVMORE, &more, &more_size);
if(ret < 0) {
spdlog::debug("z_recv_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
}
}
if (nread < 0) {
if (nread != UV_EOF) {
spdlog::error("Read error {}, closing", uv_err_name(nread));
}
else {
spdlog::error("read error {}", uv_err_name(nread));
uv_close((uv_handle_t *)client, on_closed);
}
if(ret < 0) {
spdlog::error("failed to recv msg: {}", ret < 0? zmq_strerror(ret): "invalid frames");
return -1;
}
free(buf->base);
return 0;
}
void on_connect(uv_stream_t *server, int status)
{
if (status < 0) {
spdlog::error("New connection error {}", uv_strerror(status));
packet_processor_t globalProcess = {0};
void dispatch(string sn, zmq_msg_t &msg){
evpacket_t *pkt = (evpacket_t *)zmq_msg_data(&msg);
globalProcess.packetId++;
//debugHex((char*)zmq_msg_data(&msg), 64);
auto crc = pkt->meta.crc;
pkt->meta.crc = 0;
auto crc_ = crc16((unsigned char*)pkt, sizeof(evpacket_t));
if(crc != crc_){
spdlog::error("invalid crc: {}", pkt->meta.packet_id);
return;
}
spdlog::info("client connected");
packet_client *client = new packet_client();
client->processor.handle = &client->handle;
uv_tcp_init(loop, (uv_tcp_t *)client);
if (uv_accept(server, (uv_stream_t *)client) == 0) {
uv_read_start((uv_stream_t *)client, alloc_buffer, on_read);
}
else {
spdlog::error("failed to accept");
uv_close((uv_handle_t *)client, on_closed);
}
}
/// NOTES: since libuv async-api already implements multi-thread internally, the following
/// 'bottom-half' implementaion class is never used.
// class RtspPusher {
// private:
// map<string, devinfo_t *> devMap;
// map<string, Notifier *> devNoti;
// int num_clients;
// string darwin_addr;
// protected:
// public:
// RtspPusher()
// {
// darwin_addr = "rtsp://evcloud.ilabservice.cloud:554/";
// }
// RtspPusher(string darwin_addr):darwin_addr(darwin_addr) {}
// ~RtspPusher()
// {
// /// NOTES: ATTENTION!!! N-phase shutdown
// for(auto &[k, v]:devNoti) {
// lock_guard<mutex> lk(*v->mut);
// if(devMap.count(k) != 0) {
// delete devMap[k];
// devMap.erase(k);
// }
// }
// for(auto &[k, v]:devNoti) {
// lock_guard<mutex> lk(*v->mut);
// delete v->cond;
// v->cond = nullptr;
// }
// for(auto &[k, v]:devNoti) {
// delete v->mut;
// delete v;
// devNoti.erase(k);
// }
// }
// int send(string devsn, dataque_frame_t && item)
// {
// int ret = 0;
// AVFormatContext *ctx = nullptr;
// if(devMap.count(devsn) == 0) {
// string url = darwin_addr + devsn;
// ctx = rtsp_init(url, item.codec, item.height, item.width, &ret);
// if(ctx == nullptr) {
// spdlog::error("failed to connect rtsp: {}", av_err2str(ret));
// return -1;
// }
// queue<dataque_frame_t> * qu = new queue<dataque_frame_t>();
// devMap.insert({devsn, new devinfo_t{devsn, qu, ctx}});
// //
// mutex *mut = new mutex();
// condition_variable *cv = new condition_variable();
// devNoti.insert({devsn, new Notifier({mut, cv})});
// }
// if(devMap.count(devsn) == 0 || devNoti.count(devsn) == 0) {
// spdlog::error("failed to get resources map for: {}, maybe stopped", devsn);
// return -2;
// }
// devinfo_t *info = devMap[devsn];
// Notifier *noti = devNoti[devsn];
// lock_guard<mutex> lock(*noti->mut);
// /// double check
// if(devMap.count(devsn) == 0 || devNoti.count(devsn) == 0||devNoti[devsn]->cond == nullptr) {
// spdlog::error("failed to get resources map for: {}, maybe stopped", devsn);
// return -2;
// }
// /// push new frame
// info->que->push(std::move(item));
// noti->cond->notify_one();
// }
// int run(thread &th){
// th = thread([]{
// // TODO: some multi-worker impl here.
// });
// if(th.joinable()){
// th.detach();
// }else{
// spdlog::error("failed to create or join worker thread");
// }
// }
// };
static bool bInSegv = false;
void clean_up(int sig)
{
if(sig == SIGSEGV && bInSegv == true) {
exit(1);
if(globalProcess.pAvCtx == nullptr && globalProcess.packetId %15*10 == 0){
int rc = 0;
globalProcess.pAvCtx = rtsp_init(darwinUrl + sn, 0, 1080, 1920, &rc);
if(globalProcess.pAvCtx == nullptr || rc < 0) {
spdlog::error("failed to connect to darwin: {}", darwinUrl);
}
memcpy(&globalProcess.hdr, zmq_msg_data(&msg), sizeof(globalProcess.hdr));
}
// try to cleanup before exit. (most concerned on socket closing for the edge node)
int ret = write_packet(&globalProcess, (char*)zmq_msg_data(&msg) + sizeof(evpacket_t), zmq_msg_size(&msg) - sizeof(evpacket_t) );
if(ret < 0) {
spdlog::error("failed to send packet!");
if(globalProcess.pAvCtx && globalProcess.pAvCtx->pb) {
avio_closep(&globalProcess.pAvCtx->pb);
}
if(globalProcess.pAvCtx) {
avformat_free_context(globalProcess.pAvCtx);
}
globalProcess.pAvCtx = nullptr;
}
}
int main()
......@@ -580,17 +265,6 @@ int main()
darwinUrl = string(darwinUrl_);
}
/// bind addr
auto bindAddr_ = getenv("HOST");
if(bindAddr_) {
bindAddr = bindAddr_;
}
auto bindPort_ = getenv("PORT");
if(bindPort_) {
bindPort = string(bindPort_);
}
auto logLevel_ = getenv("LOG_LEVEL");
if(logLevel_) {
spdlog::set_level(spdlog::level::from_str(logLevel_));
......@@ -600,17 +274,57 @@ int main()
spdlog::set_level(spdlog::level::debug);
}
loop = uv_default_loop();
uv_tcp_t server;
uv_tcp_init(loop, &server);
uv_ip4_addr(bindAddr.c_str(), stoi(bindPort), &addr);
uv_tcp_bind(&server, (const struct sockaddr *)&addr, 0);
spdlog::info("sizeof pkt header {}, sizeof tv {}, bigendian: {}", sizeof(evpacket_t), sizeof(timeval), is_big_endian());
int r = uv_listen((uv_stream_t *)&server, DEFAULT_BACKLOG, on_connect);
if (r) {
spdlog::error("Listen error {}", uv_strerror(r));
return 1;
string addr = "tcp://0.0.0.0:7123";
void *pRouterCtx = nullptr, *pRouter = nullptr/*, *pDealerCtx = nullptr, *pDealer = nullptr*/;
setupRouter(&pRouterCtx, &pRouter, addr, 10);
while(1){
int64_t more = 1;
size_t more_size = sizeof(more);
int ret = 0;
zmq_msg_t msg;
ret = zmq_msg_init(&msg);
if(ret < 0) {
spdlog::debug("failed to receive multiple msg on zmq_msg_init: {}", zmq_strerror(zmq_errno()));
break;
}
ret = zmq_msg_recv(&msg, pRouter, 0);
if(ret < 0) {
spdlog::debug("z_recv_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
string sn((char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
zmq_msg_close(&msg);
// ignore the first frame
ret = zmq_getsockopt(pRouter, ZMQ_RCVMORE, &more, &more_size);
if(ret < 0) {
spdlog::debug("z_recv_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
if(more > 0) {
ret = zmq_msg_init(&msg);
ret |= zmq_msg_recv(&msg, pRouter, 0);
if(ret < 0) {
spdlog::debug("z_recv_multiple: {}", zmq_strerror(zmq_errno()));
break;
}
dispatch(sn, msg);
}else{
spdlog::error("invalid packet fmt");
}
zmq_msg_close(&msg);
if(ret < 0) {
spdlog::error("failed to recv msg: {}", ret < 0? zmq_strerror(ret): "invalid frames");
return -1;
}
}
spdlog::info("started video gateway service from {}:{} to {}", bindAddr, bindPort, darwinUrl);
return uv_run(loop, UV_RUN_DEFAULT);
this_thread::sleep_for(2s);
zmq_close(pRouter);
zmq_ctx_destroy(pRouterCtx);
return 0;
}
libzmq @ a84ffa12
Subproject commit 2aa87c94cc8be57a878e2e3c6a0551e8fdf6c886
Subproject commit a84ffa12b2eb3569ced199660bac5ad128bff1f0
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论