提交 4babc9c5 authored 作者: zw.wang's avatar zw.wang

feat: [recorder] 录制任务重构,修复多文件同一个链接的问题

上级 1fe18c06
......@@ -10,6 +10,7 @@ import dynaconf
from datetime import timedelta, datetime
from intelab_python_sdk.logger import log_init, log
from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat
from intelab_python_sdk.ffmpeg.ffmpeg_prune import prune as ffmpeg_prune
from isc_video_record.db import rabbitmq_connect, redis_connect, influxdb
from isc_video_record.const import PROCESSING_CAMERA_KEY
......@@ -95,7 +96,7 @@ class StreamRecorder:
on_message, args=(self.connection, threads))
# 设置消费能力
self.channel.basic_qos(prefetch_count=dynaconf.settings.get('PREFETCH_COUNT', 80))
self.channel.basic_qos(prefetch_count=dynaconf.settings.get('PREFETCH_COUNT', 300))
self.channel.basic_consume(on_message_callback=on_message_callback,
queue=self.queue_name)
......@@ -268,42 +269,10 @@ class ProcessMessage:
return True
@staticmethod
def recorder(body, file_name, start_time, end_time, part_files_set=None):
"""
:param body:
:param file_name:
:param start_time: 开始时间,上海时区
:param end_time: 结束时间,上海时区
:param part_files_set:
"""
playback_urls = []
res = {'file_name': file_name, 'is_completed': False, 'recovered_time': start_time}
try:
playback_urls = hik_client.get_cameras_playback_urls(
body['camera_index'],
IntelabApiHelper.iso_format(start_time),
IntelabApiHelper.iso_format(end_time)
)
except PlaybackUrlException as e:
res.update({'except': True, 'offline': True, 'remark': e.msg})
except Exception as e:
log.exception(e)
res.update({'except': True, 'remark': e.__str__()})
send_alarm_to_developer('recorder-{}'.format(threading.get_ident()), e)
log.info('%s: playback: %s', body['camera_code'], playback_urls)
if playback_urls:
# 可以只通过一个回放流地址取到其他时间段的流
playback_stream = playback_urls[0]
else:
# 取流失败
ProcessMessage.write_retry_info_to_influx(body['camera_code'], res.get('remark'))
playback_stream = None
def stream_to_video(body, playback_stream, start_time, end_time, part_files_set):
part_num = retry_count = 1
is_completed = False
part_files_set = part_files_set or set()
remark = ''
while retry_count < 6 and playback_stream:
# 重试六次
......@@ -330,7 +299,6 @@ class ProcessMessage:
start_time = new_start_time
part_num += 1
retry_count = 1
else:
# 录制完成
part_files_set.add(file_info['file_name'])
......@@ -340,6 +308,49 @@ class ProcessMessage:
ProcessMessage.write_retry_info_to_influx(body['camera_code'], error_log)
return start_time, is_completed, {'retry_count': retry_count, 'remark': remark}
@staticmethod
def recorder(body, file_name, start_time, end_time, part_files_set=None):
"""
:param body:
:param file_name:
:param start_time: 开始时间,上海时区
:param end_time: 结束时间,上海时区
:param part_files_set:
"""
playback_urls = []
res = {'file_name': file_name, 'is_completed': False, 'recovered_time': start_time}
try:
playback_urls = hik_client.get_cameras_playback_urls(
body['camera_index'],
IntelabApiHelper.iso_format(start_time),
IntelabApiHelper.iso_format(end_time)
)
except PlaybackUrlException as e:
res.update({'except': True, 'offline': True, 'remark': e.msg})
except Exception as e:
log.exception(e)
res.update({'except': True, 'remark': e.__str__()})
send_alarm_to_developer('recorder-{}'.format(threading.get_ident()), e)
log.info('%s: playback: %s', body['camera_code'], playback_urls)
if not playback_urls:
ProcessMessage.write_retry_info_to_influx(body['camera_code'], res.get('remark'))
playback_urls = []
is_completed, recovered_time, retry_info = False, start_time, {}
part_files_set = part_files_set or set()
for playback_stream in playback_urls:
start_time = max(start_time, playback_stream['start_time'])
end_time = min(end_time, playback_stream['end_time'])
recovered_time, is_completed, retry_info = ProcessMessage.stream_to_video(
body, playback_stream, start_time, end_time, part_files_set)
if not is_completed:
log.info('摄像头%s在%s, %s录制未完成', body['camera_code'], start_time, end_time)
break
part_files = sorted(list(part_files_set))
if len(part_files) > 1:
log.info('摄像头%s存在多个视频文件%s,将进行合并', body['camera_code'], part_files)
......@@ -352,11 +363,11 @@ class ProcessMessage:
res.update({
'file_name': file_name,
'is_completed': is_completed,
'recovered_time': start_time,
'retry_count': retry_count
'recovered_time': recovered_time,
'retry_count': retry_info.get('retry_count')
})
if remark:
res['remark'] = remark
if retry_info.get('remark'):
res['remark'] = retry_info.get('remark')
part_files_set.clear()
return res
......@@ -374,8 +385,18 @@ class ProcessMessage:
# TODO 出现ffmpeg进程阻塞的情况
_, error_log = record_thread(stream_url, file_name, thread_name=body['camera_code'])
video_info, video_error_log = judge_video_error(file_name)
# TODO 查看出现损坏的视频发送报警
if video_error_log:
video_duration = time_to_seconds(video_info['duration'])
if video_duration > 5:
tmp_file_name = file_name + '.tmp.mp4'
ffmpeg_prune(file_name, file_name, duration=video_duration - 5)
if os.path.isfile(tmp_file_name):
shutil.move(tmp_file_name, file_name)
video_info, video_error_log = judge_video_error(file_name)
else:
# 小于5秒不要了
os.remove(file_name)
send_alarm_to_developer('recorder', 'file_name: {}, error_log: {}'.format(file_name, video_error_log),
mobiles=['15131601294'])
return video_info, error_log
......
......@@ -12,6 +12,7 @@ def record_thread(stream_url, out_file, thread_name='ffmpeg-log', protocol='rtmp
'-v', 'info',
'-use_wallclock_as_timestamps', '1',
'-rtsp_transport', 'tcp',
'-stimeout', '10000000',
'-i', stream_url,
'-c', 'copy',
'-f', 'mp4',
......@@ -23,7 +24,7 @@ def record_thread(stream_url, out_file, thread_name='ffmpeg-log', protocol='rtmp
'-y',
'-v', 'info',
'-rtbufsize', '1m',
'-rw_timeout', '20000000',
'-rw_timeout', '10000000',
'-i', stream_url,
'-movflags', 'faststart+frag_keyframe', # 使mp4支持渐进式下载
'-c', 'copy',
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论