提交 c1953038 authored 作者: blu's avatar blu

evslicer: reset slice time stamps

上级 670be3ac
...@@ -481,12 +481,13 @@ private: ...@@ -481,12 +481,13 @@ private:
//av_dict_set(&pOptsRemux, "movflags", "frag_keyframe+empty_moov+default_base_moof", 0); //av_dict_set(&pOptsRemux, "movflags", "frag_keyframe+empty_moov+default_base_moof", 0);
av_dict_set(&pOptsRemux, "c:v", "libx264", 0); av_dict_set(&pOptsRemux, "c:v", "libx264", 0);
//av_dict_set(&pOptsRemux, "brand", "mp42", 0); //av_dict_set(&pOptsRemux, "brand", "mp42", 0);
//av_dict_set(&pOptsRemux, "movflags", "faststart", 0); av_dict_set(&pOptsRemux, "movflags", "faststart+frag_keyframe", 0);
av_dict_set(&pOptsRemux, "strftime", "1", 0); av_dict_set(&pOptsRemux, "strftime", "1", 0);
av_dict_set(&pOptsRemux, "segment_format", "mp4", 0); av_dict_set(&pOptsRemux, "segment_format", "mp4", 0);
av_dict_set(&pOptsRemux, "f", "segment", 0); av_dict_set(&pOptsRemux, "f", "segment", 0);
av_dict_set(&pOptsRemux, "segment_time", to_string(seconds).data(), 0); av_dict_set(&pOptsRemux, "segment_time", to_string(seconds).data(), 0);
av_dict_set(&pOptsRemux, "segment_wrap", to_string(numSlices).data(), 0); av_dict_set(&pOptsRemux, "segment_wrap", to_string(numSlices).data(), 0);
av_dict_set(&pOptsRemux, "reset_timestamps", "1", 0);
return ret; return ret;
} }
......
...@@ -4,8 +4,21 @@ from cerberus import schema_registry, Validator ...@@ -4,8 +4,21 @@ from cerberus import schema_registry, Validator
from celery import Celery from celery import Celery
from azure.storage.fileshare import ShareFileClient from azure.storage.fileshare import ShareFileClient
import random import random
import os, yaml, logging, time, datetime, threading, json, subprocess, shlex, re, string import requests
import pdb,traceback, sys import os
import yaml
import logging
import time
import datetime
import threading
import json
import subprocess
import shlex
import re
import string
import pdb
import traceback
import sys
random.seed(datetime.datetime.now()) random.seed(datetime.datetime.now())
...@@ -23,94 +36,109 @@ va task schema: ...@@ -23,94 +36,109 @@ va task schema:
""" """
VA_SCHEMAS = { VA_SCHEMAS = {
'task': { 'task': {
'cameraId': {'type': 'string', 'dependencies': ['startTime', 'endTime']}, 'cameraId': {'type': 'string', 'dependencies': ['startTime', 'endTime']},
'startTime': {'type': 'integer', 'dependencies':'cameraId'}, 'startTime': {'type': 'integer', 'dependencies': 'cameraId'},
'endTime': {'type': 'integer', 'dependencies':'cameraId'}, 'endTime': {'type': 'integer', 'dependencies': 'cameraId'},
'length': {'type': 'integer'}, 'length': {'type': 'integer'},
'image': {'type': 'string'}, 'image': {'type': 'string'},
'video': {'type': 'string'} 'video': {'type': 'string'}
}, },
} }
aan = os.getenv('AAN', 'ilsvideostablediag') aan = os.getenv('AAN', 'ilsvideostablediag')
aak = os.getenv('AAK', 'rWeA/cUiWAsDqGHO0lfDB5eDHNZxCChrH0pMvICdNJR6tt+hE2tHlSl9kUEjqyOY6cztPWaaRbbeoI47uNEeWA==') aak = os.getenv(
CONNSTR="DefaultEndpointsProtocol=https;AccountName={};AccountKey={};EndpointSuffix=core.chinacloudapi.cn".format(aan,aak) 'AAK', 'rWeA/cUiWAsDqGHO0lfDB5eDHNZxCChrH0pMvICdNJR6tt+hE2tHlSl9kUEjqyOY6cztPWaaRbbeoI47uNEeWA==')
CONNSTR = "DefaultEndpointsProtocol=https;AccountName={};AccountKey={};EndpointSuffix=core.chinacloudapi.cn".format(
aan, aak)
#CONNSTR1= 'DefaultEndpointsProtocol=https;AccountName=ilsvideostablediag;AccountKey=rWeA/cUiWAsDqGHO0lfDB5eDHNZxCChrH0pMvICdNJR6tt+hE2tHlSl9kUEjqyOY6cztPWaaRbbeoI47uNEeWA==;EndpointSuffix=core.chinacloudapi.cn' #CONNSTR1= 'DefaultEndpointsProtocol=https;AccountName=ilsvideostablediag;AccountKey=rWeA/cUiWAsDqGHO0lfDB5eDHNZxCChrH0pMvICdNJR6tt+hE2tHlSl9kUEjqyOY6cztPWaaRbbeoI47uNEeWA==;EndpointSuffix=core.chinacloudapi.cn'
#if CONNSTR1 != CONNSTR: # if CONNSTR1 != CONNSTR:
# print("======\n\n======= no valid key\n{}\n{}".format(CONNSTR1, CONNSTR)) # print("======\n\n======= no valid key\n{}\n{}".format(CONNSTR1, CONNSTR))
SHARENAME=os.getenv('SHARE','pre-data') SHARENAME = os.getenv('SHARE', 'pre-data')
MQTT_HOST=os.getenv('MQTT_HOST','evcloud.ilabservice.cloud') MQTT_HOST = os.getenv('MQTT_HOST', 'evcloud.ilabservice.cloud')
MQTT_PORT=int(os.getenv('MQTT_PORT', 1883)) MQTT_PORT = int(os.getenv('MQTT_PORT', 1883))
REDIS_ADDR = os.getenv('REDIS', 'redis://localhost:6379') REDIS_ADDR = os.getenv('REDIS', 'redis://localhost:6379')
workd = os.getenv('BIN_DIR', '../') workd = os.getenv('BIN_DIR', '../')
binName = os.getenv('BIN_NAME', 'detector ') binName = os.getenv('BIN_NAME', 'detector ')
binPrefix = os.getenv('BIN_PRE', '') binPrefix = os.getenv('BIN_PRE', '')
configDir = os.getenv('CFG_DIR', workd) configDir = os.getenv('CFG_DIR', workd)
API_HOST = None
API_CAMERA_CFG_URI = "/video/analysis/camera/{}/model"
RABBITMQ_TOPIC = None
CAMERA_CONFIG_MAP = {}
print("CONFIG: \n\tMQTT: {}:{}\n\tBIN_NAME: {}".format(
MQTT_HOST, MQTT_PORT, binName))
print("CONFIG: \n\tMQTT: {}:{}\n\tBIN_NAME: {}".format(MQTT_HOST, MQTT_PORT, binName))
def downloadFile(ipcSn, dirName, fileName, destDir): def downloadFile(ipcSn, dirName, fileName, destDir):
file_path=ipcSn + '/'+dirName+'/'+fileName file_path = ipcSn + '/'+dirName+'/'+fileName
destDir = destDir + '/' + fileName destDir = destDir + '/' + fileName
print("downloading {}: {} {} {}".format(destDir, ipcSn, dirName, file_path)) print("downloading {}: {} {} {}".format(
with ShareFileClient.from_connection_string(conn_str=CONNSTR, share_name=SHARENAME, file_path=file_path) as fc: destDir, ipcSn, dirName, file_path))
with open(destDir, "wb") as f: with ShareFileClient.from_connection_string(conn_str=CONNSTR, share_name=SHARENAME, file_path=file_path) as fc:
data = fc.download_file() with open(destDir, "wb") as f:
data.readinto(f) data = fc.download_file()
data.readinto(f)
def uploadFile(ipcSn, dirName, fileName, srcPath): def uploadFile(ipcSn, dirName, fileName, srcPath):
file_path=ipcSn + '/'+dirName+'/' + fileName file_path = ipcSn + '/'+dirName+'/' + fileName
fc = ShareFileClient.from_connection_string(conn_str=CONNSTR, share_name=SHARENAME, file_path=file_path) fc = ShareFileClient.from_connection_string(
with open(srcPath + '/' + fileName, "rb") as source_file: conn_str=CONNSTR, share_name=SHARENAME, file_path=file_path)
fc.upload_file(source_file) with open(srcPath + '/' + fileName, "rb") as source_file:
fc.upload_file(source_file)
class VAMMQTTClient: class VAMMQTTClient:
# The callback for when the client receives a CONNACK response from the server. # The callback for when the client receives a CONNACK response from the server.
@staticmethod @staticmethod
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc)) print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and # Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed. # reconnect then subscriptions will be renewed.
topic = '$queue/video.ai/v1.0/task' topic = '$queue/video.ai/v1.0/task'
client.subscribe(topic, qos=1) client.subscribe(topic, qos=1)
print('subscribed to ', topic) print('subscribed to ', topic)
#client.subscribe("$queue/video.ai/v1.0/task", qos=1) #client.subscribe("$queue/video.ai/v1.0/task", qos=1)
# The callback for when a PUBLISH message is received from the server. # The callback for when a PUBLISH message is received from the server.
@staticmethod @staticmethod
def on_message(client, userdata, msg): def on_message(client, userdata, msg):
payload = msg.payload.decode('utf-8') payload = msg.payload.decode('utf-8')
print(msg.topic+" "+ payload) print(msg.topic+" " + payload)
if userdata: if userdata:
try: try:
jd = json.loads(payload) jd = json.loads(payload)
userdata(jd) userdata(jd)
except Exception as e: except Exception as e:
print('exception in process message:', e) print('exception in process message:', e)
#extype, value, tb = sys.exc_info() #extype, value, tb = sys.exc_info()
#traceback.print_exc() # traceback.print_exc()
#pdb.post_mortem(tb) # pdb.post_mortem(tb)
@staticmethod @staticmethod
def on_disconnect(client, userdata, rc): def on_disconnect(client, userdata, rc):
#topic = "video.ai/v1.0/task" #topic = "video.ai/v1.0/task"
#client.publish(topic, payload=None, qos=1, retian=False) #client.publish(topic, payload=None, qos=1, retian=False)
print("disconnected") print("disconnected")
def __init__(self, callback, host = MQTT_HOST, port = MQTT_PORT):
''' def __init__(self, callback, host=MQTT_HOST, port=MQTT_PORT):
Parameters '''
''' Parameters
self.client = mqtt.Client("vamqtt",userdata=callback) #, protocol=mqtt.MQTTv5) '''
self.client.on_connect = VAMMQTTClient.on_connect self.client = mqtt.Client(
self.client.on_message = VAMMQTTClient.on_message "vamqtt", userdata=callback) # , protocol=mqtt.MQTTv5)
self.client.connect_async(host, port, 30) self.client.on_connect = VAMMQTTClient.on_connect
self.client.loop_start() self.client.on_message = VAMMQTTClient.on_message
self.client.connect_async(host, port, 30)
self.client.loop_start()
app = Flask(__name__, app = Flask(__name__,
static_url_path='', static_url_path='',
static_folder='web/main/dist') static_folder='web/main/dist')
logger = app.logger logger = app.logger
app.config['broker_url'] = REDIS_ADDR app.config['broker_url'] = REDIS_ADDR
...@@ -118,108 +146,124 @@ app.config['result_backend'] = REDIS_ADDR ...@@ -118,108 +146,124 @@ app.config['result_backend'] = REDIS_ADDR
worker = Celery(app.name, broker=app.config['broker_url']) worker = Celery(app.name, broker=app.config['broker_url'])
worker.conf.update(app.config) worker.conf.update(app.config)
worker.conf.update( worker.conf.update(
task_serializer='json', task_serializer='json',
#accept_content=['json'], # accept_content=['json'],
result_serializer='json', result_serializer='json',
#timezone='Europe/Oslo', # timezone='Europe/Oslo',
enable_utc=True) enable_utc=True)
def take_task(task): def take_task(task):
ret = {'code': 0,'msg': 'ok'} ret = {'code': 0, 'msg': 'ok'}
print("taking task", json.dumps(task)) print("taking task", json.dumps(task))
taskValidator = Validator(VA_SCHEMAS['task'], allow_unknown=True) taskValidator = Validator(VA_SCHEMAS['task'], allow_unknown=True)
if not taskValidator.validate(task): if not taskValidator.validate(task):
ret['code'] = 1 ret['code'] = 1
ret['msg'] = 'invalid request body' ret['msg'] = 'invalid request body'
ret['data'] = taskValidator.errors ret['data'] = taskValidator.errors
else: else:
# process # process
video_analysis.apply_async(args=[task]) video_analysis.apply_async(args=[task])
print(json.dumps(ret)) print(json.dumps(ret))
return ret return ret
@app.route('/api/video.ai/v1.0/task', methods=['POST']) @app.route('/api/video.ai/v1.0/task', methods=['POST'])
def new_task(): def new_task():
ret = take_task(request.json) ret = take_task(request.json)
return jsonify(ret); return jsonify(ret)
@worker.task(acks_late=True) @worker.task(acks_late=True)
def video_analysis(data): def video_analysis(data):
ret = {'code': 0, 'msg': 'ok'} ret = {'code': 0, 'msg': 'ok'}
ret['target'] = data ret['target'] = data
print(json.dumps(data)) print(json.dumps(data))
imageName = None imageName = None
try: try:
if 'cameraId' in data: # azure storage if 'cameraId' in data: # azure storage
# get azure storage video # get azure storage video
ipcSN = data["cameraId"] ipcSN = data["cameraId"]
dirName = "{}-{}".format(data["startTime"],data["endTime"]) dirName = "{}-{}".format(data["startTime"], data["endTime"])
fileName = dirName + '.mp4' fileName = dirName + '.mp4'
strRand = "{}-{}".format(data["startTime"], ''.join(random.choice(string.ascii_letters) for i in range(6))) strRand = "{}-{}".format(data["startTime"], ''.join(
downloadDir = "{}/{}/".format(os.getenv('DL_DIR', workd) + '/' + ipcSN, strRand) random.choice(string.ascii_letters) for i in range(6)))
os.system('mkdir -p ' + downloadDir) downloadDir = "{}/{}/".format(os.getenv('DL_DIR',
downloadFile(ipcSN, dirName, fileName, downloadDir) workd) + '/' + ipcSN, strRand)
print("downloaded file {} into {}".format(fileName, downloadDir)) os.system('mkdir -p ' + downloadDir)
# analyze downloadFile(ipcSN, dirName, fileName, downloadDir)
#cmdLine = '/Users/blu/work/opencv-projects/opencv-yolo/detector /Users/blu/work/opencv-projects/opencv-yolo/web/1550143347000-1577267418999.mp4 -c /Users/blu/work/opencv-projects/opencv-yolo/' res = requests.get(API_HOST + API_CAMERA_CFG_URI)
prefix = binPrefix + ' ' if binPrefix else binPrefix try:
cmdLine = prefix + workd + '/' + binName + ' ' + downloadDir + fileName + ' -c ' + configDir + ' -o ' + downloadDir + '/detect.jpg' if res.status_code == 200:
cmdArgs = shlex.split(cmdLine) pass
print(cmdLine, '\n\n', cmdArgs) except Exception as e:
output = subprocess.check_output(cmdArgs) print("failed to get camera config {}: {}".format(ipcSN, e))
print(output)
ret['data'] = {}
ret['data']['humanDetect'] = {}
ret['data']['humanDetect']['found'] = 0
# parse
for line in output.decode('utf-8').split('\n'):
print("\n", line)
m = re.match(r".*? found (\w+) ([\d\.]+) .*? image: .*?/([_\w\d]+.jpg).*?time: ([\d\.]+)", line)
if m:
ret['data']['humanDetect']['found'] = 1
ret['data']['humanDetect']['level'] = m.group(2)
imageName = m.group(3)
ret['data']['humanDetect']['image'] = ipcSN + '/' + dirName + '/' + m.group(3)
ret['data']['humanDetect']['time'] = int(m.group(4))
print('found {}: {}, img: {}, elapse: {}'.format(m.group(1), m.group(2), m.group(3), m.group(4)))
break
else:
ret['data']['humanDetect']['found'] = 0
elif 'video' in data: # http
ret['code'] = 2
ret['msg'] = 'not impelemented yet for http'
else: # no video
ret['code'] = 1
ret['msg'] = 'no video specified'
except Exception as e:
print("exception in va worker: {}".format(e));
ret['code'] = -1
ret['msg'] = str(e)
#extype, value, tb = sys.exc_info()
#traceback.print_exc()
# write json
jsonFile = downloadDir + '/' + 'result.json'
with open(jsonFile, 'w') as outfile:
json.dump(ret, outfile)
# upload
uploadFile(ipcSN, dirName, 'result.json', downloadDir)
if ret['data']['humanDetect']['found'] != 0:
uploadFile(ipcSN, dirName, imageName, downloadDir)
# pub msg
mc = mqtt.Client("vamqtt-pub")
mc.connect(MQTT_HOST, MQTT_PORT)
mc.publish('video.ai/v1.0/result', json.dumps(ret), qos=1)
try:
os.system('rm -fr ' + downloadDir)
except Exception as e:
print('cascaded exception in va: {}'.format(e))
#raise Exception()
return ret
if __name__ == '__main__': print("downloaded file {} into {}".format(fileName, downloadDir))
mq = VAMMQTTClient(take_task) # analyze
app.config['mc'] = mq.client #cmdLine = '/Users/blu/work/opencv-projects/opencv-yolo/detector /Users/blu/work/opencv-projects/opencv-yolo/web/1550143347000-1577267418999.mp4 -c /Users/blu/work/opencv-projects/opencv-yolo/'
app.run(host='0.0.0.0', port = '5000') prefix = binPrefix + ' ' if binPrefix else binPrefix
cmdLine = prefix + workd + '/' + binName + ' ' + downloadDir + \
fileName + ' -c ' + configDir + ' -o ' + downloadDir + '/detect.jpg'
cmdArgs = shlex.split(cmdLine)
print(cmdLine, '\n\n', cmdArgs)
output = subprocess.check_output(cmdArgs)
print(output)
ret['data'] = {}
ret['data']['humanDetect'] = {}
ret['data']['humanDetect']['found'] = 0
# parse
for line in output.decode('utf-8').split('\n'):
print("\n", line)
m = re.match(
r".*? found (\w+) ([\d\.]+) .*? image: .*?/([_\w\d]+.jpg).*?time: ([\d\.]+)", line)
if m:
ret['data']['humanDetect']['found'] = 1
ret['data']['humanDetect']['level'] = m.group(2)
imageName = m.group(3)
ret['data']['humanDetect']['image'] = ipcSN + \
'/' + dirName + '/' + m.group(3)
ret['data']['humanDetect']['time'] = int(m.group(4))
print('found {}: {}, img: {}, elapse: {}'.format(
m.group(1), m.group(2), m.group(3), m.group(4)))
break
else:
ret['data']['humanDetect']['found'] = 0
elif 'video' in data: # http
ret['code'] = 2
ret['msg'] = 'not impelemented yet for http'
else: # no video
ret['code'] = 1
ret['msg'] = 'no video specified'
except Exception as e:
print("exception in va worker: {}".format(e))
ret['code'] = -1
ret['msg'] = str(e)
#extype, value, tb = sys.exc_info()
# traceback.print_exc()
# write json
jsonFile = downloadDir + '/' + 'result.json'
with open(jsonFile, 'w') as outfile:
json.dump(ret, outfile)
# upload
uploadFile(ipcSN, dirName, 'result.json', downloadDir)
if ret['data']['humanDetect']['found'] != 0:
uploadFile(ipcSN, dirName, imageName, downloadDir)
# pub msg
mc = mqtt.Client("vamqtt-pub")
mc.connect(MQTT_HOST, MQTT_PORT)
mc.publish('video.ai/v1.0/result', json.dumps(ret), qos=1)
try:
os.system('rm -fr ' + downloadDir)
except Exception as e:
print('cascaded exception in va: {}'.format(e))
#raise Exception()
return ret
if __name__ == '__main__':
mq = VAMMQTTClient(take_task)
app.config['mc'] = mq.client
app.run(host='0.0.0.0', port='5000')
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论