提交 2b6c18dc authored 作者: zw.wang's avatar zw.wang

fix: 修复录制过程中重试和设置分布式锁失效等问题

上级 badec519
...@@ -78,7 +78,16 @@ def query(cursor_dict=False): ...@@ -78,7 +78,16 @@ def query(cursor_dict=False):
@query(cursor_dict=True) @query(cursor_dict=True)
def get_camera_info(cursor, conn): def get_camera_info(cursor, conn, camera_code=None):
if camera_code:
_filter = 'where camera_info.device_code = "{}"'.format(camera_code)
else:
_filter = '''
where `is_valid` = 1
and biz_type is not null
and `point_index_code` is not null
'''
sql = ''' sql = '''
select select
id as camera_id, id as camera_id,
...@@ -94,12 +103,9 @@ def get_camera_info(cursor, conn): ...@@ -94,12 +103,9 @@ def get_camera_info(cursor, conn):
from camera_info from camera_info
join camera_ai_config cac join camera_ai_config cac
on camera_info.id = cac.camera_info_id on camera_info.id = cac.camera_info_id
where {}
`is_valid` = 1
and biz_type is not null
and `point_index_code` is not null
order by create_time; order by create_time;
''' '''.format(_filter)
cursor.execute(sql) cursor.execute(sql)
cameras = cursor.fetchall() cameras = cursor.fetchall()
if not cameras: if not cameras:
......
...@@ -128,7 +128,8 @@ class ProcessMessage: ...@@ -128,7 +128,8 @@ class ProcessMessage:
log.info('thread_id:%s:%s: events count: %s', self.thread_id, self.body['camera_code'], len(events)) log.info('thread_id:%s:%s: events count: %s', self.thread_id, self.body['camera_code'], len(events))
for inx, event in enumerate(events): for inx, event in enumerate(events):
expire_time = int((event['end_time'] - event['start_time']).total_seconds() * 2 + 1) # 事件最小重试时间为120秒
expire_time = int(max(120, (event['end_time'] - event['start_time']).total_seconds() * 2 + 1))
# 判定当前任务是否需要继续 # 判定当前任务是否需要继续
pipe = redis_connect() pipe = redis_connect()
...@@ -177,6 +178,8 @@ class ProcessMessage: ...@@ -177,6 +178,8 @@ class ProcessMessage:
try: try:
aliyun_oss.oss_download_file(event['video_url'], part_file_name) aliyun_oss.oss_download_file(event['video_url'], part_file_name)
if os.path.isfile(part_file_name): if os.path.isfile(part_file_name):
log.info('thread:%s:%s: 上次中断文件%s', self.thread_id, event['camera_code'],
part_file_name)
part_files_set.add(part_file_name) part_files_set.add(part_file_name)
else: else:
raise FileExistsError(part_file_name) raise FileExistsError(part_file_name)
...@@ -236,9 +239,11 @@ class ProcessMessage: ...@@ -236,9 +239,11 @@ class ProcessMessage:
else: else:
next_retry_time = now + timedelta(minutes=30) next_retry_time = now + timedelta(minutes=30)
retry_count += 1 retry_count += 1
if retry_count > 10: if (next_retry_time - event['start_time']) > timedelta(days=3):
# 重试最长时间为3天
status = 4 status = 4
remark = remark + '+failed+end.' remark = remark + '+failed+end.'
# TODO 钉钉异常告警
else: else:
remark = (remark or 'start') + '+failed' remark = (remark or 'start') + '+failed'
...@@ -288,20 +293,24 @@ class ProcessMessage: ...@@ -288,20 +293,24 @@ class ProcessMessage:
# 可以只通过一个回放流地址取到其他时间段的流 # 可以只通过一个回放流地址取到其他时间段的流
playback_stream = playback_urls[0] playback_stream = playback_urls[0]
else: else:
# 取流失败,直接返回 # 取流失败
self.write_retry_info_to_influx(self.body['camera_code'], res.get('remark')) self.write_retry_info_to_influx(self.body['camera_code'], res.get('remark'))
return res playback_stream = None
part_num = retry_count = 1 part_num = retry_count = 1
is_completed = False is_completed = False
part_files_set = part_files_set or set() part_files_set = part_files_set or set()
while retry_count < 6: remark = ''
while retry_count < 6 and playback_stream:
# 重试六次 # 重试六次
retry_count += 1 retry_count += 1
complete_duration = (end_time - start_time).total_seconds() complete_duration = (end_time - start_time).total_seconds()
(file_info, _), error_log = self.stream_record(playback_stream['stream_url'], (file_info, _), error_log = self.stream_record(playback_stream['stream_url'],
start_time, end_time) start_time, end_time)
if error_log:
remark += '\n' + error_log
file_duration = time_to_seconds(file_info['duration']) file_duration = time_to_seconds(file_info['duration'])
if not os.path.isfile(file_info['file_name']): if not os.path.isfile(file_info['file_name']):
log.info('当前录制无文件输出:%s, 重试计数: %s', self.body['camera_code'], retry_count) log.info('当前录制无文件输出:%s, 重试计数: %s', self.body['camera_code'], retry_count)
...@@ -342,6 +351,8 @@ class ProcessMessage: ...@@ -342,6 +351,8 @@ class ProcessMessage:
'recovered_time': start_time, 'recovered_time': start_time,
'retry_count': retry_count 'retry_count': retry_count
}) })
if remark:
res['remark'] = remark
return res return res
def stream_record(self, stream, start_time, end_time): def stream_record(self, stream, start_time, end_time):
......
...@@ -48,8 +48,8 @@ daily_text = """ ## {}日报, 共{}路摄像头 ...@@ -48,8 +48,8 @@ daily_text = """ ## {}日报, 共{}路摄像头
● 总视频时长为: {} \n ● 总视频时长为: {} \n
● 录制完成时长: {} \n ● 录制完成时长: {} \n
● 稍后重试时长: {} \n ● 稍后重试时长: {} \n
不再重试时长: {} \n 多次失败时长: {} \n
● 暂未处理时长: {} \n
[摄像头视频时长详情]({}) [摄像头视频时长详情]({})
""" """
...@@ -223,6 +223,7 @@ class Tasks: ...@@ -223,6 +223,7 @@ class Tasks:
file_name = 'isc-daily-{}.xlsx'.format(end_time_cst.strftime('%Y%m%d')) file_name = 'isc-daily-{}.xlsx'.format(end_time_cst.strftime('%Y%m%d'))
camera_count = 0 camera_count = 0
daily_untreated_duration = 0
daily_total_duration = daily_done_duration = 0 daily_total_duration = daily_done_duration = 0
daily_failed_duration = daily_retry_duration = 0 daily_failed_duration = daily_retry_duration = 0
results = [] results = []
...@@ -270,6 +271,7 @@ class Tasks: ...@@ -270,6 +271,7 @@ class Tasks:
daily_done_duration += done daily_done_duration += done
daily_failed_duration += unknown daily_failed_duration += unknown
daily_retry_duration += failed daily_retry_duration += failed
daily_untreated_duration += untreated
camera_count += 1 camera_count += 1
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
...@@ -286,6 +288,7 @@ class Tasks: ...@@ -286,6 +288,7 @@ class Tasks:
get_time_str(daily_done_duration), get_time_str(daily_done_duration),
get_time_str(daily_retry_duration), get_time_str(daily_retry_duration),
get_time_str(daily_failed_duration), get_time_str(daily_failed_duration),
get_time_str(daily_untreated_duration),
url url
) )
) )
......
...@@ -62,22 +62,8 @@ class IntelabApiHelper: ...@@ -62,22 +62,8 @@ class IntelabApiHelper:
for pre_event in pre_events: for pre_event in pre_events:
# 合并时间间隔较短的事件 # 合并时间间隔较短的事件
# TODO online=0的处理 if pre_event.get('online', 1) == '0':
''' raise PlaybackUrlException({'message': 'offline', 'code': 404})
{
"deviceType": null,
"regionIndexCode": "bb488f00-486e-4f6d-afde-24e22a13c427",
"collectTime": "2021-06-19T18:32:27.000+08:00",
"deviceIndexCode": null, "ip": null, "regionName": "\u9e7f\u57ce\u533a",
"indexCode": "7a22574c7dae4f869b1f3dca55587175",
"cn": "\u9e7f\u57ce\u533a\u5434\u7acb\u695a\u53e3\u8154\u706d\u83cc\u95f4",
"treatyType": "1", "manufacturer": null, "port": null,
"online": 0
}
'''
if 'beginTime' not in pre_event:
log.exception(ValueError(json.dumps(pre_event)))
continue
start_time = dateutil.parser.parse(pre_event['beginTime']) start_time = dateutil.parser.parse(pre_event['beginTime'])
end_time = dateutil.parser.parse(pre_event['endTime']) end_time = dateutil.parser.parse(pre_event['endTime'])
......
...@@ -24,7 +24,7 @@ requires = [ ...@@ -24,7 +24,7 @@ requires = [
setuptools.setup( setuptools.setup(
name='isc-video-record', name='isc-video-record',
version='1.0.0b6', version='1.0.0b7',
description='ISC motion detection playback video stream recording service.', description='ISC motion detection playback video stream recording service.',
long_description=long_description, long_description=long_description,
long_description_content_type='text/markdown', long_description_content_type='text/markdown',
......
import os
import pytz
from datetime import datetime
from intelab_python_sdk.logger import log_init
from dynaconf import settings
from isc_video_record.utils.isc_client import HikVisionClient
from isc_video_record.utils.api_helper import IntelabApiHelper
tz = pytz.timezone('Asia/Shanghai')
log_init(__name__, True, './log')
video_path = '/tmp/videos/isc-record'
os.makedirs(video_path, exist_ok=True)
config = settings.get('ISC')
client = HikVisionClient(config.get('KEY'), config.get('SECRET'),
config.get('HOST'), config.get('PORT'))
def main():
start_time = datetime(2021, 6, 23, 12, 15, 29).astimezone(tz)
end_time = datetime(2021, 6, 23, 12, 17, 32).astimezone(tz)
camera_index = '85a04a664af448f79cba3693cae4b4fd'
res = client.get_cameras_playback_urls(
camera_index,
client.iso_format(start_time), client.iso_format(end_time)
)
print(res)
api_helper = IntelabApiHelper()
res = api_helper.get_cameras_playback_urls(
'G11576055',
client.iso_format(start_time), client.iso_format(end_time)
)
print(res)
if __name__ == '__main__':
main()
...@@ -23,10 +23,10 @@ client = HikVisionClient(config.get('KEY'), config.get('SECRET'), ...@@ -23,10 +23,10 @@ client = HikVisionClient(config.get('KEY'), config.get('SECRET'),
def main(): def main():
start_time = datetime(2021, 6, 18, 8, 27, 21).astimezone(tz) start_time = datetime(2021, 6, 23, 12, 15, 29).astimezone(tz)
# start_time = datetime(2021, 5, 28, 9, 10, 59).astimezone(tz) # start_time = datetime(2021, 5, 28, 9, 10, 59).astimezone(tz)
end_time = datetime(2021, 6, 18, 8, 30, 50).astimezone(tz) end_time = datetime(2021, 6, 23, 12, 17, 32).astimezone(tz)
camera_index = 'f134d02a55f2402c8175cd10b6c57f97' camera_index = '1f1878fd4e8b4e60b2adb1c0290458e8'
results = [] results = []
try: try:
......
import json
from isc_video_record.db import mysql
from isc_video_record.db.rabbitmq import rabbitmq_connect
queue_name = 'ISC_RECORD_JOB'
@mysql.query()
def reset_event_status(cursor, conn, event_id, db_table):
sql = '''
update {} set status = 3, retry_info = null
where id = %s
'''.format(db_table)
cursor.execute(sql, [event_id])
conn.commit()
@mysql.query()
def update_event_status(cursor, conn, inx):
sql = '''
update common_video_1.video_data_motion_{}
set status = 1, `update_time` = now()
where status > 1 and `recovered_time` = end_time
order by `start_time` desc ;
'''.format(inx)
cursor.execute(sql)
conn.commit()
def main():
camera_code = 'G11575797'
event_id = 37274
camera_info = mysql.get_camera_info(camera_code=camera_code)[0]
reset_event_status(event_id, camera_info['db_table'])
body = {
'camera_code': camera_info['device_code'],
'camera_index': camera_info['point_index_code'],
'db_table': camera_info['db_table'],
'ex': 100
}
connection = rabbitmq_connect()
channel = connection.channel()
channel.queue_declare(queue_name, durable=True)
channel.basic_publish(exchange='', routing_key=queue_name,
body=json.dumps(body, ensure_ascii=False))
connection.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论