提交 bb388547 authored 作者: zw.wang's avatar zw.wang

chore: [recorder] 重构录制模块,排查多线程任务重复的问题

上级 3ec77629
......@@ -8,12 +8,11 @@ import pytz
import dynaconf
from datetime import timedelta, datetime
from redis.exceptions import ConnectionError
from intelab_python_sdk.logger import log_init, log
from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat
from isc_video_record.db import rabbitmq_connect, redis_connect
from isc_video_record.const import PROCESSING_CAMERA_KEY, PROCESSING_TOTAL_KEY
from isc_video_record.const import PROCESSING_CAMERA_KEY
from isc_video_record.utils.api_helper import IntelabApiHelper
from isc_video_record.utils import aliyun_oss
from isc_video_record.utils.record_utils import record_thread, get_video_duration, time_to_seconds
......@@ -35,12 +34,6 @@ class StreamRecorder:
self.channel = self.connection.channel()
@staticmethod
def set_nx(pipe, camera_key, thread_id, ex):
pipe.pipeline()
res = pipe.set(camera_key, thread_id, nx=True, ex=int(ex))
return res
def start(self):
log.info('binding to queue {}'.format(self.queue_name))
......@@ -55,7 +48,7 @@ class StreamRecorder:
def do_work(conn, ch, delivery_tag, body):
thread_id = threading.get_ident()
log.info('Thread id: %s Delivery tag: %s Message body: %s', thread_id,
log.info('thread_id:%s: Delivery tag: %s Message body: %s', thread_id,
delivery_tag, body)
ack = False
......@@ -63,20 +56,30 @@ class StreamRecorder:
camera_key = PROCESSING_CAMERA_KEY.format(body['camera_code'])
try:
if self.set_nx(pipe, camera_key, thread_id, body['ex']):
ack = self.process_message(pipe, camera_key, thread_id, body)
if pipe.set(camera_key, thread_id, nx=True, ex=100):
log.info('thread_id:%s:setnx:%s', thread_id, camera_key)
process_message = ProcessMessage(camera_key, thread_id, body)
ack = process_message.process()
else:
log.info('Thread id:%s: camera_code[%s] processing, end.', thread_id, body['camera_code'])
log.info('thread_id:%s: camera_code[%s] processing, end.', thread_id, body['camera_code'])
except Exception as e:
log.exception(e)
send_alarm_to_developer('recorder_{}'.format(thread_id), e)
finally:
if pipe.get(camera_key) == str(thread_id):
pipe.delete(camera_key)
try:
if pipe.get(camera_key) == str(thread_id):
log.info('delete:%s:%s', thread_id, camera_key)
pipe.delete(camera_key)
pipe.close()
except Exception:
pass
cb = functools.partial(ack_message, ch, delivery_tag)
conn.add_callback_threadsafe(cb)
log.info('finished processing MQ message. ack=%s', ack)
log.info('thread_id:%s: finished processing MQ message. ack=%s', thread_id, ack)
def on_message(ch, method_frame, _header_frame, body, args):
(conn, thrds) = args
......@@ -109,47 +112,64 @@ class StreamRecorder:
self.connection.close()
def process_message(self, pipe, camera_key, thread_id, body):
class ProcessMessage:
def __init__(self, camera_key, thread_id, body):
self.camera_key = camera_key
self.thread_id = thread_id
self.body = body
def process(self):
ack = False
events = mysql.get_untreated_events(body['db_table'], body['camera_code'], retry=True)
log.info('Thread_id:%s: events count: %s', thread_id, len(events))
events = mysql.get_untreated_events(self.body['db_table'], self.body['camera_code'], retry=True)
log.info('thread_id:%s:%s: events count: %s', self.thread_id, self.body['camera_code'], len(events))
pipe.incr(PROCESSING_TOTAL_KEY) # 当前录制进程数+1
for inx, event in enumerate(events):
log.info(event)
if pipe.keys(camera_key):
if pipe.get(camera_key) == str(thread_id):
# 判定当前分布式锁是本线程设置的
self.recording(thread_id, body['db_table'], event)
continue
if not self.set_nx(pipe, camera_key, thread_id, body['ex']):
_continue = False
expire_time = int(self.body['ex'] * 2 + 1)
pipe = redis_connect()
if pipe.ttl(self.camera_key) == -2:
if pipe.set(self.camera_key, self.thread_id, nx=True, ex=expire_time):
_continue = True
elif -1 < pipe.ttl(self.camera_key) < expire_time \
and pipe.get(self.camera_key) == str(self.thread_id) \
and pipe.expire(self.camera_key, expire_time):
_continue = True
if _continue:
# 判定当前分布式锁是本线程设置的
log.info('thread_id:%s:%s,ttl:%s', self.thread_id, self.camera_key,
pipe.ttl(self.camera_key))
self.recording(event)
continue
else:
break
else:
ack = True
try:
pipe.decr(PROCESSING_TOTAL_KEY) # 本次录制结束进程数-1
except ConnectionError:
with redis_connect() as pipe:
pipe.decr(PROCESSING_TOTAL_KEY)
with redis_connect() as pipe:
if pipe.get(self.camera_key) == str(self.thread_id):
log.info('thread:%s:delete:%s', self.thread_id, self.camera_key)
pipe.delete(self.camera_key)
return ack
def recording(self, thread_id, db_table, event):
def recording(self, event):
t1 = time.time()
record_result = self.recorder(
event['camera_code'],
event['start_time'].astimezone(tz), # 录制调用接口的过程中使用的是上海时区
event['end_time'].astimezone(tz),
thread_id
)
record_result = self.recorder(event['start_time'].astimezone(tz),
event['end_time'].astimezone(tz))
t2 = time.time()
video_info, error_log = get_video_duration(record_result['file_name'])
log.info('Thread_id:%s:%s: Time consuming: %s, duration: %s, size: %sM',
thread_id, event['camera_code'],
log.info('thread_id:%s:%s: time consuming: %s, duration: %s, size: %sM',
self.thread_id, event['camera_code'],
round(t2 - t1, 2), time_to_seconds(video_info['duration']),
video_info['size'])
url = ''
file_name = record_result['file_name'].split('/')[-1]
if record_result['file_name'] and os.path.isfile(record_result['file_name']):
......@@ -157,7 +177,7 @@ class StreamRecorder:
record_result['file_name'])
status = 1 if record_result['is_completed'] else 2
# TODO 失败的续录
mysql.update_video_info(db_table, event['video_id'], status,
mysql.update_video_info(self.body['db_table'], event['video_id'], status,
file_name=file_name, video_url=url, video_resolution=video_info['resolution'],
recovered_time=record_result['recovered_time'].astimezone(pytz.utc))
os.remove(record_result['file_name'])
......@@ -184,32 +204,29 @@ class StreamRecorder:
status = 4 # 状态4: 不再重试!
remark = event['remark'] + '+failed+end.'
mysql.update_video_info(db_table, event['video_id'], status,
mysql.update_video_info(self.body['db_table'], event['video_id'], status,
retry_info=event['retry_info'],
next_retry_time=next_retry_time,
remark=remark)
log.info('video_info: %s, url: %s, video_id: %s.%s, time: %s',
video_info, url, db_table,
log.info('thread_id:%s:video_info: %s, url: %s, video_id: %s.%s, time: %s',
self.thread_id, video_info, url, self.body['db_table'],
event['video_id'], round(time.time() - t1, 2))
return True
@staticmethod
def recorder(camera_code, start_time, end_time, thread_id=1):
def recorder(self, start_time, end_time):
"""
:param camera_code: 摄像头序列号
:param start_time: 开始时间,上海时区
:param end_time: 结束时间,上海时区
:param thread_id:
"""
playback_urls = api_helper.get_cameras_playback_urls(
camera_code,
self.body['camera_code'],
IntelabApiHelper.iso_format(start_time),
IntelabApiHelper.iso_format(end_time)
)
log.info('thread_id:%s:%s: playback: %s', thread_id, camera_code, playback_urls)
log.info('thread_id:%s:%s: playback: %s', self.thread_id, self.body['camera_code'], playback_urls)
file_name = os.path.join(video_path, 'ISC_{}_{}_{}.mp4'.format(
camera_code,
self.body['camera_code'],
start_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S'),
end_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S')
))
......@@ -227,11 +244,10 @@ class StreamRecorder:
retry_count += 1
complete_duration = (end_time - start_time).total_seconds()
file_info, _ = StreamRecorder.stream_record(
playback_stream['stream_url'], start_time, end_time, camera_code)
file_info, _ = self.stream_record(playback_stream['stream_url'], start_time, end_time)
file_duration = time_to_seconds(file_info['duration'])
if not os.path.isfile(file_info['file_name']):
log.info('当前录制无文件输出:%s, 重试计数: %s', camera_code, retry_count)
log.info('当前录制无文件输出:%s, 重试计数: %s', self.body['camera_code'], retry_count)
time.sleep(1)
continue
......@@ -239,7 +255,7 @@ class StreamRecorder:
# 视频文件时长小于完整时长
new_start_time = start_time + timedelta(seconds=file_duration)
part_file_name = os.path.join(video_path, 'ISC_{}_{}_{}_{}.mp4'.format(
camera_code,
self.body['camera_code'],
start_time.strftime('%Y%m%dT%H%M%S'),
new_start_time.strftime('%Y%m%dT%H%M%S'), part_num
))
......@@ -261,7 +277,7 @@ class StreamRecorder:
elif len(part_files) == 1:
shutil.move(part_files[0], file_name)
log.info('thread_id:%s:%s: The download is complete, file %s',
thread_id, camera_code, file_name)
self.thread_id, self.body['camera_code'], file_name)
return {
'file_name': file_name,
'is_completed': is_completed,
......@@ -269,8 +285,7 @@ class StreamRecorder:
'retry_count': retry_count
}
@staticmethod
def stream_record(stream, start_time, end_time, camera_code):
def stream_record(self, stream, start_time, end_time):
start_time = start_time.strftime('%Y%m%dT%H%M%S')
end_time = end_time.strftime('%Y%m%dT%H%M%S')
......@@ -281,9 +296,11 @@ class StreamRecorder:
start_time,
end_time, stream['extra_args'])
file_name = os.path.join(
video_path, 'rtmp_{}_{}.mp4'.format(start_time, end_time))
log.info('{} stream_url: %s', camera_code, stream_url)
record_thread(stream_url, file_name, thread_name=camera_code)
video_path, 'rtmp_{}_{}_{}.mp4'.format(self.body['camera_code'], start_time, end_time))
log.info('thread_id:%s:%s:stream_url: %s',
self.thread_id, self.body['camera_code'], stream_url)
# TODO 多进程处理
record_thread(stream_url, file_name, thread_name=self.body['camera_code'])
return get_video_duration(file_name)
......
......@@ -23,7 +23,7 @@ requires = [
setuptools.setup(
name='isc-video-record',
version='1.0.0a13',
version='1.0.0a15',
description='ISC motion detection playback video stream recording service.',
long_description=long_description,
long_description_content_type='text/markdown',
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论