提交 a94108d9 authored 作者: zw.wang's avatar zw.wang

feat: 萤石云云录制

上级 b897fa12
......@@ -5,4 +5,10 @@ PROCESSING_TOTAL_KEY = 'hk_isc:recording:processing:total'
ACCOUNT_TOKEN_KEY = 'eviz:account:app_key:{}:token'
DEVICE_SERIAL_KEY = 'eviz:account:device:{}'
UNTREATED_JOB_KEY = 'eviz:camera:{}:cloud_download'
UNVERIFIED_FILE_KEY = 'eviz:unverified:{}:file:{}'
RECORDING_JOB_KEY = 'eviz:camera:{}:recording'
UNVERIFIED_EVENT_QUEUE = 'UNVERIFIED_EVENT_QUEUE'
RECORD_TASK_QUEUE = 'RECORD_TASK_QUEUE'
......@@ -137,7 +137,7 @@ def get_camera_info(cursor, conn, camera_code=None, platform='isc', video_plan_t
@query()
def insert_video_info(cursor, conn, db_table, device_code, start_time, end_time,
biz_type, service_type, status=0,
file_name=None, video_url=None, video_resolution=None, recovered_time=None):
file_name=None, video_url=None, video_resolution=None, recovered_time=None, record_type='event'):
sql = '''
insert {} (
device_code, file_name, start_time, end_time,
......
import json
import pytz
import time
import os
import dateutil.parser
import urllib.request
import dynaconf
from datetime import datetime, timedelta
from intelab_python_sdk.logger import log
from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat
from ils_common_video.utils.eviz_client import EvizVersionClient
from ils_common_video.utils.video_file import VideoFile
from ils_common_video.utils.record_utils import get_video_duration, time_to_seconds
from ils_common_video.utils.aliyun_oss import oss_upload_file
from ils_common_video.db.redis import redis_connect
from ils_common_video.const import UNTREATED_JOB_KEY, RECORDING_JOB_KEY
from ils_common_video.utils.alarm_utils import send_markdown
from ils_common_video.db.mysql import update_video_info
MAX_RECORDING_NUM = 3
videos_path = dynaconf.settings.get('VIDEOS_PATH')
os.makedirs(videos_path, exist_ok=True)
ysy_client = EvizVersionClient()
tz = pytz.timezone('Asia/Shanghai')
text = """
云录制重新录制失败任务提醒
- 诊所名称:{}
- 区域:{}
- 摄像头序列号: {}
- 事件开始事件:{}
- 事件结束事件:{}
- file_id: {}
- 创建时间:{}
- Token: {}
- 所属帐号: {}
---
"""
class DownloadFileJob:
def __init__(self):
self.project_id = 1
def start(self):
while True:
try:
self.run()
except Exception as e:
log.exception(e)
time.sleep(10)
def run(self):
pipe = redis_connect()
all_untreated_job_keys = pipe.keys(UNTREATED_JOB_KEY.format('*'))
all_recording_job_keys = pipe.keys(RECORDING_JOB_KEY.format('*'))
log.info('当前未处理摄像头个数%s, 正在录制中摄像头个数%s',
len(all_untreated_job_keys), len(all_recording_job_keys))
# 首先判定正在录制中的文件是否可以下载
for recording_key in all_recording_job_keys:
camera_code = recording_key.split(':')[2]
token = ysy_client.get_access_token(camera_code)
self._scan_recording_job(pipe, camera_code, recording_key, token)
for untreated_job_key in all_untreated_job_keys:
camera_code = untreated_job_key.split(':')[2]
recording_key = RECORDING_JOB_KEY.format(camera_code)
recording_job_count = pipe.llen(recording_key)
# 如果当前摄像头的正在录制进程数量为三,其他任务则需要等待
if recording_job_count >= MAX_RECORDING_NUM:
# self._scan_recording_job(pipe, camera_code, recording_key, token)
continue
token = ysy_client.get_access_token(camera_code)
now = datetime.now()
for _ in range(MAX_RECORDING_NUM - recording_job_count):
untreated_job = pipe.rpop(untreated_job_key)
if not untreated_job:
break
event = json.loads(untreated_job)
start_time = dateutil.parser.parse(event['start_time']).astimezone(tz)
end_time = dateutil.parser.parse(event['end_time']).astimezone(tz)
create_time = dateutil.parser.parse(event['create_time'])
# 设备离线了,延后1小时处理
if event.get('reason') == 'offline' and create_time > now:
pipe.lpush(untreated_job, json.dumps(event))
continue
record_res = ysy_client.record_cloud_video_save(
token, event['camera_code'], self.project_id, event['file_id'],
start_time.strftime('%Y%m%d%H%M%S'),
end_time.strftime('%Y%m%d%H%M%S'), rec_type='local')
meta = record_res.get('meta')
log.info(meta)
if meta.get('code') == 200:
log.info('add file_id: {}, start_time: {}, end_time: {}, duration: {}'.format(
event['file_id'], event['start_time'], event['end_time'], event['duration']))
event['create_time'] = now.strftime('%Y-%m-%d %H:%M:%S')
event['token'] = token
event['account'] = ysy_client.choice_account_of_device(event['camera_code'])
# 创建录制任务完成之后将任务丢到录制任务queue
pipe.lpush(recording_key, json.dumps(event))
continue
elif meta.get('code') == 400:
if '未搜索到本地录像' in meta.get('message'):
continue
elif meta.get('message') == '文件已经存在':
delete_res = ysy_client.delete_cloud_video_download_url(
token, event['file_id'], self.project_id)
if delete_res and '文件正在上传' in delete_res.get('meta', {'message': ''}).get('message'):
log.info('删除文件%s失败,文件正在上传', event['file_id'])
pipe.lpush(recording_key, json.dumps(event))
continue
elif '不在线' in meta.get('message'):
event['create_time'] = (now + timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S')
event['reason'] = 'offline'
log.info('设备%s不在线', event['camera_code'])
else:
# 未知错误需要处理
log.warning(record_res)
# 任务丢回待处理队列
log.info('开启录制file_id %s 任务失败,重新将任务丢回待处理队列, %s', event['file_id'], event)
pipe.lpush(untreated_job_key, json.dumps(event))
pipe.close()
def _download_file_by_url(self, urls, base_file_name, file_id, duration, token):
file_duration = 0
error_log_tag = 0
videos = []
for inx, video_url in enumerate(urls):
file_name = '{}_{}.mp4'.format(base_file_name, str(inx))
file_full_path = os.path.join(videos_path, file_name)
try:
self.download_file(video_url, file_full_path)
if not os.path.isfile(file_full_path):
raise FileExistsError
videos.append(file_full_path)
except Exception as e:
log.exception(e)
error_log_tag = 1
break
video_info, _ = get_video_duration(file_full_path)
if video_info['bitrate'] == '0':
error_log_tag = 2
break
file_duration += time_to_seconds(video_info['duration'])
log.info('The download is complete, file: %s, %s, %s',
file_name, video_info['duration'], video_info['resolution'])
else:
if file_duration < int(duration) - 5:
log.info('File(%s) length is smaller than event length. %s < %s' % (
file_id, file_duration, duration))
log.info('---------------')
# TODO 视频实际长度和事件不符
# 下载完成
ysy_client.delete_cloud_video_download_url(token, file_id, self.project_id)
return videos, error_log_tag
def _scan_recording_job(self, pipe, camera_code, camera_recording_key, token):
for _ in range(pipe.llen(camera_recording_key)):
recording_job = json.loads(pipe.rpop(camera_recording_key))
now = datetime.now()
start_time = dateutil.parser.parse(recording_job['start_time'])
create_time = dateutil.parser.parse(recording_job['create_time'])
if create_time + timedelta(seconds=int(recording_job['duration'])) > now:
# 当前文件还在录制中,不用去查询下载情况
pipe.lpush(camera_recording_key, json.dumps(recording_job))
continue
base_file_name = '{}_{}'.format(camera_code,
start_time.astimezone(pytz.utc).strftime('%Y-%m-%d_%H-%M-%S'))
video_full_path = os.path.join(videos_path, base_file_name + '.mp4')
file_id = recording_job['file_id']
res = ysy_client.get_cloud_video_download_url(token, file_id, self.project_id)
log.info('%s, %s', file_id, res)
meta = res.get('meta')
if meta.get('code') == 404:
if meta.get('message') == '文件正在上传' \
and create_time + timedelta(hours=3) < now:
if 'FIX' in recording_job['file_id'] and recording_job.get('retry_count') > 5:
log.warning('file_id-%s云录制异常,发生钉钉报警,中止任务!',
recording_job['file_id'].split('FIX')[0])
send_markdown('异常云录制事件',
text.format(recording_job['camera_info']['department_name'],
recording_job['camera_info']['department_district'],
recording_job['camera_code'], recording_job['start_time'],
recording_job['end_time'], recording_job['file_id'],
recording_job['create_time'], recording_job['token'],
recording_job['account']))
else:
retry_count = recording_job.get('retry_count', 0)
recording_job['file_id'] = '{}FIX{}'.format(recording_job['file_id'], retry_count)
recording_job['create_time'] = now.strftime('%Y-%m-%d %H:%M:%S')
recording_job['retry_count'] = retry_count + 1
pipe.lpush(UNTREATED_JOB_KEY.format(camera_code), json.dumps(recording_job))
elif meta.get('message') == '文件上传失败':
recording_job['create_time'] = now.strftime('%Y-%m-%d %H:%M:%S')
pipe.lpush(UNTREATED_JOB_KEY.format(camera_code), json.dumps(recording_job))
else:
pipe.lpush(camera_recording_key, json.dumps(recording_job))
continue
elif meta.get('code') == 200 and res.get('data'):
# 文件录制完成下载视频文件
urls = res.get('data').get('urls')
log.info('file %s: 持续时间%s, 等待下载时间: %s' % (
file_id, recording_job['duration'],
(now - create_time).total_seconds()))
videos, error_tag = self._download_file_by_url(urls, base_file_name, recording_job['file_id'],
recording_job['duration'], token)
if error_tag == 1:
log.warning('当前文件file_id:%s下载失败,需要重新下载', recording_job['file_id'])
pipe.lpush(camera_recording_key, json.dumps(recording_job))
continue
elif error_tag == 2 or len(videos) == 0:
log.warning('当前文件file_id:%s为空,重新录制', recording_job['file_id'])
recording_job['create_time'] = now.strftime('%Y-%m-%d %H:%M:%S')
pipe.lpush(UNTREATED_JOB_KEY.format(camera_code), json.dumps(recording_job))
continue
else:
if len(videos) > 1:
# 多个视频进行合并
concat(videos, video_full_path, removed=True)
else:
os.renames(videos[0], video_full_path)
if not os.path.isfile(video_full_path):
# TODO 文件不存在时,表示合并或重命名错误需要重新处理一下
continue
video_file = VideoFile(video_full_path)
url = oss_upload_file('eviz_record/{}'.format(video_file.file_path), video_file.full_path)
update_video_info(recording_job['db_table'], recording_job['event_id'], 1,
file_name=video_file.file_name, video_url=url,
video_resolution=video_file.resolution, recovered_time=recording_job['end_time'])
@staticmethod
def download_file(url, local_file):
"""
下载url文件另存为local_file
"""
urllib.request.urlretrieve(url, local_file)
......@@ -435,6 +435,7 @@ class ProcessMessage:
_, error_log = record_thread(stream_url, file_name, thread_name=body['camera_code'])
video_info, video_error_log = judge_video_error(file_name)
if video_error_log:
# TODO 文件损害说明
log.warning('file: %s, error_log: %s', file_name, video_error_log)
video_duration = time_to_seconds(video_info['duration'])
if video_duration > 5:
......
......@@ -48,11 +48,19 @@ def command_line_runner():
elif args['worker'] == 'merger':
from ils_common_video.eviz_video.merger import main as record_main
record_main()
elif args['worker'] == 'downloader':
from ils_common_video.eviz_video.cloud_download import DownloadFileJob
dfj = DownloadFileJob()
dfj.start()
elif args['env'] == 'common':
if args['worker'] == 'filter':
from ils_common_video.video_filter.filter import VideoFilterProcess
fp = VideoFilterProcess()
fp.run()
elif args['worker'] == 'record_task':
from ils_common_video.record_task import RecordTaskListener
rtr = RecordTaskListener()
rtr.run()
else:
parser.print_help()
......
import json
import time
import dateutil.parser
from datetime import datetime
from intelab_python_sdk.logger import log
from ils_common_video.db.rabbitmq import rabbitmq_connect
from ils_common_video.const import RECORD_TASK_QUEUE, UNTREATED_JOB_KEY
from ils_common_video.db.redis import redis_connect
from ils_common_video.db import mysql
class RecordTaskListener:
def __init__(self):
self.queue_name = RECORD_TASK_QUEUE
self.connection = rabbitmq_connect()
self.channel = self.connection.channel()
def run(self):
log.info('启动分析进程')
log.info('binding to queue {}'.format(self.queue_name))
self.channel.queue_declare(queue=self.queue_name, durable=True)
def callback(ch, method, properties, body):
log.info('received MQ message {}'.format(body))
try:
body = json.loads(body)
# TODO 摄像头没有绑定业务的需要处理吗?
camera_info = mysql.get_camera_info(camera_code=body['camera_code'], platform=None)
if camera_info:
platform = camera_info[0]['platform']
camera_info[0].update({
'ex': body['end_time'] - body['start_time'],
'retry': True
})
body['start_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(body['start_time']))
body['end_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(body['end_time']))
camera_info[0].update(body)
event_id = self.insert_pending_event(camera_info[0])
camera_info[0].update({'event_id': event_id})
if platform == 'eviz':
# 萤石云的云录制,直接设置任务key就可以
self.set_eviz_cloud_record_job(camera_info[0])
else:
# ISC协议摄像头需要丢进任务队列
self.channel.basic_publish(exchange='', routing_key='ISC_CLOUD_RECORD',
body=json.dumps(camera_info[0], ensure_ascii=False))
log.info('finished processing MQ message')
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
log.exception(e)
log.error('finished processing MQ message, error')
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(on_message_callback=callback,
queue=self.queue_name)
log.info(' [*] Waiting for messages. To exit press CTRL+C')
try:
self.channel.start_consuming()
except KeyboardInterrupt:
log.info('MQ connection closed by user')
except Exception as e:
log.exception('MQ connection closed unexpectedly. %s', e)
raise
self.connection.close()
@staticmethod
def insert_pending_event(body):
event_id = mysql.insert_video_info(
body['db_table'], body['device_code'], body['start_time'],
body['end_time'], biz_type=body['biz_type'],
service_type=body['service_type'], status=3, record_type='task'
)
return event_id
@staticmethod
def set_eviz_cloud_record_job(body):
pipe = redis_connect()
start_time = dateutil.parser.parse(body['start_time'])
end_time = dateutil.parser.parse(body['end_time'])
file_id = body['camera_code'] + start_time.strftime('%Y%m%d%H%M%S')
event_duration = int((end_time - start_time).total_seconds())
log.info('file_id: {}, start_time: {}, end_time: {}, duration: {}'.format(
file_id, start_time, end_time, event_duration))
# TODO 可以使用异步任务框架
body.update({
'file_id': file_id,
'create_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'duration': event_duration,
'status': 0,
})
pipe.lpush(UNTREATED_JOB_KEY.format(body['camera_code']), json.dumps(body, ensure_ascii=False))
pipe.close()
......@@ -117,7 +117,7 @@ def judge_video_error(file_name):
def get_video_duration(file_name, start_time='00:00:00', key_word='block unavailable'):
resolution, duration, bitrate, media_type = ['0x0'], ['00:00:00'], ['0'], 'VideoHandler'
size, error_log = 0, ''
size, error_log, fps = 0, '', ['0']
log_buffer = subprocess.getoutput(
'ffmpeg -y -ss {} -i {} -t 1 -vframes 1 /tmp/tmp.jpg'.format(start_time, file_name))
......@@ -134,6 +134,7 @@ def get_video_duration(file_name, start_time='00:00:00', key_word='block unavail
duration = re.findall(r'Duration: (\d\d:\d\d:\d\d)\.\d\d', log_buffer) or duration
bitrate = re.findall(r'bitrate: (\d+) kb/s', log_buffer) or bitrate
resolution = re.findall(r'(\d{3,4}x\d{3,4})', log_buffer) or resolution
fps = re.findall(r', (\d+) fps,', log_buffer) or fps
if 'SoundHandler' in log_buffer:
media_type += '+SoundHandler'
......@@ -143,6 +144,11 @@ def get_video_duration(file_name, start_time='00:00:00', key_word='block unavail
video_info = {
'duration': duration[0], 'bitrate': bitrate[0],
'resolution': resolution[0], 'media_type': media_type,
'file_name': file_name, 'size': round(size, 2)
'file_name': file_name, 'size': round(size, 2),
'fps': fps[0]
}
return video_info, error_log
if __name__ == '__main__':
print(get_video_duration('/tmp/videos/D86639983/D86639983_2021-08-12_14-06-42.mp4'))
......@@ -10,8 +10,8 @@ client = HikVisionClient(config.get('KEY'), config.get('SECRET'),
# with open('camera_infos.json', 'r+') as f:
# json.dump(client.get_cameras(), f, indent=4, ensure_ascii=False)
#
for camera in client.get_cameras():
for camera in [{'indexCode': '9ce74c6cfa114f6c86f97bd58377409d'}]:
print(camera)
url = client.get_camera_preview_url(camera['indexCode'], protocol='rtsp', stream_type=1)
url = client.get_camera_preview_url(camera['indexCode'], protocol='rtsp', stream_type=0)
print('摄像头{}的直播地址为{}'.format(camera['indexCode'], url.get('url')))
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论