提交 5d8f8d00 authored 作者: blu's avatar blu

nop

上级 1e162fa4
......@@ -49,13 +49,14 @@ add_library(zmqhelper STATIC inc/zmqhelper.cpp)
add_library(dirmon STATIC dirmon.cpp)
add_library(post STATIC postfile.cpp)
add_library(util STATIC inc/utils.cpp)
add_library(mqtthelper STATIC mqtt_helper.cc)
# order matters, local lib first
LIST(APPEND COMM_LIBS zmqhelper util fmt zmq)
LIST(APPEND AV_LIBS avformat swscale avcodec avutil swresample)
add_executable(evcloudsvc evcloudsvc.cpp)
target_link_libraries(evcloudsvc PUBLIC database leveldb ${COMM_LIBS} ${EXTRA_LIBS})
target_link_libraries(evcloudsvc PUBLIC database leveldb paho-mqtt3a mqtthelper ${COMM_LIBS} ${EXTRA_LIBS})
add_executable(evdaemon evdaemon.cpp)
target_link_libraries(evdaemon PUBLIC database leveldb ${COMM_LIBS} ${SSH2LIB} ${EXTRA_LIBS})
......
......@@ -143,19 +143,6 @@ evmlmotion/fast:
$(MAKE) $(MAKESILENT) -f CMakeFiles/evmlmotion.dir/build.make CMakeFiles/evmlmotion.dir/build
.PHONY : evmlmotion/fast
#=============================================================================
# Target rules for targets named evslicer
# Build rule for target.
evslicer: cmake_check_build_system
$(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 evslicer
.PHONY : evslicer
# fast build rule for target.
evslicer/fast:
$(MAKE) $(MAKESILENT) -f CMakeFiles/evslicer.dir/build.make CMakeFiles/evslicer.dir/build
.PHONY : evslicer/fast
#=============================================================================
# Target rules for targets named evpuller
......@@ -221,6 +208,32 @@ evcloudsvc/fast:
$(MAKE) $(MAKESILENT) -f CMakeFiles/evcloudsvc.dir/build.make CMakeFiles/evcloudsvc.dir/build
.PHONY : evcloudsvc/fast
#=============================================================================
# Target rules for targets named evslicer
# Build rule for target.
evslicer: cmake_check_build_system
$(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 evslicer
.PHONY : evslicer
# fast build rule for target.
evslicer/fast:
$(MAKE) $(MAKESILENT) -f CMakeFiles/evslicer.dir/build.make CMakeFiles/evslicer.dir/build
.PHONY : evslicer/fast
#=============================================================================
# Target rules for targets named mqtthelper
# Build rule for target.
mqtthelper: cmake_check_build_system
$(MAKE) $(MAKESILENT) -f CMakeFiles/Makefile2 mqtthelper
.PHONY : mqtthelper
# fast build rule for target.
mqtthelper/fast:
$(MAKE) $(MAKESILENT) -f CMakeFiles/mqtthelper.dir/build.make CMakeFiles/mqtthelper.dir/build
.PHONY : mqtthelper/fast
#=============================================================================
# Target rules for targets named zmqhelper
......@@ -623,6 +636,33 @@ inc/zmqhelper.cpp.s:
$(MAKE) $(MAKESILENT) -f CMakeFiles/zmqhelper.dir/build.make CMakeFiles/zmqhelper.dir/inc/zmqhelper.cpp.s
.PHONY : inc/zmqhelper.cpp.s
mqtt_helper.o: mqtt_helper.cc.o
.PHONY : mqtt_helper.o
# target to build an object file
mqtt_helper.cc.o:
$(MAKE) $(MAKESILENT) -f CMakeFiles/mqtthelper.dir/build.make CMakeFiles/mqtthelper.dir/mqtt_helper.cc.o
.PHONY : mqtt_helper.cc.o
mqtt_helper.i: mqtt_helper.cc.i
.PHONY : mqtt_helper.i
# target to preprocess a source file
mqtt_helper.cc.i:
$(MAKE) $(MAKESILENT) -f CMakeFiles/mqtthelper.dir/build.make CMakeFiles/mqtthelper.dir/mqtt_helper.cc.i
.PHONY : mqtt_helper.cc.i
mqtt_helper.s: mqtt_helper.cc.s
.PHONY : mqtt_helper.s
# target to generate assembly for a file
mqtt_helper.cc.s:
$(MAKE) $(MAKESILENT) -f CMakeFiles/mqtthelper.dir/build.make CMakeFiles/mqtthelper.dir/mqtt_helper.cc.s
.PHONY : mqtt_helper.cc.s
postfile.o: postfile.cpp.o
.PHONY : postfile.o
......@@ -668,6 +708,7 @@ help:
@echo "... evpusher"
@echo "... evslicer"
@echo "... evwifi"
@echo "... mqtthelper"
@echo "... post"
@echo "... util"
@echo "... zmqhelper"
......@@ -707,6 +748,9 @@ help:
@echo "... inc/zmqhelper.o"
@echo "... inc/zmqhelper.i"
@echo "... inc/zmqhelper.s"
@echo "... mqtt_helper.o"
@echo "... mqtt_helper.i"
@echo "... mqtt_helper.s"
@echo "... postfile.o"
@echo "... postfile.i"
@echo "... postfile.s"
......
......@@ -19,6 +19,7 @@ update: 2019/09/10
#include "utils.hpp"
#include "inc/zmqhelper.hpp"
#include "fmt/format.h"
#include "mqtt_helper.hpp"
using namespace std;
using namespace httplib;
......@@ -31,7 +32,6 @@ using namespace zmqhelper;
#define NUM_MAX_REPORT_HISTORY 5
class EvCloudSvc {
private:
Server svr;
......
This source diff could not be displayed because it is too large. You can view the blob instead.
mqtt_helper.hpp
\ No newline at end of file
/**
* mqtt_helper.hpp
*
* simplifies api for mqtt
* simplified apis for mqtt
*
* Bruce.Lu @20200415
*/
#ifndef __MQTT_HELPER__
#define __MQTT_HELPER__
#pragma once
#include <string>
#include <chrono>
#include <thread>
#include <set>
#include <map>
#include <future>
#include <mutex>
// #define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
// #define SPDLOG_DEBUG_ON
// #define SPDLOG_TRACE_ON
#include <spdlog/spdlog.h>
#include <map>
#include <httplib.h>
#include <fmt/format.h>
#include <spdlog/spdlog.h>
#include "inc/json.hpp"
#include "inc/httplib.h"
extern "C" {
#include "MQTTAsync.h"
#include <MQTTAsync.h>
}
using namespace std;
void on_connected(void* context, MQTTAsync_successData* response);
void on_connlost(void *context, char *cause);
int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *message);
void on_disconn(void* context, MQTTAsync_successData* response);
void on_subscribed(void* context, MQTTAsync_successData* response);
void on_conn_fail(void* context, MQTTAsync_failureData* response);
void on_subscribed_fail(void* context, MQTTAsync_failureData* response);
void on_sent(void* context, MQTTAsync_successData* response);
void on_unsubscribe_fail(void* context, MQTTAsync_failureData* response);
void on_unsubscribe_ok(void* context, MQTTAsync_successData* response);
using namespace nlohmann;
#define EV_MAX_PRINTABLE_SIZE 512
extern void on_connected(void* context, MQTTAsync_successData* response);
extern void on_connlost(void *context, char *cause);
extern int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *message);
extern void on_disconn(void* context, MQTTAsync_successData* response);
extern void on_subscribed(void* context, MQTTAsync_successData* response);
extern void on_conn_fail(void* context, MQTTAsync_failureData* response);
extern void on_subscribed_fail(void* context, MQTTAsync_failureData* response);
extern void on_sent(void* context, MQTTAsync_successData* response);
extern void on_unsubscribe_fail(void* context, MQTTAsync_failureData* response);
extern void on_unsubscribe_ok(void* context, MQTTAsync_successData* response);
extern json make_lastwill_msg(string sn);
extern json make_online_msg(string sn);
class MqttHelper;
typedef void (*on_res_fun_ptr_t)(void *res);
typedef void (*on_msg_fun_ptr_t)(MqttHelper *hlp, const void * const data, int len, string topic);
template<typename R>
bool is_ready(std::future<R> const& f)
{
return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}
bool is_ready(std::future<R> const& f);
struct StrException : public std::exception {
std::string s;
StrException(std::string ss) : s(ss) {}
~StrException() throw () {} // Updated
const char* what() const throw()
{
return s.c_str();
}
};
class MqttPub {
private:
......@@ -103,6 +96,20 @@ private:
recursive_mutex mut;
protected:
public:
struct StrException : public std::exception {
std::string s;
StrException(std::string ss) : s(ss) {}
~StrException() throw () {} // Updated
const char* what() const throw()
{
return s.c_str();
}
};
string mqtt_url;
string id;
string topic_lastwill;
string topic_report;
string addr;
MQTTAsync client;
enum class State {
......@@ -126,7 +133,7 @@ public:
/// State state = State::None; // 0: initial ; 1: ready; 2: disconnected; 3: destroyed
promise<AsyncResult> state;
MqttHelper(string addr, string id, int kai = 20, int cs = 1): addr(addr)
MqttHelper(string mqtt_url, string id, string topic_lastwill, string topic_report, int kai = 20, int cs = 1): mqtt_url(mqtt_url), id(id), topic_lastwill(topic_lastwill), topic_report(topic_report)
{
// make connection, throw excpetions
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
......@@ -134,7 +141,14 @@ public:
int rc;
int ch;
string msg;
if(MQTTASYNC_SUCCESS != MQTTAsync_create(&client, addr.c_str(), id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL)) {
auto uri = httplib::Uri::Parse(mqtt_url);
if(!uri.Host.empty() && !uri.Port.empty()) {
addr = "tcp://" + uri.Host + ":" + uri.Port;
}
spdlog::debug("mqtt url: proto: {}, host: {}, port: {}, user: {}, pass: {}", uri.Protocol, uri.Host, uri.Port, uri.User, uri.Password);
if(MQTTASYNC_SUCCESS != MQTTAsync_create(&client, addr.c_str(), ("EVC"+id).c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL)) {
msg = "failed to async create mqtt";
spdlog::error(msg);
throw StrException(msg);
......@@ -147,9 +161,25 @@ public:
}
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
/// TODO:
if(!uri.User.empty() && !uri.Password.empty()) {
conn_opts.username = uri.User.c_str();
conn_opts.password = uri.Password.c_str();
}
conn_opts.onSuccess = on_connected;
conn_opts.onFailure = on_conn_fail;
MQTTAsync_willOptions lwm = MQTTAsync_willOptions_initializer;
/// template for lastwill message
auto js = make_lastwill_msg(id);
lwm.message = js.dump().c_str();
lwm.qos = 1;
lwm.retained = 1;
lwm.topicName = nullptr;
if(!topic_lastwill.empty()) {
lwm.topicName = topic_lastwill.c_str();
}
conn_opts.context = this;
conn_opts.will = &lwm;
spdlog::info("trying to connect to {}", addr);
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
msg = fmt::format("Failed to start connect: {}", MQTTAsync_strerror(rc));
......@@ -255,7 +285,6 @@ public:
return 0;
}
~MqttHelper()
{
int rc;
......@@ -278,185 +307,147 @@ public:
};
void on_connlost(void *context, char *cause)
{
MqttHelper *self = (MqttHelper *) context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
spdlog::error("mqtt connection lost: {}", cause? cause: "unkown reason");
/// just a simple wrapper around MqttHelper
class MqttMgr {
public:
static map<string, MqttHelper *> insts;
static mutex mut;
public:
spdlog::warn("reconnecting {}", self->addr);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(self->client, &conn_opts)) != MQTTASYNC_SUCCESS) {
string msg = fmt::format("Failed to start connect:", MQTTAsync_strerror(rc));
spdlog::error(msg);
//self->state.set_exception(StrException(msg));
static MqttHelper* get_instance(string mqtt_url, string devsn, string topic_lastwill="", string topic_report="")
{
auto key = mqtt_url + "/" + devsn;
MqttHelper *ret = nullptr;
lock_guard<mutex> lg(MqttMgr::mut);
try {
if(MqttMgr::insts.count(key) == 0) {
if(topic_lastwill.empty()) {
// TODO:
topic_lastwill = "";
}
}
int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *message)
{
int i;
char* payloadptr;
MqttHelper *self = (MqttHelper *) context;
string msg;
msg.resize(message->payloadlen + 1);
payloadptr = (char *)message->payload;
memcpy(msg.data(), message->payload, message->payloadlen);
msg.data()[message->payloadlen] = 0;
spdlog::debug("new message: {}, at {}", msg, topic);
(*self)(string(topic), message->payload, message->payloadlen);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topic);
return 1;
}
void on_disconn(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
// self->state = MqttHelper::State::Disconnected;
// if(self->state.get_future().valid()){
// self->state = std::promise<MqttHelper::State>();
// }
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::Disconnected;
self->state.set_value(as);
spdlog::info("successful disconnected {}", self->addr);
}
void on_subscribed(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
//spdlog::debug("subscribe succeeded");
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::SubscribeOK;
self->state.set_value(as);
}
void on_subscribed_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
//string msg = fmt::format("subscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
// spdlog::error(msg);
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::SubScribeFail;
as.rc = response?response->code:0;
self->state.set_value(as);
}
void on_unsubscribe_ok(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::UnsubscribeOk;
self->state.set_value(as);
}
void on_unsubscribe_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
string msg = fmt::format("unsubscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::error(msg);
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::UnsubscribeFail;
as.rc = response?response->code:0;
self->state.set_value(as);
}
void on_conn_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
string msg = fmt::format("Connect failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::error(msg);
//self->state.set_exception(StrException(msg));
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::ConnFail;
as.rc = response?response->code:0;
self->state.set_value(as);
}
void on_sent(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
spdlog::debug("Message delivery confirmed, tok: {}", response->token);
}
if(topic_report.empty()) {
topic_report = "";
}
void on_connected(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::Ready;
self->state.set_value(as);
spdlog::info("connected to mqtt {}", self->addr);
}
auto inst = new MqttHelper(mqtt_url, devsn, topic_lastwill, topic_report);
auto js = make_online_msg(devsn);
auto msg = js.dump();
auto rc = (*inst)[topic_report].pub(msg.c_str(), msg.size(), 2, false);
ret = inst;
MqttMgr::insts[key] = inst;
}
else {
ret = MqttMgr::insts[key];
}
}
catch(exception &e) {
spdlog::error("failed to create mqtt adaptor: {}", e.what());
}
#define DEBUG
#ifdef DEBUG
return ret;
}
void my_on_msg1(MqttHelper *hlp, const void * const data, int len, string topic)
{
string msg;
msg.resize(len+1);
memcpy(msg.data(), data, len);
msg.data()[len] = 0;
spdlog::info("I1 recv on {}: {}", topic, msg);
(*hlp)[to_string(atoi(topic.data()) + 1)]("hello ack1", 11);
}
static void remove(string mqtt_url, string devsn)
{
auto key = mqtt_url + "/" + devsn;
lock_guard<mutex> lg(MqttMgr::mut);
if(MqttMgr::insts.count(key) == 0) {
void my_on_msg2(MqttHelper *hlp, const void * const data, int len, string topic)
{
string msg;
msg.resize(len+1);
memcpy(msg.data(), data, len);
msg.data()[len] = 0;
spdlog::info("I2 recv on {}: {}", topic, msg);
(*hlp)[to_string(atoi(topic.data()) + 1)]("hello ack2", 11);
}
}
else {
delete MqttMgr::insts[key];
MqttMgr::insts.erase(key);
}
}
int main()
{
spdlog::set_level(spdlog::level::debug);
MqttMgr() {}
~MqttMgr()
{
MqttHelper hlp("tcp://evcloud.ilabservice.cloud:1883", "testaabbc");
hlp["hello"].pub("hello", 6, 1);
hlp.subscribe("no_on_msg", NULL);
auto x = thread([&hlp] {
for(int i = 0; i < 20; i ++)
{
hlp.subscribe(to_string(i), my_on_msg1);
for(auto &x:insts) {
if(x.second) {
delete x.second;
}
insts.erase(x.first);
}
}
hlp["hello"].pub("hello1", 7, 1);
});
auto y = thread([&hlp] {
for(int i = 0; i < 10; i ++)
/// low level api for report and response
static void _report_response(MqttHelper *client, string topic, string message)
{
hlp.subscribe(to_string(i), my_on_msg2);
// auto client = MqttMgr::get_instance(mqtt_url, dev_sn);
if(client == nullptr) {
spdlog::error("invalid mqtt client {}");
return;
}
for(int i = 10; i < 30; i ++)
auto rc = (*client)[topic].pub(message.c_str(), message.size(), 2, false);
if(message.size() > EV_MAX_PRINTABLE_SIZE) {
string ellipsis = " <...> ";
message = message.substr(0, EV_MAX_PRINTABLE_SIZE/2 - ellipsis.size()) + ellipsis + message.substr(message.size() -1 - EV_MAX_PRINTABLE_SIZE/2, EV_MAX_PRINTABLE_SIZE/2);
}
if(rc < 0) {
spdlog::error("failed to pub mqtt message on {}: {}", topic, message);
}
else {
spdlog::info("successfully pub message at {}:{}", topic, message);
}
}
/// no data field. only code and msg field
static void report_response_args(MqttHelper *client, string topic, int code, string message, string cata, string rid, json &&data)
{
hlp.subscribe(to_string(i), my_on_msg2, true);
json js;
/// current timestamp
auto ts = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
js["time"] = ts;
/// rid
js["rid"] = rid;
/// type
if(rid.empty()) {
js["type"] = "report";
}
else {
js["type"] = "response";
}
js["category"] = cata;
///
js["code"] = code;
js["msg"] = message;
js["sn"] = client->id;
if(!data.empty()) {
js["data"] = data;
}
hlp["hello"].pub("hello2", 7, 1);
});
x.join();
y.join();
this_thread::sleep_for(chrono::seconds(100));
_report_response(client, topic, js.dump());
}
this_thread::sleep_for(chrono::seconds(2));
return 0;
}
static void report_response_args(MqttHelper *client, string topic, int code, string message, string cata, string rid, json &data)
{
json js;
/// current timestamp
auto ts = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
js["time"] = ts;
/// rid
js["rid"] = rid;
/// type
if(rid.empty()) {
js["type"] = "report";
}
else {
js["type"] = "response";
}
js["category"] = cata;
///
js["code"] = code;
js["msg"] = message;
js["sn"] = client->id;
if(!data.empty()) {
js["data"] = data;
}
#endif
_report_response(client, topic, js.dump());
}
};
#endif
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论