提交 81a400a1 authored 作者: zxd's avatar zxd

发送emqq

上级 214934c7
......@@ -44,4 +44,5 @@
}
]
}
]
\ No newline at end of file
]
# encoding=utf8
import json
import random
import threading
########################
# 虚拟终端上报数据脚本
########################
import paho.mqtt.client as mqtt
import json
import sys
import os
import time
from paho.mqtt import client as mqtt_client
sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
sys.path.append("..")
client_id = f''
topic = "/a1hjFmQV0zo"
......@@ -46,6 +50,23 @@ telemetryTemplate = """
}}
"""
cmdResponse = """
{{
"devSN": "{devSn}",
"data": {{
"cmd": "{cmd}",
"status": "OK"
}},
"time": {time},
"id": "{id}",
"mqttTopic": "{topic}",
"type": "response",
"deviceName": "{devSn1}",
"group": "scione5",
"middleware": "hub"
}}
"""
def getTelemetry(param):
t = time.time()
......@@ -65,7 +86,7 @@ def connect_mqtt(ip, port):
if rc != 0:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client = mqtt.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
......@@ -74,6 +95,35 @@ def connect_mqtt(ip, port):
return client
# 连接MQTT服务器
def on_mqtt_connect(serverClient, ip, port):
serverClient.connect(ip, port)
serverClient.loop_start()
# 消息处理函数
def on_message_come(lient, userdata, msg):
print(msg.topic + " " + ":" + str(msg.payload))
msgJson = json.loads(msg.payload)
cmd = msgJson.get("cmd")
devSn = msgJson.get("devSN")
id = msgJson.get("id")
responseTopic = msg.topic + "/response"
info = cmdResponse.format(devSn=devSn, cmd=cmd, time=str(time.time()), id=id, topic=responseTopic, devSn1=devSn)
on_publish(lient, responseTopic, info, 1)
def on_publish(lient, topic, payload, qos):
lient.publish(topic, payload, qos)
# subscribe 消息
def on_subscribe(serverClient, serverTopic):
serverClient.subscribe(serverTopic, 1)
serverClient.on_message = on_message_come # 消息到来处理函数
def publish(client, msg, topicSn):
result = client.publish(topicSn, msg)
# result: [0, 1]
......@@ -84,8 +134,14 @@ def publish(client, msg, topicSn):
print(f"Failed to send message to topic {topicSn}")
def run(ip, port, msg, topicSn):
def run(ip, port, msg, topicSn, sn):
# 订阅
serverClient = connect_mqtt(ip=ip, port=port)
on_mqtt_connect(serverClient=serverClient, ip=ip, port=port)
on_subscribe(serverClient, serverTopic=topic + "/" + sn + "/user/group/cmd")
client = connect_mqtt(ip=ip, port=port)
client.loop_start()
publish(client, msg, topicSn)
......@@ -98,7 +154,7 @@ def test(data):
cmdJson = json.loads(cmd)
topicSn = cmdJson.get("mqttTopic")
# print(cmd)
run(data.get("emq_host"), data.get("emq_port"), cmd, topicSn)
run(data.get("emq_host"), data.get("emq_port"), cmd, topicSn, data.get("serial_no"))
time.sleep(10)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论