提交 3ef56023 authored 作者: blu's avatar blu

object detection: revise

上级 7b282b1f
...@@ -72,9 +72,13 @@ API_CAMERA_CFG_URI = os.getenv("API_CAMERA_CFG_URI", "/video/analysis/camera/{}/ ...@@ -72,9 +72,13 @@ API_CAMERA_CFG_URI = os.getenv("API_CAMERA_CFG_URI", "/video/analysis/camera/{}/
RABBITMQ_URI = os.getenv("RABBITMQ_URI", "amqp://guest:guest@localhost:5672/") RABBITMQ_URI = os.getenv("RABBITMQ_URI", "amqp://guest:guest@localhost:5672/")
RABBITMQ_TOPIC = os.getenv("RABBITMQ_TOPIC", "*.camera.model") RABBITMQ_TOPIC = os.getenv("RABBITMQ_TOPIC", "*.camera.model")
RABBITMQ_EXHANGE = os.getenv("RABBITMQ_EXHANGE", "video") RABBITMQ_EXHANGE = os.getenv("RABBITMQ_EXHANGE", "video")
AI_QUEUE_TASK = os.getenv('AI_TASK', 'videoai.v1.task')
AI_QUEUE_RESULT = os.getenv('AI_RESULT', 'videoai.v1.result')
AI_EXCHANGE = os.getenv('AI_XHG', 'videoai')
CAMERA_CONFIG_MAP = {} CAMERA_CONFIG_MAP = {}
def rabCallback(ch, method, prop, body): def rabModelCallback(ch, method, prop, body):
try: try:
print("received rabbitmsg. routing key: {} , body: {}".format(method.routing_key, body)) print("received rabbitmsg. routing key: {} , body: {}".format(method.routing_key, body))
rk = method.routing_key.split('.') rk = method.routing_key.split('.')
...@@ -85,25 +89,56 @@ def rabCallback(ch, method, prop, body): ...@@ -85,25 +89,56 @@ def rabCallback(ch, method, prop, body):
CAMERA_CONFIG_MAP[sn] = json.loads(body) CAMERA_CONFIG_MAP[sn] = json.loads(body)
print("CAMERA_CONFIG_MAP: \n{}".format(json.dumps(CAMERA_CONFIG_MAP))) print("CAMERA_CONFIG_MAP: \n{}".format(json.dumps(CAMERA_CONFIG_MAP)))
except Exception as e: except Exception as e:
#extype, value, tb = sys.exc_info()
#traceback.print_exc() #traceback.print_exc()
#pdb.post_mortem(tb) #pdb.post_mortem(tb)
print("failed to process callback {}: {} - {}".format(e, method, body)) print("failed to process callback {}: {} - {}".format(e, method, body))
def initRabbit(): def rabAiTaskCallback(ch, method, prop, body):
rabChan = None try:
print("received rabbitmsg. routing key: {} , body: {}".format(method.routing_key, body))
take_task(json.loads(body))
except Exception as e:
extype, value, tb = sys.exc_info()
traceback.print_exc()
pdb.post_mortem(tb)
print("failed to process callback {}: {} - {}".format(e, method, body))
def rabbit_camera_model():
if RABBITMQ_URI:
print("connecting rabbitmq {}".format(RABBITMQ_URI))
parameters = pika.URLParameters(RABBITMQ_URI) #("amqp://ilabservice:iLabServiceOps123456@40.73.40.246:5672/")
connection = pika.BlockingConnection(parameters)
print("rabbitmq connection established")
# camera model channel
rabChanModel = connection.channel()
rabChanModel.exchange_declare(exchange=RABBITMQ_EXHANGE, exchange_type="topic")
modelQueue = rabChanModel.queue_declare('', durable=True)
rabChanModel.queue_bind(exchange=RABBITMQ_EXHANGE,
queue=modelQueue.method.queue, routing_key=RABBITMQ_TOPIC)
rabChanModel.basic_consume(
queue=modelQueue.method.queue, on_message_callback=rabModelCallback, auto_ack=True)
print("bound camera model")
rabChanModel.start_consuming()
def rabbit_ai_task():
if RABBITMQ_URI: if RABBITMQ_URI:
print("connecting rabbitmq {}".format(RABBITMQ_URI)) print("connecting rabbitmq {}".format(RABBITMQ_URI))
parameters = pika.URLParameters(RABBITMQ_URI) #("amqp://ilabservice:iLabServiceOps123456@40.73.40.246:5672/") parameters = pika.URLParameters(RABBITMQ_URI) #("amqp://ilabservice:iLabServiceOps123456@40.73.40.246:5672/")
connection = pika.BlockingConnection(parameters) connection = pika.BlockingConnection(parameters)
rabChan = connection.channel()
rabChan.exchange_declare(exchange=RABBITMQ_EXHANGE, exchange_type="topic")
qr = rabChan.queue_declare('', durable=True)
rabChan.queue_bind(exchange=RABBITMQ_EXHANGE,
queue=qr.method.queue, routing_key=RABBITMQ_TOPIC)
rabChan.basic_consume(
queue=qr.method.queue, on_message_callback=rabCallback, auto_ack=True)
return rabChan
# ai analysis task channel
rabChanAiTask = connection.channel()
if AI_EXCHANGE:
rabChanAiTask.exchange_declare(exchange=AI_EXCHANGE, exchange_type="direct")
rabChanAiTask.queue_declare(queue=AI_QUEUE_TASK, durable=True)
rabChanAiTask.queue_bind(exchange=AI_EXCHANGE,
queue=AI_QUEUE_TASK, routing_key=AI_QUEUE_TASK)
rabChanAiTask.basic_consume(
queue=AI_QUEUE_TASK, on_message_callback=rabAiTaskCallback, auto_ack=True)
print("bound ai task")
rabChanAiTask.start_consuming()
print("CONFIG: \n\tMQTT: {}:{}\n\tBIN_NAME: {}".format( print("CONFIG: \n\tMQTT: {}:{}\n\tBIN_NAME: {}".format(
MQTT_HOST, MQTT_PORT, binName)) MQTT_HOST, MQTT_PORT, binName))
...@@ -128,6 +163,10 @@ def uploadFile(ipcSn, dirName, fileName, srcPath): ...@@ -128,6 +163,10 @@ def uploadFile(ipcSn, dirName, fileName, srcPath):
class VAMMQTTClient: class VAMMQTTClient:
rabChanModel = None
rabChanAiTask = None
th1 =None
th2 = None
# The callback for when the client receives a CONNACK response from the server. # The callback for when the client receives a CONNACK response from the server.
@staticmethod @staticmethod
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
...@@ -173,8 +212,9 @@ class VAMMQTTClient: ...@@ -173,8 +212,9 @@ class VAMMQTTClient:
self.client.on_message = VAMMQTTClient.on_message self.client.on_message = VAMMQTTClient.on_message
self.client.connect_async(host, port, 30) self.client.connect_async(host, port, 30)
self.client.loop_start() self.client.loop_start()
self.rabChan = initRabbit()
th = threading.Thread(target=lambda: self.rabChan.start_consuming()).start() self.th1 = threading.Thread(target=rabbit_camera_model).start()
self.th2 = threading.Thread(target=rabbit_ai_task).start()
app = Flask(__name__, app = Flask(__name__,
static_url_path='', static_url_path='',
...@@ -195,6 +235,7 @@ worker.conf.update( ...@@ -195,6 +235,7 @@ worker.conf.update(
def getRegionConfig(ipcSN): def getRegionConfig(ipcSN):
region = None region = None
res = None res = None
if API_HOST:
reqUrl = API_HOST + API_CAMERA_CFG_URI.format(ipcSN) reqUrl = API_HOST + API_CAMERA_CFG_URI.format(ipcSN)
try: try:
if ipcSN in CAMERA_CONFIG_MAP and type(CAMERA_CONFIG_MAP[ipcSN]) is dict: if ipcSN in CAMERA_CONFIG_MAP and type(CAMERA_CONFIG_MAP[ipcSN]) is dict:
...@@ -208,7 +249,7 @@ def getRegionConfig(ipcSN): ...@@ -208,7 +249,7 @@ def getRegionConfig(ipcSN):
CAMERA_CONFIG_MAP[ipcSN] = ret["data"] CAMERA_CONFIG_MAP[ipcSN] = ret["data"]
if CAMERA_CONFIG_MAP.get(ipcSN) is None: if CAMERA_CONFIG_MAP.get(ipcSN) is None:
CAMERA_CONFIG_MAP[ipCSN] = 0 CAMERA_CONFIG_MAP[ipCSN] = 0
print("getRegionConfig: {}, {}, {}: {}".format(ipcSN, reqUrl, res.text if res and (text in res) else None, json.dumps(region))) print("getRegionConfig: {}, {}, {}: {}".format(ipcSN, reqUrl, res.text if res and ('text' in res) else None, json.dumps(region)))
except Exception as e: except Exception as e:
print("failed to get camera config {}: {}".format(ipcSN, e)) print("failed to get camera config {}: {}".format(ipcSN, e))
return region return region
...@@ -316,11 +357,24 @@ def video_analysis(data): ...@@ -316,11 +357,24 @@ def video_analysis(data):
mc.username_pw_set(username=MQTT_USER,password=MQTT_PASSWORD) mc.username_pw_set(username=MQTT_USER,password=MQTT_PASSWORD)
mc.connect(MQTT_HOST, MQTT_PORT) mc.connect(MQTT_HOST, MQTT_PORT)
mc.publish('video.ai/v1.0/result', json.dumps(ret), qos=1) mc.publish('video.ai/v1.0/result', json.dumps(ret), qos=1)
print("mqtt message published")
# rabbitmq
if RABBITMQ_URI:
parameters = pika.URLParameters(RABBITMQ_URI)
connection = pika.BlockingConnection(parameters)
rabChanResult = connection.channel()
rabChanResult.exchange_declare(exchange=AI_EXCHANGE, exchange_type="direct")
rabChanResult.queue_declare(queue=AI_QUEUE_RESULT, durable=True)
rabChanResult.queue_bind(exchange=AI_EXCHANGE,
queue=AI_QUEUE_RESULT, routing_key=AI_QUEUE_RESULT)
rabChanResult.basic_publish(exchange =AI_EXCHANGE,routing_key=AI_QUEUE_RESULT,body=json.dumps(ret),
properties=pika.BasicProperties(delivery_mode = 2))
print("rabbitmq message published")
rabChanResult.close()
try: try:
os.system('rm -fr ' + downloadDir) os.system('rm -fr ' + downloadDir)
except Exception as e: except Exception as e:
print('cascaded exception in va: {}'.format(e)) print('cascaded exception in va: {}'.format(e))
#raise Exception()
return ret return ret
......
from flask import Flask, escape, request, jsonify, g, url_for, current_app
import paho.mqtt.client as mqtt
from cerberus import schema_registry, Validator
from celery import Celery
from azure.storage.fileshare import ShareFileClient
import random
import pika
import requests
import os
import yaml
import logging
import time
import datetime
import threading
import json
import subprocess
import shlex
import re
import string
import pdb
import traceback
import sys
random.seed(datetime.datetime.now())
"""
va task schema:
{
"cameraId": "D72154040",
"endTime": 1577267418999,
"image": "http://evcloudsvc.ilabservice.cloud/video/D72154040/1550143347000-1577267418999/firstFrame.jpg",
"length": 260,
"startTime": 1550143347000,
"video": "http://evcloudsvc.ilabservice.cloud/video/D72154040/1550143347000-1577267418999/1550143347000-1577267418999.mp4"
}
"""
VA_SCHEMAS = {
'task': {
'cameraId': {'type': 'string', 'dependencies': ['startTime', 'endTime']},
'startTime': {'type': 'integer', 'dependencies': 'cameraId'},
'endTime': {'type': 'integer', 'dependencies': 'cameraId'},
'length': {'type': 'integer'},
'image': {'type': 'string'},
'video': {'type': 'string'}
},
}
aan = os.getenv('AAN', 'ilsvideostablediag')
aak = os.getenv(
'AAK', 'rWeA/cUiWAsDqGHO0lfDB5eDHNZxCChrH0pMvICdNJR6tt+hE2tHlSl9kUEjqyOY6cztPWaaRbbeoI47uNEeWA==')
CONNSTR = "DefaultEndpointsProtocol=https;AccountName={};AccountKey={};EndpointSuffix=core.chinacloudapi.cn".format(
aan, aak)
#CONNSTR1= 'DefaultEndpointsProtocol=https;AccountName=ilsvideostablediag;AccountKey=rWeA/cUiWAsDqGHO0lfDB5eDHNZxCChrH0pMvICdNJR6tt+hE2tHlSl9kUEjqyOY6cztPWaaRbbeoI47uNEeWA==;EndpointSuffix=core.chinacloudapi.cn'
# if CONNSTR1 != CONNSTR:
# print("======\n\n======= no valid key\n{}\n{}".format(CONNSTR1, CONNSTR))
SHARENAME = os.getenv('SHARE', 'pre-data')
MQTT_HOST = os.getenv('MQTT_HOST', 'evcloud.ilabservice.cloud')
MQTT_PORT = int(os.getenv('MQTT_PORT', 1883))
MQTT_CID = os.getenv('MQTT_CID', None)
MQTT_USER=os.getenv('MQTT_USER', None)
MQTT_PASSWORD=os.getenv('MQTT_PASSWORD', None)
REDIS_ADDR = os.getenv('REDIS', 'redis://localhost:6379')
workd = os.getenv('BIN_DIR', '../')
binName = os.getenv('BIN_NAME', 'detector ')
binPrefix = os.getenv('BIN_PRE', '')
configDir = os.getenv('CFG_DIR', workd)
API_HOST = os.getenv("API_HOST", None)
API_CAMERA_CFG_URI = os.getenv("API_CAMERA_CFG_URI", "/video/analysis/camera/{}/model")
RABBITMQ_URI = os.getenv("RABBITMQ_URI", "amqp://guest:guest@localhost:5672/")
RABBITMQ_TOPIC = os.getenv("RABBITMQ_TOPIC", "*.camera.model")
RABBITMQ_EXHANGE = os.getenv("RABBITMQ_EXHANGE", "video")
AI_QUEUE_TASK = os.getenv('AI_TASK', 'videoai.v1.task')
AI_QUEUE_RESULT = os.getenv('AI_RESULT', 'videoai.v1.result')
AI_EXCHANGE = os.getenv('AI_XHG', 'videoai')
CAMERA_CONFIG_MAP = {}
def rabModelCallback(ch, method, prop, body):
try:
print("received rabbitmsg. routing key: {} , body: {}".format(method.routing_key, body))
rk = method.routing_key.split('.')
sn = None
if len(rk):
sn = rk[0]
if sn:
CAMERA_CONFIG_MAP[sn] = json.loads(body)
print("CAMERA_CONFIG_MAP: \n{}".format(json.dumps(CAMERA_CONFIG_MAP)))
except Exception as e:
#extype, value, tb = sys.exc_info()
#traceback.print_exc()
#pdb.post_mortem(tb)
print("failed to process callback {}: {} - {}".format(e, method, body))
def rabAiTaskCallback(ch, method, prop, body):
try:
print("received rabbitmsg. routing key: {} , body: {}".format(method.routing_key, body))
take_task(json.loads(body))
except Exception as e:
extype, value, tb = sys.exc_info()
traceback.print_exc()
pdb.post_mortem(tb)
print("failed to process callback {}: {} - {}".format(e, method, body))
def rabbit_camera_model():
if RABBITMQ_URI:
print("connecting rabbitmq {}".format(RABBITMQ_URI))
parameters = pika.URLParameters(RABBITMQ_URI) #("amqp://ilabservice:iLabServiceOps123456@40.73.40.246:5672/")
connection = pika.BlockingConnection(parameters)
print("rabbitmq connection established")
# camera model channel
rabChanModel = connection.channel()
rabChanModel.exchange_declare(exchange=RABBITMQ_EXHANGE, exchange_type="topic")
modelQueue = rabChanModel.queue_declare('', durable=True)
rabChanModel.queue_bind(exchange=RABBITMQ_EXHANGE,
queue=modelQueue.method.queue, routing_key=RABBITMQ_TOPIC)
rabChanModel.basic_consume(
queue=modelQueue.method.queue, on_message_callback=rabModelCallback, auto_ack=True)
print("bound camera model")
rabChanModel.start_consuming()
def rabbit_ai_task():
if RABBITMQ_URI:
print("connecting rabbitmq {}".format(RABBITMQ_URI))
parameters = pika.URLParameters(RABBITMQ_URI) #("amqp://ilabservice:iLabServiceOps123456@40.73.40.246:5672/")
connection = pika.BlockingConnection(parameters)
# ai analysis task channel
rabChanAiTask = connection.channel()
if AI_EXCHANGE:
rabChanAiTask.exchange_declare(exchange=AI_EXCHANGE, exchange_type="direct")
rabChanAiTask.queue_declare(queue=AI_QUEUE_TASK, durable=True)
rabChanAiTask.queue_bind(exchange=AI_EXCHANGE,
queue=AI_QUEUE_TASK, routing_key=AI_QUEUE_TASK)
rabChanAiTask.basic_consume(
queue=AI_QUEUE_TASK, on_message_callback=rabAiTaskCallback, auto_ack=True)
print("bound ai task")
rabChanAiTask.start_consuming()
print("CONFIG: \n\tMQTT: {}:{}\n\tBIN_NAME: {}".format(
MQTT_HOST, MQTT_PORT, binName))
def downloadFile(ipcSn, dirName, fileName, destDir):
file_path = ipcSn + '/'+dirName+'/'+fileName
destDir = destDir + '/' + fileName
print("downloading {}: {} {} {}".format(
destDir, ipcSn, dirName, file_path))
with ShareFileClient.from_connection_string(conn_str=CONNSTR, share_name=SHARENAME, file_path=file_path) as fc:
with open(destDir, "wb") as f:
data = fc.download_file()
data.readinto(f)
def uploadFile(ipcSn, dirName, fileName, srcPath):
file_path = ipcSn + '/'+dirName+'/' + fileName
fc = ShareFileClient.from_connection_string(
conn_str=CONNSTR, share_name=SHARENAME, file_path=file_path)
with open(srcPath + '/' + fileName, "rb") as source_file:
fc.upload_file(source_file)
class VAMMQTTClient:
rabChanModel = None
rabChanAiTask = None
th1 =None
th2 = None
# The callback for when the client receives a CONNACK response from the server.
@staticmethod
def on_connect(client, userdata, flags, rc):
now = datetime.datetime.now()
print(now, "Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
topic = '$queue/video.ai/v1.0/task'
client.subscribe(topic, qos=1)
print(datetime.datetime.now(), 'subscribed to ', topic)
#client.subscribe("$queue/video.ai/v1.0/task", qos=1)
# The callback for when a PUBLISH message is received from the server.
@staticmethod
def on_message(client, userdata, msg):
payload = msg.payload.decode('utf-8')
print(msg.topic+" " + payload)
if userdata:
try:
jd = json.loads(payload)
userdata(jd)
except Exception as e:
print('exception in process message:', e)
#extype, value, tb = sys.exc_info()
# traceback.print_exc()
# pdb.post_mortem(tb)
@staticmethod
def on_disconnect(client, userdata, rc):
#topic = "video.ai/v1.0/task"
#client.publish(topic, payload=None, qos=1, retian=False)
print("disconnected")
def __init__(self, callback, host=MQTT_HOST, port=MQTT_PORT):
'''
Parameters
'''
self.client = mqtt.Client(
MQTT_CID if MQTT_CID else "vamqtt", userdata=callback) # , protocol=mqtt.MQTTv5)
if MQTT_USER and MQTT_PASSWORD:
self.client.username_pw_set(username=MQTT_USER,password=MQTT_PASSWORD)
self.client.on_connect = VAMMQTTClient.on_connect
self.client.on_message = VAMMQTTClient.on_message
self.client.connect_async(host, port, 30)
self.client.loop_start()
self.th1 = threading.Thread(target=rabbit_camera_model).start()
self.th2 = threading.Thread(target=rabbit_ai_task).start()
app = Flask(__name__,
static_url_path='',
static_folder='web/main/dist')
logger = app.logger
app.config['broker_url'] = REDIS_ADDR
app.config['result_backend'] = REDIS_ADDR
worker = Celery(app.name, broker=app.config['broker_url'])
worker.conf.update(app.config)
worker.conf.update(
task_serializer='json',
# accept_content=['json'],
result_serializer='json',
# timezone='Europe/Oslo',
enable_utc=True)
def getRegionConfig(ipcSN):
region = None
res = None
if API_HOST:
reqUrl = API_HOST + API_CAMERA_CFG_URI.format(ipcSN)
try:
if ipcSN in CAMERA_CONFIG_MAP and type(CAMERA_CONFIG_MAP[ipcSN]) is dict:
region = CAMERA_CONFIG_MAP[ipcSN]['region']
elif CAMERA_CONFIG_MAP.get(ipcSN) is None and API_HOST:
res = requests.get(reqUrl)
if res.status_code == 200:
ret = res.json()
if 'data' in ret and 'region' in ret['data']:
region = ret["data"]["region"]
CAMERA_CONFIG_MAP[ipcSN] = ret["data"]
if CAMERA_CONFIG_MAP.get(ipcSN) is None:
CAMERA_CONFIG_MAP[ipCSN] = 0
print("getRegionConfig: {}, {}, {}: {}".format(ipcSN, reqUrl, res.text if res and (text in res) else None, json.dumps(region)))
except Exception as e:
print("failed to get camera config {}: {}".format(ipcSN, e))
return region
def take_task(task):
ret = {'code': 0, 'msg': 'ok'}
print("taking task", json.dumps(task))
taskValidator = Validator(VA_SCHEMAS['task'], allow_unknown=True)
if not taskValidator.validate(task):
ret['code'] = 1
ret['msg'] = 'invalid request body'
ret['data'] = taskValidator.errors
else:
# process
task["region"] = getRegionConfig(task['cameraId'])
print("got region for ipc {}: {}".format(task['cameraId'], json.dumps(task["region"])))
video_analysis.apply_async(args=[task])
print(json.dumps(ret))
return ret
@app.route('/api/video.ai/v1.0/task', methods=['POST'])
def new_task():
ret = take_task(request.json)
return jsonify(ret)
@worker.task(acks_late=True)
def video_analysis(data):
ret = {'code': 0, 'msg': 'ok'}
ret['target'] = data
ret['data'] = {}
ret['data']['humanDetect'] = {}
ret['data']['humanDetect']['found'] = 0
print(json.dumps(data))
imageName = None
try:
if 'cameraId' in data: # azure storage
# get azure storage video
ipcSN = data["cameraId"]
dirName = "{}-{}".format(data["startTime"], data["endTime"])
fileName = dirName + '.mp4'
strRand = "{}-{}".format(data["startTime"], ''.join(
random.choice(string.ascii_letters) for i in range(6)))
downloadDir = "{}/{}/".format(os.getenv('DL_DIR',
workd) + '/' + ipcSN, strRand)
os.system('mkdir -p ' + downloadDir)
downloadFile(ipcSN, dirName, fileName, downloadDir)
print("downloaded file {} into {}".format(fileName, downloadDir))
# analyze
#cmdLine = '/Users/blu/work/opencv-projects/opencv-yolo/detector /Users/blu/work/opencv-projects/opencv-yolo/web/1550143347000-1577267418999.mp4 -c /Users/blu/work/opencv-projects/opencv-yolo/'
prefix = binPrefix + ' ' if binPrefix else binPrefix
cmdLine = prefix + workd + '/' + binName + ' ' + downloadDir + \
fileName + ' -c ' + configDir + ' -o ' + downloadDir + '/detect.jpg'
region = data['region']
if region:
cmdLine += " -s {},{},{},{}".format(region['minX'], region['minY'], region['maxX'], region['maxY'])
else:
print("no region config")
cmdArgs = shlex.split(cmdLine)
print(cmdLine, '\n\n', cmdArgs)
output = subprocess.check_output(cmdArgs)
print(output)
# parse
for line in output.decode('utf-8').split('\n'):
print("\n", line)
m = re.match(
r".*? found (\w+) ([\d\.]+) .*? image: .*?/([_\w\d]+.jpg).*?time: ([\d\.]+)", line)
if m:
ret['data']['humanDetect']['found'] = 1
ret['data']['humanDetect']['level'] = m.group(2)
imageName = m.group(3)
ret['data']['humanDetect']['image'] = ipcSN + \
'/' + dirName + '/' + m.group(3)
ret['data']['humanDetect']['time'] = int(m.group(4))
print('found {}: {}, img: {}, elapse: {}'.format(
m.group(1), m.group(2), m.group(3), m.group(4)))
break
else:
ret['data']['humanDetect']['found'] = 0
elif 'video' in data: # http
ret['code'] = 2
ret['msg'] = 'not impelemented yet for http'
else: # no video
ret['code'] = 1
ret['msg'] = 'no video specified'
except Exception as e:
print("exception in va worker: {}".format(e))
ret['code'] = -1
ret['msg'] = str(e)
#extype, value, tb = sys.exc_info()
# traceback.print_exc()
# write json
jsonFile = downloadDir + '/' + 'result.json'
with open(jsonFile, 'w') as outfile:
json.dump(ret, outfile)
# upload
uploadFile(ipcSN, dirName, 'result.json', downloadDir)
if ret['data']['humanDetect']['found'] != 0:
uploadFile(ipcSN, dirName, imageName, downloadDir)
# pub msg
mc = mqtt.Client((MQTT_CID + '-pub') if MQTT_CID else "vamqtt-pub")
if MQTT_USER and MQTT_PASSWORD:
mc.username_pw_set(username=MQTT_USER,password=MQTT_PASSWORD)
mc.connect(MQTT_HOST, MQTT_PORT)
mc.publish('video.ai/v1.0/result', json.dumps(ret), qos=1)
print("mqtt message published")
# rabbitmq
if RABBITMQ_URI:
parameters = pika.URLParameters(RABBITMQ_URI)
connection = pika.BlockingConnection(parameters)
rabChanResult = connection.channel()
rabChanResult.exchange_declare(exchange=AI_EXCHANGE, exchange_type="direct")
rabChanResult.queue_declare(queue=AI_QUEUE_RESULT, durable=True)
rabChanResult.queue_bind(exchange=AI_EXCHANGE,
queue=AI_QUEUE_RESULT, routing_key=AI_QUEUE_RESULT)
rabChanResult.basic_publish(exchange =AI_EXCHANGE,routing_key=AI_QUEUE_RESULT,body=json.dumps(ret),
properties=pika.BasicProperties(delivery_mode = 2))
print("rabbitmq message published")
rabChanResult.close()
try:
os.system('rm -fr ' + downloadDir)
except Exception as e:
print('cascaded exception in va: {}'.format(e))
return ret
if __name__ == '__main__':
mq = VAMMQTTClient(take_task)
app.config['mc'] = mq.client
app.run(host='0.0.0.0', port='5000')
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论