import pandas as pd import paho.mqtt.client as mqtt from cerberus import Validator import os, logging, time, datetime, threading, json, re import pdb,traceback, sys class VideoAiTestor: VA_SCHEMAS = { 'task': { 'cameraId': {'type': 'string', 'dependencies': ['startTime', 'endTime']}, 'startTime': {'type': 'integer', 'dependencies':'cameraId'}, 'endTime': {'type': 'integer', 'dependencies':'cameraId'}, } } TASK_VALIDATOR = Validator(VA_SCHEMAS['task'], allow_unknown=True) MQTT_HOST=os.getenv('MQTT_HOST','evcloudsvc.ilabservice.cloud') MQTT_PORT=int(os.getenv('MQTT_PORT', 11883)) @staticmethod def on_connect(client, userdata, flags, rc): print("connected with result code "+str(rc)) topic = 'video.ai/v1.0/result' client.subscribe(topic, qos=1) print('subscribed to ', topic) @staticmethod def on_message(client, userdata, msg): payload = msg.payload.decode('utf-8') if userdata: try: jd = json.loads(payload) userdata.callback(jd) except Exception as e: print('exception in process message:', e) def callback(self,v): key = v['target']['cameraId'] + str(v['target']['startTime']) if key in self.videos: self.cnt -= 1 self.results[key] = v print("left {}, got result for {}: {}".format(self.cnt, key, v)) def __init__(self,cfg_path): self.cfg_path = cfg_path self.videos = {} self.results = {} self.cnt = 0 with open(self.cfg_path) as f: for idx,line in enumerate(f): v = json.loads(line) if not self.TASK_VALIDATOR.validate(v): raise key = v['cameraId'] + str(v['startTime']) if key in self.videos: print("duplicated video json, ignored: {}".format(json.dumps(v))) continue self.videos[key] = v self.cnt += 1 print("loaded {} lines of videos json".format(self.cnt)) self.subc = mqtt.Client("vatstsub",userdata=self) self.subc.on_connect = self.on_connect self.subc.on_message = self.on_message self.subc.connect_async(self.MQTT_HOST, self.MQTT_PORT, 30) self.subc.loop_start() def run(self): mc = mqtt.Client("vatstpub") mc.max_inflight_messages_set(self.cnt) mc.connect(self.MQTT_HOST, self.MQTT_PORT) for k, v in self.videos.items(): msg = json.dumps(v) mc.publish('video.ai/v1.0/task', msg, qos=1) print("published: {}".format(msg)) while self.cnt > 0: time.sleep(3) self.subc.loop_stop() self.validate_result() self.make_output() # TODO: you should only customize this function def validate_result(self): # add your code here pass # TODO: you should only customize this function def make_output(self): # add your code here: customize your output, or write to file for k,v in self.results.items(): print(json.dumps(v)) if __name__ == "__main__": vt = VideoAiTestor('test_cfg.txt') vt.run()