提交 c0ce7650 authored 作者: zw.wang's avatar zw.wang

fix: [recorder] 修复录制进程bug

上级 05d5903a
......@@ -139,7 +139,7 @@ def update_video_info(cursor, conn, db_table, video_id, status,
where id = %s
'''
sub_set = ''
if file_name:
if file_name and video_url:
sub_set += ', file_name = "{}", video_url="{}", video_resolution="{}", recovered_time="{}",' \
'expired_time=date_add(now(),interval 31 day)'\
.format(file_name, video_url, video_resolution, recovered_time)
......
......@@ -13,7 +13,7 @@ from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat
from isc_video_record.db import rabbitmq_connect, redis_connect, influxdb
from isc_video_record.const import PROCESSING_CAMERA_KEY
from isc_video_record.utils.api_helper import IntelabApiHelper
from isc_video_record.utils.api_helper import IntelabApiHelper, PlaybackUrlException
from isc_video_record.utils import aliyun_oss
from isc_video_record.utils.record_utils import record_thread, get_video_duration, time_to_seconds
from isc_video_record.utils.alarm_utils import send_alarm_to_developer
......@@ -95,7 +95,7 @@ class StreamRecorder:
on_message, args=(self.connection, threads))
# 设置消费能力
self.channel.basic_qos(prefetch_count=dynaconf.settings.get('PREFETCH_COUNT', 10))
self.channel.basic_qos(prefetch_count=dynaconf.settings.get('PREFETCH_COUNT', 50))
self.channel.basic_consume(on_message_callback=on_message_callback,
queue=self.queue_name)
......@@ -129,8 +129,9 @@ class ProcessMessage:
for inx, event in enumerate(events):
expire_time = int((event['end_time'] - event['start_time']).total_seconds() * 2 + 1)
pipe = redis_connect()
# 判定当前任务是否需要继续
pipe = redis_connect()
if pipe.ttl(self.camera_key) > expire_time:
_continue = True
elif -1 < pipe.ttl(self.camera_key) < expire_time \
......@@ -148,6 +149,7 @@ class ProcessMessage:
# 判定当前分布式锁是本线程设置的
log.info('thread_id:%s:%s,ttl:%s', self.thread_id, self.camera_key,
pipe.ttl(self.camera_key))
mysql.update_video_info(self.body['db_table'], event['video_id'], status=0)
self.recording(event)
continue
else:
......@@ -172,50 +174,60 @@ class ProcessMessage:
video_info, _ = get_video_duration(record_result['file_name'])
log.info('thread_id:%s:%s: time consuming: %s, duration: %s, size: %sM',
self.thread_id, event['camera_code'],
round(t2 - t1, 2), time_to_seconds(video_info['duration']),
video_info['size'])
self.thread_id, event['camera_code'], round(t2 - t1, 2),
time_to_seconds(video_info['duration']), video_info['size'])
url = ''
now = datetime.now()
url = next_retry_time = None
remark = event.get('remark')
status = 2
file_name = record_result['file_name'].split('/')[-1]
if record_result['file_name'] and os.path.isfile(record_result['file_name']):
url = aliyun_oss.oss_upload_file('isc_record/' + file_name,
record_result['file_name'])
status = 1 if record_result['is_completed'] else 2
# TODO 失败的续录
mysql.update_video_info(self.body['db_table'], event['video_id'], status,
file_name=file_name, video_url=url, video_resolution=video_info['resolution'],
recovered_time=record_result['recovered_time'].astimezone(pytz.utc))
os.remove(record_result['file_name'])
else:
# TODO 离线事件重试的时间需要调整为3小时一次,但是普通录制失败的视频重试为30分钟
now = datetime.now()
if not event.get('retry_info'):
event['retry_info'] = '[]'
if not event.get('retry_info'):
event['retry_info'] = '[]'
event['retry_info'] = json.loads(event['retry_info'])
retry_count = len(event['retry_info'])
if os.path.isfile(record_result['file_name']):
url = aliyun_oss.oss_upload_file('isc_record/' + file_name, record_result['file_name'])
# TODO 上传失败,多次重试
if not url:
record_result['is_completed'] = False
os.remove(record_result['file_name'])
event['retry_info'] = json.loads(event['retry_info'])
retry_count = len(event['retry_info'])
next_retry_time = remark = None
if record_result.get('except') and record_result.get('offline'):
# 当前摄像头离线只增加下次重试时间
log.info('thread_id:%s:%s: camera offline.', self.thread_id, event['camera_code'])
next_retry_time = now + timedelta(minutes=30)
remark = (remark or 'start') + '+offline'
elif record_result.get('is_completed'):
log.info('thread_id:%s:%s: %s is completed.',
self.thread_id, event['camera_code'], record_result['file_name'])
status = 1
else:
next_retry_time = now + timedelta(minutes=30)
retry_count += 1
if retry_count > 5:
status = 4
remark = remark + '+failed+end.'
else:
remark = (remark or 'start') + '+failed'
event['retry_info'].append({
'retry_count': retry_count + 1,
'retry_count': retry_count,
'retry_time': now.strftime('%Y-%m-%d %H:%M:%S'),
'result': 'failed'
'result': record_result.get('remark')
})
if retry_count < 5:
# 无有效视频文件,标记事件状态 status = 2
status = 2
next_retry_time = now + timedelta(minutes=30)
remark = (event.get('remark') or 'start') + '+failed'
else:
status = 4 # 状态4: 不再重试!
remark = event['remark'] + '+failed+end.'
mysql.update_video_info(self.body['db_table'], event['video_id'], status,
retry_info=event['retry_info'],
next_retry_time=next_retry_time,
remark=remark)
mysql.update_video_info(
self.body['db_table'], event['video_id'],
status, file_name=file_name, video_url=url,
video_resolution=video_info['resolution'],
recovered_time=record_result['recovered_time'].astimezone(pytz.utc),
retry_info=event['retry_info'],
next_retry_time=next_retry_time,
remark=remark
)
log.info('thread_id:%s:video_info: %s, url: %s, video_id: %s.%s, time: %s',
self.thread_id, video_info, url, self.body['db_table'],
......@@ -227,28 +239,39 @@ class ProcessMessage:
:param start_time: 开始时间,上海时区
:param end_time: 结束时间,上海时区
"""
playback_urls = api_helper.get_cameras_playback_urls(
self.body['camera_code'],
IntelabApiHelper.iso_format(start_time),
IntelabApiHelper.iso_format(end_time)
)
log.info('thread_id:%s:%s: playback: %s', self.thread_id, self.body['camera_code'], playback_urls)
file_name = os.path.join(video_path, 'ISC_{}_{}_{}.mp4'.format(
self.body['camera_code'],
start_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S'),
end_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S')
))
if len(playback_urls) > 0:
playback_urls = []
res = {'file_name': file_name, 'is_completed': False, 'recovered_time': start_time}
try:
playback_urls = api_helper.get_cameras_playback_urls(
self.body['camera_code'],
IntelabApiHelper.iso_format(start_time),
IntelabApiHelper.iso_format(end_time)
)
except PlaybackUrlException as e:
res.update({'except': True, 'offline': True, 'remark': e.msg})
except Exception as e:
log.exception(e)
res.update({'except': True, 'remark': e.__str__})
log.info('thread_id:%s:%s: playback: %s', self.thread_id, self.body['camera_code'], playback_urls)
if playback_urls:
# 可以只通过一个回放流地址取到其他时间段的流
playback_stream = playback_urls[0]
else:
# 取流失败,直接返回
return {'file_name': file_name, 'is_completed': False, 'recovered_time': start_time}
self.write_retry_info_to_influx(self.body['camera_code'], res.get('remark'))
return res
part_num = retry_count = 0
is_completed = False
part_files_set = set()
while retry_count < 3:
while retry_count < 6:
# 重试六次
retry_count += 1
complete_duration = (end_time - start_time).total_seconds()
......@@ -289,12 +312,13 @@ class ProcessMessage:
shutil.move(part_files[0], file_name)
log.info('thread_id:%s:%s: The download is complete, file %s',
self.thread_id, self.body['camera_code'], file_name)
return {
res.update({
'file_name': file_name,
'is_completed': is_completed,
'recovered_time': start_time,
'retry_count': retry_count
}
})
return res
def stream_record(self, stream, start_time, end_time):
start_time = start_time.strftime('%Y%m%dT%H%M%S')
......
......@@ -24,7 +24,7 @@ requires = [
setuptools.setup(
name='isc-video-record',
version='1.0.0a20',
version='1.0.0a22',
description='ISC motion detection playback video stream recording service.',
long_description=long_description,
long_description_content_type='text/markdown',
......
......@@ -21,9 +21,10 @@ client = HikVisionClient(config.get('KEY'), config.get('SECRET'),
def main():
start_time = datetime(2021, 5, 23, 10, 13, 0).astimezone(tz)
end_time = datetime(2021, 5, 23, 10, 59, 0).astimezone(tz)
res = client.get_cameras_playback_urls('8b86a98f6c5143219d69c9c0344b156a',
start_time = datetime(2021, 5, 27, 9, 10, 30).astimezone(tz)
# start_time = datetime(2021, 5, 28, 9, 10, 59).astimezone(tz)
end_time = datetime(2021, 5, 27, 9, 30, 50).astimezone(tz)
res = client.get_cameras_playback_urls('8f50e406cad6489fac443e034d29a66f',
client.iso_format(start_time), client.iso_format(end_time))
print(res)
for event in res:
......@@ -31,8 +32,8 @@ def main():
cur_end_time = min(event['end_time'], end_time)
print(cur_start_time, cur_end_time)
# print(event)
# file_info, error_log = stream_record(event['stream_url'], cur_start_time, cur_end_time)
# print(file_info)
file_info, error_log = stream_record(event['stream_url'], cur_start_time, cur_end_time)
print(file_info)
def stream_record(stream, start_time, end_time):
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论