提交 ddf5eabc authored 作者: blu's avatar blu

mqtt api

上级 873bc09f
...@@ -73,10 +73,17 @@ class MqttHelper { ...@@ -73,10 +73,17 @@ class MqttHelper {
private: private:
string mqtt_addr; string mqtt_addr;
map<string, on_msg_fun_ptr_t> topics; map<string, on_msg_fun_ptr_t> topics;
MQTTAsync client;
protected: protected:
public: public:
int state = 0; // 0: initial ; 1: ready; 2: disconnected; 3: destroyed MQTTAsync client;
enum class State{
None,
Ready,
Disconnected,
Destroyed
};
State state = State::None; // 0: initial ; 1: ready; 2: disconnected; 3: destroyed
MqttHelper(string mqtt_addr, string id, int kai = 20, int cs = 1): mqtt_addr(mqtt_addr){ MqttHelper(string mqtt_addr, string id, int kai = 20, int cs = 1): mqtt_addr(mqtt_addr){
// make connection, throw excpetions // make connection, throw excpetions
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
...@@ -107,34 +114,36 @@ class MqttHelper { ...@@ -107,34 +114,36 @@ class MqttHelper {
// can be called only state // can be called only state
int subscribe(string topic, on_msg_fun_ptr_t on_msg/*, on_res_fun_ptr_t on_success = nullptr, on_res_fun_ptr_t on_failure = nullptr*/){ int subscribe(string topic, on_msg_fun_ptr_t on_msg/*, on_res_fun_ptr_t on_success = nullptr, on_res_fun_ptr_t on_failure = nullptr*/){
assert(state); assert(state != State::None);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc; int rc;
// if(topics.contains(topic)){ // if(topics.contains(topic)){
// MQTTAsync_unsubscribe(topic); // MQTTAsync_unsubscribe(topic);
// } // }
if(topics.count(topic) > 0){ if(topics.count(topic) > 0){
spdlog::warn("already subscribed to {}", topic);
return 0; return 0;
} }
topics[topic] = on_msg; topics[topic] = on_msg;
opts.onSuccess = on_subscribed; opts.onSuccess = on_subscribed;
opts.onFailure = on_subscribed_fail; opts.onFailure = on_subscribed_fail;
opts.context = client; opts.context = this;
if ((rc = MQTTAsync_subscribe(client, topic.c_str(), 1, &opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_subscribe(client, topic.c_str(), 1, &opts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start subscribe, return code %d\n", rc); spdlog::error("failed to subscribe {}: {}",topic, MQTTAsync_strerror(rc));
exit(EXIT_FAILURE); }else{
spdlog::info("subscribe {} success", topic);
} }
} }
MqttPub operator [](string topic){ MqttPub operator[](string topic){
MqttPub pub(client, topic); MqttPub pub(client, topic);
return pub; return pub;
} }
int operator()(string topic, const void* const data, int len) { int operator()(string topic, const void* const data, int len) {
if(topics.count(topic) >0) { if(topics.count(topic) >0 && topics[topic] != nullptr) {
topics[topic](std::move(MqttPub(client, topic)), data, len); topics[topic](std::move(MqttPub(client, topic)), data, len);
}else{ }else{
spdlog::warn("no handler installed for topic: {}", topic); spdlog::warn("no handler installed for topic: {}", topic);
...@@ -144,12 +153,13 @@ class MqttHelper { ...@@ -144,12 +153,13 @@ class MqttHelper {
~MqttHelper(){ ~MqttHelper(){
int rc; int rc;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
opts.onSuccess = on_disconn;
opts.context = this;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start disconnect, return code %d\n", rc); spdlog::info("Failed to start disconnect: {}", MQTTAsync_strerror(rc));
exit(EXIT_FAILURE);
} }
MQTTAsync_destroy(&client); MQTTAsync_destroy(&client);
...@@ -159,7 +169,7 @@ class MqttHelper { ...@@ -159,7 +169,7 @@ class MqttHelper {
void on_connlost(void *context, char *cause) void on_connlost(void *context, char *cause)
{ {
MQTTAsync client = (MQTTAsync)context; MqttHelper *self = (MqttHelper *) context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc; int rc;
...@@ -170,7 +180,7 @@ void on_connlost(void *context, char *cause) ...@@ -170,7 +180,7 @@ void on_connlost(void *context, char *cause)
printf("Reconnecting\n"); printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20; conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1; conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_connect(self->client, &conn_opts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start connect, return code %d\n", rc); printf("Failed to start connect, return code %d\n", rc);
} }
...@@ -202,66 +212,81 @@ int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *mes ...@@ -202,66 +212,81 @@ int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *mes
void on_disconn(void* context, MQTTAsync_successData* response) void on_disconn(void* context, MQTTAsync_successData* response)
{ {
printf("Successful disconnection\n");
MqttHelper *self = (MqttHelper *) context; MqttHelper *self = (MqttHelper *) context;
self->state = 2; self->state = MqttHelper::State::Disconnected;
printf("Successful disconnection\n");
} }
void on_subscribed(void* context, MQTTAsync_successData* response) void on_subscribed(void* context, MQTTAsync_successData* response)
{ {
MqttHelper *self = (MqttHelper *) context;
printf("Subscribe succeeded\n"); printf("Subscribe succeeded\n");
} }
void on_subscribed_fail(void* context, MQTTAsync_failureData* response) void on_subscribed_fail(void* context, MQTTAsync_failureData* response)
{ {
printf("Subscribe failed, rc %d\n", response ? response->code : 0); MqttHelper *self = (MqttHelper *) context;
spdlog::error("Subscribe failed: {}", MQTTAsync_strerror(response ? response->code : 0));
} }
void on_conn_fail(void* context, MQTTAsync_failureData* response) void on_conn_fail(void* context, MQTTAsync_failureData* response)
{ {
MqttHelper *self = (MqttHelper *) context;
printf("Connect failed, rc %d\n", response ? response->code : 0); printf("Connect failed, rc %d\n", response ? response->code : 0);
} }
void on_sent(void* context, MQTTAsync_successData* response) void on_sent(void* context, MQTTAsync_successData* response)
{ {
MQTTAsync client = (MQTTAsync)context; MqttHelper *self = (MqttHelper *) context;
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
printf("Message with token value %d delivery confirmed\n", response->token); printf("Message with token value %d delivery confirmed\n", response->token);
opts.onSuccess = on_disconn;
opts.context = client;
if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
} }
void on_connected(void* context, MQTTAsync_successData* response) void on_connected(void* context, MQTTAsync_successData* response)
{ {
MqttHelper *self = (MqttHelper *) context; MqttHelper *self = (MqttHelper *) context;
self->state = 1; self->state = MqttHelper::State::Ready;
spdlog::info("connected"); spdlog::info("connected");
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
// if(topics.contains(topic)){
// MQTTAsync_unsubscribe(topic);
// }
string topic = "test.direct.topic";
opts.onSuccess = on_subscribed;
opts.onFailure = on_subscribed_fail;
opts.context = self;
if ((rc = MQTTAsync_subscribe(self->client, topic.c_str(), 1, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}else{
spdlog::info("successfully subscribed the test direct topic: {}", topic);
}
} }
#define DEBUG #define DEBUG
#ifdef DEBUG #ifdef DEBUG
int main(){ int main(){
{
MqttHelper hlp("tcp://evcloud.ilabservice.cloud:1883", "testaabbc"); MqttHelper hlp("tcp://evcloud.ilabservice.cloud:1883", "testaabbc");
while(!hlp.state){ while(hlp.state == MqttHelper::State::None){
spdlog::info("not state"); spdlog::info("not state");
this_thread::sleep_for(chrono::seconds(3)); this_thread::sleep_for(chrono::milliseconds(200));
} }
hlp["hello"].pub("hello", 6, 1); hlp["hello"].pub("hello", 6, 1);
hlp.subscribe("bbbb", NULL); hlp.subscribe("bbbb", NULL);
this_thread::sleep_for(chrono::seconds(10)); this_thread::sleep_for(chrono::seconds(5));
}
this_thread::sleep_for(chrono::seconds(5));
return 0; return 0;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论