提交 4bc5f699 authored 作者: zw.wang's avatar zw.wang

feat: [recorder] 修改录制线程日志打印

上级 d6d10999
...@@ -21,7 +21,8 @@ def command_line_runner(): ...@@ -21,7 +21,8 @@ def command_line_runner():
parser = get_parser() parser = get_parser()
args = vars(parser.parse_args()) args = vars(parser.parse_args())
log_init(args['worker'], debug=args['debug'], log_path=args['log_path'], backupCount=3) log_init(args['worker'], debug=args['debug'], log_path=args['log_path'], backupCount=3,
thread=True)
if args['worker'] == 'merger': if args['worker'] == 'merger':
from isc_video_record.merger import EventMergerJob from isc_video_record.merger import EventMergerJob
...@@ -37,7 +38,7 @@ def command_line_runner(): ...@@ -37,7 +38,7 @@ def command_line_runner():
t = Tasks() t = Tasks()
t.start() t.start()
else: else:
parser.print_help parser.print_help()
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -48,8 +48,7 @@ class StreamRecorder: ...@@ -48,8 +48,7 @@ class StreamRecorder:
def do_work(conn, ch, delivery_tag, body): def do_work(conn, ch, delivery_tag, body):
thread_id = threading.get_ident() thread_id = threading.get_ident()
log.info('thread_id:%s: Delivery tag: %s Message body: %s', thread_id, log.info('Delivery tag: %s Message body: %s', delivery_tag, body)
delivery_tag, body)
ack = False ack = False
pipe = redis_connect() pipe = redis_connect()
...@@ -57,12 +56,12 @@ class StreamRecorder: ...@@ -57,12 +56,12 @@ class StreamRecorder:
try: try:
if pipe.set(camera_key, thread_id, nx=True, ex=100): if pipe.set(camera_key, thread_id, nx=True, ex=100):
log.info('thread_id:%s:setnx:%s', thread_id, camera_key) log.info('setnx:%s:ttl:100', camera_key)
process_message = ProcessMessage(camera_key, thread_id, body) process_message = ProcessMessage(camera_key, thread_id, body)
ack = process_message.process() ack = process_message.process()
else: else:
log.info('thread_id:%s: camera_code[%s] processing, end.', thread_id, body['camera_code']) log.info('camera_code[%s] processing, end.', thread_id, body['camera_code'])
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)
...@@ -71,7 +70,7 @@ class StreamRecorder: ...@@ -71,7 +70,7 @@ class StreamRecorder:
finally: finally:
try: try:
if pipe.get(camera_key) == str(thread_id): if pipe.get(camera_key) == str(thread_id):
log.info('delete:%s:%s', thread_id, camera_key) log.info('delete:%s', camera_key)
pipe.delete(camera_key) pipe.delete(camera_key)
pipe.close() pipe.close()
except Exception: except Exception:
...@@ -79,7 +78,7 @@ class StreamRecorder: ...@@ -79,7 +78,7 @@ class StreamRecorder:
cb = functools.partial(ack_message, ch, delivery_tag) cb = functools.partial(ack_message, ch, delivery_tag)
conn.add_callback_threadsafe(cb) conn.add_callback_threadsafe(cb)
log.info('thread_id:%s: finished processing MQ message. ack=%s', thread_id, ack) log.info('finished processing MQ message. ack=%s', ack)
def on_message(ch, method_frame, _header_frame, body, args): def on_message(ch, method_frame, _header_frame, body, args):
(conn, thrds) = args (conn, thrds) = args
...@@ -125,11 +124,11 @@ class ProcessMessage: ...@@ -125,11 +124,11 @@ class ProcessMessage:
ack = False ack = False
events = mysql.get_untreated_events(self.body['db_table'], self.body['camera_code'], retry=True) events = mysql.get_untreated_events(self.body['db_table'], self.body['camera_code'], retry=True)
log.info('thread_id:%s:%s: events count: %s', self.thread_id, self.body['camera_code'], len(events)) log.info('%s: events count: %s', self.body['camera_code'], len(events))
for inx, event in enumerate(events): for inx, event in enumerate(events):
# 事件最小重试时间为120秒 # 事件最小重试时间为120秒
expire_time = int(max(120, (event['end_time'] - event['start_time']).total_seconds() * 2 + 1)) expire_time = int(max(120, (event['end_time'] - event['start_time']).total_seconds() * 5 + 1))
# 判定当前任务是否需要继续 # 判定当前任务是否需要继续
pipe = redis_connect() pipe = redis_connect()
...@@ -148,13 +147,11 @@ class ProcessMessage: ...@@ -148,13 +147,11 @@ class ProcessMessage:
if _continue: if _continue:
# 续约成功,本次任务继续 # 续约成功,本次任务继续
# 判定当前分布式锁是本线程设置的 # 判定当前分布式锁是本线程设置的
log.info('thread_id:%s:%s,ttl:%s', self.thread_id, self.camera_key, log.info('%s:%s,ttl:%s', self.camera_key, pipe.ttl(self.camera_key))
pipe.ttl(self.camera_key))
mysql.update_video_info(self.body['db_table'], event['video_id'], status=0) mysql.update_video_info(self.body['db_table'], event['video_id'], status=0)
self.recording(event) self.recording(event)
continue
else: else:
log.info('thread_id:%s:%s, expire failed', self.thread_id, self.camera_key) log.info('%s, expire failed', self.camera_key)
break break
else: else:
...@@ -162,7 +159,7 @@ class ProcessMessage: ...@@ -162,7 +159,7 @@ class ProcessMessage:
with redis_connect() as pipe: with redis_connect() as pipe:
if pipe.get(self.camera_key) == str(self.thread_id): if pipe.get(self.camera_key) == str(self.thread_id):
log.info('thread:%s:delete:%s', self.thread_id, self.camera_key) log.info('delete:%s', self.camera_key)
pipe.delete(self.camera_key) pipe.delete(self.camera_key)
return ack return ack
...@@ -178,8 +175,7 @@ class ProcessMessage: ...@@ -178,8 +175,7 @@ class ProcessMessage:
try: try:
aliyun_oss.oss_download_file(event['video_url'], part_file_name) aliyun_oss.oss_download_file(event['video_url'], part_file_name)
if os.path.isfile(part_file_name): if os.path.isfile(part_file_name):
log.info('thread:%s:%s: 上次中断文件%s', self.thread_id, event['camera_code'], log.info('%s: 上次中断文件%s', event['camera_code'], part_file_name)
part_file_name)
part_files_set.add(part_file_name) part_files_set.add(part_file_name)
else: else:
raise FileExistsError(part_file_name) raise FileExistsError(part_file_name)
...@@ -202,8 +198,8 @@ class ProcessMessage: ...@@ -202,8 +198,8 @@ class ProcessMessage:
part_files_set) part_files_set)
t2 = time.time() t2 = time.time()
video_info, _ = get_video_duration(record_result['file_name']) video_info, _ = get_video_duration(record_result['file_name'])
log.info('thread_id:%s:%s: time consuming: %s, duration: %s, size: %sM', log.info('%s: time consuming: %s, duration: %s, size: %sM',
self.thread_id, event['camera_code'], round(t2 - t1, 2), event['camera_code'], round(t2 - t1, 2),
time_to_seconds(video_info['duration']), video_info['size']) time_to_seconds(video_info['duration']), video_info['size'])
return self.update_video_info(record_result, video_info, event) return self.update_video_info(record_result, video_info, event)
...@@ -229,12 +225,11 @@ class ProcessMessage: ...@@ -229,12 +225,11 @@ class ProcessMessage:
if record_result.get('except') and record_result.get('offline'): if record_result.get('except') and record_result.get('offline'):
# 当前摄像头离线只增加下次重试时间 # 当前摄像头离线只增加下次重试时间
log.info('thread_id:%s:%s: camera offline.', self.thread_id, event['camera_code']) log.info('%s: camera offline.', event['camera_code'])
next_retry_time = now + timedelta(minutes=30) next_retry_time = now + timedelta(minutes=30)
remark = (remark or 'start') + '+offline' remark = (remark or 'start') + '+offline'
elif record_result.get('is_completed'): elif record_result.get('is_completed'):
log.info('thread_id:%s:%s: %s is completed.', log.info('%s: %s is completed.', event['camera_code'], record_result['file_name'])
self.thread_id, event['camera_code'], record_result['file_name'])
status = 1 status = 1
else: else:
next_retry_time = now + timedelta(minutes=30) next_retry_time = now + timedelta(minutes=30)
...@@ -263,15 +258,15 @@ class ProcessMessage: ...@@ -263,15 +258,15 @@ class ProcessMessage:
remark=remark remark=remark
) )
log.info('thread_id:%s:video_info: %s, url: %s, video_id: %s.%s', log.info('video_info: %s, url: %s, video_id: %s.%s', video_info, url, self.body['db_table'], event['video_id'])
self.thread_id, video_info, url, self.body['db_table'],
event['video_id'])
return True return True
def recorder(self, file_name, start_time, end_time, part_files_set=None): def recorder(self, file_name, start_time, end_time, part_files_set=None):
""" """
:param file_name:
:param start_time: 开始时间,上海时区 :param start_time: 开始时间,上海时区
:param end_time: 结束时间,上海时区 :param end_time: 结束时间,上海时区
:param part_files_set:
""" """
playback_urls = [] playback_urls = []
res = {'file_name': file_name, 'is_completed': False, 'recovered_time': start_time} res = {'file_name': file_name, 'is_completed': False, 'recovered_time': start_time}
...@@ -288,7 +283,7 @@ class ProcessMessage: ...@@ -288,7 +283,7 @@ class ProcessMessage:
log.exception(e) log.exception(e)
res.update({'except': True, 'remark': e.__str__()}) res.update({'except': True, 'remark': e.__str__()})
log.info('thread_id:%s:%s: playback: %s', self.thread_id, self.body['camera_code'], playback_urls) log.info('%s: playback: %s', self.body['camera_code'], playback_urls)
if playback_urls: if playback_urls:
# 可以只通过一个回放流地址取到其他时间段的流 # 可以只通过一个回放流地址取到其他时间段的流
playback_stream = playback_urls[0] playback_stream = playback_urls[0]
...@@ -301,7 +296,7 @@ class ProcessMessage: ...@@ -301,7 +296,7 @@ class ProcessMessage:
is_completed = False is_completed = False
part_files_set = part_files_set or set() part_files_set = part_files_set or set()
remark = '' remark = ''
while retry_count < 6 and playback_stream: while retry_count < 4 and playback_stream:
# 重试六次 # 重试六次
retry_count += 1 retry_count += 1
complete_duration = (end_time - start_time).total_seconds() complete_duration = (end_time - start_time).total_seconds()
...@@ -343,8 +338,7 @@ class ProcessMessage: ...@@ -343,8 +338,7 @@ class ProcessMessage:
concat(part_files, file_name, removed=True) concat(part_files, file_name, removed=True)
elif len(part_files) == 1: elif len(part_files) == 1:
shutil.move(part_files[0], file_name) shutil.move(part_files[0], file_name)
log.info('thread_id:%s:%s: The download is complete, file %s', log.info('%s: The download is complete, file %s', self.body['camera_code'], file_name)
self.thread_id, self.body['camera_code'], file_name)
res.update({ res.update({
'file_name': file_name, 'file_name': file_name,
'is_completed': is_completed, 'is_completed': is_completed,
...@@ -367,13 +361,13 @@ class ProcessMessage: ...@@ -367,13 +361,13 @@ class ProcessMessage:
end_time, stream['extra_args']) end_time, stream['extra_args'])
file_name = os.path.join( file_name = os.path.join(
video_path, 'rtmp_{}_{}_{}.mp4'.format(self.body['camera_code'], start_time, end_time)) video_path, 'rtmp_{}_{}_{}.mp4'.format(self.body['camera_code'], start_time, end_time))
log.info('thread_id:%s:%s:stream_url: %s', log.info('%s:stream_url: %s', self.body['camera_code'], stream_url)
self.thread_id, self.body['camera_code'], stream_url)
# TODO 多进程处理 # TODO 多进程处理
_, error_log = record_thread(stream_url, file_name, thread_name=self.body['camera_code']) _, error_log = record_thread(stream_url, file_name, thread_name=self.body['camera_code'])
return get_video_duration(file_name), error_log return get_video_duration(file_name), error_log
def write_retry_info_to_influx(self, camera_code, error_log): @staticmethod
def write_retry_info_to_influx(camera_code, error_log):
try: try:
influxdb.reconnect() influxdb.reconnect()
influxdb.write_points([{ influxdb.write_points([{
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论