提交 1fe18c06 authored 作者: zw.wang's avatar zw.wang

feat: [recorder] 修改录制类方法为静态函数,修复视频文件时间错误

上级 09c4d42d
......@@ -62,9 +62,7 @@ class StreamRecorder:
try:
if pipe.set(camera_key, thread_id, nx=True, ex=100):
log.info('setnx:%s:ttl:100', camera_key)
process_message = ProcessMessage(camera_key, body)
ack = process_message.process()
ack = ProcessMessage.process(body, camera_key)
else:
log.info('camera_code[%s] processing, end.', body['camera_code'])
......@@ -86,13 +84,11 @@ class StreamRecorder:
log.info('finished processing MQ message. ack=%s', ack)
def on_message(ch, method_frame, _header_frame, body, args):
(conn, thrds) = args
body = json.loads(body)
delivery_tag = method_frame.delivery_tag
(conn, _threads) = args
t = threading.Thread(target=do_work, args=(
conn, ch, delivery_tag, body))
conn, ch, method_frame.delivery_tag, json.loads(body)))
t.start()
thrds.append(t)
_threads.append(t)
threads = []
on_message_callback = functools.partial(
......@@ -120,62 +116,70 @@ class StreamRecorder:
class ProcessMessage:
def __init__(self, camera_key, body):
self.camera_key = camera_key
self.thread_id = threading.get_ident()
self.body = body
@staticmethod
def set_expired_time(camera_key, expired_time, thread_id):
# 判定当前任务是否需要继续
pipe = redis_connect()
if pipe.ttl(camera_key) > expired_time:
_continue = True
elif -1 < pipe.ttl(camera_key) < expired_time \
and pipe.get(camera_key) == str(thread_id) \
and pipe.expire(camera_key, expired_time):
_continue = True
elif pipe.ttl(camera_key) == -2 \
and pipe.set(camera_key, thread_id, nx=True, ex=expired_time):
_continue = True
else:
_continue = False
def process(self):
log.info('%s,ttl:%s', camera_key, pipe.ttl(camera_key))
return _continue
@staticmethod
def process(body, camera_key):
thread_id = threading.get_ident()
ack = False
events = mysql.get_untreated_events(self.body['db_table'], self.body['camera_code'], retry=True)
log.info('%s: events count: %s', self.body['camera_code'], len(events))
events = mysql.get_untreated_events(body['db_table'], body['camera_code'], retry=True)
log.info('%s: events count: %s', body['camera_code'], len(events))
for inx, event in enumerate(events):
for event in events:
# 事件最小重试时间为120秒
expire_time = int(max(120, (event['end_time'] - event['start_time']).total_seconds() * 5 + 1))
# 判定当前任务是否需要继续
pipe = redis_connect()
if pipe.ttl(self.camera_key) > expire_time:
_continue = True
elif -1 < pipe.ttl(self.camera_key) < expire_time \
and pipe.get(self.camera_key) == str(self.thread_id) \
and pipe.expire(self.camera_key, expire_time):
_continue = True
elif pipe.ttl(self.camera_key) == -2 \
and pipe.set(self.camera_key, self.thread_id, nx=True, ex=expire_time):
_continue = True
else:
_continue = False
if _continue:
expired_time = int(max(120, (event['end_time'] - event['start_time']).total_seconds() * 5 + 1))
if ProcessMessage.set_expired_time(camera_key, expired_time, thread_id):
# 续约成功,本次任务继续
# 判定当前分布式锁是本线程设置的
log.info('%s,ttl:%s', self.camera_key, pipe.ttl(self.camera_key))
mysql.update_video_info(self.body['db_table'], event['video_id'], status=0)
self.recording(event)
mysql.update_video_info(body['db_table'], event['video_id'], status=0)
ProcessMessage.recording(body, event)
else:
log.info('%s, expire failed', self.camera_key)
log.info('%s, expire failed', camera_key)
break
else:
ack = True
with redis_connect() as pipe:
if pipe.get(self.camera_key) == str(self.thread_id):
log.info('delete:%s', self.camera_key)
pipe.delete(self.camera_key)
if pipe.get(camera_key) == str(thread_id):
log.info('delete:%s', camera_key)
pipe.delete(camera_key)
return ack
def recording(self, event):
@staticmethod
def gen_file_name(camera_code, start_time, end_time, prefix='ISC', part_num=None):
elements = [
prefix, camera_code,
start_time.strftime('%Y%m%dT%H%M%S'),
end_time.strftime('%Y%m%d%H%M%S')
]
if part_num is not None:
elements.append(str(part_num))
return os.path.join(video_path, '_'.join(elements) + '.mp4')
@staticmethod
def recording(body, event):
part_files_set = set()
if event['recovered_time'] and event['recovered_time'] != event['start_time']:
part_file_name = os.path.join(video_path, 'ISC_{}_{}_{}_{}.mp4'.format(
self.body['camera_code'],
event['start_time'].strftime('%Y%m%dT%H%M%S'),
event['recovered_time'].strftime('%Y%m%dT%H%M%S'), '0'
))
part_file_name = ProcessMessage.gen_file_name(
body['camera_code'], event['start_time'], event['recovered_time'], part_num=0)
# 下载上次失败的视频文件
try:
aliyun_oss.oss_download_file(event['video_url'], part_file_name)
......@@ -191,24 +195,21 @@ class ProcessMessage:
event['recovered_time'] = event['start_time']
t1 = time.time()
full_file_name = os.path.join(video_path, 'ISC_{}_{}_{}.mp4'.format(
self.body['camera_code'],
event['start_time'].strftime('%Y%m%dT%H%M%S'),
event['end_time'].strftime('%Y%m%dT%H%M%S')
))
record_result = self.recorder(full_file_name,
event['recovered_time'].astimezone(tz),
event['end_time'].astimezone(tz),
part_files_set)
full_file_name = ProcessMessage.gen_file_name(body['camera_code'], event['start_time'], event['end_time'])
record_result = ProcessMessage.recorder(
body, full_file_name,
event['recovered_time'].astimezone(tz),
event['end_time'].astimezone(tz), part_files_set)
t2 = time.time()
video_info, _ = get_video_duration(record_result['file_name'])
log.info('%s: time consuming: %s, duration: %s, size: %sM',
event['camera_code'], round(t2 - t1, 2),
time_to_seconds(video_info['duration']), video_info['size'])
return self.update_video_info(record_result, video_info, event)
return ProcessMessage.update_video_info(body, record_result, video_info, event)
def update_video_info(self, record_result, video_info, event):
@staticmethod
def update_video_info(body, record_result, video_info, event):
now = datetime.now()
url = next_retry_time = None
......@@ -254,7 +255,7 @@ class ProcessMessage:
})
mysql.update_video_info(
self.body['db_table'], event['video_id'],
body['db_table'], event['video_id'],
status, file_name=file_name, video_url=url,
video_resolution=video_info['resolution'],
recovered_time=record_result['recovered_time'].astimezone(pytz.utc),
......@@ -263,11 +264,13 @@ class ProcessMessage:
remark=remark
)
log.info('video_info: %s, url: %s, video_id: %s.%s', video_info, url, self.body['db_table'], event['video_id'])
log.info('video_info: %s, url: %s, video_id: %s.%s', video_info, url, body['db_table'], event['video_id'])
return True
def recorder(self, file_name, start_time, end_time, part_files_set=None):
@staticmethod
def recorder(body, file_name, start_time, end_time, part_files_set=None):
"""
:param body:
:param file_name:
:param start_time: 开始时间,上海时区
:param end_time: 结束时间,上海时区
......@@ -278,7 +281,7 @@ class ProcessMessage:
try:
playback_urls = hik_client.get_cameras_playback_urls(
self.body['camera_index'],
body['camera_index'],
IntelabApiHelper.iso_format(start_time),
IntelabApiHelper.iso_format(end_time)
)
......@@ -287,44 +290,42 @@ class ProcessMessage:
except Exception as e:
log.exception(e)
res.update({'except': True, 'remark': e.__str__()})
send_alarm_to_developer('recorder-{}'.format(self.thread_id), e)
send_alarm_to_developer('recorder-{}'.format(threading.get_ident()), e)
log.info('%s: playback: %s', self.body['camera_code'], playback_urls)
log.info('%s: playback: %s', body['camera_code'], playback_urls)
if playback_urls:
# 可以只通过一个回放流地址取到其他时间段的流
playback_stream = playback_urls[0]
else:
# 取流失败
self.write_retry_info_to_influx(self.body['camera_code'], res.get('remark'))
ProcessMessage.write_retry_info_to_influx(body['camera_code'], res.get('remark'))
playback_stream = None
part_num = retry_count = 1
is_completed = False
part_files_set = part_files_set or set()
remark = ''
while retry_count < 4 and playback_stream:
while retry_count < 6 and playback_stream:
# 重试六次
retry_count += 1
complete_duration = (end_time - start_time).total_seconds()
file_info, error_log = self.stream_record(playback_stream['stream_url'],
start_time, end_time)
file_info, error_log = ProcessMessage.stream_record(body, playback_stream['stream_url'],
start_time, end_time)
if error_log:
remark += ' ' + error_log
file_duration = time_to_seconds(file_info['duration'])
if not os.path.isfile(file_info['file_name']):
retry_count = 4
log.info('当前录制无文件输出:%s, 重试计数: %s', self.body['camera_code'], retry_count)
# retry_count = 4
log.info('当前录制无文件输出:%s, 重试计数: %s', body['camera_code'], retry_count)
elif file_duration < complete_duration - 2:
# 视频文件时长小于完整时长
new_start_time = start_time + timedelta(seconds=file_duration)
part_file_name = os.path.join(video_path, 'ISC_{}_{}_{}_{}.mp4'.format(
self.body['camera_code'],
start_time.strftime('%Y%m%dT%H%M%S'),
new_start_time.strftime('%Y%m%dT%H%M%S'), part_num
))
part_file_name = ProcessMessage.gen_file_name(
body['camera_code'], start_time, new_start_time, part_num=part_num)
shutil.move(file_info['file_name'], part_file_name)
log.info('摄像头%s的part_file: %s', body['camera_code'], part_file_name)
part_files_set.add(part_file_name)
start_time = new_start_time
......@@ -337,15 +338,17 @@ class ProcessMessage:
start_time = end_time
break
self.write_retry_info_to_influx(self.body['camera_code'], error_log)
ProcessMessage.write_retry_info_to_influx(body['camera_code'], error_log)
part_files = sorted(list(part_files_set))
if len(part_files) > 1:
log.info('摄像头%s存在多个视频文件%s,将进行合并', body['camera_code'], part_files)
concat(part_files, file_name, removed=True)
elif len(part_files) == 1:
log.info('摄像头%s重命名%s为%s', body['camera_code'], part_files[0], file_name)
shutil.move(part_files[0], file_name)
log.info('%s: The download is complete, file %s, is_completed: %s',
self.body['camera_code'], file_name, is_completed)
body['camera_code'], file_name, is_completed)
res.update({
'file_name': file_name,
'is_completed': is_completed,
......@@ -354,23 +357,22 @@ class ProcessMessage:
})
if remark:
res['remark'] = remark
part_files_set.clear()
return res
def stream_record(self, stream, start_time, end_time):
start_time = start_time.strftime('%Y%m%dT%H%M%S')
end_time = end_time.strftime('%Y%m%dT%H%M%S')
@staticmethod
def stream_record(body, stream, start_time, end_time):
if stream['protocol'] == 'rtsp':
stream_url = stream['url']
else:
stream_url = '{}?beginTime={}&endTime={}&{}'.format(stream['url'],
start_time,
end_time, stream['extra_args'])
file_name = os.path.join(
video_path, 'rtmp_{}_{}_{}.mp4'.format(self.body['camera_code'], start_time, end_time))
log.info('%s:stream_url: %s', self.body['camera_code'], stream_url)
stream_url = '{}?beginTime={}&endTime={}&{}'.format(
stream['url'], start_time.strftime('%Y%m%dT%H%M%S'),
end_time.strftime('%Y%m%dT%H%M%S'), stream['extra_args'])
file_name = ProcessMessage.gen_file_name(body['camera_code'], start_time, end_time, prefix='rtmp')
log.info('%s:stream_url: %s', body['camera_code'], stream_url)
# TODO 出现ffmpeg进程阻塞的情况
_, error_log = record_thread(stream_url, file_name, thread_name=self.body['camera_code'])
_, error_log = record_thread(stream_url, file_name, thread_name=body['camera_code'])
video_info, video_error_log = judge_video_error(file_name)
# TODO 查看出现损坏的视频发送报警
if video_error_log:
......
......@@ -24,7 +24,7 @@ requires = [
setuptools.setup(
name='isc-video-record',
version='1.0.0b14',
version='1.0.0b16',
description='ISC motion detection playback video stream recording service.',
long_description=long_description,
long_description_content_type='text/markdown',
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论