提交 32500cf5 authored 作者: lc.zhou's avatar lc.zhou

Merge branch 'develop'

from .influxdb import influxdb # noqa
from .redis import redis_connect # noqa
from .rabbitmq import rabbitmq_connect # noqa
from .rabbitmq import rabbitmq_connect,send_message_to_queue # noqa
......@@ -304,3 +304,18 @@ def update_camera_network_quality(cursor, conn, camera_code, network_quality):
'''
cursor.execute(sql, [network_quality, camera_code])
conn.commit()
@query(cursor_dict=True)
def getWaitFilterEvent(cursor, conn):
sql = '''
SELECT `id` , `service_type` , `biz_type` , `device_code` ,`file_name`, `video_url` , `start_time` , `end_time` FROM `common_video_1`.`video_data_motion_1` WHERE `status` = 5 UNION
SELECT `id` , `service_type` , `biz_type` , `device_code` ,`file_name`, `video_url` , `start_time` , `end_time` FROM `common_video_1`.`video_data_motion_2` WHERE `status` = 5 UNION
SELECT `id` , `service_type` , `biz_type` , `device_code` ,`file_name`, `video_url` , `start_time` , `end_time` FROM `common_video_1`.`video_data_motion_3` WHERE `status` = 5 UNION
SELECT `id` , `service_type` , `biz_type` , `device_code` ,`file_name`, `video_url` , `start_time` , `end_time` FROM `common_video_1`.`video_data_motion_4` WHERE `status` = 5 UNION
SELECT `id` , `service_type` , `biz_type` , `device_code` ,`file_name`, `video_url` , `start_time` , `end_time` FROM `common_video_1`.`video_data_motion_5` WHERE `status` = 5 UNION
SELECT `id` , `service_type` , `biz_type` , `device_code` ,`file_name`, `video_url` , `start_time` , `end_time` FROM `common_video_1`.`video_data_motion_6` WHERE `status` = 5 UNION
SELECT `id` , `service_type` , `biz_type` , `device_code` ,`file_name`, `video_url` , `start_time` , `end_time` FROM `common_video_1`.`video_data_motion_7` WHERE `status` = 5 UNION
SELECT `id` , `service_type` , `biz_type` , `device_code` ,`file_name`, `video_url` , `start_time` , `end_time` FROM `common_video_1`.`video_data_motion_0` WHERE `status` = 5
'''
cursor.execute(sql)
return cursor.fetchall()
import dynaconf
import pika
import json
from retrying import retry
......@@ -13,3 +14,13 @@ def rabbitmq_connect():
connection = pika.BlockingConnection(
pika.URLParameters('amqp://{user}:{password}@{host}:{port}/%2F'.format(**amqp_config)))
return connection
@retry(stop_max_attempt_number=3, wait_random_min=100, wait_random_max=1000)
def send_message_to_queue(queue_name, data):
connection = rabbitmq_connect()
channel = connection.channel()
channel.queue_declare(queue_name, durable=True)
# 发送mq信息
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(data, ensure_ascii=False))
connection.close()
......@@ -12,7 +12,7 @@ from intelab_python_sdk.logger import log_init, log
from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat
from intelab_python_sdk.ffmpeg.ffmpeg_prune import prune as ffmpeg_prune
from ils_common_video.db import rabbitmq_connect, redis_connect, influxdb
from ils_common_video.db import rabbitmq_connect, redis_connect, influxdb, send_message_to_queue
from ils_common_video.const import PROCESSING_CAMERA_KEY, CAMERA_CALLBACK_KEY
from ils_common_video.utils.api_helper import IntelabApiHelper, PlaybackUrlException
from ils_common_video.utils.isc_client import HikVisionClient
......@@ -60,10 +60,10 @@ class StreamRecorder:
log.info('Delivery tag: %s Message body: %s', delivery_tag, body)
ack = False
pipe = redis_connect()
camera_key = PROCESSING_CAMERA_KEY.format(body['camera_code'])
try:
pipe = redis_connect()
if pipe.set(camera_key, thread_id, nx=True, ex=100):
log.info('setnx:%s:ttl:100', camera_key)
# 针对当前摄像头开始录制
......@@ -100,7 +100,7 @@ class StreamRecorder:
on_message, args=(self.connection, threads))
# 设置消费能力
self.channel.basic_qos(prefetch_count=dynaconf.settings.get('PREFETCH_COUNT', 150))
self.channel.basic_qos(prefetch_count=dynaconf.settings.get('PREFETCH_COUNT', 190))
self.channel.basic_consume(on_message_callback=on_message_callback,
queue=self.queue_name)
......@@ -124,20 +124,20 @@ class ProcessMessage:
@staticmethod
def set_expired_time(camera_key, expired_time, thread_id):
# 判定当前任务是否需要继续
pipe = redis_connect()
if pipe.ttl(camera_key) > expired_time:
_continue = True
elif -1 < pipe.ttl(camera_key) < expired_time \
and pipe.get(camera_key) == str(thread_id) \
and pipe.expire(camera_key, expired_time):
_continue = True
elif pipe.ttl(camera_key) == -2 \
and pipe.set(camera_key, thread_id, nx=True, ex=expired_time):
_continue = True
else:
_continue = False
with redis_connect() as pipe:
if pipe.ttl(camera_key) > expired_time:
_continue = True
elif -1 < pipe.ttl(camera_key) < expired_time \
and pipe.get(camera_key) == str(thread_id) \
and pipe.expire(camera_key, expired_time):
_continue = True
elif pipe.ttl(camera_key) == -2 \
and pipe.set(camera_key, thread_id, nx=True, ex=expired_time):
_continue = True
else:
_continue = False
log.info('%s,ttl:%s', camera_key, pipe.ttl(camera_key))
log.info('%s,ttl:%s', camera_key, pipe.ttl(camera_key))
return _continue
@staticmethod
......@@ -318,18 +318,9 @@ class ProcessMessage:
}
queue_name = 'UNVERIFIED_EVENT_QUEUE'
try:
connection = rabbitmq_connect()
channel = connection.channel()
channel.queue_declare(queue_name, durable=True)
# 发送mq信息
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(video_data, ensure_ascii=False))
except:
log.error("send filter error,content is: %s", json.dumps(video_data, ensure_ascii=False))
send_markdown('rabbitmq连接异常',
'recorder_{}:rabbitmq连接异常\n{}'.format(dynaconf.settings.get('SERVICE_NAME', 0),
json.dumps(video_data, ensure_ascii=False)))
finally:
connection.close()
send_message_to_queue(queue_name, video_data)
except Exception as e:
log.exception(e)
@staticmethod
def stream_to_video(body, playback_stream, start_time, end_time, part_files_set):
......
import os
import re
import oss2
from intelab_python_sdk.logger import log
import urllib.request
from functools import lru_cache, wraps
from dynaconf import settings
from oss2.exceptions import NoSuchKey
from oss2.exceptions import NoSuchKey,RequestError
local_endpoint = 'https://oss-cn-{}-internal.aliyuncs.com' # 局域网配置
......
......@@ -22,7 +22,7 @@ from ils_common_video.utils.alarm_utils import send_alarm_to_developer
class VideoFilterProcess:
def __init__(self):
self.queue_name = UNVERIFIED_EVENT_QUEUE
self.queue_name = settings.get('UNVERIFIED_QUEUE')
self.connection = rabbitmq_connect()
self.channel = self.connection.channel()
......
from ils_common_video.db import rabbitmq_connect, redis_connect, influxdb, mysql
from ils_common_video.utils.video_file import VideoFile
import json
import dynaconf
queue_name = 'UNVERIFIED_EVENT_QUEUE'
events = mysql.getWaitFilterEvent()
connection = rabbitmq_connect()
channel = connection.channel()
channel.queue_declare(queue_name, durable=True)
for event in events:
if not event.get('file_name'):
continue
video_file = VideoFile('/tmp/videos/isc-record/' + event.get('file_name'))
video_data = {
'device_code': event.get('device_code'),
'event_id': event.get('id'),
'video_url': event.get('video_url'), # 视频播放地址
'sn': event.get('device_code'),
'full_path': video_file.full_path,
'db_table': "common_video_1.video_data_motion_" + str(event.get('biz_type') % 8),
'biz_type': event.get('biz_type'),
'service_type': event.get('service_type'),
'file_name': video_file.file_name,
'start_time': video_file.start_time.strftime('%Y-%m-%d %H:%M:%S'),
'end_time': video_file.end_time.strftime('%Y-%m-%d %H:%M:%S'),
'date': video_file.date,
'detection_region': "",
'bitrate': video_file.bitrate,
'resolution': video_file.resolution,
'duration': video_file.duration,
'cloud_storage': video_file.file_path, # 云存储key
'network_quality': video_file.network_quality,
'network_quality_grade': video_file.network_quality_grade,
}
# 发送mq信息
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(video_data, ensure_ascii=False))
print("send filter success,content is:" + json.dumps(video_data, ensure_ascii=False))
connection.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论