提交 77a8949e authored 作者: zw.wang's avatar zw.wang

fix: [filter] 修复filter处理文件时间较长时,RABBITMQ 连接断开的问题

上级 0402a325
import os
import time
import json
import threading
import functools
from datetime import timedelta
from intelab_python_sdk.ffmpeg.ffmpeg_prune import prune
......@@ -14,6 +16,7 @@ from ils_common_video.const import UNVERIFIED_EVENT_QUEUE
from ils_common_video.capability.movement_verify import file_filter
from ils_common_video.utils.aliyun_oss import oss_upload_file, oss_download_file, oss_delete_file
from ils_common_video.utils.video_file import VideoFile
from ils_common_video.utils.alarm_utils import send_alarm_to_developer
class VideoFilterProcess:
......@@ -33,31 +36,48 @@ class VideoFilterProcess:
# self.connection.call_later(5, lambda: self.channel.stop_consuming())
self.channel.queue_declare(queue=self.queue_name, durable=True)
def callback(ch, method, properties, body):
log.info('received MQ message {}'.format(body))
try:
body = json.loads(body)
def ack_message(ch, delivery_tag):
"""Note that `ch` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if ch.is_open:
ch.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def do_work(conn, ch, delivery_tag, body):
thread_id = threading.get_ident()
log.info('Thread id: %s Delivery tag: %s Message body: %s', thread_id,
delivery_tag, body)
try:
ret = self.process_message(body)
self.process_message(json.loads(body))
except Exception as e:
log.error('视频文件%s分析过程出错', body['full_path'])
log.exception(e)
ret = False
send_alarm_to_developer('common-filter', e)
if ret:
cb = functools.partial(ack_message, ch, delivery_tag)
conn.add_callback_threadsafe(cb)
log.info('finished processing MQ message')
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
ch.basic_nack(delivery_tag=method.delivery_tag)
except Exception as e:
log.exception(e)
log.error('finished processing MQ message, error')
def on_message(ch, method_frame, _header_frame, body, args):
(conn, thrds) = args
delivery_tag = method_frame.delivery_tag
t = threading.Thread(target=do_work, args=(
conn, ch, delivery_tag, body))
t.start()
thrds.append(t)
threads = []
on_message_callback = functools.partial(
on_message, args=(self.connection, threads))
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(on_message_callback=callback,
self.channel.basic_consume(on_message_callback=on_message_callback,
queue=self.queue_name)
log.info(' [*] Waiting for messages. To exit press CTRL+C')
log.info('[*] Waiting for messages. To exit press CTRL+C')
try:
self.channel.start_consuming()
except KeyboardInterrupt:
......@@ -66,6 +86,9 @@ class VideoFilterProcess:
log.exception('MQ connection closed unexpectedly. %s', e)
raise
for thread in threads:
thread.join()
self.connection.close()
@staticmethod
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论