提交 b92bb722 authored 作者: blu's avatar blu

recording & refactoring

上级 2c0181b7
......@@ -185,12 +185,18 @@ evcamera_BINARY_DIR:STATIC=/mnt/shared/evcamera/hi3518
//Value Computed by CMake
evcamera_SOURCE_DIR:STATIC=/mnt/shared/evcamera/hi3518
//Dependencies for target
evutils_LIB_DEPENDS:STATIC=
//Dependencies for target
mime_LIB_DEPENDS:STATIC=
//Dependencies for target
motion_LIB_DEPENDS:STATIC=
//Dependencies for target
mqtthelper_LIB_DEPENDS:STATIC=
//Dependencies for target
ntp_LIB_DEPENDS:STATIC=
......
......@@ -20,15 +20,11 @@ link_directories(${COMMON_LIB_DIR} /root/xiongmai/arm-himix100-linux/target/lib/
#add_library(mime STATIC mime.c)
add_library(ntp STATIC ntp.cc)
add_library(tcp_client STATIC raw_tcp.cc)
add_library(smart STATIC smart.cc)
add_library(evutils STATIC evutils.cc)
add_library(mqtthelper STATIC ../include/mqtt_helper.cc)
add_library(motion STATIC motion.cc)
add_library(smart STATIC smart.cc)
#add_library(util STATIC utils.cpp)
add_executable(test_mqtt test_mqtt.cc)
target_link_libraries(test_mqtt PUBLIC paho-mqtt3a fmt m dl pthread)
add_executable(evcamera main.cc)
target_link_libraries(evcamera PUBLIC ntp paho-mqtt3a tcp_client smart motion zmq ${COMMON_LIBS} ${XM_LIBS})
add_executable(ntp_client ntp_main.cc)
target_link_libraries(ntp_client PUBLIC ntp)
target_link_libraries(evcamera PUBLIC ntp paho-mqtt3a mqtthelper tcp_client evutils smart motion zmq ${COMMON_LIBS} ${XM_LIBS})
#ifndef __EV_UTILS_HPP__
#define __EV_UTILS_HPP__
#include <string>
#include <jsoncons/json.hpp>
#include <fstream>
#include <algorithm>
#include <sys/statvfs.h>
#include <mutex>
#include <set>
#include <zmq.h>
#include <string>
#include <spdlog/spdlog.h>
#include <fmt/format.h>
#include "evutils.hpp"
using namespace std;
using namespace jsoncons;
using namespace jsoncons::literals;
namespace evutils {
const char version[] = "EVC20200429";
/// for mqtt proto: json payload
const string kMsgCmd = "cmd";
const string kMsgCmdConfig = "config";
const string kMsgConfigVgw = "vgw";
const string kMsgConfigMqtt = "mqtt";
const string kMsgConfigUpload = "upload";
const string kMsgConfigFeatures = "features";
const string kMsgConfigEnable = "enable";
const string kMsgConfigLevel = "level";
const string kMsgConfigInterval = "interval";
const string kMsgConfigDuration = "duration";
const string kMsgConfigHumanThreash = "humanThreash";
const string kMsgConfigFaceThreash = "faceThresh";
const string kMsgConfigRegion = "region";
const string kMsgConfigMotion = "motion";
const string kMsgConfigRecord = "record";
const string kMsgConfigAi = "ai";
const string kMsgCmdUploadVideo = "upload_video";
const string kMsgCmdStatus = "status";
const string kMsgCmdGetConfig = "get_config";
const string kMsgCmdReboot = "reboot";
const string kMsgCmdPtz = "ptz";
const string kMsgType = "type";
const string kMsgCata = "cata";
const string kMsgTypeReport = "report";
const string kMsgTypeResponse = "response";
const string kMsgTime = "time";
const string kMsgData = "data";
const string kMsgRid = "rid";
const string kMsgCode = "code";
const string kMsgMsg = "msg";
const string kMsgSn = "sn";
/// default video gateway info
char hostArr[] = "192.168.55.104";
char portArr[] = "7123";
char *host = hostArr, *port = portArr;
string recFilePath = "/mnt/sd/records/";
const long long TS_2020 = 1577836800000L;
const char consts::version[] = "EVC20200429";
const string consts::kMsgCmd = "cmd";
const string consts::kMsgCmdConfig = "config";
const string consts::kMsgConfigVgw = "vgw";
const string consts::kMsgConfigMqtt = "mqtt";
const string consts::kMsgConfigUpload = "upload";
const string consts::kMsgConfigFeatures = "features";
const string consts::kMsgConfigEnable = "enable";
const string consts::kMsgConfigLevel = "level";
const string consts::kMsgConfigInterval = "interval";
const string consts::kMsgConfigDuration = "duration";
const string consts::kMsgConfigHumanThreash = "humanThreash";
const string consts::kMsgConfigFaceThreash = "faceThresh";
const string consts::kMsgConfigRegion = "region";
const string consts::kMsgConfigMotion = "motion";
const string consts::kMsgConfigRecord = "record";
const string consts::kMsgConfigAi = "ai";
const string consts::kMsgCmdUploadVideo = "upload_video";
const string consts::kMsgCmdStatus = "status";
const string consts::kMsgCmdGetConfig = "get_config";
const string consts::kMsgCmdReboot = "reboot";
const string consts::kMsgCmdPtz = "ptz";
const string consts::kMsgType = "type";
const string consts::kMsgCata = "category";
const string consts::kMsgTypeReport = "report";
const string consts::kMsgTypeResponse = "response";
const string consts::kMsgTime = "time";
const string consts::kMsgData = "data";
const string consts::kMsgRid = "rid";
const string consts::kMsgCode = "code";
const string consts::kMsgMsg = "msg";
const string consts::kMsgSn = "sn";
const string consts::recFilePath = "/mnt/sd/records/";
const long long consts::TS_2020 = 1577836800000L;
/// topics
string sub_topic = "evcamera/v1.0/request/";
string pub_topic = "evcamera/v1.0/res_rep";
string mqtt_url = "tcp://admin:vJ3zHqWrHbrqxVMT@evcloudsvc.ilabservice.cloud:11883";
const string consts::sub_topic = "evcamera/v1.0/request/";
const string consts::pub_topic_response = "evcamera/v1.0/response/";
const string consts::pub_topic_report = "evcamera/v1.0/report";
const string consts::mqtt_url = "tcp://admin:vJ3zHqWrHbrqxVMT@evcloudsvc.ilabservice.cloud:11883";
consts &consts::self(){
static consts cons;
return cons;
}
inline string msg_field(const json &msg, const string &field)
string msg_field(const json &msg, const string &field)
{
return msg.get_value_or<string>(field, "");
}
bool is_sdcard_avail(char *path = nullptr)
bool is_sdcard_avail(char *path)
{
char p[] = "/sys/block/mmcblk0/size";
if(path == nullptr){
......@@ -117,7 +101,7 @@ json make_default_config(){
)"_json;
}
void get_mac_addr(char *buf, char *intf = nullptr){
void get_mac_addr(char *buf, char *intf){
char p[] = "/sys/class/net/eth0/address";
if(intf == nullptr){
intf = p;
......@@ -135,7 +119,7 @@ void get_mac_addr(char *buf, char *intf = nullptr){
*(buf+i) = 0;
}
bool get_sdcard_megabytes(uint64_t &total, uint64_t &free, char* path = nullptr)
bool get_sdcard_megabytes(uint64_t &total, uint64_t &free, char* path )
{
struct statvfs stat;
char p[] = "/mnt/sd";
......@@ -155,133 +139,8 @@ bool get_sdcard_megabytes(uint64_t &total, uint64_t &free, char* path = nullptr)
return true;
}
template <typename T>
using cb_remove_elem = void(*)(T elem);
template <typename TN>
class OrderedList {
private:
set<TN> list_;
size_t maxSize;
mutex mut;
TN oldestTs;
cb_remove_elem<TN> fn_remove;
unsigned long cntInsert = 0;
public:
OrderedList() = delete;
OrderedList(ssize_t maxSize, cb_remove_elem<TN> fn_remove):maxSize(maxSize), fn_remove(fn_remove){}
void insert(TN elem, cb_remove_elem<TN> fn=nullptr)
{
// list_.insert(lower_bound(list_.begin(), list_.end(), elem), elem);
if(cntInsert % 10 == 0) {
oldestTs = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count() - 60*2*1000*maxSize;
spdlog::info("record: maxSlices {}, oldestTs {}", maxSize, oldestTs);
}
++cntInsert;
if(elem < oldestTs){
if(fn != nullptr){
(*fn)(elem);
}else if(fn_remove != nullptr){
(*fn_remove)(elem);
}
return;
}
if(list_.size() == 0) {
list_.insert(list_.begin(),elem);
return;
}
auto itr = list_.rbegin();
for(; itr != list_.rend(); itr++) {
if(*itr < elem) {
break;
}
}
if(itr == list_.rbegin() ) {
list_.insert(list_.end(), elem);
}
else {
list_.insert(itr.base(), elem);
}
if(list_.size() > maxSize) {
lock_guard<mutex> lg(mut);
auto ts = *(list_.begin());
list_.erase(list_.begin());
if(fn != nullptr){
(*fn)(ts);
}else if(fn_remove != nullptr){
(*fn_remove)(ts);
}else{
// no callback available
}
// auto baseName = videoFileTs2Name(ts);
// fs::path fname(this->urlOut + "/" + baseName + ".mp4");
// fs::remove(fname);
}
}
set<TN> findByRange(TN tss, TN tse, TN & offsetS, TN &offsetE){
set<TN> ret;
lock_guard<mutex> lg(mut);
if(list_.size() == 0) {
return ret;
}
TN first = *(list_.begin());
auto _it = list_.end();
TN end = *(--_it);
if(tse < first||tss > end) {
spdlog::info("range requested ({}, {}) is not in range existed ({}, {}).", tss, tse, first, end);
return ret;
}
first = end = 0;
int found = 0;
TN last = 0;
auto itr = list_.rbegin();
for(; itr != list_.rend(); itr++) {
if(*itr > tse) {
continue;
}
if(*itr <= tse) {
if(found != 1) {
spdlog::info("\t matched : {}, s:{}, e:{}", *itr, tss, tse);
found = 1;
// check the end offset, not guaranteed
if( itr != list_.rbegin()){
auto t = itr;
last = *(--t);
}
}
ret.insert(*itr);
if(tss >= *itr) {
break;
}
}
}
if(found == 1) {
auto itr = ret.begin();
offsetS = tss - *itr;
if(last != 0) {
offsetE = last - tse;
}
}
return ret;
}
};
/// returns negtive for failure, otherwise success
int setupDealer(void **ctx, void **s, string ident, string addr, int sndQS=0, int timeoutMs = -1) {
int setupDealer(void **ctx, void **s, string ident, string addr, int sndQS, int timeoutMs) {
int ret = 0;
*ctx = zmq_ctx_new();
*s = zmq_socket(*ctx, ZMQ_DEALER);
......@@ -319,7 +178,7 @@ int setupDealer(void **ctx, void **s, string ident, string addr, int sndQS=0, in
return ret;
}
int setupRouter(void **ctx, void **s, string addr, int rcvQS=0){
int setupRouter(void **ctx, void **s, string addr, int rcvQS){
int ret = 0;
*ctx = zmq_ctx_new();
*s = zmq_socket(*ctx, ZMQ_ROUTER);
......@@ -378,6 +237,4 @@ int z_recv_multiple(void *s, vector<uint8_t> &buf, int &frames) {
}
}
#endif
\ No newline at end of file
}
\ No newline at end of file
#ifndef __EV_UTILS_HPP__
#define __EV_UTILS_HPP__
#pragma once
#include <string>
#include <jsoncons/json.hpp>
#include <fstream>
#include <algorithm>
#include <sys/statvfs.h>
#include <mutex>
#include <set>
#include <zmq.h>
#include <string>
#include <spdlog/spdlog.h>
#include <fmt/format.h>
#include <jsoncons/json.hpp>
namespace evutils {
using namespace std;
using namespace jsoncons;
typedef struct consts{
/// for mqtt proto: json payload
static const char version[];
static const string kMsgCmd;
static const string kMsgCmdConfig;
static const string kMsgConfigVgw;
static const string kMsgConfigMqtt;
static const string kMsgConfigUpload;
static const string kMsgConfigFeatures;
static const string kMsgConfigEnable;
static const string kMsgConfigLevel;
static const string kMsgConfigInterval;
static const string kMsgConfigDuration;
static const string kMsgConfigHumanThreash;
static const string kMsgConfigFaceThreash;
static const string kMsgConfigRegion;
static const string kMsgConfigMotion;
static const string kMsgConfigRecord;
static const string kMsgConfigAi;
static const string kMsgCmdUploadVideo;
static const string kMsgCmdStatus;
static const string kMsgCmdGetConfig;
static const string kMsgCmdReboot;
static const string kMsgCmdPtz;
static const string kMsgType;
static const string kMsgCata;
static const string kMsgTypeReport;
static const string kMsgTypeResponse;
static const string kMsgTime;
static const string kMsgData;
static const string kMsgRid;
static const string kMsgCode;
static const string kMsgMsg;
static const string kMsgSn;
static const string recFilePath;
static const long long TS_2020;
/// topics
static const string sub_topic;
static const string pub_topic_response;
static const string pub_topic_report;
static const string mqtt_url;
static consts& self();
}consts;
extern string msg_field(const json &msg, const string &field);
extern bool is_sdcard_avail(char *path = nullptr);
extern json make_default_config();
extern void get_mac_addr(char *buf, char *intf = nullptr);
extern bool get_sdcard_megabytes(uint64_t &total, uint64_t &free, char* path = nullptr);
extern int setupDealer(void **ctx, void **s, string ident, string addr, int sndQS=0, int timeoutMs = -1);
extern int setupRouter(void **ctx, void **s, string addr, int rcvQS=0);
extern int z_recv_multiple(void *s, vector<uint8_t> &buf, int &frames);
template <typename T>
using cb_remove_elem = void(*)(T elem);
template <typename TN>
class OrderedList {
private:
set<TN> list_;
size_t maxSize;
mutex mut;
TN oldestTs;
cb_remove_elem<TN> fn_remove;
unsigned long cntInsert = 0;
public:
OrderedList() = delete;
OrderedList(ssize_t maxSize, cb_remove_elem<TN> fn_remove):maxSize(maxSize), fn_remove(fn_remove) {}
void insert(TN elem, cb_remove_elem<TN> fn=nullptr)
{
// list_.insert(lower_bound(list_.begin(), list_.end(), elem), elem);
if(cntInsert % 10 == 0) {
oldestTs = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count() - 60*2*1000*maxSize;
spdlog::info("record: maxSlices {}, oldestTs {}", maxSize, oldestTs);
}
++cntInsert;
if(elem < oldestTs) {
if(fn != nullptr) {
(*fn)(elem);
}
else if(fn_remove != nullptr) {
(*fn_remove)(elem);
}
return;
}
if(list_.size() == 0) {
list_.insert(list_.begin(),elem);
return;
}
auto itr = list_.rbegin();
for(; itr != list_.rend(); itr++) {
if(*itr < elem) {
break;
}
}
if(itr == list_.rbegin() ) {
list_.insert(list_.end(), elem);
}
else {
list_.insert(itr.base(), elem);
}
if(list_.size() > maxSize) {
lock_guard<mutex> lg(mut);
auto ts = *(list_.begin());
list_.erase(list_.begin());
if(fn != nullptr) {
(*fn)(ts);
}
else if(fn_remove != nullptr) {
(*fn_remove)(ts);
}
else {
// no callback available
}
}
}
set<TN> findByRange(TN tss, TN tse, TN & offsetS, TN &offsetE)
{
set<TN> ret;
lock_guard<mutex> lg(mut);
if(list_.size() == 0) {
return ret;
}
TN first = *(list_.begin());
auto _it = list_.end();
TN end = *(--_it);
if(tse < first||tss > end) {
spdlog::info("range requested ({}, {}) is not in range existed ({}, {}).", tss, tse, first, end);
return ret;
}
first = end = 0;
int found = 0;
TN last = 0;
auto itr = list_.rbegin();
for(; itr != list_.rend(); itr++) {
if(*itr > tse) {
continue;
}
if(*itr <= tse) {
if(found != 1) {
spdlog::info("\t matched : {}, s:{}, e:{}", *itr, tss, tse);
found = 1;
// check the end offset, not guaranteed
if( itr != list_.rbegin()) {
auto t = itr;
last = *(--t);
}
}
ret.insert(*itr);
if(tss >= *itr) {
break;
}
}
}
if(found == 1) {
auto itr = ret.begin();
offsetS = tss - *itr;
if(last != 0) {
offsetE = last - tse;
}
}
return ret;
}
set<TN> & items()
{
return list_;
}
};
}
#endif
\ No newline at end of file
差异被折叠。
差异被折叠。
差异被折叠。
/**
* Mqtt main implementation file
* Bruce.Lu 20200506
* sub topic for request: evcamera/v1.0/request/<SN>
* pub topic for report/response: evcamera/v1.0/response
*/
#ifndef __EV_MQTT_MGR___
#define __EV_MQTT_MGR___
#include <mqtt_helper.hpp>
#include <mutex>
#include <map>
using namespace std;
/// just a simple wrapper around MqttHelper
class MqttMgr {
public:
static map<string, MqttHelper *> insts;
static mutex mut;
public:
static MqttHelper* get_instance(string mqtt_url, string devsn){
auto key = mqtt_url + "/" + devsn;
lock_guard<mutex> lg(MqttMgr::mut);
try{
if(MqttMgr::insts.count(key) == 0){
auto inst = new MqttHelper(mqtt_url, "evc." + devsn);
MqttMgr::insts[key] = inst;
}else{
//
}
}catch(exception &e) {
spdlog::error("failed to create mqtt adaptor: {}", e.what());
}
return MqttMgr::insts[key];
}
static void remove(string mqtt_url, string devsn){
auto key = mqtt_url + "/" + devsn;
lock_guard<mutex> lg(MqttMgr::mut);
if(MqttMgr::insts.count(key) == 0){
}else{
delete MqttMgr::insts[key];
MqttMgr::insts.erase(key);
}
}
MqttMgr(){}
~MqttMgr(){
for(auto &x:insts){
if(x.second){
delete x.second;
}
insts.erase(x.first);
}
}
};
map<string, MqttHelper *> MqttMgr::insts;
mutex MqttMgr::mut;
#endif
......@@ -84,6 +84,22 @@ int getNtpTime(time_t * txTm)
last_errno = errno;
continue;
}
struct timeval timeout;
timeout.tv_sec = 2;
timeout.tv_usec = 0;
if (setsockopt (socket_, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
spdlog::error("setsockopt SO_RCVTIMEO failed");
return -1;
}
timeout.tv_sec = 2;
if (setsockopt (socket_, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
spdlog::error("setsockopt SO_SNDTIMEO failed");
return -1;
}
rv = ::connect(socket_, rp->ai_addr, rp->ai_addrlen);
if (rv == 0) {
auto addr = (struct sockaddr_in *)rp->ai_addr;
......
#include "smart.h"
#include <spdlog/spdlog.h>
extern "C"
{
......@@ -7,6 +8,8 @@ extern "C"
#include <maque_osd.h>
}
#include "mqtt_helper.hpp"
#define DIV_UP(x, a) ( ((x) + ((a) - 1) ) / a )
static int smartCnt = 0;
......@@ -297,4 +300,4 @@ void maq_smart_task_entry(void *pArg)
spdlog::error("LibXmMaQue_SmartDestory() Failed!\n");
return;
}
}
\ No newline at end of file
}
#ifndef __SMART_H__
#define __SMART_H__
#ifndef __EV_SMART_HPP__
#define __EV_SMART_HPP__
#pragma once
void maq_smart_task_entry(void *pArg);
#endif
\ No newline at end of file
/**
* mqtt_helper.hpp
*
* simplifies api for mqtt
*
* Bruce.Lu @20200415
*/
#include <mqtt_helper.hpp>
#include <string>
#include <chrono>
#include <thread>
#include <set>
#include <map>
#include <future>
#include <mutex>
#include <httplib.h>
// #define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
// #define SPDLOG_DEBUG_ON
// #define SPDLOG_TRACE_ON
#include <spdlog/spdlog.h>
#include <fmt/format.h>
extern "C" {
#include <MQTTAsync.h>
}
using namespace std;
void on_connected(void* context, MQTTAsync_successData* response);
void on_connlost(void *context, char *cause);
int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *message);
void on_disconn(void* context, MQTTAsync_successData* response);
void on_subscribed(void* context, MQTTAsync_successData* response);
void on_conn_fail(void* context, MQTTAsync_failureData* response);
void on_subscribed_fail(void* context, MQTTAsync_failureData* response);
void on_sent(void* context, MQTTAsync_successData* response);
void on_unsubscribe_fail(void* context, MQTTAsync_failureData* response);
void on_unsubscribe_ok(void* context, MQTTAsync_successData* response);
class MqttHelper;
typedef void (*on_res_fun_ptr_t)(void *res);
typedef void (*on_msg_fun_ptr_t)(MqttHelper *hlp, const void * const data, int len, string topic);
template<typename R>
bool is_ready(std::future<R> const& f)
{
return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}
void on_connlost(void *context, char *cause)
{
MqttHelper *self = (MqttHelper *) context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
spdlog::error("mqtt connection lost: {}", cause? cause: "unkown reason");
spdlog::warn("reconnecting {}", self->addr);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(self->client, &conn_opts)) != MQTTASYNC_SUCCESS) {
string msg = fmt::format("Failed to start connect:", MQTTAsync_strerror(rc));
spdlog::error(msg);
//self->state.set_exception(StrException(msg));
}
}
int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *message)
{
int i;
char* payloadptr;
MqttHelper *self = (MqttHelper *) context;
(*self)(string(topic), message->payload, message->payloadlen);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topic);
return 1;
}
void on_disconn(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
// self->state = MqttHelper::State::Disconnected;
// if(self->state.get_future().valid()){
// self->state = std::promise<MqttHelper::State>();
// }
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::Disconnected;
self->state.set_value(as);
spdlog::info("successful disconnected {}", self->addr);
}
void on_subscribed(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
//spdlog::debug("subscribe succeeded");
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::SubscribeOK;
self->state.set_value(as);
}
void on_subscribed_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
//string msg = fmt::format("subscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
// spdlog::error(msg);
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::SubScribeFail;
as.rc = response?response->code:0;
self->state.set_value(as);
}
void on_unsubscribe_ok(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::UnsubscribeOk;
self->state.set_value(as);
}
void on_unsubscribe_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
string msg = fmt::format("unsubscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::error(msg);
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::UnsubscribeFail;
as.rc = response?response->code:0;
self->state.set_value(as);
}
void on_conn_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
string msg = fmt::format("Connect failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::error(msg);
//self->state.set_exception(StrException(msg));
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::ConnFail;
as.rc = response?response->code:0;
self->state.set_value(as);
}
void on_sent(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
spdlog::debug("Message delivery confirmed, tok: {}", response->token);
}
void on_connected(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::Ready;
self->state.set_value(as);
spdlog::info("connected to mqtt {}", self->addr);
}
map<string, MqttHelper *> MqttMgr::insts;
mutex MqttMgr::mut;
#ifdef TEST_MQTT
#include <httplib.h>
void my_on_msg1(MqttHelper *hlp, const void * const data, int len, string topic)
{
string msg;
msg.resize(len+1);
memcpy((void*)&msg[0], data, len);
*(char*)(&msg[0] +len) = 0;
spdlog::info("I1 recv on {}: {}", topic, msg);
(*hlp)[to_string(stoi(topic) + 1)]("hello ack1", 11);
}
void my_on_msg2(MqttHelper *hlp, const void * const data, int len, string topic)
{
string msg;
msg.resize(len+1);
memcpy((void*)&msg[0], data, len);
*(char*)(&msg[0] +len) = 0;
spdlog::info("I2 recv on {}: {}", topic, msg);
(*hlp)[to_string(stoi(topic) + 1)]("hello ack2", 11);
}
int main()
{
string mqtt_url = "tcp://admin:vJ3zHqWrHbrqxVMT@evcloudsvc.ilabservice.cloud:11883";
string mqtt_cid = "testaabbc";
auto tmpEnv = getenv("MQTT_URL");
if(tmpEnv) {
mqtt_url = string(tmpEnv);
}
tmpEnv = getenv("CID");
if(tmpEnv) {
mqtt_cid = string(tmpEnv);
}
spdlog::set_level(spdlog::level::debug);
{
try {
MqttHelper hlp(mqtt_url, mqtt_cid);
hlp["hello"].pub("hello", 6, 1);
hlp.subscribe("no_on_msg", NULL);
auto x = thread([&hlp] {
for(int i = 0; i < 20; i ++)
{
hlp.subscribe(to_string(i), my_on_msg1);
}
hlp["hello"].pub("hello1", 7, 1);
});
auto y = thread([&hlp] {
for(int i = 0; i < 10; i ++)
{
hlp.subscribe(to_string(i), my_on_msg2);
}
for(int i = 10; i < 30; i ++)
{
hlp.subscribe(to_string(i), my_on_msg2, true);
}
hlp["hello"].pub("hello2", 7, 1);
});
x.join();
y.join();
this_thread::sleep_for(chrono::seconds(20));
}
catch(exception &e) {
spdlog::error("failed to connect mqtt: {}, {}", mqtt_url, e.what());
exit(1);
}
}
this_thread::sleep_for(chrono::seconds(2));
return 0;
}
#endif
差异被折叠。
差异被折叠。
//---------------------------------------------------------------------------------------
//
// ghc::filesystem - A C++17-like filesystem implementation for C++11/C++14
//
//---------------------------------------------------------------------------------------
//
// Copyright (c) 2018, Steffen Schümann <s.schuemann@pobox.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification,
// are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors
// may be used to endorse or promote products derived from this software without
// specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//---------------------------------------------------------------------------------------
// fs_fwd.hpp - The forwarding header for the header/implementation seperated usage of
// ghc::filesystem.
// This file can be include at any place, where ghc::filesystem api is needed while
// not bleeding implementation details (e.g. system includes) into the global namespace,
// as long as one cpp includes fs_impl.hpp to deliver the matching implementations.
//---------------------------------------------------------------------------------------
#ifndef GHC_FILESYSTEM_FWD_H
#define GHC_FILESYSTEM_FWD_H
#define GHC_FILESYSTEM_FWD
#include <ghc/filesystem.hpp>
#endif // GHC_FILESYSTEM_FWD_H
//---------------------------------------------------------------------------------------
//
// ghc::filesystem - A C++17-like filesystem implementation for C++11/C++14
//
//---------------------------------------------------------------------------------------
//
// Copyright (c) 2018, Steffen Schümann <s.schuemann@pobox.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification,
// are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors
// may be used to endorse or promote products derived from this software without
// specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//---------------------------------------------------------------------------------------
// fs_impl.hpp - The implementation header for the header/implementation seperated usage of
// ghc::filesystem.
// This file can be used to hide the implementation of ghc::filesystem into a single cpp.
// The cpp has to include this before including fs_fwd.hpp directly or via a different
// header to work.
//---------------------------------------------------------------------------------------
#define GHC_FILESYSTEM_IMPLEMENTATION
#include <ghc/filesystem.hpp>
//---------------------------------------------------------------------------------------
//
// ghc::filesystem - A C++17-like filesystem implementation for C++11/C++14
//
//---------------------------------------------------------------------------------------
//
// Copyright (c) 2018, Steffen Schümann <s.schuemann@pobox.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification,
// are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors
// may be used to endorse or promote products derived from this software without
// specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//---------------------------------------------------------------------------------------
// fs_std.hpp - The dynamic switching header that includes std::filesystem if detected
// or ghc::filesystem if not, and makes the resulting API available in the
// namespace fs.
//---------------------------------------------------------------------------------------
#ifndef GHC_FILESYSTEM_STD_H
#if defined(__cplusplus) && __cplusplus >= 201703L && defined(__has_include)
#if __has_include(<filesystem>)
#define GHC_USE_STD_FS
#include <filesystem>
namespace fs {
using namespace std::filesystem;
using ifstream = std::ifstream;
using ofstream = std::ofstream;
using fstream = std::fstream;
}
#endif
#endif
#ifndef GHC_USE_STD_FS
#define GHC_WIN_WSTRING_STRING_TYPE
#include <ghc/filesystem.hpp>
namespace fs {
using namespace ghc::filesystem;
using ifstream = ghc::filesystem::ifstream;
using ofstream = ghc::filesystem::ofstream;
using fstream = ghc::filesystem::fstream;
}
#endif
#endif // GHC_FILESYSTEM_STD_H
//---------------------------------------------------------------------------------------
//
// ghc::filesystem - A C++17-like filesystem implementation for C++11/C++14
//
//---------------------------------------------------------------------------------------
//
// Copyright (c) 2018, Steffen Schümann <s.schuemann@pobox.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification,
// are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors
// may be used to endorse or promote products derived from this software without
// specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//---------------------------------------------------------------------------------------
// fs_std_fwd.hpp - The forwarding header for the header/implementation seperated usage of
// ghc::filesystem that uses std::filesystem if it detects it.
// This file can be include at any place, where fs::filesystem api is needed while
// not bleeding implementation details (e.g. system includes) into the global namespace,
// as long as one cpp includes fs_std_impl.hpp to deliver the matching implementations.
//---------------------------------------------------------------------------------------
#ifndef GHC_FILESYSTEM_STD_FWD_H
#define GHC_FILESYSTEM_STD_FWD_H
#if defined(__cplusplus) && __cplusplus >= 201703L && defined(__has_include)
#if __has_include(<filesystem>)
#define GHC_USE_STD_FS
#include <filesystem>
namespace fs {
using namespace std::filesystem;
using ifstream = std::ifstream;
using ofstream = std::ofstream;
using fstream = std::fstream;
}
#endif
#endif
#ifndef GHC_USE_STD_FS
#define GHC_WIN_WSTRING_STRING_TYPE
#define GHC_FILESYSTEM_FWD
#include <ghc/filesystem.hpp>
namespace fs {
using namespace ghc::filesystem;
using ifstream = ghc::filesystem::ifstream;
using ofstream = ghc::filesystem::ofstream;
using fstream = ghc::filesystem::fstream;
}
#endif
#endif // GHC_FILESYSTEM_STD_FWD_H
//---------------------------------------------------------------------------------------
//
// ghc::filesystem - A C++17-like filesystem implementation for C++11/C++14
//
//---------------------------------------------------------------------------------------
//
// Copyright (c) 2018, Steffen Schümann <s.schuemann@pobox.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification,
// are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors
// may be used to endorse or promote products derived from this software without
// specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//---------------------------------------------------------------------------------------
// fs_std_impl.hpp - The implementation header for the header/implementation seperated usage of
// ghc::filesystem that does nothing if std::filesystem is detected.
// This file can be used to hide the implementation of ghc::filesystem into a single cpp.
// The cpp has to include this before including fs_std_fwd.hpp directly or via a different
// header to work.
//---------------------------------------------------------------------------------------
#if defined(__cplusplus) && __cplusplus >= 201703L && defined(__has_include)
#if __has_include(<filesystem>)
#define GHC_USE_STD_FS
#endif
#endif
#ifndef GHC_USE_STD_FS
#define GHC_WIN_WSTRING_STRING_TYPE
#define GHC_FILESYSTEM_IMPLEMENTATION
#include <ghc/filesystem.hpp>
#endif
差异被折叠。
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/* This file is deprecated, and all its functionality provided by zmq.h */
/* Note that -Wpedantic compilation requires GCC to avoid using its custom
extensions such as #warning, hence the trick below. Also, pragmas for
warnings or other messages are not standard, not portable, and not all
compilers even have an equivalent concept.
So in the worst case, this include file is treated as silently empty. */
#if defined(__clang__) || defined(__GNUC__) || defined(__GNUG__) \
|| defined(_MSC_VER)
#if defined(__GNUC__) || defined(__GNUG__)
#pragma GCC diagnostic push
#pragma GCC diagnostic warning "-Wcpp"
#pragma GCC diagnostic ignored "-Werror"
#pragma GCC diagnostic ignored "-Wall"
#endif
#pragma message( \
"Warning: zmq_utils.h is deprecated. All its functionality is provided by zmq.h.")
#if defined(__GNUC__) || defined(__GNUG__)
#pragma GCC diagnostic pop
#endif
#endif
# libzmq.la - a libtool library file
# Generated by libtool (GNU libtool) 2.4.6 Debian-2.4.6-2
#
# Please DO NOT delete this file!
# It is necessary for linking the library.
# The name that we can dlopen(3).
dlname='libzmq.so.5'
# Names of this library.
library_names='libzmq.so.5.2.2 libzmq.so.5 libzmq.so'
# The name of the static archive.
old_library='libzmq.a'
# Linker flags that cannot go in dependency_libs.
inherited_linker_flags=''
# Libraries that this one depends upon.
dependency_libs=' -lrt -lpthread'
# Names of additional weak libraries provided by this library
weak_library_names=''
# Version information for libzmq.
current=7
age=2
revision=2
# Is this an already installed library?
installed=yes
# Should we warn about portability when linking against -modules?
shouldnotlink=no
# Files to dlopen/dlpreopen
dlopen=''
dlpreopen=''
# Directory that this library needs to be installed in:
libdir='/mnt/shared/evcamera/vendor/libzmq../x64/lib'
prefix=/mnt/shared/evcamera/vendor/libzmq../x64
exec_prefix=${prefix}
libdir=${exec_prefix}/lib
includedir=${prefix}/include
Name: libzmq
Description: 0MQ c++ library
Version: 4.3.2
Libs: -L${libdir} -lzmq
Libs.private: -lstdc++ -lpthread -lrt
Requires.private:
Cflags: -I${includedir} -DZMQ_BUILD_DRAFT_API=1
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论