import json import os import shutil import time import threading import functools import pytz import dynaconf from datetime import timedelta, datetime from intelab_python_sdk.logger import log_init, log from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat from isc_video_record.db import rabbitmq_connect, redis_connect from isc_video_record.const import PROCESSING_CAMERA_KEY from isc_video_record.utils.api_helper import IntelabApiHelper from isc_video_record.utils import aliyun_oss from isc_video_record.utils.record_utils import record_thread, get_video_duration, time_to_seconds from isc_video_record.utils.alarm_utils import send_alarm_to_developer from isc_video_record.db import mysql tz = pytz.timezone('Asia/Shanghai') video_path = '/data/videos/isc-record' os.makedirs(video_path, exist_ok=True) api_helper = IntelabApiHelper() class StreamRecorder: def __init__(self): self.queue_name = 'ISC_RECORD_JOB' self.connection = rabbitmq_connect() self.channel = self.connection.channel() def start(self): log.info('binding to queue {}'.format(self.queue_name)) self.channel.queue_declare(queue=self.queue_name, durable=True) def ack_message(ch, delivery_tag): """Note that `ch` must be the same pika channel instance via which the message being ACKed was retrieved (AMQP protocol constraint). """ if ch.is_open: ch.basic_ack(delivery_tag) def do_work(conn, ch, delivery_tag, body): thread_id = threading.get_ident() log.info('thread_id:%s: Delivery tag: %s Message body: %s', thread_id, delivery_tag, body) ack = False pipe = redis_connect() camera_key = PROCESSING_CAMERA_KEY.format(body['camera_code']) try: if pipe.set(camera_key, thread_id, nx=True, ex=100): log.info('thread_id:%s:setnx:%s', thread_id, camera_key) process_message = ProcessMessage(camera_key, thread_id, body) ack = process_message.process() else: log.info('thread_id:%s: camera_code[%s] processing, end.', thread_id, body['camera_code']) except Exception as e: log.exception(e) send_alarm_to_developer('recorder_{}'.format(thread_id), e) finally: try: if pipe.get(camera_key) == str(thread_id): log.info('delete:%s:%s', thread_id, camera_key) pipe.delete(camera_key) pipe.close() except Exception: pass cb = functools.partial(ack_message, ch, delivery_tag) conn.add_callback_threadsafe(cb) log.info('thread_id:%s: finished processing MQ message. ack=%s', thread_id, ack) def on_message(ch, method_frame, _header_frame, body, args): (conn, thrds) = args body = json.loads(body) delivery_tag = method_frame.delivery_tag t = threading.Thread(target=do_work, args=( conn, ch, delivery_tag, body)) t.start() thrds.append(t) threads = [] on_message_callback = functools.partial( on_message, args=(self.connection, threads)) # 设置消费能力 self.channel.basic_qos(prefetch_count=dynaconf.settings.get('PREFETCH_COUNT', 10)) self.channel.basic_consume(on_message_callback=on_message_callback, queue=self.queue_name) log.info('[*] Waiting for messages. To exit press CTRL+C') try: self.channel.start_consuming() except KeyboardInterrupt: log.info('MQ connection closed by user') except Exception as e: log.exception('MQ connection closed unexpectedly. %s', e) raise for thread in threads: thread.join() self.connection.close() class ProcessMessage: def __init__(self, camera_key, thread_id, body): self.camera_key = camera_key self.thread_id = thread_id self.body = body def process(self): ack = False events = mysql.get_untreated_events(self.body['db_table'], self.body['camera_code'], retry=True) log.info('thread_id:%s:%s: events count: %s', self.thread_id, self.body['camera_code'], len(events)) for inx, event in enumerate(events): expire_time = int((event['end_time'] - event['start_time']).total_seconds() * 2 + 1) pipe = redis_connect() if pipe.ttl(self.camera_key) > expire_time: _continue = True elif -1 < pipe.ttl(self.camera_key) < expire_time \ and pipe.get(self.camera_key) == str(self.thread_id) \ and pipe.expire(self.camera_key, expire_time): _continue = True elif pipe.ttl(self.camera_key) == -2 \ and pipe.set(self.camera_key, self.thread_id, nx=True, ex=expire_time): _continue = True else: _continue = False if _continue: # 续约成功,本次任务继续 # 判定当前分布式锁是本线程设置的 log.info('thread_id:%s:%s,ttl:%s', self.thread_id, self.camera_key, pipe.ttl(self.camera_key)) self.recording(event) continue else: log.info('thread_id:%s:%s, expire failed', self.thread_id, self.camera_key) break else: ack = True with redis_connect() as pipe: if pipe.get(self.camera_key) == str(self.thread_id): log.info('thread:%s:delete:%s', self.thread_id, self.camera_key) pipe.delete(self.camera_key) return ack def recording(self, event): t1 = time.time() record_result = self.recorder(event['start_time'].astimezone(tz), event['end_time'].astimezone(tz)) t2 = time.time() video_info, error_log = get_video_duration(record_result['file_name']) log.info('thread_id:%s:%s: time consuming: %s, duration: %s, size: %sM', self.thread_id, event['camera_code'], round(t2 - t1, 2), time_to_seconds(video_info['duration']), video_info['size']) url = '' file_name = record_result['file_name'].split('/')[-1] if record_result['file_name'] and os.path.isfile(record_result['file_name']): url = aliyun_oss.oss_upload_file('isc_record/' + file_name, record_result['file_name']) status = 1 if record_result['is_completed'] else 2 # TODO 失败的续录 mysql.update_video_info(self.body['db_table'], event['video_id'], status, file_name=file_name, video_url=url, video_resolution=video_info['resolution'], recovered_time=record_result['recovered_time'].astimezone(pytz.utc)) os.remove(record_result['file_name']) else: now = datetime.now() if not event.get('retry_info'): event['retry_info'] = '[]' event['retry_info'] = json.loads(event['retry_info']) retry_count = len(event['retry_info']) next_retry_time = remark = None event['retry_info'].append({ 'retry_count': retry_count + 1, 'retry_time': now.strftime('%Y-%m-%d %H:%M:%S'), 'result': 'failed' }) if retry_count < 5: # 无有效视频文件,标记事件状态 status = 2 status = 2 next_retry_time = now + timedelta(hours=3) remark = (event.get('remark') or 'start') + '+failed' else: status = 4 # 状态4: 不再重试! remark = event['remark'] + '+failed+end.' mysql.update_video_info(self.body['db_table'], event['video_id'], status, retry_info=event['retry_info'], next_retry_time=next_retry_time, remark=remark) log.info('thread_id:%s:video_info: %s, url: %s, video_id: %s.%s, time: %s', self.thread_id, video_info, url, self.body['db_table'], event['video_id'], round(time.time() - t1, 2)) return True def recorder(self, start_time, end_time): """ :param start_time: 开始时间,上海时区 :param end_time: 结束时间,上海时区 """ playback_urls = api_helper.get_cameras_playback_urls( self.body['camera_code'], IntelabApiHelper.iso_format(start_time), IntelabApiHelper.iso_format(end_time) ) log.info('thread_id:%s:%s: playback: %s', self.thread_id, self.body['camera_code'], playback_urls) file_name = os.path.join(video_path, 'ISC_{}_{}_{}.mp4'.format( self.body['camera_code'], start_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S'), end_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S') )) if len(playback_urls) > 0: # 可以只通过一个回放流地址取到其他时间段的流 playback_stream = playback_urls[0] else: return {'file_name': file_name, 'is_completed': False, 'recovered_time': start_time} part_num = retry_count = 0 is_completed = False part_files_set = set() while retry_count < 3: # 重试六次 retry_count += 1 complete_duration = (end_time - start_time).total_seconds() file_info, _ = self.stream_record(playback_stream['stream_url'], start_time, end_time) file_duration = time_to_seconds(file_info['duration']) if not os.path.isfile(file_info['file_name']): log.info('当前录制无文件输出:%s, 重试计数: %s', self.body['camera_code'], retry_count) time.sleep(1) continue if file_duration < complete_duration - 2: # 视频文件时长小于完整时长 new_start_time = start_time + timedelta(seconds=file_duration) part_file_name = os.path.join(video_path, 'ISC_{}_{}_{}_{}.mp4'.format( self.body['camera_code'], start_time.strftime('%Y%m%dT%H%M%S'), new_start_time.strftime('%Y%m%dT%H%M%S'), part_num )) shutil.move(file_info['file_name'], part_file_name) part_files_set.add(part_file_name) start_time = new_start_time part_num += 1 retry_count = 0 else: part_files_set.add(file_info['file_name']) is_completed = True start_time = end_time break part_files = sorted(list(part_files_set)) if len(part_files) > 1: concat(part_files, file_name, removed=True) elif len(part_files) == 1: shutil.move(part_files[0], file_name) log.info('thread_id:%s:%s: The download is complete, file %s', self.thread_id, self.body['camera_code'], file_name) return { 'file_name': file_name, 'is_completed': is_completed, 'recovered_time': start_time, 'retry_count': retry_count } def stream_record(self, stream, start_time, end_time): start_time = start_time.strftime('%Y%m%dT%H%M%S') end_time = end_time.strftime('%Y%m%dT%H%M%S') if stream['protocol'] == 'rtsp': stream_url = stream['url'] else: stream_url = '{}?beginTime={}&endTime={}&{}'.format(stream['url'], start_time, end_time, stream['extra_args']) file_name = os.path.join( video_path, 'rtmp_{}_{}_{}.mp4'.format(self.body['camera_code'], start_time, end_time)) log.info('thread_id:%s:%s:stream_url: %s', self.thread_id, self.body['camera_code'], stream_url) # TODO 多进程处理 record_thread(stream_url, file_name, thread_name=self.body['camera_code']) return get_video_duration(file_name) if __name__ == '__main__': log_init('recorder', False, '/var/log/event_vss') stream_recorder = StreamRecorder() stream_recorder.start()