提交 6c0044ef authored 作者: zw.wang's avatar zw.wang

feat: [merger] 新增对ai_config的配置支持

上级 e5090f72
import pytz import pytz
import json import json
import time import time
import dynaconf
from datetime import datetime, timedelta from datetime import datetime, timedelta
...@@ -9,12 +8,11 @@ from isc_video_record.db import influxdb ...@@ -9,12 +8,11 @@ from isc_video_record.db import influxdb
from intelab_python_sdk.logger import log from intelab_python_sdk.logger import log
from isc_video_record.db import rabbitmq_connect, redis_connect, mysql from isc_video_record.db import rabbitmq_connect, mysql
from isc_video_record.db.mysql import get_camera_info, insert_video_info from isc_video_record.db.mysql import get_camera_info, insert_video_info
from isc_video_record.utils.alarm_utils import send_alarm_to_developer from isc_video_record.utils.alarm_utils import send_alarm_to_developer
from isc_video_record.utils.api_helper import IntelabApiHelper, PlaybackUrlException from isc_video_record.utils.api_helper import IntelabApiHelper, PlaybackUrlException
from isc_video_record.utils.pre_event import PreEvent from isc_video_record.utils.pre_event import PreEvent
from isc_video_record.const import LAST_CHECK_TIME_KEY
api_helper = IntelabApiHelper() api_helper = IntelabApiHelper()
...@@ -51,11 +49,13 @@ class EventMergerJob: ...@@ -51,11 +49,13 @@ class EventMergerJob:
return last_check_time return last_check_time
@staticmethod @staticmethod
def get_camera_local_events(camera, now): def get_camera_local_events(camera, now, full_day, movement):
""" """
获取摄像头本地移动侦测事件 获取摄像头本地移动侦测事件
:param camera: :param camera:
:param now: uct now :param now: uct now
:param full_day:
:param movement:
""" """
last_check_time = EventMergerJob.get_scan_time(camera, now) last_check_time = EventMergerJob.get_scan_time(camera, now)
events = [] events = []
...@@ -74,7 +74,7 @@ class EventMergerJob: ...@@ -74,7 +74,7 @@ class EventMergerJob:
# TODO 需要重新处理事件 # TODO 需要重新处理事件
now = last_check_time + timedelta(hours=2) now = last_check_time + timedelta(hours=2)
log.warning('查询%s事件跨度较大.', camera['device_code']) log.warning('查询%s事件跨度较大.', camera['device_code'])
return EventMergerJob.get_camera_local_events(camera, now) return EventMergerJob.get_camera_local_events(camera, now, full_day, movement)
except Exception as e: except Exception as e:
# TODO 网络请求错误重试 # TODO 网络请求错误重试
log.exception(e) log.exception(e)
...@@ -82,13 +82,15 @@ class EventMergerJob: ...@@ -82,13 +82,15 @@ class EventMergerJob:
return events return events
@staticmethod @staticmethod
def get_alarm_events(camera, now): def get_alarm_events(camera, now, full_day, movement):
""" """
摄像头录制计划为全天录制时 摄像头录制计划为全天录制时
查询指定时间段内摄像头是否有移动告警消息并合并成事件 查询指定时间段内摄像头是否有移动告警消息并合并成事件
:param camera: :param camera:
:param now: uct now :param now: uct now
:param full_day:
:param movement:
""" """
last_check_time = EventMergerJob.get_scan_time(camera, now) last_check_time = EventMergerJob.get_scan_time(camera, now)
...@@ -101,9 +103,10 @@ class EventMergerJob: ...@@ -101,9 +103,10 @@ class EventMergerJob:
log.info('camera_code: {}, time: {}-{}, alarm_list: {}'.format( log.info('camera_code: {}, time: {}-{}, alarm_list: {}'.format(
camera['device_code'], last_check_time, now, len(alarm_list))) camera['device_code'], last_check_time, now, len(alarm_list)))
if dynaconf.settings.get('EVENT_ON', True): if movement:
events = pre_event.merge_alarm_to_event(alarm_list) events = pre_event.merge_alarm_to_event(alarm_list, full_day)
else: else:
log.warning('摄像头%s关闭移动侦测', camera['device_code'])
events = [{ events = [{
'start_time': last_check_time.astimezone(tz), 'start_time': last_check_time.astimezone(tz),
'end_time': now.astimezone(tz) 'end_time': now.astimezone(tz)
...@@ -111,7 +114,12 @@ class EventMergerJob: ...@@ -111,7 +114,12 @@ class EventMergerJob:
return events return events
def process_camera(self, camera): def process_camera(self, camera, full_day=False, movement=True):
"""
:param camera:
:param full_day: 是否开启全天录制,非开启全天录制的任务是6:00-23:00之间的任务,不建议开启全天录制
:param movement: 是否开启移动侦测,开启移动侦测只会针对移动侦测的视频进行云存储,建议开启
"""
body = { body = {
'camera_code': camera['device_code'], 'camera_code': camera['device_code'],
'camera_index': camera['point_index_code'], 'camera_index': camera['point_index_code'],
...@@ -131,7 +139,7 @@ class EventMergerJob: ...@@ -131,7 +139,7 @@ class EventMergerJob:
call_get_events_func = self.get_alarm_events if camera['video_plan_type'] == 1 \ call_get_events_func = self.get_alarm_events if camera['video_plan_type'] == 1 \
else self.get_camera_local_events else self.get_camera_local_events
for event in call_get_events_func(camera, now): for event in call_get_events_func(camera, now, full_day, movement):
insert_video_info( insert_video_info(
camera['db_table'], camera['device_code'], camera['db_table'], camera['device_code'],
event['start_time'].astimezone(pytz.utc), event['start_time'].astimezone(pytz.utc),
...@@ -149,7 +157,7 @@ class EventMergerJob: ...@@ -149,7 +157,7 @@ class EventMergerJob:
self.send_mq_message(body) self.send_mq_message(body)
log.info(body) log.info(body)
return body['ex'] return body.get('ex', 10)
def run(self): def run(self):
...@@ -157,9 +165,16 @@ class EventMergerJob: ...@@ -157,9 +165,16 @@ class EventMergerJob:
camera_count = 0 camera_count = 0
for camera in get_camera_info(): for camera in get_camera_info():
# TODO 根据摄像头AI配置启动移动侦测回放录制 if not camera['ai_config_support'] or len(camera['ai_config_support']) < 5:
continue
if camera['ai_config_support'][0] == '0':
# 只有开启了云存储的才会启动录制服务
log.warning('摄像头%s未开启云存储功能,不需要进行录制', camera['device_code'])
continue
full_day = True if camera['ai_config_support'][2] == '1' else False
movement = True if camera['ai_config_support'][3] == '1' else False
try: try:
camera_event_duration = self.process_camera(camera) camera_event_duration = self.process_camera(camera, full_day=full_day, movement=movement)
total_video_duration += camera_event_duration total_video_duration += camera_event_duration
if camera_event_duration > 10: if camera_event_duration > 10:
camera_count += 1 camera_count += 1
......
...@@ -38,14 +38,14 @@ class PreEvent(object): ...@@ -38,14 +38,14 @@ class PreEvent(object):
return points return points
def _alarm_point_to_event(self, alarm_list): def _alarm_point_to_event(self, alarm_list, full_day=False):
pre_events = [] pre_events = []
pre_status = -1 pre_status = -1
# 告警消息转换成事件 # 告警消息转换成事件
for alarm_point in alarm_list: for alarm_point in alarm_list:
# 报警时间转换成上海时区 # 报警时间转换成上海时区
alarm_time = dt_parse(alarm_point['time']).astimezone(tz) alarm_time = dt_parse(alarm_point['time']).astimezone(tz)
if alarm_time.hour in self.invalid_hour: if not full_day and alarm_time.hour in self.invalid_hour:
# 过滤无效时间 # 过滤无效时间
continue continue
...@@ -75,8 +75,8 @@ class PreEvent(object): ...@@ -75,8 +75,8 @@ class PreEvent(object):
pre_events[-1]['end_time'] = self.tz_end_time pre_events[-1]['end_time'] = self.tz_end_time
return pre_events return pre_events
def merge_alarm_to_event(self, alarm_list): def merge_alarm_to_event(self, alarm_list, full_day=False):
pre_events = self._alarm_point_to_event(alarm_list) pre_events = self._alarm_point_to_event(alarm_list, full_day)
# 合并时间间隔较短的事件或切分时间过长的事件 # 合并时间间隔较短的事件或切分时间过长的事件
events = [] events = []
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论