提交 e987496a authored 作者: blu's avatar blu

mqtt api: future style

上级 00fb7c60
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include <set> #include <set>
#include <map> #include <map>
#include <future> #include <future>
#include <mutex>
extern "C" { extern "C" {
#include "MQTTAsync.h" #include "MQTTAsync.h"
...@@ -74,12 +75,17 @@ class MqttHelper { ...@@ -74,12 +75,17 @@ class MqttHelper {
private: private:
string mqtt_addr; string mqtt_addr;
map<string, on_msg_fun_ptr_t> topics; map<string, on_msg_fun_ptr_t> topics;
recursive_mutex mut;
protected: protected:
public: public:
MQTTAsync client; MQTTAsync client;
enum class State{ enum class State{
None, None,
Ready, Ready,
ConnOK,
ConnFail,
SubscribeOK,
SubScribeFail,
Disconnected, Disconnected,
Destroyed Destroyed
}; };
...@@ -130,6 +136,7 @@ class MqttHelper { ...@@ -130,6 +136,7 @@ class MqttHelper {
// can be called only state // can be called only state
int subscribe(string topic, on_msg_fun_ptr_t on_msg/*, on_res_fun_ptr_t on_success = nullptr, on_res_fun_ptr_t on_failure = nullptr*/){ int subscribe(string topic, on_msg_fun_ptr_t on_msg/*, on_res_fun_ptr_t on_success = nullptr, on_res_fun_ptr_t on_failure = nullptr*/){
// assert(state != State::None); // assert(state != State::None);
lock_guard<recursive_mutex> lk(mut);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc; int rc;
// if(topics.contains(topic)){ // if(topics.contains(topic)){
...@@ -150,6 +157,10 @@ class MqttHelper { ...@@ -150,6 +157,10 @@ class MqttHelper {
}else{ }else{
spdlog::info("subscribe {} success", topic); spdlog::info("subscribe {} success", topic);
} }
State st = state.get_future().get();
// TODO;
state= promise<State>();
return 0; return 0;
} }
...@@ -241,6 +252,7 @@ void on_subscribed(void* context, MQTTAsync_successData* response) ...@@ -241,6 +252,7 @@ void on_subscribed(void* context, MQTTAsync_successData* response)
{ {
MqttHelper *self = (MqttHelper *) context; MqttHelper *self = (MqttHelper *) context;
spdlog::info("Subscribe succeeded"); spdlog::info("Subscribe succeeded");
self->state.set_value(MqttHelper::State::SubscribeOK);
} }
void on_subscribed_fail(void* context, MQTTAsync_failureData* response) void on_subscribed_fail(void* context, MQTTAsync_failureData* response)
...@@ -249,6 +261,7 @@ void on_subscribed_fail(void* context, MQTTAsync_failureData* response) ...@@ -249,6 +261,7 @@ void on_subscribed_fail(void* context, MQTTAsync_failureData* response)
string msg = fmt::format("Subscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0)); string msg = fmt::format("Subscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::error(msg); spdlog::error(msg);
//self->state.set_exception(StrException(msg)); //self->state.set_exception(StrException(msg));
self->state.set_value(MqttHelper::State::SubScribeFail);
} }
void on_conn_fail(void* context, MQTTAsync_failureData* response) void on_conn_fail(void* context, MQTTAsync_failureData* response)
...@@ -257,6 +270,7 @@ void on_conn_fail(void* context, MQTTAsync_failureData* response) ...@@ -257,6 +270,7 @@ void on_conn_fail(void* context, MQTTAsync_failureData* response)
string msg = fmt::format("Connect failed: {}", MQTTAsync_strerror(response ? response->code : 0)); string msg = fmt::format("Connect failed: {}", MQTTAsync_strerror(response ? response->code : 0));
spdlog::info(msg); spdlog::info(msg);
//self->state.set_exception(StrException(msg)); //self->state.set_exception(StrException(msg));
self->state.set_value(MqttHelper::State::ConnFail);
} }
void on_sent(void* context, MQTTAsync_successData* response) void on_sent(void* context, MQTTAsync_successData* response)
...@@ -273,25 +287,6 @@ void on_connected(void* context, MQTTAsync_successData* response) ...@@ -273,25 +287,6 @@ void on_connected(void* context, MQTTAsync_successData* response)
// } // }
self->state.set_value(MqttHelper::State::Ready); self->state.set_value(MqttHelper::State::Ready);
spdlog::info("connected"); spdlog::info("connected");
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
// if(topics.contains(topic)){
// MQTTAsync_unsubscribe(topic);
// }
string topic = "test.direct.topic";
opts.onSuccess = on_subscribed;
opts.onFailure = on_subscribed_fail;
opts.context = self;
if ((rc = MQTTAsync_subscribe(self->client, topic.c_str(), 1, &opts)) != MQTTASYNC_SUCCESS)
{
spdlog::info("Failed to start subscribe: {}", MQTTAsync_strerror(rc));
}else{
spdlog::info("successfully subscribed the test direct topic: {}", topic);
}
} }
#define DEBUG #define DEBUG
...@@ -306,10 +301,9 @@ int main(){ ...@@ -306,10 +301,9 @@ int main(){
// } // }
hlp["hello"].pub("hello", 6, 1); hlp["hello"].pub("hello", 6, 1);
hlp.subscribe("bbbb", NULL); hlp.subscribe("bbbb", NULL);
this_thread::sleep_for(chrono::seconds(4));
} }
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::seconds(4));
return 0; return 0;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论