提交 44cf2efd authored 作者: blu's avatar blu

nop

上级 279ce782
/**
* @file mqtt_helper.cc
*
* @brief this is the main file impelents mqtt helpers
*
* @ingroup evcamera
*
* @author Bruce Lu
* Contact: lzbgt@icloud.com
*
*/
#include <mqtt_helper.hpp>
#include <string>
#include <chrono>
#include <thread>
#include <set>
#include <map>
#include <future>
#include <mutex>
#include <httplib.h>
// #define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
// #define SPDLOG_DEBUG_ON
// #define SPDLOG_TRACE_ON
#include <spdlog/spdlog.h>
#include <fmt/format.h>
extern "C" {
#include <MQTTAsync.h>
}
using namespace std;
class MqttHelper;
typedef void (*on_res_fun_ptr_t)(void *res);
typedef void (*on_msg_fun_ptr_t)(MqttHelper *hlp, const void * const data, int len, string topic);
template<typename R>
bool is_ready(std::future<R> const& f)
{
return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}
json make_lastwill_msg(string sn)
{
json js = R"(
{
"category":"lastwill",
"code":0,
"data":{
},
"msg":"I'M OFFLINE",
"rid":"",
"sn":"001231554A20",
"time":1589537073532,
"type":"report"
}
)"_json;
js["time"] = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
js["sn"] = sn;
return js;
}
json make_online_msg(string sn)
{
json js = R"(
{
"category":"online",
"code":0,
"data":{
},
"msg":"I'M ONLINE",
"rid":"",
"sn":"001231554A20",
"time":1589537073532,
"type":"report"
}
)"_json;
js["time"] = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
js["sn"] = sn;
return js;
}
void on_connlost(void *context, char *cause)
{
MqttHelper *self = (MqttHelper *) context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc = -1;
spdlog::error("mqtt connection lost: {}", cause? cause: "unkown reason");
spdlog::warn("reconnecting {}", self->addr);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
int cnt = 0;
const int kRetryIntervalMaxSeconds = 60 * 60;
const int kRetryIntervalMinSeconds = 10;
time_t t;
srand((unsigned) time(&t));
int sleepTime = 10;
while(rc != MQTTASYNC_SUCCESS){
if ((rc = MQTTAsync_connect(self->client, &conn_opts)) != MQTTASYNC_SUCCESS) {
string msg = fmt::format("mqtt failed to reconnect {}: {}",self->mqtt_url, MQTTAsync_strerror(rc));
spdlog::error(msg);
//self->state.set_exception(StrException(msg));
sleepTime = cnt* kRetryIntervalMinSeconds;
this_thread::sleep_for(chrono::seconds(sleepTime));
sleepTime++;
}else{
break;
}
}
}
int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *message)
{
int i;
char* payloadptr;
MqttHelper *self = (MqttHelper *) context;
(*self)(string(topic), message->payload, message->payloadlen);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topic);
return 1;
}
void on_disconn(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
// self->state = MqttHelper::State::Disconnected;
// if(self->state.get_future().valid()){
// self->state = std::promise<MqttHelper::State>();
// }
auto js = make_lastwill_msg(self->id);
auto str = js.dump();
(*self)["hello"].pub(str.c_str(), str.size(), 1, true);
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::Disconnected;
self->state.set_value(as);
spdlog::info("successful disconnected {}", self->addr);
}
void on_subscribed(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
//spdlog::debug("subscribe succeeded");
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::SubscribeOK;
self->state.set_value(as);
}
void on_subscribed_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
//string msg = fmt::format("subscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
// spdlog::error(msg);
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::SubScribeFail;
as.rc = response?response->code:0;
self->state.set_value(as);
}
void on_unsubscribe_ok(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::UnsubscribeOk;
self->state.set_value(as);
}
void on_unsubscribe_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
string msg = fmt::format("unsubscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::error(msg);
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::UnsubscribeFail;
as.rc = response?response->code:0;
self->state.set_value(as);
}
void on_conn_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
string msg = fmt::format("Connect failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::error(msg);
//self->state.set_exception(StrException(msg));
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::ConnFail;
as.rc = response?response->code:0;
self->state.set_value(as);
}
void on_sent(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
spdlog::debug("Message delivery confirmed, tok: {}", response->token);
}
void on_connected(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::Ready;
self->state.set_value(as);
spdlog::info("connected to mqtt {}", self->addr);
}
map<string, MqttHelper *> MqttMgr::insts;
mutex MqttMgr::mut;
#ifdef TEST_MQTT
#include <httplib.h>
void my_on_msg1(MqttHelper *hlp, const void * const data, int len, string topic)
{
string msg;
msg.resize(len+1);
memcpy((void*)&msg[0], data, len);
*(char*)(&msg[0] +len) = 0;
spdlog::info("I1 recv on {}: {}", topic, msg);
(*hlp)[to_string(stoi(topic) + 1)]("hello ack1", 11);
}
void my_on_msg2(MqttHelper *hlp, const void * const data, int len, string topic)
{
string msg;
msg.resize(len+1);
memcpy((void*)&msg[0], data, len);
*(char*)(&msg[0] +len) = 0;
spdlog::info("I2 recv on {}: {}", topic, msg);
(*hlp)[to_string(stoi(topic) + 1)]("hello ack2", 11);
}
int main()
{
string mqtt_url = "tcp://admin:vJ3zHqWrHbrqxVMT@evcloudsvc.ilabservice.cloud:11883";
string mqtt_cid = "testaabbc";
auto tmpEnv = getenv("MQTT_URL");
if(tmpEnv) {
mqtt_url = string(tmpEnv);
}
tmpEnv = getenv("CID");
if(tmpEnv) {
mqtt_cid = string(tmpEnv);
}
spdlog::set_level(spdlog::level::debug);
{
try {
MqttHelper hlp(mqtt_url, mqtt_cid);
hlp["hello"].pub("hello", 6, 1);
hlp.subscribe("no_on_msg", NULL);
auto x = thread([&hlp] {
for(int i = 0; i < 20; i ++)
{
hlp.subscribe(to_string(i), my_on_msg1);
}
hlp["hello"].pub("hello1", 7, 1);
});
auto y = thread([&hlp] {
for(int i = 0; i < 10; i ++)
{
hlp.subscribe(to_string(i), my_on_msg2);
}
for(int i = 10; i < 30; i ++)
{
hlp.subscribe(to_string(i), my_on_msg2, true);
}
hlp["hello"].pub("hello2", 7, 1);
});
x.join();
y.join();
this_thread::sleep_for(chrono::seconds(20));
}
catch(exception &e) {
spdlog::error("failed to connect mqtt: {}, {}", mqtt_url, e.what());
exit(1);
}
}
this_thread::sleep_for(chrono::seconds(2));
return 0;
}
#endif
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论