提交 8b3c36e5 authored 作者: blu's avatar blu

test script

上级 b677ec31
{ "cameraId": "D72154040", "endTime": 1577082467999, "image": "http://evcloudsvc.ilabservice.cloud/video/D72154040/1550139137000-1577082467999/firstFrame.jpg", "length": 129, "startTime": 1550139137000, "video": "http://evcloudsvc.ilabservice.cloud/video/D72154040/1550139137000-1577082467999/1550139137000-1577082467999.mp4"}
{ "cameraId": "D72154040", "endTime": 1577081728999, "image": "http://evcloudsvc.ilabservice.cloud/video/D72154040/1550139207000-1577081728999/firstFrame.jpg", "length": 61, "startTime": 1550139207000, "video": "http://evcloudsvc.ilabservice.cloud/video/D72154040/1550139207000-1577081728999/1550139207000-1577081728999.mp4"}
\ No newline at end of file
import pandas as pd
import paho.mqtt.client as mqtt
from cerberus import Validator
import os, logging, time, datetime, threading, json, re
import pdb,traceback, sys
class VideoAiTestor:
VA_SCHEMAS = {
'task': {
'cameraId': {'type': 'string', 'dependencies': ['startTime', 'endTime']},
'startTime': {'type': 'integer', 'dependencies':'cameraId'},
'endTime': {'type': 'integer', 'dependencies':'cameraId'},
}
}
TASK_VALIDATOR = Validator(VA_SCHEMAS['task'], allow_unknown=True)
MQTT_HOST=os.getenv('MQTT_HOST','evcloudsvc.ilabservice.cloud')
MQTT_PORT=int(os.getenv('MQTT_PORT', 11883))
@staticmethod
def on_connect(client, userdata, flags, rc):
print("connected with result code "+str(rc))
topic = 'video.ai/v1.0/result'
client.subscribe(topic, qos=1)
print('subscribed to ', topic)
@staticmethod
def on_message(client, userdata, msg):
payload = msg.payload.decode('utf-8')
if userdata:
try:
jd = json.loads(payload)
userdata.callback(jd)
except Exception as e:
print('exception in process message:', e)
def callback(self,v):
key = v['target']['cameraId'] + str(v['target']['startTime'])
if key in self.videos:
self.cnt -= 1
self.results[key] = v
print("got result for {}: {}".format(key, v))
def __init__(self,cfg_path):
self.cfg_path = cfg_path
self.videos = {}
self.results = {}
self.cnt = 0
with open(self.cfg_path) as f:
for idx,line in enumerate(f):
v = json.loads(line)
if not self.TASK_VALIDATOR.validate(v):
raise
key = v['cameraId'] + str(v['startTime'])
self.videos[key] = v
self.cnt += 1
print("loaded {} lines of videos json".format(self.cnt))
self.subc = mqtt.Client("vatstsub",userdata=self)
self.subc.on_connect = self.on_connect
self.subc.on_message = self.on_message
self.subc.connect_async(self.MQTT_HOST, self.MQTT_PORT, 30)
self.subc.loop_start()
def run(self):
mc = mqtt.Client("vatstpub")
mc.connect(self.MQTT_HOST, self.MQTT_PORT)
for k, v in self.videos.items():
msg = json.dumps(v)
mc.publish('video.ai/v1.0/task', msg, qos=1)
print("published: {}".format(msg))
while self.cnt > 0:
time.sleep(3)
self.subc.loop_stop()
self.validate_result()
self.make_output()
# TODO: you should only customize this function
def validate_result(self):
# add your code here
pass
# TODO: you should only customize this function
def make_output(self):
# add your code here: customize your output, or write to file
for k,v in self.results.items():
print(json.dumps(v))
if __name__ == "__main__":
vt = VideoAiTestor('test_cfg.txt')
vt.run()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论