提交 191c4c19 authored 作者: lc.zhou's avatar lc.zhou

Merge branch 'develop'

# Conflicts:
#	ils_common_video/isc_video/recorder.py
...@@ -40,14 +40,13 @@ class StreamRecorder: ...@@ -40,14 +40,13 @@ class StreamRecorder:
def __init__(self): def __init__(self):
self.queue_name = hk_config.get('QUEUE_NAME') self.queue_name = hk_config.get('QUEUE_NAME')
self.connection = rabbitmq_connect() self.connection = rabbitmq_connect()
self.downstream_mq = rabbitmq_connect()
self.downstream_mq_channel = self.downstream_mq.channel()
self.channel = self.connection.channel() self.channel = self.connection.channel()
def start(self): def start(self):
log.info('binding to queue {}'.format(self.queue_name)) log.info('binding to queue {}'.format(self.queue_name))
self.channel.queue_declare(queue=self.queue_name, durable=True) self.channel.queue_declare(queue=self.queue_name, durable=True)
self.downstream_mq_channel.queue_declare('UNVERIFIED_EVENT_QUEUE', durable=True)
def ack_message(ch, delivery_tag): def ack_message(ch, delivery_tag):
"""Note that `ch` must be the same pika channel instance via which """Note that `ch` must be the same pika channel instance via which
...@@ -56,7 +55,7 @@ class StreamRecorder: ...@@ -56,7 +55,7 @@ class StreamRecorder:
if ch.is_open: if ch.is_open:
ch.basic_ack(delivery_tag) ch.basic_ack(delivery_tag)
def do_work(channel, conn, ch, delivery_tag, body): def do_work(conn, ch, delivery_tag, body):
thread_id = threading.get_ident() thread_id = threading.get_ident()
log.info('Delivery tag: %s Message body: %s', delivery_tag, body) log.info('Delivery tag: %s Message body: %s', delivery_tag, body)
ack = False ack = False
...@@ -68,13 +67,13 @@ class StreamRecorder: ...@@ -68,13 +67,13 @@ class StreamRecorder:
if pipe.set(camera_key, thread_id, nx=True, ex=100): if pipe.set(camera_key, thread_id, nx=True, ex=100):
log.info('setnx:%s:ttl:100', camera_key) log.info('setnx:%s:ttl:100', camera_key)
# 针对当前摄像头开始录制 # 针对当前摄像头开始录制
ack = ProcessMessage.process(channel, body, camera_key) ack = ProcessMessage.process(body, camera_key)
else: else:
log.info('camera_code[%s] processing, end.', body['camera_code']) log.info('camera_code[%s] processing, end.', body['camera_code'])
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)
send_alarm_to_developer('recorder_{}'.format(thread_id), e) send_alarm_to_developer('recorder_{}'.format(dynaconf.settings.get('SERVICE_NAME',0)), e)
finally: finally:
try: try:
...@@ -92,7 +91,7 @@ class StreamRecorder: ...@@ -92,7 +91,7 @@ class StreamRecorder:
def on_message(ch, method_frame, _header_frame, body, args): def on_message(ch, method_frame, _header_frame, body, args):
(conn, _threads) = args (conn, _threads) = args
t = threading.Thread(target=do_work, args=( t = threading.Thread(target=do_work, args=(
self.downstream_mq_channel, conn, ch, method_frame.delivery_tag, json.loads(body))) conn, ch, method_frame.delivery_tag, json.loads(body)))
t.start() t.start()
_threads.append(t) _threads.append(t)
...@@ -101,7 +100,7 @@ class StreamRecorder: ...@@ -101,7 +100,7 @@ class StreamRecorder:
on_message, args=(self.connection, threads)) on_message, args=(self.connection, threads))
# 设置消费能力 # 设置消费能力
self.channel.basic_qos(prefetch_count=dynaconf.settings.get('PREFETCH_COUNT', 180)) self.channel.basic_qos(prefetch_count=dynaconf.settings.get('PREFETCH_COUNT', 150))
self.channel.basic_consume(on_message_callback=on_message_callback, self.channel.basic_consume(on_message_callback=on_message_callback,
queue=self.queue_name) queue=self.queue_name)
...@@ -118,7 +117,6 @@ class StreamRecorder: ...@@ -118,7 +117,6 @@ class StreamRecorder:
thread.join() thread.join()
self.connection.close() self.connection.close()
self.downstream_mq.close()
class ProcessMessage: class ProcessMessage:
...@@ -143,7 +141,7 @@ class ProcessMessage: ...@@ -143,7 +141,7 @@ class ProcessMessage:
return _continue return _continue
@staticmethod @staticmethod
def process(channel, body, camera_key): def process(body, camera_key):
thread_id = threading.get_ident() thread_id = threading.get_ident()
ack = False ack = False
...@@ -158,7 +156,7 @@ class ProcessMessage: ...@@ -158,7 +156,7 @@ class ProcessMessage:
# 续约成功,本次任务继续 # 续约成功,本次任务继续
mysql.update_video_info(body['db_table'], event['video_id'], status=0) mysql.update_video_info(body['db_table'], event['video_id'], status=0)
# 开始摄像头录制 # 开始摄像头录制
ProcessMessage.recording(channel, body, event) ProcessMessage.recording(body, event)
else: else:
log.info('%s, expire failed', camera_key) log.info('%s, expire failed', camera_key)
break break
...@@ -183,7 +181,7 @@ class ProcessMessage: ...@@ -183,7 +181,7 @@ class ProcessMessage:
return os.path.join(video_path, '_'.join(elements) + '.mp4') return os.path.join(video_path, '_'.join(elements) + '.mp4')
@staticmethod @staticmethod
def recording(channel, body, event): def recording(body, event):
part_files_set = set() part_files_set = set()
if event['recovered_time'] and event['recovered_time'] != event['start_time']: if event['recovered_time'] and event['recovered_time'] != event['start_time']:
# 如果恢复时间存在且不等于开始时间 那么说明该视频录制中断过 # 如果恢复时间存在且不等于开始时间 那么说明该视频录制中断过
...@@ -217,10 +215,10 @@ class ProcessMessage: ...@@ -217,10 +215,10 @@ class ProcessMessage:
log.info('%s: time consuming: %s, duration: %s, size: %sM', log.info('%s: time consuming: %s, duration: %s, size: %sM',
event['camera_code'], round(t2 - t1, 2), event['camera_code'], round(t2 - t1, 2),
time_to_seconds(video_info['duration']), video_info['size']) time_to_seconds(video_info['duration']), video_info['size'])
return ProcessMessage.update_video_info(channel, body, record_result, video_info, event) return ProcessMessage.update_video_info(body, record_result, video_info, event)
@staticmethod @staticmethod
def update_video_info(channel, body, record_result, video_info, event): def update_video_info(body, record_result, video_info, event):
now = datetime.now() now = datetime.now()
url = next_retry_time = None url = next_retry_time = None
...@@ -279,7 +277,7 @@ class ProcessMessage: ...@@ -279,7 +277,7 @@ class ProcessMessage:
remark=remark remark=remark
) )
if status == 5: if status == 5:
ProcessMessage.send_unverified_file(channel, VideoFile(record_result['file_name']), ProcessMessage.send_unverified_file(VideoFile(record_result['file_name']),
event['video_id'], url, event['camera_code'], event['video_id'], url, event['camera_code'],
event['biz_type'], event['service_type'], body['db_table']) event['biz_type'], event['service_type'], body['db_table'])
...@@ -287,7 +285,7 @@ class ProcessMessage: ...@@ -287,7 +285,7 @@ class ProcessMessage:
return True return True
@staticmethod @staticmethod
def send_unverified_file(channel, video_file, event_id, video_url, def send_unverified_file(video_file, event_id, video_url,
device_code, biz_type, service_type, db_table, detection_region=''): device_code, biz_type, service_type, db_table, detection_region=''):
log.info('添加待处理文件%s,到分析队列,开始时间%s, 结束时间%s', video_file.file_name, log.info('添加待处理文件%s,到分析队列,开始时间%s, 结束时间%s', video_file.file_name,
video_file.start_time, video_file.end_time) video_file.start_time, video_file.end_time)
...@@ -318,10 +316,13 @@ class ProcessMessage: ...@@ -318,10 +316,13 @@ class ProcessMessage:
'network_quality': video_file.network_quality, 'network_quality': video_file.network_quality,
'network_quality_grade': video_file.network_quality_grade, 'network_quality_grade': video_file.network_quality_grade,
} }
# 发送mq信息
queue_name = 'UNVERIFIED_EVENT_QUEUE' queue_name = 'UNVERIFIED_EVENT_QUEUE'
channel.basic_publish(exchange='', routing_key=queue_name, connection = rabbitmq_connect()
body=json.dumps(video_data, ensure_ascii=False)) channel = connection.channel()
channel.queue_declare(queue_name, durable=True)
# 发送mq信息
channel.basic_publish(exchange='', routing_key=queue_name,body=json.dumps(video_data, ensure_ascii=False))
connection.close()
@staticmethod @staticmethod
def stream_to_video(body, playback_stream, start_time, end_time, part_files_set): def stream_to_video(body, playback_stream, start_time, end_time, part_files_set):
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论