提交 6dc496cc authored 作者: lc.zhou's avatar lc.zhou

ils-video-recorder:重新声明一个全局的rabbitmq连接用于发送消息

上级 4942a681
...@@ -40,13 +40,14 @@ class StreamRecorder: ...@@ -40,13 +40,14 @@ class StreamRecorder:
def __init__(self): def __init__(self):
self.queue_name = hk_config.get('QUEUE_NAME') self.queue_name = hk_config.get('QUEUE_NAME')
self.connection = rabbitmq_connect() self.connection = rabbitmq_connect()
self.downstream_mq = rabbitmq_connect()
self.downstream_mq_channel = self.downstream_mq.channel()
self.channel = self.connection.channel() self.channel = self.connection.channel()
def start(self): def start(self):
log.info('binding to queue {}'.format(self.queue_name)) log.info('binding to queue {}'.format(self.queue_name))
self.channel.queue_declare(queue=self.queue_name, durable=True) self.channel.queue_declare(queue=self.queue_name, durable=True)
self.downstream_mq_channel.queue_declare('UNVERIFIED_EVENT_QUEUE', durable=True)
def ack_message(ch, delivery_tag): def ack_message(ch, delivery_tag):
"""Note that `ch` must be the same pika channel instance via which """Note that `ch` must be the same pika channel instance via which
...@@ -55,7 +56,7 @@ class StreamRecorder: ...@@ -55,7 +56,7 @@ class StreamRecorder:
if ch.is_open: if ch.is_open:
ch.basic_ack(delivery_tag) ch.basic_ack(delivery_tag)
def do_work(conn, ch, delivery_tag, body): def do_work(channel, conn, ch, delivery_tag, body):
thread_id = threading.get_ident() thread_id = threading.get_ident()
log.info('Delivery tag: %s Message body: %s', delivery_tag, body) log.info('Delivery tag: %s Message body: %s', delivery_tag, body)
ack = False ack = False
...@@ -67,7 +68,7 @@ class StreamRecorder: ...@@ -67,7 +68,7 @@ class StreamRecorder:
if pipe.set(camera_key, thread_id, nx=True, ex=100): if pipe.set(camera_key, thread_id, nx=True, ex=100):
log.info('setnx:%s:ttl:100', camera_key) log.info('setnx:%s:ttl:100', camera_key)
# 针对当前摄像头开始录制 # 针对当前摄像头开始录制
ack = ProcessMessage.process(body, camera_key) ack = ProcessMessage.process(channel, body, camera_key)
else: else:
log.info('camera_code[%s] processing, end.', body['camera_code']) log.info('camera_code[%s] processing, end.', body['camera_code'])
...@@ -91,7 +92,7 @@ class StreamRecorder: ...@@ -91,7 +92,7 @@ class StreamRecorder:
def on_message(ch, method_frame, _header_frame, body, args): def on_message(ch, method_frame, _header_frame, body, args):
(conn, _threads) = args (conn, _threads) = args
t = threading.Thread(target=do_work, args=( t = threading.Thread(target=do_work, args=(
conn, ch, method_frame.delivery_tag, json.loads(body))) self.downstream_mq_channel, conn, ch, method_frame.delivery_tag, json.loads(body)))
t.start() t.start()
_threads.append(t) _threads.append(t)
...@@ -117,6 +118,7 @@ class StreamRecorder: ...@@ -117,6 +118,7 @@ class StreamRecorder:
thread.join() thread.join()
self.connection.close() self.connection.close()
self.downstream_mq.close()
class ProcessMessage: class ProcessMessage:
...@@ -141,7 +143,7 @@ class ProcessMessage: ...@@ -141,7 +143,7 @@ class ProcessMessage:
return _continue return _continue
@staticmethod @staticmethod
def process(body, camera_key): def process(channel, body, camera_key):
thread_id = threading.get_ident() thread_id = threading.get_ident()
ack = False ack = False
...@@ -156,7 +158,7 @@ class ProcessMessage: ...@@ -156,7 +158,7 @@ class ProcessMessage:
# 续约成功,本次任务继续 # 续约成功,本次任务继续
mysql.update_video_info(body['db_table'], event['video_id'], status=0) mysql.update_video_info(body['db_table'], event['video_id'], status=0)
# 开始摄像头录制 # 开始摄像头录制
ProcessMessage.recording(body, event) ProcessMessage.recording(channel, body, event)
else: else:
log.info('%s, expire failed', camera_key) log.info('%s, expire failed', camera_key)
break break
...@@ -181,7 +183,7 @@ class ProcessMessage: ...@@ -181,7 +183,7 @@ class ProcessMessage:
return os.path.join(video_path, '_'.join(elements) + '.mp4') return os.path.join(video_path, '_'.join(elements) + '.mp4')
@staticmethod @staticmethod
def recording(body, event): def recording(channel, body, event):
part_files_set = set() part_files_set = set()
if event['recovered_time'] and event['recovered_time'] != event['start_time']: if event['recovered_time'] and event['recovered_time'] != event['start_time']:
# 如果恢复时间存在且不等于开始时间 那么说明该视频录制中断过 # 如果恢复时间存在且不等于开始时间 那么说明该视频录制中断过
...@@ -215,10 +217,10 @@ class ProcessMessage: ...@@ -215,10 +217,10 @@ class ProcessMessage:
log.info('%s: time consuming: %s, duration: %s, size: %sM', log.info('%s: time consuming: %s, duration: %s, size: %sM',
event['camera_code'], round(t2 - t1, 2), event['camera_code'], round(t2 - t1, 2),
time_to_seconds(video_info['duration']), video_info['size']) time_to_seconds(video_info['duration']), video_info['size'])
return ProcessMessage.update_video_info(body, record_result, video_info, event) return ProcessMessage.update_video_info(channel, body, record_result, video_info, event)
@staticmethod @staticmethod
def update_video_info(body, record_result, video_info, event): def update_video_info(channel, body, record_result, video_info, event):
now = datetime.now() now = datetime.now()
url = next_retry_time = None url = next_retry_time = None
...@@ -277,7 +279,7 @@ class ProcessMessage: ...@@ -277,7 +279,7 @@ class ProcessMessage:
remark=remark remark=remark
) )
if status == 5: if status == 5:
ProcessMessage.send_unverified_file(VideoFile(record_result['file_name']), ProcessMessage.send_unverified_file(channel, VideoFile(record_result['file_name']),
event['video_id'], url, event['camera_code'], event['video_id'], url, event['camera_code'],
event['biz_type'], event['service_type'], body['db_table']) event['biz_type'], event['service_type'], body['db_table'])
...@@ -285,12 +287,8 @@ class ProcessMessage: ...@@ -285,12 +287,8 @@ class ProcessMessage:
return True return True
@staticmethod @staticmethod
def send_unverified_file(video_file, event_id, video_url, def send_unverified_file(channel, video_file, event_id, video_url,
device_code, biz_type, service_type, db_table, detection_region=''): device_code, biz_type, service_type, db_table, detection_region=''):
queue_name = 'UNVERIFIED_EVENT_QUEUE'
connection = rabbitmq_connect()
channel = connection.channel()
channel.queue_declare(queue_name, durable=True)
log.info('添加待处理文件%s,到分析队列,开始时间%s, 结束时间%s', video_file.file_name, log.info('添加待处理文件%s,到分析队列,开始时间%s, 结束时间%s', video_file.file_name,
video_file.start_time, video_file.end_time) video_file.start_time, video_file.end_time)
log.info('视频文件%s的网络质量为%s, 评级为%s', log.info('视频文件%s的网络质量为%s, 评级为%s',
...@@ -320,13 +318,11 @@ class ProcessMessage: ...@@ -320,13 +318,11 @@ class ProcessMessage:
'network_quality': video_file.network_quality, 'network_quality': video_file.network_quality,
'network_quality_grade': video_file.network_quality_grade, 'network_quality_grade': video_file.network_quality_grade,
} }
# 发送mq信息 # 发送mq信息
queue_name = 'UNVERIFIED_EVENT_QUEUE'
channel.basic_publish(exchange='', routing_key=queue_name, channel.basic_publish(exchange='', routing_key=queue_name,
body=json.dumps(video_data, ensure_ascii=False)) body=json.dumps(video_data, ensure_ascii=False))
connection.close()
@staticmethod @staticmethod
def stream_to_video(body, playback_stream, start_time, end_time, part_files_set): def stream_to_video(body, playback_stream, start_time, end_time, part_files_set):
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论