from flask import Flask, escape, request, jsonify, g, url_for, current_app import paho.mqtt.client as mqtt from cerberus import schema_registry, Validator from celery import Celery from azure.storage.fileshare import ShareFileClient import random import pika import requests import os import yaml import logging import time import datetime import threading import json import subprocess import shlex import re import string import pdb import traceback import sys random.seed(datetime.datetime.now()) """ va task schema: { "cameraId": "D72154040", "endTime": 1577267418999, "image": "http://evcloudsvc.ilabservice.cloud/video/D72154040/1550143347000-1577267418999/firstFrame.jpg", "length": 260, "startTime": 1550143347000, "video": "http://evcloudsvc.ilabservice.cloud/video/D72154040/1550143347000-1577267418999/1550143347000-1577267418999.mp4" } """ VA_SCHEMAS = { 'task': { 'cameraId': {'type': 'string', 'dependencies': ['startTime', 'endTime']}, 'startTime': {'type': 'integer', 'dependencies': 'cameraId'}, 'endTime': {'type': 'integer', 'dependencies': 'cameraId'}, 'length': {'type': 'integer'}, 'image': {'type': 'string'}, 'video': {'type': 'string'} }, } aan = os.getenv('AAN', 'ilsvideostablediag') aak = os.getenv( 'AAK', 'rWeA/cUiWAsDqGHO0lfDB5eDHNZxCChrH0pMvICdNJR6tt+hE2tHlSl9kUEjqyOY6cztPWaaRbbeoI47uNEeWA==') CONNSTR = "DefaultEndpointsProtocol=https;AccountName={};AccountKey={};EndpointSuffix=core.chinacloudapi.cn".format( aan, aak) #CONNSTR1= 'DefaultEndpointsProtocol=https;AccountName=ilsvideostablediag;AccountKey=rWeA/cUiWAsDqGHO0lfDB5eDHNZxCChrH0pMvICdNJR6tt+hE2tHlSl9kUEjqyOY6cztPWaaRbbeoI47uNEeWA==;EndpointSuffix=core.chinacloudapi.cn' # if CONNSTR1 != CONNSTR: # print("======\n\n======= no valid key\n{}\n{}".format(CONNSTR1, CONNSTR)) SHARENAME = os.getenv('SHARE', 'pre-data') MQTT_HOST = os.getenv('MQTT_HOST', 'evcloud.ilabservice.cloud') MQTT_PORT = int(os.getenv('MQTT_PORT', 1883)) MQTT_CID = os.getenv('MQTT_CID', None) MQTT_USER=os.getenv('MQTT_USER', None) MQTT_PASSWORD=os.getenv('MQTT_PASSWORD', None) REDIS_ADDR = os.getenv('REDIS', 'redis://localhost:6379') workd = os.getenv('BIN_DIR', '../') binName = os.getenv('BIN_NAME', 'detector ') binPrefix = os.getenv('BIN_PRE', '') configDir = os.getenv('CFG_DIR', workd) API_HOST = os.getenv("API_HOST", None) API_CAMERA_CFG_URI = os.getenv("API_CAMERA_CFG_URI", "/video/analysis/camera/{}/model") RABBITMQ_URI = os.getenv("RABBITMQ_URI", "amqp://guest:guest@localhost:5672/") RABBITMQ_TOPIC = os.getenv("RABBITMQ_TOPIC", "*.camera.model") RABBITMQ_EXHANGE = os.getenv("RABBITMQ_EXHANGE", "video") CAMERA_CONFIG_MAP = {} def rabCallback(ch, method, prop, body): try: print("received rabbitmsg. routing key: {} , body: {}".format(method.routing_key, body)) rk = method.routing_key.split('.') sn = None if len(rk): sn = rk[0] if sn: CAMERA_CONFIG_MAP[sn] = json.loads(body) print("CAMERA_CONFIG_MAP: \n{}".format(json.dumps(CAMERA_CONFIG_MAP))) except Exception as e: #traceback.print_exc() #pdb.post_mortem(tb) print("failed to process callback {}: {} - {}".format(e, method, body)) def initRabbit(): rabChan = None if RABBITMQ_URI: print("connecting rabbitmq {}".format(RABBITMQ_URI)) parameters = pika.URLParameters(RABBITMQ_URI) #("amqp://ilabservice:iLabServiceOps123456@40.73.40.246:5672/") connection = pika.BlockingConnection(parameters) rabChan = connection.channel() rabChan.exchange_declare(exchange=RABBITMQ_EXHANGE, exchange_type="topic") qr = rabChan.queue_declare('', durable=True) rabChan.queue_bind(exchange=RABBITMQ_EXHANGE, queue=qr.method.queue, routing_key=RABBITMQ_TOPIC) rabChan.basic_consume( queue=qr.method.queue, on_message_callback=rabCallback, auto_ack=True) return rabChan print("CONFIG: \n\tMQTT: {}:{}\n\tBIN_NAME: {}".format( MQTT_HOST, MQTT_PORT, binName)) def downloadFile(ipcSn, dirName, fileName, destDir): file_path = ipcSn + '/'+dirName+'/'+fileName destDir = destDir + '/' + fileName print("downloading {}: {} {} {}".format( destDir, ipcSn, dirName, file_path)) with ShareFileClient.from_connection_string(conn_str=CONNSTR, share_name=SHARENAME, file_path=file_path) as fc: with open(destDir, "wb") as f: data = fc.download_file() data.readinto(f) def uploadFile(ipcSn, dirName, fileName, srcPath): file_path = ipcSn + '/'+dirName+'/' + fileName fc = ShareFileClient.from_connection_string( conn_str=CONNSTR, share_name=SHARENAME, file_path=file_path) with open(srcPath + '/' + fileName, "rb") as source_file: fc.upload_file(source_file) class VAMMQTTClient: # The callback for when the client receives a CONNACK response from the server. @staticmethod def on_connect(client, userdata, flags, rc): now = datetime.datetime.now() print(now, "Connected with result code "+str(rc)) # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. topic = '$queue/video.ai/v1.0/task' client.subscribe(topic, qos=1) print(datetime.datetime.now(), 'subscribed to ', topic) #client.subscribe("$queue/video.ai/v1.0/task", qos=1) # The callback for when a PUBLISH message is received from the server. @staticmethod def on_message(client, userdata, msg): payload = msg.payload.decode('utf-8') print(msg.topic+" " + payload) if userdata: try: jd = json.loads(payload) userdata(jd) except Exception as e: print('exception in process message:', e) #extype, value, tb = sys.exc_info() # traceback.print_exc() # pdb.post_mortem(tb) @staticmethod def on_disconnect(client, userdata, rc): #topic = "video.ai/v1.0/task" #client.publish(topic, payload=None, qos=1, retian=False) print("disconnected") def __init__(self, callback, host=MQTT_HOST, port=MQTT_PORT): ''' Parameters ''' self.client = mqtt.Client( MQTT_CID if MQTT_CID else "vamqtt", userdata=callback) # , protocol=mqtt.MQTTv5) if MQTT_USER and MQTT_PASSWORD: self.client.username_pw_set(username=MQTT_USER,password=MQTT_PASSWORD) self.client.on_connect = VAMMQTTClient.on_connect self.client.on_message = VAMMQTTClient.on_message self.client.connect_async(host, port, 30) self.client.loop_start() self.rabChan = initRabbit() th = threading.Thread(target=lambda: self.rabChan.start_consuming()).start() app = Flask(__name__, static_url_path='', static_folder='web/main/dist') logger = app.logger app.config['broker_url'] = REDIS_ADDR app.config['result_backend'] = REDIS_ADDR worker = Celery(app.name, broker=app.config['broker_url']) worker.conf.update(app.config) worker.conf.update( task_serializer='json', # accept_content=['json'], result_serializer='json', # timezone='Europe/Oslo', enable_utc=True) def getRegionConfig(ipcSN): region = None res = None reqUrl = API_HOST + API_CAMERA_CFG_URI.format(ipcSN) try: if ipcSN in CAMERA_CONFIG_MAP and type(CAMERA_CONFIG_MAP[ipcSN]) is dict: region = CAMERA_CONFIG_MAP[ipcSN]['region'] elif CAMERA_CONFIG_MAP.get(ipcSN) is None and API_HOST: res = requests.get(reqUrl) if res.status_code == 200: ret = res.json() if 'data' in ret and 'region' in ret['data']: region = ret["data"]["region"] CAMERA_CONFIG_MAP[ipcSN] = ret["data"] if CAMERA_CONFIG_MAP.get(ipcSN) is None: CAMERA_CONFIG_MAP[ipCSN] = 0 print("getRegionConfig: {}, {}, {}: {}".format(ipcSN, reqUrl, res.text if res and (text in res) else None, json.dumps(region))) except Exception as e: print("failed to get camera config {}: {}".format(ipcSN, e)) return region def take_task(task): ret = {'code': 0, 'msg': 'ok'} print("taking task", json.dumps(task)) taskValidator = Validator(VA_SCHEMAS['task'], allow_unknown=True) if not taskValidator.validate(task): ret['code'] = 1 ret['msg'] = 'invalid request body' ret['data'] = taskValidator.errors else: # process task["region"] = getRegionConfig(task['cameraId']) print("got region for ipc {}: {}".format(task['cameraId'], json.dumps(task["region"]))) video_analysis.apply_async(args=[task]) print(json.dumps(ret)) return ret @app.route('/api/video.ai/v1.0/task', methods=['POST']) def new_task(): ret = take_task(request.json) return jsonify(ret) @worker.task(acks_late=True) def video_analysis(data): ret = {'code': 0, 'msg': 'ok'} ret['target'] = data ret['data'] = {} ret['data']['humanDetect'] = {} ret['data']['humanDetect']['found'] = 0 print(json.dumps(data)) imageName = None try: if 'cameraId' in data: # azure storage # get azure storage video ipcSN = data["cameraId"] dirName = "{}-{}".format(data["startTime"], data["endTime"]) fileName = dirName + '.mp4' strRand = "{}-{}".format(data["startTime"], ''.join( random.choice(string.ascii_letters) for i in range(6))) downloadDir = "{}/{}/".format(os.getenv('DL_DIR', workd) + '/' + ipcSN, strRand) os.system('mkdir -p ' + downloadDir) downloadFile(ipcSN, dirName, fileName, downloadDir) print("downloaded file {} into {}".format(fileName, downloadDir)) # analyze #cmdLine = '/Users/blu/work/opencv-projects/opencv-yolo/detector /Users/blu/work/opencv-projects/opencv-yolo/web/1550143347000-1577267418999.mp4 -c /Users/blu/work/opencv-projects/opencv-yolo/' prefix = binPrefix + ' ' if binPrefix else binPrefix cmdLine = prefix + workd + '/' + binName + ' ' + downloadDir + \ fileName + ' -c ' + configDir + ' -o ' + downloadDir + '/detect.jpg' region = data['region'] if region: cmdLine += " -s {},{},{},{}".format(region['minX'], region['minY'], region['maxX'], region['maxY']) else: print("no region config") cmdArgs = shlex.split(cmdLine) print(cmdLine, '\n\n', cmdArgs) output = subprocess.check_output(cmdArgs) print(output) # parse for line in output.decode('utf-8').split('\n'): print("\n", line) m = re.match( r".*? found (\w+) ([\d\.]+) .*? image: .*?/([_\w\d]+.jpg).*?time: ([\d\.]+)", line) if m: ret['data']['humanDetect']['found'] = 1 ret['data']['humanDetect']['level'] = m.group(2) imageName = m.group(3) ret['data']['humanDetect']['image'] = ipcSN + \ '/' + dirName + '/' + m.group(3) ret['data']['humanDetect']['time'] = int(m.group(4)) print('found {}: {}, img: {}, elapse: {}'.format( m.group(1), m.group(2), m.group(3), m.group(4))) break else: ret['data']['humanDetect']['found'] = 0 elif 'video' in data: # http ret['code'] = 2 ret['msg'] = 'not impelemented yet for http' else: # no video ret['code'] = 1 ret['msg'] = 'no video specified' except Exception as e: print("exception in va worker: {}".format(e)) ret['code'] = -1 ret['msg'] = str(e) #extype, value, tb = sys.exc_info() # traceback.print_exc() # write json jsonFile = downloadDir + '/' + 'result.json' with open(jsonFile, 'w') as outfile: json.dump(ret, outfile) # upload uploadFile(ipcSN, dirName, 'result.json', downloadDir) if ret['data']['humanDetect']['found'] != 0: uploadFile(ipcSN, dirName, imageName, downloadDir) # pub msg mc = mqtt.Client((MQTT_CID + '-pub') if MQTT_CID else "vamqtt-pub") if MQTT_USER and MQTT_PASSWORD: mc.username_pw_set(username=MQTT_USER,password=MQTT_PASSWORD) mc.connect(MQTT_HOST, MQTT_PORT) mc.publish('video.ai/v1.0/result', json.dumps(ret), qos=1) try: os.system('rm -fr ' + downloadDir) except Exception as e: print('cascaded exception in va: {}'.format(e)) #raise Exception() return ret if __name__ == '__main__': mq = VAMMQTTClient(take_task) app.config['mc'] = mq.client app.run(host='0.0.0.0', port='5000')