提交 ec6fe5db authored 作者: blu's avatar blu

mqtt api: future style

上级 e987496a
#ifndef __MQTT_HELPER__ #ifndef __MQTT_HELPER__
#define __MQTT_HELPER__ #define __MQTT_HELPER__
#include <spdlog/spdlog.h>
#include <fmt/format.h>
#include <string> #include <string>
#include <chrono> #include <chrono>
#include <thread> #include <thread>
...@@ -10,7 +7,11 @@ ...@@ -10,7 +7,11 @@
#include <map> #include <map>
#include <future> #include <future>
#include <mutex> #include <mutex>
// #define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
// #define SPDLOG_DEBUG_ON
// #define SPDLOG_TRACE_ON
#include <spdlog/spdlog.h>
#include <fmt/format.h>
extern "C" { extern "C" {
#include "MQTTAsync.h" #include "MQTTAsync.h"
} }
...@@ -25,6 +26,16 @@ void on_subscribed(void* context, MQTTAsync_successData* response); ...@@ -25,6 +26,16 @@ void on_subscribed(void* context, MQTTAsync_successData* response);
void on_conn_fail(void* context, MQTTAsync_failureData* response); void on_conn_fail(void* context, MQTTAsync_failureData* response);
void on_subscribed_fail(void* context, MQTTAsync_failureData* response); void on_subscribed_fail(void* context, MQTTAsync_failureData* response);
void on_sent(void* context, MQTTAsync_successData* response); void on_sent(void* context, MQTTAsync_successData* response);
void on_unsubscribe_fail(void* context, MQTTAsync_failureData* response);
void on_unsubscribe_ok(void* context, MQTTAsync_successData* response);
class MqttHelper;
typedef void (*on_res_fun_ptr_t)(void *res);
typedef void (*on_msg_fun_ptr_t)(MqttHelper *hlp, const void * const data, int len, string topic);
template<typename R>
bool is_ready(std::future<R> const& f)
{ return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready; }
struct StrException : public std::exception struct StrException : public std::exception
{ {
...@@ -57,27 +68,28 @@ class MqttPub { ...@@ -57,27 +68,28 @@ class MqttPub {
if ((rc = MQTTAsync_sendMessage(client, topic.c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_sendMessage(client, topic.c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{ {
string msg = fmt::format("Failed to start sendMessage: {}", MQTTAsync_strerror(rc)); string msg = fmt::format("Failed to start sendMessage: {}", MQTTAsync_strerror(rc));
spdlog::info(msg); spdlog::error(msg);
throw msg; throw msg;
} }
return 0; return 0;
} }
/// alias for pub
int operator()(const char *msg, int len, int qos = 0, bool retained = false){
return pub(msg, len, qos, retained);
}
~MqttPub(){ ~MqttPub(){
} }
}; };
typedef void (*on_res_fun_ptr_t)(void *res);
typedef void (*on_msg_fun_ptr_t)(MqttPub pub, const void * const data, int len/*, on_res_fun_ptr_t success=nullptr*/);
class MqttHelper { class MqttHelper {
private: private:
string mqtt_addr;
map<string, on_msg_fun_ptr_t> topics; map<string, on_msg_fun_ptr_t> topics;
recursive_mutex mut; recursive_mutex mut;
protected: protected:
public: public:
string addr;
MQTTAsync client; MQTTAsync client;
enum class State{ enum class State{
None, None,
...@@ -86,20 +98,28 @@ class MqttHelper { ...@@ -86,20 +98,28 @@ class MqttHelper {
ConnFail, ConnFail,
SubscribeOK, SubscribeOK,
SubScribeFail, SubScribeFail,
UnsubscribeOk,
UnsubscribeFail,
Disconnected, Disconnected,
Destroyed Destroyed
}; };
struct AsyncResult {
State state;
int rc;
void *extra;
};
/// State state = State::None; // 0: initial ; 1: ready; 2: disconnected; 3: destroyed /// State state = State::None; // 0: initial ; 1: ready; 2: disconnected; 3: destroyed
promise<State> state; promise<AsyncResult> state;
MqttHelper(string mqtt_addr, string id, int kai = 20, int cs = 1): mqtt_addr(mqtt_addr){ MqttHelper(string addr, string id, int kai = 20, int cs = 1): addr(addr){
// make connection, throw excpetions // make connection, throw excpetions
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc; int rc;
int ch; int ch;
string msg; string msg;
if(MQTTASYNC_SUCCESS != MQTTAsync_create(&client, mqtt_addr.c_str(), id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL)){ if(MQTTASYNC_SUCCESS != MQTTAsync_create(&client, addr.c_str(), id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL)){
msg = "failed to async create mqtt"; msg = "failed to async create mqtt";
spdlog::error(msg); spdlog::error(msg);
throw StrException(msg); throw StrException(msg);
...@@ -115,7 +135,7 @@ class MqttHelper { ...@@ -115,7 +135,7 @@ class MqttHelper {
conn_opts.onSuccess = on_connected; conn_opts.onSuccess = on_connected;
conn_opts.onFailure = on_conn_fail; conn_opts.onFailure = on_conn_fail;
conn_opts.context = this; conn_opts.context = this;
spdlog::info("trying to connect"); spdlog::info("trying to connect to {}", addr);
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{ {
msg = fmt::format("Failed to start connect: {}", MQTTAsync_strerror(rc)); msg = fmt::format("Failed to start connect: {}", MQTTAsync_strerror(rc));
...@@ -123,29 +143,54 @@ class MqttHelper { ...@@ -123,29 +143,54 @@ class MqttHelper {
throw StrException(msg); throw StrException(msg);
} }
State st = state.get_future().get(); auto ar = state.get_future().get();
if(st != State::Ready){ if(ar.state != State::Ready){
spdlog::error("failed to initilaze mqtt"); string msg = fmt::format("ailed to initilaze mqtt: {}", MQTTAsync_strerror(ar.rc));
spdlog::error(msg);
throw StrException(msg);
}else{ }else{
spdlog::info("initialze mqtt successfully"); spdlog::info("initialze mqtt {} successfully", addr);
} }
state = promise<State>();
// reset state
state = promise<AsyncResult>();
} }
// can be called only state // can be called only state
int subscribe(string topic, on_msg_fun_ptr_t on_msg/*, on_res_fun_ptr_t on_success = nullptr, on_res_fun_ptr_t on_failure = nullptr*/){ int subscribe(string topic, on_msg_fun_ptr_t on_msg, bool force = false){
// assert(state != State::None); // make sure other thread can't access state while this is in working
lock_guard<recursive_mutex> lk(mut); lock_guard<recursive_mutex> lk(mut);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc; int rc;
// if(topics.contains(topic)){
// MQTTAsync_unsubscribe(topic);
// }
if(topics.count(topic) > 0){ if(topics.count(topic) > 0){
spdlog::warn("already subscribed to {}", topic); string msg = fmt::format("already subscribed to {}", topic, addr);
return 0; if(force){
MQTTAsync_responseOptions opts;
opts.context = this;
opts.onFailure = on_unsubscribe_fail;
opts.onSuccess = on_unsubscribe_ok;
MQTTAsync_unsubscribe(client, topic.c_str(), &opts);
auto st = state.get_future().get();
state= promise<AsyncResult>();
if(st.state == State::UnsubscribeOk) {
msg += ", forced uninstall ok! ";
spdlog::info(msg);
}else if(st.state == State::UnsubscribeFail){
msg += ", froce unsintall failed! " + string(MQTTAsync_strerror(st.rc));
spdlog::error(msg);
return -1;
}else{
spdlog::error("unkown uninstall status");
}
}else{
msg += ", ingored since force = false";
spdlog::warn(msg);
return 0;
}
} }
topics[topic] = on_msg; topics[topic] = on_msg;
opts.onSuccess = on_subscribed; opts.onSuccess = on_subscribed;
opts.onFailure = on_subscribed_fail; opts.onFailure = on_subscribed_fail;
...@@ -153,25 +198,34 @@ class MqttHelper { ...@@ -153,25 +198,34 @@ class MqttHelper {
if ((rc = MQTTAsync_subscribe(client, topic.c_str(), 1, &opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_subscribe(client, topic.c_str(), 1, &opts)) != MQTTASYNC_SUCCESS)
{ {
spdlog::error("failed to subscribe {}: {}",topic, MQTTAsync_strerror(rc)); spdlog::error("failed to subscribe {}: {} at {}",topic, MQTTAsync_strerror(rc), addr);
}else{ }else{
spdlog::info("subscribe {} success", topic); spdlog::info("subscribe to {} success at {}", topic, addr);
} }
State st = state.get_future().get(); auto st = state.get_future().get();
// TODO; // TODO
state= promise<State>(); if(st.state == State::SubscribeOK) {
}else if(st.state == State::SubScribeFail){
string msg = fmt::format("failed to subscribe topic {}: {}", topic, MQTTAsync_strerror(st.rc));
spdlog::error(msg);
throw StrException(msg);
}
// reset state
state= promise<AsyncResult>();
return 0; return 0;
} }
/// for create pub topic
MqttPub operator[](string topic){ MqttPub operator[](string topic){
MqttPub pub(client, topic); MqttPub pub(client, topic);
return pub; return pub;
} }
/// process messages when on_msgarrvd
int operator()(string topic, const void* const data, int len) { int operator()(string topic, const void* const data, int len) {
if(topics.count(topic) >0 && topics[topic] != nullptr) { if(topics.count(topic) >0 && topics[topic] != nullptr) {
topics[topic](std::move(MqttPub(client, topic)), data, len); topics[topic](this, data, len, topic);
}else{ }else{
spdlog::warn("no handler installed for topic: {}", topic); spdlog::warn("no handler installed for topic: {}", topic);
} }
...@@ -185,10 +239,14 @@ class MqttHelper { ...@@ -185,10 +239,14 @@ class MqttHelper {
opts.onSuccess = on_disconn; opts.onSuccess = on_disconn;
opts.context = this; opts.context = this;
state = promise<State>(); // TODO: how to avoid the pending future state?
auto fu = state.get_future();
// wait for all other future state finished
while(is_ready(fu) && fu.valid());
state = promise<AsyncResult>();
if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{ {
spdlog::info("Failed to start disconnect: {}", MQTTAsync_strerror(rc)); spdlog::error("failed to start disconnect: {}", MQTTAsync_strerror(rc));
} }
state.get_future().get(); state.get_future().get();
...@@ -205,7 +263,7 @@ void on_connlost(void *context, char *cause) ...@@ -205,7 +263,7 @@ void on_connlost(void *context, char *cause)
spdlog::error("mqtt connection lost: {}", cause? cause: "unkown reason"); spdlog::error("mqtt connection lost: {}", cause? cause: "unkown reason");
spdlog::info("reconnecting"); spdlog::warn("reconnecting {}", self->addr);
conn_opts.keepAliveInterval = 20; conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1; conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(self->client, &conn_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_connect(self->client, &conn_opts)) != MQTTASYNC_SUCCESS)
...@@ -222,13 +280,12 @@ int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *mes ...@@ -222,13 +280,12 @@ int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *mes
char* payloadptr; char* payloadptr;
MqttHelper *self = (MqttHelper *) context; MqttHelper *self = (MqttHelper *) context;
spdlog::debug("new messge at {}:", topic);
string msg; string msg;
msg.resize(message->payloadlen + 1); msg.resize(message->payloadlen + 1);
payloadptr = (char *)message->payload; payloadptr = (char *)message->payload;
memcpy(msg.data(), message->payload, message->payloadlen); memcpy(msg.data(), message->payload, message->payloadlen);
msg.data()[message->payloadlen] = 0; msg.data()[message->payloadlen] = 0;
spdlog::debug("\t{}", msg); spdlog::debug("new message: {}, at {}", msg, topic);
(*self)(string(topic), message->payload, message->payloadlen); (*self)(string(topic), message->payload, message->payloadlen);
MQTTAsync_freeMessage(&message); MQTTAsync_freeMessage(&message);
MQTTAsync_free(topic); MQTTAsync_free(topic);
...@@ -243,67 +300,132 @@ void on_disconn(void* context, MQTTAsync_successData* response) ...@@ -243,67 +300,132 @@ void on_disconn(void* context, MQTTAsync_successData* response)
// if(self->state.get_future().valid()){ // if(self->state.get_future().valid()){
// self->state = std::promise<MqttHelper::State>(); // self->state = std::promise<MqttHelper::State>();
// } // }
self->state.set_value(MqttHelper::State::Disconnected);
spdlog::info("successful disconnection"); MqttHelper::AsyncResult as;
as.state = MqttHelper::State::Disconnected;
self->state.set_value(as);
spdlog::info("successful disconnected {}", self->addr);
} }
void on_subscribed(void* context, MQTTAsync_successData* response) void on_subscribed(void* context, MQTTAsync_successData* response)
{ {
MqttHelper *self = (MqttHelper *) context; MqttHelper *self = (MqttHelper *) context;
spdlog::info("Subscribe succeeded"); //spdlog::debug("subscribe succeeded");
self->state.set_value(MqttHelper::State::SubscribeOK);
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::SubscribeOK;
self->state.set_value(as);
} }
void on_subscribed_fail(void* context, MQTTAsync_failureData* response) void on_subscribed_fail(void* context, MQTTAsync_failureData* response)
{ {
MqttHelper *self = (MqttHelper *) context; MqttHelper *self = (MqttHelper *) context;
string msg = fmt::format("Subscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0)); string msg = fmt::format("Subscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::error(msg); // spdlog::error(msg);
//self->state.set_exception(StrException(msg)); MqttHelper::AsyncResult as;
self->state.set_value(MqttHelper::State::SubScribeFail); as.state = MqttHelper::State::SubScribeFail;
as.rc = response?response->code:0;
self->state.set_value(as);
}
void on_unsubscribe_ok(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::UnsubscribeOk;
self->state.set_value(as);
}
void on_unsubscribe_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
string msg = fmt::format("unsubscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::error(msg);
MqttHelper::AsyncResult as;
as.state = MqttHelper::State::UnsubscribeFail;
as.rc = response?response->code:0;
self->state.set_value(as);
} }
void on_conn_fail(void* context, MQTTAsync_failureData* response) void on_conn_fail(void* context, MQTTAsync_failureData* response)
{ {
MqttHelper *self = (MqttHelper *) context; MqttHelper *self = (MqttHelper *) context;
string msg = fmt::format("Connect failed: {}", MQTTAsync_strerror(response ? response->code : 0)); string msg = fmt::format("Connect failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::info(msg); spdlog::error(msg);
//self->state.set_exception(StrException(msg)); //self->state.set_exception(StrException(msg));
self->state.set_value(MqttHelper::State::ConnFail); MqttHelper::AsyncResult as;
as.state = MqttHelper::State::ConnFail;
as.rc = response?response->code:0;
self->state.set_value(as);
} }
void on_sent(void* context, MQTTAsync_successData* response) void on_sent(void* context, MQTTAsync_successData* response)
{ {
MqttHelper *self = (MqttHelper *) context; MqttHelper *self = (MqttHelper *) context;
spdlog::info("Message with token value {} delivery confirmed", response->token); spdlog::debug("Message delivery confirmed to {}", response->alt.pub.destinationName);
} }
void on_connected(void* context, MQTTAsync_successData* response) void on_connected(void* context, MQTTAsync_successData* response)
{ {
MqttHelper *self = (MqttHelper *) context; MqttHelper *self = (MqttHelper *) context;
// if(self->state.get_future().valid()){ MqttHelper::AsyncResult as;
// self->state = std::promise<MqttHelper::State>(); as.state = MqttHelper::State::Ready;
// } self->state.set_value(as);
self->state.set_value(MqttHelper::State::Ready); spdlog::info("connected to mqtt {}", self->addr);
spdlog::info("connected");
} }
#define DEBUG #define DEBUG
#ifdef DEBUG #ifdef DEBUG
void my_on_msg1(MqttHelper *hlp, const void * const data, int len, string topic){
string msg;
msg.resize(len+1);
memcpy(msg.data(), data, len);
msg.data()[len] = 0;
spdlog::info("I1 recv on {}: {}", topic, msg);
(*hlp)[to_string(atoi(topic.data()) + 1)]("hello ack", 10);
}
void my_on_msg2(MqttHelper *hlp, const void * const data, int len, string topic){
string msg;
msg.resize(len+1);
memcpy(msg.data(), data, len);
msg.data()[len] = 0;
spdlog::info("I2 recv on {}: {}", topic, msg);
(*hlp)[to_string(atoi(topic.data()) + 1)]("hello ack", 10);
}
int main(){ int main(){
spdlog::set_level(spdlog::level::debug);
{ {
MqttHelper hlp("tcp://evcloud.ilabservice.cloud:1883", "testaabbc"); MqttHelper hlp("tcp://evcloud.ilabservice.cloud:1883", "testaabbc");
// while(hlp.state == MqttHelper::State::None){
// spdlog::info("not state");
// this_thread::sleep_for(chrono::milliseconds(200));
// }
hlp["hello"].pub("hello", 6, 1); hlp["hello"].pub("hello", 6, 1);
hlp.subscribe("bbbb", NULL); hlp.subscribe("no_on_msg", NULL);
auto x = thread([&hlp]{
for(int i = 0; i < 20; i ++) {
hlp.subscribe(to_string(i), my_on_msg1);
}
hlp["hello"].pub("hello1", 7, 1);
});
auto y = thread([&hlp]{
for(int i = 0; i < 10; i ++) {
hlp.subscribe(to_string(i), my_on_msg2);
}
for(int i = 10; i < 30; i ++) {
hlp.subscribe(to_string(i), my_on_msg2, true);
}
hlp["hello"].pub("hello2", 7, 1);
});
x.join();
y.join();
this_thread::sleep_for(chrono::seconds(100));
} }
this_thread::sleep_for(chrono::seconds(4)); this_thread::sleep_for(chrono::seconds(2));
return 0; return 0;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论