提交 873bc09f authored 作者: blu's avatar blu

mqtt test

上级 715e60fd
mqtt_helper.hpp
\ No newline at end of file
#ifndef __MQTT_HELPER__
#define __MQTT_HELPER__
#include <spdlog/spdlog.h>
#include <fmt/format.h>
#include <string>
#include <chrono>
#include <thread>
#include <set>
#include <map>
extern "C" {
#include "MQTTAsync.h"
}
using namespace std;
void on_connected(void* context, MQTTAsync_successData* response);
void on_connlost(void *context, char *cause);
int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *message);
void on_disconn(void* context, MQTTAsync_successData* response);
void on_subscribed(void* context, MQTTAsync_successData* response);
void on_conn_fail(void* context, MQTTAsync_failureData* response);
void on_subscribed_fail(void* context, MQTTAsync_failureData* response);
void on_sent(void* context, MQTTAsync_successData* response);
struct StrException : public std::exception
{
std::string s;
StrException(std::string ss) : s(ss) {}
~StrException() throw () {} // Updated
const char* what() const throw() { return s.c_str(); }
};
class MqttPub {
private:
MQTTAsync client = nullptr;
string topic;
protected:
public:
MqttPub(MQTTAsync client, string topic):client(client), topic(topic){}
int pub(const char *msg, int len, int qos = 0, bool retained = false){
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
opts.onSuccess = on_sent;
opts.context = client;
pubmsg.payload = (void *)msg;
pubmsg.payloadlen =len;
pubmsg.qos = qos;
pubmsg.retained = retained;
if ((rc = MQTTAsync_sendMessage(client, topic.c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage %d\n", rc);
exit(EXIT_FAILURE);
}
}
~MqttPub(){
}
};
typedef void (*on_res_fun_ptr_t)(void *res);
typedef void (*on_msg_fun_ptr_t)(MqttPub pub, const void * const data, int len/*, on_res_fun_ptr_t success=nullptr*/);
class MqttHelper {
private:
string mqtt_addr;
map<string, on_msg_fun_ptr_t> topics;
MQTTAsync client;
protected:
public:
int state = 0; // 0: initial ; 1: ready; 2: disconnected; 3: destroyed
MqttHelper(string mqtt_addr, string id, int kai = 20, int cs = 1): mqtt_addr(mqtt_addr){
// make connection, throw excpetions
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc;
int ch;
if(MQTTASYNC_SUCCESS != MQTTAsync_create(&client, mqtt_addr.c_str(), id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL)){
spdlog::error("failed to async create mqtt");
//TODO
}
if(MQTTASYNC_SUCCESS != MQTTAsync_setCallbacks(client, this, on_connlost, on_msgarrvd, NULL)){
spdlog::error("failed to async setcallbacks");
//TODO
}
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = on_connected;
conn_opts.onFailure = on_conn_fail;
conn_opts.context = this;
spdlog::info("trying to connect");
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
// can be called only state
int subscribe(string topic, on_msg_fun_ptr_t on_msg/*, on_res_fun_ptr_t on_success = nullptr, on_res_fun_ptr_t on_failure = nullptr*/){
assert(state);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
// if(topics.contains(topic)){
// MQTTAsync_unsubscribe(topic);
// }
if(topics.count(topic) > 0){
return 0;
}
topics[topic] = on_msg;
opts.onSuccess = on_subscribed;
opts.onFailure = on_subscribed_fail;
opts.context = client;
if ((rc = MQTTAsync_subscribe(client, topic.c_str(), 1, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
MqttPub operator [](string topic){
MqttPub pub(client, topic);
return pub;
}
int operator()(string topic, const void* const data, int len) {
if(topics.count(topic) >0) {
topics[topic](std::move(MqttPub(client, topic)), data, len);
}else{
spdlog::warn("no handler installed for topic: {}", topic);
}
}
~MqttHelper(){
int rc;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
MQTTAsync_destroy(&client);
}
};
void on_connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
if (cause)
printf(" cause: %s\n", cause);
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
}
}
int on_msgarrvd(void *context, char *topic, int topicLen, MQTTAsync_message *message)
{
int i;
char* payloadptr;
MqttHelper *self = (MqttHelper *) context;
printf("Message arrived\n");
printf(" topic: %s\n", topic);
printf(" message: ");
payloadptr = (char *)message->payload;
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
(*self)(string(topic), message->payload, message->payloadlen);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topic);
return 1;
}
void on_disconn(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
MqttHelper *self = (MqttHelper *) context;
self->state = 2;
}
void on_subscribed(void* context, MQTTAsync_successData* response)
{
printf("Subscribe succeeded\n");
}
void on_subscribed_fail(void* context, MQTTAsync_failureData* response)
{
printf("Subscribe failed, rc %d\n", response ? response->code : 0);
}
void on_conn_fail(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
}
void on_sent(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
printf("Message with token value %d delivery confirmed\n", response->token);
opts.onSuccess = on_disconn;
opts.context = client;
if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
void on_connected(void* context, MQTTAsync_successData* response)
{
MqttHelper *self = (MqttHelper *) context;
self->state = 1;
spdlog::info("connected");
}
#define DEBUG
#ifdef DEBUG
int main(){
MqttHelper hlp("tcp://evcloud.ilabservice.cloud:1883", "testaabbc");
while(!hlp.state){
spdlog::info("not state");
this_thread::sleep_for(chrono::seconds(3));
}
hlp["hello"].pub("hello", 6, 1);
hlp.subscribe("bbbb", NULL);
this_thread::sleep_for(chrono::seconds(10));
return 0;
}
#endif
#endif
\ No newline at end of file
paho.mqtt.c @ fbf98282
Subproject commit fbf9828200f46e212189d98eaedf8e11281e409a
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论