提交 ead11e58 authored 作者: blu's avatar blu

video upload & motion detection

上级 0b173acb
...@@ -80,6 +80,6 @@ payload: { ...@@ -80,6 +80,6 @@ payload: {
```json ```json
{ "time":1567669674, "cmd":"upload_video", "rid":"001231554A20", "data":{ "start":1590542800, "end":1590542900, "type":6 } } { "time":1567669674, "cmd":"upload_video", "rid":"001231554A20", "data":{ "start":1590716600, "end":1590716700, "type":6 } }
``` ```
...@@ -45,7 +45,7 @@ SN := {UINT8*SN_LEN} # 摄像头SN, 长度为 ...@@ -45,7 +45,7 @@ SN := {UINT8*SN_LEN} # 摄像头SN, 长度为
START_TS := TS # 事件开始时间 START_TS := TS # 事件开始时间
END_TS := TS # 时间结束时间 END_TS := TS # 时间结束时间
TYPE := UINT8 # 0 - 未定义, 1 - AI人形, 2 - AI人脸, 3 - 移动侦测, TYPE := UINT8 # 0 - 未定义, 1 - AI人形, 2 - AI人脸, 3 - 移动侦测,
# 4 - 覆盖, 5 - 覆盖解除, 6 - 视频获取 # 4 - 覆盖, 5 - 覆盖解除, 6 - 视频获取, 7 - AI和移动综合
# 其他 - 由发起请求的微服务自定义 # 其他 - 由发起请求的微服务自定义
FILE_META := FILE_TS,SIZE FILE_META := FILE_TS,SIZE
FILE_TS := TS # 文件开始时间 FILE_TS := TS # 文件开始时间
......
...@@ -203,12 +203,18 @@ ntp_LIB_DEPENDS:STATIC= ...@@ -203,12 +203,18 @@ ntp_LIB_DEPENDS:STATIC=
//Dependencies for target //Dependencies for target
ptz_LIB_DEPENDS:STATIC= ptz_LIB_DEPENDS:STATIC=
//Dependencies for target
record_upload_LIB_DEPENDS:STATIC=
//Dependencies for target //Dependencies for target
smart_LIB_DEPENDS:STATIC= smart_LIB_DEPENDS:STATIC=
//Dependencies for target //Dependencies for target
tcp_client_LIB_DEPENDS:STATIC= tcp_client_LIB_DEPENDS:STATIC=
//Dependencies for target
upload_LIB_DEPENDS:STATIC=
//Dependencies for target //Dependencies for target
util_LIB_DEPENDS:STATIC= util_LIB_DEPENDS:STATIC=
......
...@@ -24,7 +24,8 @@ add_library(mqtthelper STATIC ../include/mqtt_helper.cc) ...@@ -24,7 +24,8 @@ add_library(mqtthelper STATIC ../include/mqtt_helper.cc)
add_library(motion STATIC motion.cc) add_library(motion STATIC motion.cc)
add_library(smart STATIC smart.cc) add_library(smart STATIC smart.cc)
add_library(ptz STATIC ptz.cc) add_library(ptz STATIC ptz.cc)
add_library(record_upload STATIC record_upload.cc)
add_executable(evcamera main.cc) add_executable(evcamera main.cc)
target_link_libraries(evcamera PUBLIC ntp paho-mqtt3a mqtthelper tcp_client evutils smart motion ptz zmq ${COMMON_LIBS} ${XM_LIBS}) target_link_libraries(evcamera PUBLIC ntp paho-mqtt3a mqtthelper tcp_client evutils smart motion record_upload ptz zmq ${COMMON_LIBS} ${XM_LIBS})
add_executable(test_fsm test_fsm.cc) add_executable(test_fsm test_fsm.cc)
\ No newline at end of file
...@@ -4,6 +4,18 @@ using namespace std; ...@@ -4,6 +4,18 @@ using namespace std;
using namespace jsoncons; using namespace jsoncons;
using namespace jsoncons::literals; using namespace jsoncons::literals;
unsigned short crc16(const unsigned char* data_p, unsigned char length){
unsigned char x;
unsigned short crc = 0xFFFF;
while (length--){
x = crc >> 8 ^ *data_p++;
x ^= x>>4;
crc = (crc << 8) ^ ((unsigned short)(x << 12)) ^ ((unsigned short)(x <<5)) ^ ((unsigned short)x);
}
return crc;
}
namespace evutils { namespace evutils {
const char consts::version[] = "EVC20200429"; const char consts::version[] = "EVC20200429";
const string consts::kMsgCmd = "cmd"; const string consts::kMsgCmd = "cmd";
...@@ -55,6 +67,7 @@ const string consts::pub_topic_report = "/evcamera/v1.0/report"; ...@@ -55,6 +67,7 @@ const string consts::pub_topic_report = "/evcamera/v1.0/report";
const string consts::mqtt_url = "tcp://admin:vJ3zHqWrHbrqxVMT@evcloudsvc.ilabservice.cloud:11883"; const string consts::mqtt_url = "tcp://admin:vJ3zHqWrHbrqxVMT@evcloudsvc.ilabservice.cloud:11883";
const string consts::vgw_addr = "tcp://evcloudsvc.ilabservice.cloud:7123"; const string consts::vgw_addr = "tcp://evcloudsvc.ilabservice.cloud:7123";
const string consts::upload_addr = "tcp://evcloudsvc.ilabservice.cloud:10009"; const string consts::upload_addr = "tcp://evcloudsvc.ilabservice.cloud:10009";
const string consts::strPubUrl = "inproc://frame";
consts &consts::self() consts &consts::self()
{ {
......
...@@ -15,6 +15,19 @@ ...@@ -15,6 +15,19 @@
#include <fmt/format.h> #include <fmt/format.h>
#include <jsoncons/json.hpp> #include <jsoncons/json.hpp>
struct StrException : public std::exception {
std::string s;
StrException(std::string ss) : s(ss) {}
~StrException() throw () {} // Updated
const char* what() const throw()
{
return s.c_str();
}
};
unsigned short crc16(const unsigned char* data_p, unsigned char length);
namespace evutils { namespace evutils {
using namespace std; using namespace std;
using namespace jsoncons; using namespace jsoncons;
...@@ -70,6 +83,7 @@ typedef struct consts{ ...@@ -70,6 +83,7 @@ typedef struct consts{
static const string mqtt_url; static const string mqtt_url;
static const string vgw_addr; static const string vgw_addr;
static const string upload_addr; static const string upload_addr;
static const string strPubUrl;
static consts& self(); static consts& self();
}consts; }consts;
...@@ -167,7 +181,7 @@ public: ...@@ -167,7 +181,7 @@ public:
} }
} }
set<TN> findByRange(TN tss, TN tse, TN & offsetS, TN &offsetE) set<TN> findByRange(TN tss, TN tse, TN & offsetS, TN &offsetE, int lagSecs, int *status = nullptr)
{ {
set<TN> ret; set<TN> ret;
lock_guard<mutex> lg(mut); lock_guard<mutex> lg(mut);
...@@ -179,8 +193,18 @@ public: ...@@ -179,8 +193,18 @@ public:
auto _it = list_.end(); auto _it = list_.end();
TN end = *(--_it); TN end = *(--_it);
if(tse < first||tss > end) { if(tse < first||tss > end || tse > end || end - tse < lagSecs*1000) {
int s = 0;
spdlog::info("range requested ({}, {}) is not in range existed ({}, {}).", tss, tse, first, end); spdlog::info("range requested ({}, {}) is not in range existed ({}, {}).", tss, tse, first, end);
if(tse < first) {
s = -1;
}else{
s = 1;
}
if(status){
*status = s;
}
return ret; return ret;
} }
...@@ -218,6 +242,9 @@ public: ...@@ -218,6 +242,9 @@ public:
if(last != 0) { if(last != 0) {
offsetE = last - tse; offsetE = last - tse;
} }
if(status){
*status = 0;
}
} }
return ret; return ret;
......
...@@ -13,6 +13,7 @@ typedef struct ev_region_t { ...@@ -13,6 +13,7 @@ typedef struct ev_region_t {
} ev_region_t; } ev_region_t;
typedef struct ev_module_config_t { typedef struct ev_module_config_t {
MqttHelper *pClient; MqttHelper *pClient;
void **pPubCtx;
struct module{ struct module{
struct record{ struct record{
int interval; // in seconds int interval; // in seconds
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "internal_types.h" #include "internal_types.h"
#include "ptz.h" #include "ptz.h"
#include <queue> #include <queue>
#include "record_upload.h"
using namespace std; using namespace std;
using namespace jsoncons; using namespace jsoncons;
...@@ -68,7 +69,6 @@ MqttHelper *gMqttClient = nullptr; ...@@ -68,7 +69,6 @@ MqttHelper *gMqttClient = nullptr;
// zmq // zmq
void *pPubCtx = nullptr; void *pPubCtx = nullptr;
void *pPub = nullptr; void *pPub = nullptr;
const string strPubUrl = "inproc://frame";
void *pSub = nullptr; void *pSub = nullptr;
void *pDealerCtx = nullptr; void *pDealerCtx = nullptr;
void *pDealer = nullptr; void *pDealer = nullptr;
...@@ -85,12 +85,14 @@ char *port = portArr; ...@@ -85,12 +85,14 @@ char *port = portArr;
// //
OrderedList<int64_t> *gRecFilesList = nullptr; OrderedList<int64_t> *gRecFilesList = nullptr;
typedef struct upload_item_t {
int64_t tss; namespace _upvariable_{
int64_t tse; mutex mut;
int type; condition_variable cv;
set<int64_t> slices; queue<upload_item_t> que;
} upload_item_t; };
thread_upload_args_t gUploadArgs{&_upvariable_::mut, &_upvariable_::cv, &_upvariable_::que, &gJsonConfig, dev_sn, &gRecFilesList};
// //
string strMaQuePath = "/mnt/sd/Config/"; string strMaQuePath = "/mnt/sd/Config/";
...@@ -102,11 +104,7 @@ vector<string> get_sdcard_resered_dirs() ...@@ -102,11 +104,7 @@ vector<string> get_sdcard_resered_dirs()
return {consts::recFilePath, strMaQuePath}; return {consts::recFilePath, strMaQuePath};
} }
// ev_module_config_t gConfigRecord;
// ev_module_config_t gConfigAI;
// ev_module_config_t gConfigMotion;
ev_module_config_t gConfigSystem; ev_module_config_t gConfigSystem;
string gMqttAddr, gVgwAddr, gUploadAddr;
enum EV_MSG_ERROR_CODE { enum EV_MSG_ERROR_CODE {
EV_MSG_ERROR_NONE, EV_MSG_ERROR_NONE,
...@@ -305,9 +303,9 @@ void frame_send_entry(void *args) ...@@ -305,9 +303,9 @@ void frame_send_entry(void *args)
continue; continue;
} }
ret = zmq_connect(pSub, strPubUrl.c_str()); ret = zmq_connect(pSub, consts::strPubUrl.c_str());
if(ret != 0) { if(ret != 0) {
spdlog::error("failed connect frame pub: {}", strPubUrl); spdlog::error("failed connect frame pub: {}", consts::strPubUrl);
continue; continue;
} }
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0); ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
...@@ -347,144 +345,6 @@ void frame_send_entry(void *args) ...@@ -347,144 +345,6 @@ void frame_send_entry(void *args)
} }
} }
void print_ts_files(OrderedList<int64_t> &_list)
{
spdlog::info("print file ts {}:", _list.items().size());
int cnt = 0;
printf("\t");
for(auto &k:_list.items()) {
cnt++;
printf("%lld, ", k);
if(cnt %6 == 0) {
printf("\n\t");
}
}
spdlog::info("\nlist_videos done!");
}
void load_sd_files(OrderedList<int64_t> &_list)
{
auto now = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
try {
string fname, baseName;
for (const auto & entry : fs::directory_iterator(consts::recFilePath)) {
fname = entry.path().c_str();
if(entry.file_size() == 0 || !entry.is_regular_file()||entry.path().extension() != ".h264") {
spdlog::warn("load_sd_files skipped {} (empty/directory/!h264)", entry.path().c_str());
fs::remove(fname);
continue;
}
int64_t ts = stoll(entry.path().stem().string());
if(ts < consts::TS_2020) {
/// TODO: files records when offline with no valid time got
fs::remove(entry.path());
spdlog::warn("remove records having invalid timestamp: {}", ts);
continue;
}
_list.insert(ts);
}
}
catch(exception &e) {
spdlog::error("{}:{} load_sd_files exception : {}", __FILE__, __LINE__, e.what());
}
}
void remove_ts_file(int64_t ts)
{
string fname = consts::recFilePath + to_string(ts) + ".h264";
fs::remove(fname);
}
void record_video_entry(ev_module_config_t *pArgs)
{
int ret = 0;
// this thread will never exit
while(1) {
if(!is_sdcard_avail()) {
spdlog::error("no sd card valial. can't record local videos");
this_thread::sleep_for(10s);
continue;
}
uint64_t total = 0, avail = 0;
if(!get_sdcard_megabytes(total, avail)) {
spdlog::error("failed to get sd card size");
continue;
}
spdlog::info("sd card: {}MB total, {}MB free", total, avail);
// calc num of slices
// reserve 0.5GB space
// multipled by 1.5 because videos take less space in midnight (variable bitrate)
ssize_t maxSlices = (ssize_t)((total - 512)/10*1.5);
gRecFilesList = new OrderedList<int64_t>(maxSlices, remove_ts_file);
// load existing videos
load_sd_files(*gRecFilesList);
// setup sub
void *pSub = zmq_socket(pPubCtx, ZMQ_SUB);
// ret = 1;
// zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE, &ret, sizeof (ret));
// ret = 2;
// zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE_IDLE, &ret, sizeof (ret));
// zmq_setsockopt(pSub, ZMQ_TCP_KEEPALIVE_INTVL, &ret, sizeof (ret));
ret = zmq_connect(pSub, strPubUrl.c_str());
if(ret != 0) {
spdlog::error("failed connect frame pub: {}", strPubUrl);
exit(1);
}
ret = zmq_setsockopt(pSub, ZMQ_SUBSCRIBE, "", 0);
spdlog::info("zmq sub successed (record)");
zmq_msg_t msg;
int64_t preTs = 0;
int64_t nowTs = 0;
uint32_t cnt = 0;
fstream * fp = nullptr;
while (1) {
zmq_msg_init(&msg);
ret = zmq_msg_recv(&msg, pSub, 0);
if(ret < 0) {
spdlog::error("failed to recv zmq msg: {}", zmq_strerror(ret));
continue;
}
ssize_t sz = zmq_msg_size(&msg);
char *ptr = (char*)zmq_msg_data(&msg);
/// we calc timestamps about every 2s
if(cnt % 30 == 0) {
nowTs = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
}
if(nowTs - preTs > pArgs->module.record.interval *1000) {
if(fp != nullptr) {
if(fp->is_open())
fp->close();
delete fp;
}
fp = new fstream(consts::recFilePath+to_string(nowTs) + ".h264", std::ios::out|std::ios::binary);
if(!fp->is_open()) {
spdlog::error("failed to open record file");
delete fp;
fp = nullptr;
}
else {
gRecFilesList->insert(nowTs);
}
preTs = nowTs;
}
if(fp != nullptr) {
fp->write(ptr, sz);
}
cnt++;
zmq_msg_close(&msg);
}
zmq_close (pSub);
/// DO NOT! do this
//zmq_ctx_destroy(pPubCtx);
}
}
/// verify request message /// verify request message
string verify_request(json &js) string verify_request(json &js)
{ {
...@@ -565,26 +425,26 @@ string apply_config(json &data) ...@@ -565,26 +425,26 @@ string apply_config(json &data)
// vgw // vgw
if(gJsonConfig.contains(consts::kMsgConfigVgw)) { if(gJsonConfig.contains(consts::kMsgConfigVgw)) {
gVgwAddr = gJsonConfig[consts::kMsgConfigVgw].as<string>(); // gVgwAddr = gJsonConfig[consts::kMsgConfigVgw].as<string>();
} }
else { else {
gVgwAddr = consts::vgw_addr; gJsonConfig[consts::kMsgConfigVgw] = consts::vgw_addr;
} }
// mqtt // mqtt
if(gJsonConfig.contains(consts::kMsgConfigMqtt)) { if(gJsonConfig.contains(consts::kMsgConfigMqtt)) {
gMqttAddr = gJsonConfig[consts::kMsgConfigMqtt].as<string>(); // gMqttAddr = gJsonConfig[consts::kMsgConfigMqtt].as<string>();
} }
else { else {
gMqttAddr = consts::mqtt_url; gJsonConfig[consts::kMsgConfigMqtt] = consts::mqtt_url;
} }
// uploader // uploader
if(gJsonConfig.contains(consts::kMsgConfigUpload)) { if(gJsonConfig.contains(consts::kMsgConfigUpload)) {
gUploadAddr = gJsonConfig[consts::kMsgConfigUpload].as<string>(); //gUploadAddr = gJsonConfig[consts::kMsgConfigUpload].as<string>();
} }
else { else {
gUploadAddr = consts::upload_addr; gJsonConfig[consts::kMsgConfigUpload] = consts::upload_addr;
} }
if(gJsonConfig.contains(consts::kMsgConfigFeatures)) { if(gJsonConfig.contains(consts::kMsgConfigFeatures)) {
...@@ -679,158 +539,6 @@ string apply_config(json &data) ...@@ -679,158 +539,6 @@ string apply_config(json &data)
} }
return rc; return rc;
} }
///
#define MAGIC_HEAD0 ((uint8_t)0xBE)
#define MAGIC_HEAD1 ((uint8_t)0xEF)
#define MATIC_TAIL0 ((uint8_t)0xFF)
#define MAGIC_TAIL1 ((uint8_t)0xFF)
#define MAGIC_TAIL2 ((uint8_t)0xDE)
#define MAGIC_TAIL3 ((uint8_t)0xAD)
#define EV_UPLOAD_MAX_BUF_SIZE 1200
string raw_send(int s, const void *buf, size_t len){
string rc;
ssize_t sent = 0;
while(len > 0) {
sent = ::send(s, buf, len, 0);
if(sent <= 0){
break;
}
len -= sent;
}
if(len > 0 || sent<=0){
rc = fmt::format("failed to send: {}", strerror(errno));
}
return rc;
}
///
string upload_video(int64_t tss, int64_t tse, set<int64_t> slices, int8_t type){
string rc;
uint8_t * buf = (uint8_t *)malloc(EV_UPLOAD_MAX_BUF_SIZE);
int s = 0;
try{
auto up = gJsonConfig[consts::kMsgConfigUpload].as<string>();
auto uri = httplib::Uri::Parse(up);
if(uri.Host.empty() || uri.Port.empty()){
rc = fmt::format("invalid upload url configure: {}", up);
}else{
auto i = raw_connect(uri.Host, uri.Port, &s);
if(i < 0 || s == 0) {
rc = fmt::format("failed to connect to {}", up);
}else{
auto metaLen = 2 + 12 + 8 + 8 + 1 + 4 + (8 + 4)*slices.size();
auto headLen = 2 + metaLen + 2;
if(headLen > EV_UPLOAD_MAX_BUF_SIZE) {
rc = fmt::format("too many files to upload, ignored: {}", slices.size());
}else{
vector<uint32_t> fsize;
for(auto &k:slices){
struct stat st;
if(stat((string(consts::recFilePath) + to_string(k) + ".h264").c_str(), &st) < 0){
rc = fmt::format("failed to get file {}.h264: {}", k, strerror(errno));
break;
}else{
fsize.push_back(st.st_size);
}
}
if(!rc.empty()){
throw StrException(rc);
}
int totalLen = 0;
buf[0] = MAGIC_HEAD0; // 1, 0
buf[1] = MAGIC_HEAD1; // 1, 1
totalLen += 2;
*((uint16_t *)(buf+totalLen)) = 12; // 2, 2
totalLen += 2;
memcpy(buf+totalLen, dev_sn, 12); // 12, 4
totalLen += 12;
*((uint64_t *)(buf+totalLen)) = tss; // 8, 16
totalLen += 8;
*((uint64_t *)(buf+totalLen)) = tse; // 8, 24
totalLen += 8;
buf[totalLen]= type; // 1, 32
totalLen += 1;
*((uint32_t *)(buf+totalLen)) = slices.size(); // 4, 33
totalLen += 4;
int idx = 0;
for(auto &k:slices){
*((uint64_t *)(buf + totalLen)) = k; // 8, 37
totalLen += 8;
*((uint32_t *)(buf + totalLen)) = fsize[idx]; // 4, 45
totalLen += 4;
idx++;
}
auto crc_ = crc16(buf, totalLen);
memcpy(buf+totalLen, &crc_, 2);
totalLen +=2;
assert(totalLen = headLen);
spdlog::info("header len: {}, crc: {}", totalLen, crc_);
// send header
rc = raw_send(s, buf, totalLen);
if(!rc.empty()){
rc = fmt::format("{}, {}:{}", rc, host, port);
}else{
// construct body
idx = 0;
for(auto &k:slices){
int readsize = 1;
uint32_t total_sent = 0;
auto fpath = string(consts::recFilePath) + to_string(k) + ".h264";
memcpy(buf, &fsize[idx], 4);
FILE *fp = fopen(fpath.c_str(), "rb");
if(!fp){
rc = fmt::format("failed open file : {}", fpath);
break;
}else{
spdlog::info("sending file {}: {}", fpath, fsize[idx]);
for(int i = 0; readsize > 0; i++){
if(i == 0){
readsize = ::fread(buf+4, 1, EV_UPLOAD_MAX_BUF_SIZE - 4, fp) + 4;
}else{
readsize = ::fread(buf, 1, EV_UPLOAD_MAX_BUF_SIZE, fp);
}
if((i == 0 && readsize > 4) ||(i !=0 && readsize > 0)) {
rc = raw_send(s, buf, readsize);
if(!rc.empty()){
break;
}
}else{
break;
}
spdlog::info("sent bytes: {}", readsize);
total_sent += readsize;
}
::fclose(fp);
if(readsize < 0){
rc = fmt::format("error read file: {}", strerror(errno));
spdlog::error(rc);
break;
}else{
spdlog::info("sent file {}: total {}, sent {}", fpath, fsize[idx], total_sent - 4);
}
}
idx++;
}
}
}
}
}
}catch(exception &e){
rc = fmt::format("failed to send to {}:{}: {}, {}:{}", host, port, e.what(), __FILE__, __LINE__);
}
free(buf);
return rc;
}
/// handles all incoming request from cloud services /// handles all incoming request from cloud services
void handle_mqtt_req(MqttHelper *hlp, const void * const data, int len, string topic) void handle_mqtt_req(MqttHelper *hlp, const void * const data, int len, string topic)
...@@ -838,6 +546,8 @@ void handle_mqtt_req(MqttHelper *hlp, const void * const data, int len, string t ...@@ -838,6 +546,8 @@ void handle_mqtt_req(MqttHelper *hlp, const void * const data, int len, string t
char msg[len+1] = {0}; char msg[len+1] = {0};
memcpy(msg, data, len); memcpy(msg, data, len);
spdlog::info("received request on {}:\n{}", topic, (char*)msg); spdlog::info("received request on {}:\n{}", topic, (char*)msg);
auto now = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
try { try {
auto js = json::parse(msg); auto js = json::parse(msg);
auto str = verify_request(js); auto str = verify_request(js);
...@@ -896,11 +606,6 @@ void handle_mqtt_req(MqttHelper *hlp, const void * const data, int len, string t ...@@ -896,11 +606,6 @@ void handle_mqtt_req(MqttHelper *hlp, const void * const data, int len, string t
else { else {
auto tss = data["start"].as<int64_t>(); auto tss = data["start"].as<int64_t>();
auto tse = data["end"].as<int64_t>(); auto tse = data["end"].as<int64_t>();
// // guess unit in ms or s
// if(tss/100000000000 == 0) {
// tss = tss * 1000;
// tse = tse * 1000;
// }
bool isTsValid = false; bool isTsValid = false;
if(tss <= tse && tss > consts::TS_2020 && tse < consts::TS_MAX) { if(tss <= tse && tss > consts::TS_2020 && tse < consts::TS_MAX) {
...@@ -921,24 +626,30 @@ void handle_mqtt_req(MqttHelper *hlp, const void * const data, int len, string t ...@@ -921,24 +626,30 @@ void handle_mqtt_req(MqttHelper *hlp, const void * const data, int len, string t
} }
else { else {
int64_t offsetE = 0, offsetS = 0; int64_t offsetE = 0, offsetS = 0;
auto res = gRecFilesList->findByRange(tss, tse, offsetS, offsetE); int status = -2;
if(res.empty()) { auto res = gRecFilesList->findByRange(tss, tse, offsetS, offsetE, gConfigSystem.module.record.interval, &status);
MqttMgr::report_response_args(gMqttClient, consts::pub_topic_response + rid, -2, "no video found", cmd, rid, data); if(res.empty() && status < 0) {
MqttMgr::report_response_args(gMqttClient, consts::pub_topic_response + rid, EV_MSG_ERROR_INVALID_PARAM, "no video found: timestamp too far in the past", cmd, rid, data);
} }
else { else {
MqttMgr::report_response_args(gMqttClient, consts::pub_topic_response + rid, 0, "OK", cmd, rid, data); //print_ts_files(*gRecFilesList);
print_ts_files(*gRecFilesList);
printf("matched (%lld, %lld) with offsets %lld, %lld:\n\t", tss, tse, offsetS, offsetE); printf("matched (%lld, %lld) with offsets %lld, %lld:\n\t", tss, tse, offsetS, offsetE);
for(auto &k: res) { for(auto &k: res) {
printf("%lld, ", k); printf("%lld, ", k);
} }
printf("\n"); printf("\n");
/// TODO: upload videos /// TODO: upload videos
auto rc = upload_video(tss, tse, res, 6); upload_item_t item{tss, tse, 6, -1, 0};
if(!rc.empty()){ {
spdlog::error(rc); lock_guard<mutex> lg(*gUploadArgs.mut);
MqttMgr::report_response_args(gMqttClient, consts::pub_topic_response + rid, EV_MSG_ERROR_EXCEPTION, rc, cmd, rid, data); gUploadArgs.que->push(item);
gUploadArgs.cv->notify_one();
} }
string extra;
if(status > 0){
extra = ", but it is scheduled in the future";
}
MqttMgr::report_response_args(gMqttClient, consts::pub_topic_response + rid, 0, string("request accepted") + extra, cmd, rid, data);
} }
} }
} }
...@@ -1142,6 +853,8 @@ int main(int argc, char *argv[]) ...@@ -1142,6 +853,8 @@ int main(int argc, char *argv[])
// //
MaQueVideoEncodeCfg_s cfg; MaQueVideoEncodeCfg_s cfg;
init_stream_cfg(&cfg); init_stream_cfg(&cfg);
gConfigSystem.pPubCtx = &pPubCtx;
gConfigSystem.pClient = nullptr;
cfg.nFps = gConfigSystem.module.sys.fps; cfg.nFps = gConfigSystem.module.sys.fps;
cfg.eIFrmIntvType = IFRAME_INTV_TYPE_TIME; cfg.eIFrmIntvType = IFRAME_INTV_TYPE_TIME;
cfg.nIFrameInterval = 10; cfg.nIFrameInterval = 10;
...@@ -1168,9 +881,9 @@ int main(int argc, char *argv[]) ...@@ -1168,9 +881,9 @@ int main(int argc, char *argv[])
// frame dispatching // frame dispatching
pPubCtx = zmq_ctx_new(); pPubCtx = zmq_ctx_new();
pPub = zmq_socket(pPubCtx, ZMQ_PUB); pPub = zmq_socket(pPubCtx, ZMQ_PUB);
ret = zmq_bind(pPub, strPubUrl.c_str()); ret = zmq_bind(pPub, consts::strPubUrl.c_str());
if(ret < 0) { if(ret < 0) {
spdlog::error("{} failed to create zmq pub topic: {}", strPubUrl); spdlog::error("{} failed to create zmq pub topic: {}", consts::strPubUrl);
exit(1); exit(1);
} }
/// TODO: configuration issues handling /// TODO: configuration issues handling
...@@ -1189,6 +902,13 @@ int main(int argc, char *argv[]) ...@@ -1189,6 +902,13 @@ int main(int argc, char *argv[])
thVideoRecord.detach(); thVideoRecord.detach();
} }
// start upload thread
thread thUpload = thread(upload_svc_entry, &gUploadArgs);
if(thUpload.joinable()){
thUpload.detach();
spdlog::info("started video upload service");
}
start_md_bd(&gConfigSystem); start_md_bd(&gConfigSystem);
/// subscribe to mqtt /// subscribe to mqtt
gMqttClient = start_mqtt(nullptr); gMqttClient = start_mqtt(nullptr);
......
...@@ -7,8 +7,10 @@ ...@@ -7,8 +7,10 @@
#include <sml.hpp> #include <sml.hpp>
#include <map> #include <map>
#include <string> #include <string>
#include "record_upload.h"
namespace sml = boost::sml; namespace sml = boost::sml;
extern thread_upload_args_t gUploadArgs;
namespace md{ namespace md{
/// types /// types
...@@ -23,6 +25,7 @@ namespace md{ ...@@ -23,6 +25,7 @@ namespace md{
const string kKeyN= "n"; const string kKeyN= "n";
const string kKeyP= "p"; const string kKeyP= "p";
const int kMaxCntMotionInPre = 5; // about 20s const int kMaxCntMotionInPre = 5; // about 20s
const int kMaxDuration = 60*20; // max 20 minutes duration to force generating event videos
/// helpers /// helpers
auto event_inspection(auto e, std::map<const std::string, int> &m){ auto event_inspection(auto e, std::map<const std::string, int> &m){
...@@ -102,10 +105,13 @@ namespace md{ ...@@ -102,10 +105,13 @@ namespace md{
spdlog::info("post -n-> none, {}", timeEnd/1000); spdlog::info("post -n-> none, {}", timeEnd/1000);
} }
/// TODO: force event generation when max time duration reached upload_item_t item{timeStart, timeEnd, 7, -1, timeEnd};
{
lock_guard<mutex> lg(*gUploadArgs.mut);
gUploadArgs.que->push(item);
gUploadArgs.cv->notify_one();
}
spdlog::info("event start:{}, end:{}, hasMotion: {}", timeStart, timeEnd, hasMotion);
/// TODO: generate event for uploading
}; };
const auto action_pre2pre = [this](auto e){ const auto action_pre2pre = [this](auto e){
cntMotionOnly++; cntMotionOnly++;
...@@ -114,6 +120,22 @@ namespace md{ ...@@ -114,6 +120,22 @@ namespace md{
timeStart = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count(); timeStart = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
} }
}; };
const auto may_generate_event = [this]{
auto now = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
if(timeStart != 0 && (now - timeStart) >= kMaxDuration *1000){
upload_item_t item{timeStart, now, 6, -1, now};
{
lock_guard<mutex> lg(*gUploadArgs.mut);
gUploadArgs.que->push(item);
gUploadArgs.cv->notify_one();
}
timeStart = now;
return true;
}
return false;
};
using namespace sml; using namespace sml;
return make_transition_table( return make_transition_table(
*"init"_s = "none"_s, *"init"_s = "none"_s,
...@@ -121,28 +143,35 @@ namespace md{ ...@@ -121,28 +143,35 @@ namespace md{
"none"_s + event<motion>/[]{spdlog::info("none -m-> pre");} = "pre"_s, "none"_s + event<motion>/[]{spdlog::info("none -m-> pre");} = "pre"_s,
"none"_s + event<people>/action_none2in = "in"_s, "none"_s + event<people>/action_none2in = "in"_s,
"none"_s + event<mpboth>/action_none2in = "in"_s, "none"_s + event<mpboth>/action_none2in = "in"_s,
// pre // pre
"pre"_s + event<none>/[]{spdlog::info("pre -n-> none");} = "none"_s, "pre"_s + event<none>/[]{spdlog::info("pre -n-> none");} = "none"_s,
"pre"_s + event<motion>/action_pre2pre = "pre"_s, "pre"_s + event<motion>/action_pre2pre = "pre"_s,
"pre"_s + event<people>/action_pre2in = "in"_s, "pre"_s + event<people>/action_pre2in = "in"_s,
"pre"_s + event<mpboth>/action_pre2in = "in"_s, "pre"_s + event<mpboth>/action_pre2in = "in"_s,
// in // in
"in"_s + event<none> [guard_in2post]/[]{spdlog::info("in -n-> post");} = "post"_s, "in"_s + event<none> [guard_in2post]/[]{spdlog::info("in -n-> post");} = "post"_s,
"in"_s + event<motion> [guard_in2post]/[]{spdlog::info("in -m-> post");} = "post"_s, "in"_s + event<motion> [guard_in2post]/[]{spdlog::info("in -m-> post");} = "post"_s,
"in"_s + event<people> [guard_in2post]/[]{spdlog::info("in -p-> post");} = "post"_s, "in"_s + event<people> [guard_in2post]/[]{spdlog::info("in -p-> post");} = "post"_s,
"in"_s + event<mpboth>/guard_in2post, "in"_s + event<mpboth>/guard_in2post,
"in"_s + sml::on_entry<_> / may_generate_event,
// post // post
"post"_s + event<mpboth>/guard_post2none = "in"_s, "post"_s + event<mpboth>/guard_post2none = "in"_s,
"post"_s + event<people> [guard_post2none]/action_post2none = "none"_s, "post"_s + event<people> [guard_post2none]/action_post2none = "none"_s,
"post"_s + event<motion> [guard_post2none]/action_post2none = "none"_s, "post"_s + event<motion> [guard_post2none]/action_post2none = "none"_s,
"post"_s + event<none> [guard_post2none]/action_post2none = "none"_s, "post"_s + event<none> [guard_post2none]/action_post2none = "none"_s,
// "post"_s + sml::on_entry<_> / may_generate_event,
//
"none"_s + sml::on_entry<_> / [this]{hasMotion = false,timeStart = 0;} "none"_s + sml::on_entry<_> / [this]{hasMotion = false,timeStart = 0;}
); );
} }
private: private:
bool hasMotion{false}; bool hasMotion{false};
int cntMotionOnly = 0; int cntMotionOnly = 0;
uint64_t timeStart{0}; int64_t timeStart{0};
std::map<const std::string, int>guards{{"m", 0}, {"p", 0}, {"n", 0}}; std::map<const std::string, int>guards{{"m", 0}, {"p", 0}, {"n", 0}};
}; };
} }
......
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
#include <fcntl.h> #include <fcntl.h>
#include "raw_tcp.h" #include "raw_tcp.h"
using namespace std;
int raw_connect(std::string host, std::string port, int *socket_, int recv_timeout, int send_timeout) int raw_connect(std::string host, std::string port, int *socket_, int recv_timeout, int send_timeout)
{ {
int rv = 0; int rv = 0;
...@@ -71,3 +73,20 @@ int raw_connect(std::string host, std::string port, int *socket_, int recv_timeo ...@@ -71,3 +73,20 @@ int raw_connect(std::string host, std::string port, int *socket_, int recv_timeo
::freeaddrinfo(addrinfo_result); ::freeaddrinfo(addrinfo_result);
return rv; return rv;
} }
string raw_send(int s, const void *buf, size_t len){
string rc;
ssize_t sent = 0;
while(len > 0) {
sent = ::send(s, buf, len, 0);
if(sent <= 0){
break;
}
len -= sent;
}
if(len > 0 || sent<=0){
rc = fmt::format("failed to send: {}", strerror(errno));
}
return rc;
}
\ No newline at end of file
...@@ -9,6 +9,6 @@ ...@@ -9,6 +9,6 @@
#include <mutex> #include <mutex>
#include "common.h" #include "common.h"
using namespace std;
int raw_connect(std::string host, std::string port, int *socket_, int recv_timeout = 3, int send_timeout = 3); int raw_connect(std::string host, std::string port, int *socket_, int recv_timeout = 3, int send_timeout = 3);
std::string raw_send(int s, const void *buf, size_t len);
#endif #endif
\ No newline at end of file
...@@ -39,7 +39,6 @@ XM_S32 MaQue_JpegEnc_getFrame_callback (XM_VOID *pUserArg, MaQueSmartJpegFrame_s ...@@ -39,7 +39,6 @@ XM_S32 MaQue_JpegEnc_getFrame_callback (XM_VOID *pUserArg, MaQueSmartJpegFrame_s
return XM_SUCCESS; return XM_SUCCESS;
} }
void maq_smart_task_entry(ev_module_config_t *pArg) void maq_smart_task_entry(ev_module_config_t *pArg)
{ {
if(!pArg->module.ai.enabled) { if(!pArg->module.ai.enabled) {
...@@ -210,22 +209,30 @@ void maq_smart_task_entry(ev_module_config_t *pArg) ...@@ -210,22 +209,30 @@ void maq_smart_task_entry(ev_module_config_t *pArg)
int deltaMotionCnt = motionCnt_ - motionCntLast; int deltaMotionCnt = motionCnt_ - motionCntLast;
motionCntLast = motionCnt_; motionCntLast = motionCnt_;
spdlog::info("deltaTimeMs {}", deltaTimeMs); if(deltaTimeMs == 0){
spdlog::warn("no video frames in last ~5 seconds");
}else{
if(deltaMotionCnt >= motionCntThresh) { if(deltaMotionCnt >= motionCntThresh) {
hasMotion = true; hasMotion = true;
} }
if(hasMotion && hasHuman) { if(hasMotion && hasHuman) {
spdlog::info("ai_task: deltaTimeMs {}, mpboth", deltaTimeMs);
fsm.process_event(md::mpboth{}); fsm.process_event(md::mpboth{});
} }
else if(hasMotion) { else if(hasMotion) {
spdlog::info("ai_task: deltaTimeMs {}, motion", deltaTimeMs);
fsm.process_event(md::motion{}); fsm.process_event(md::motion{});
} }
else if(hasHuman) { else if(hasHuman) {
spdlog::info("ai_task: deltaTimeMs {}, people", deltaTimeMs);
fsm.process_event(md::people{}); fsm.process_event(md::people{});
} }
else { else {
spdlog::info("ai_task: deltaTimeMs {}, none", deltaTimeMs);
fsm.process_event(md::none{}); fsm.process_event(md::none{});
} }
}
// reset // reset
windowSecs = kMotionDetectWindowSeconds; windowSecs = kMotionDetectWindowSeconds;
hasHuman = false; hasHuman = false;
......
...@@ -49,18 +49,6 @@ typedef struct evpacket_t { ...@@ -49,18 +49,6 @@ typedef struct evpacket_t {
uint32_t length; uint32_t length;
} *evpacket_ptr_t; } *evpacket_ptr_t;
static inline unsigned short crc16(const unsigned char* data_p, unsigned char length){
unsigned char x;
unsigned short crc = 0xFFFF;
while (length--){
x = crc >> 8 ^ *data_p++;
x ^= x>>4;
crc = (crc << 8) ^ ((unsigned short)(x << 12)) ^ ((unsigned short)(x <<5)) ^ ((unsigned short)x);
}
return crc;
}
#endif #endif
\ No newline at end of file
...@@ -48,15 +48,6 @@ typedef void (*on_msg_fun_ptr_t)(MqttHelper *hlp, const void * const data, int l ...@@ -48,15 +48,6 @@ typedef void (*on_msg_fun_ptr_t)(MqttHelper *hlp, const void * const data, int l
template<typename R> template<typename R>
bool is_ready(std::future<R> const& f); bool is_ready(std::future<R> const& f);
struct StrException : public std::exception {
std::string s;
StrException(std::string ss) : s(ss) {}
~StrException() throw () {} // Updated
const char* what() const throw()
{
return s.c_str();
}
};
class MqttPub { class MqttPub {
private: private:
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论