提交 00fb7c60 authored 作者: blu's avatar blu

mqtt api: future style

上级 74dfdf95
......@@ -8,6 +8,7 @@
#include <thread>
#include <set>
#include <map>
#include <future>
extern "C" {
#include "MQTTAsync.h"
......@@ -54,9 +55,11 @@ class MqttPub {
if ((rc = MQTTAsync_sendMessage(client, topic.c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage %d\n", rc);
exit(EXIT_FAILURE);
string msg = fmt::format("Failed to start sendMessage: {}", MQTTAsync_strerror(rc));
spdlog::info(msg);
throw msg;
}
return 0;
}
~MqttPub(){
......@@ -81,7 +84,8 @@ class MqttHelper {
Destroyed
};
State state = State::None; // 0: initial ; 1: ready; 2: disconnected; 3: destroyed
/// State state = State::None; // 0: initial ; 1: ready; 2: disconnected; 3: destroyed
promise<State> state;
MqttHelper(string mqtt_addr, string id, int kai = 20, int cs = 1): mqtt_addr(mqtt_addr){
// make connection, throw excpetions
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
......@@ -112,11 +116,20 @@ class MqttHelper {
spdlog::error(msg);
throw StrException(msg);
}
State st = state.get_future().get();
if(st != State::Ready){
spdlog::error("failed to initilaze mqtt");
}else{
spdlog::info("initialze mqtt successfully");
}
state = promise<State>();
}
// can be called only state
int subscribe(string topic, on_msg_fun_ptr_t on_msg/*, on_res_fun_ptr_t on_success = nullptr, on_res_fun_ptr_t on_failure = nullptr*/){
assert(state != State::None);
// assert(state != State::None);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
// if(topics.contains(topic)){
......@@ -137,6 +150,7 @@ class MqttHelper {
}else{
spdlog::info("subscribe {} success", topic);
}
return 0;
}
MqttPub operator[](string topic){
......@@ -150,6 +164,7 @@ class MqttHelper {
}else{
spdlog::warn("no handler installed for topic: {}", topic);
}
return 0;
}
......@@ -159,11 +174,13 @@ class MqttHelper {
opts.onSuccess = on_disconn;
opts.context = this;
state = promise<State>();
if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{
spdlog::info("Failed to start disconnect: {}", MQTTAsync_strerror(rc));
}
state.get_future().get();
MQTTAsync_destroy(&client);
}
......@@ -175,16 +192,16 @@ void on_connlost(void *context, char *cause)
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
if (cause)
printf(" cause: %s\n", cause);
spdlog::error("mqtt connection lost: {}", cause? cause: "unkown reason");
printf("Reconnecting\n");
spdlog::info("reconnecting");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(self->client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
string msg = fmt::format("Failed to start connect:", MQTTAsync_strerror(rc));
spdlog::error(msg);
//self->state.set_exception(StrException(msg));
}
}
......@@ -194,17 +211,13 @@ int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *mes
char* payloadptr;
MqttHelper *self = (MqttHelper *) context;
printf("Message arrived\n");
printf(" topic: %s\n", topic);
printf(" message: ");
spdlog::debug("new messge at {}:", topic);
string msg;
msg.resize(message->payloadlen + 1);
payloadptr = (char *)message->payload;
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
memcpy(msg.data(), message->payload, message->payloadlen);
msg.data()[message->payloadlen] = 0;
spdlog::debug("\t{}", msg);
(*self)(string(topic), message->payload, message->payloadlen);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topic);
......@@ -215,40 +228,50 @@ int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *mes
void on_disconn(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
self->state = MqttHelper::State::Disconnected;
printf("Successful disconnection\n");
// self->state = MqttHelper::State::Disconnected;
// if(self->state.get_future().valid()){
// self->state = std::promise<MqttHelper::State>();
// }
self->state.set_value(MqttHelper::State::Disconnected);
spdlog::info("successful disconnection");
}
void on_subscribed(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
printf("Subscribe succeeded\n");
spdlog::info("Subscribe succeeded");
}
void on_subscribed_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
spdlog::error("Subscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
string msg = fmt::format("Subscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::error(msg);
//self->state.set_exception(StrException(msg));
}
void on_conn_fail(void* context, MQTTAsync_failureData* response)
{
MqttHelper *self = (MqttHelper *) context;
printf("Connect failed, rc %d\n", response ? response->code : 0);
string msg = fmt::format("Connect failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::info(msg);
//self->state.set_exception(StrException(msg));
}
void on_sent(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
printf("Message with token value %d delivery confirmed\n", response->token);
spdlog::info("Message with token value {} delivery confirmed", response->token);
}
void on_connected(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
self->state = MqttHelper::State::Ready;
// if(self->state.get_future().valid()){
// self->state = std::promise<MqttHelper::State>();
// }
self->state.set_value(MqttHelper::State::Ready);
spdlog::info("connected");
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
......@@ -265,8 +288,7 @@ void on_connected(void* context, MQTTAsync_successData* response)
if ((rc = MQTTAsync_subscribe(self->client, topic.c_str(), 1, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
spdlog::info("Failed to start subscribe: {}", MQTTAsync_strerror(rc));
}else{
spdlog::info("successfully subscribed the test direct topic: {}", topic);
}
......@@ -278,21 +300,20 @@ void on_connected(void* context, MQTTAsync_successData* response)
int main(){
{
MqttHelper hlp("tcp://evcloud.ilabservice.cloud:1883", "testaabbc");
while(hlp.state == MqttHelper::State::None){
spdlog::info("not state");
this_thread::sleep_for(chrono::milliseconds(200));
}
// while(hlp.state == MqttHelper::State::None){
// spdlog::info("not state");
// this_thread::sleep_for(chrono::milliseconds(200));
// }
hlp["hello"].pub("hello", 6, 1);
hlp.subscribe("bbbb", NULL);
this_thread::sleep_for(chrono::seconds(5));
this_thread::sleep_for(chrono::seconds(4));
}
this_thread::sleep_for(chrono::seconds(5));
this_thread::sleep_for(chrono::seconds(3));
return 0;
}
#endif
#endif
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论