提交 74168dce authored 作者: lc.zhou's avatar lc.zhou

ils-video-record:isc合并事件注释添加

上级 b897fa12
...@@ -82,6 +82,7 @@ def query(cursor_dict=False): ...@@ -82,6 +82,7 @@ def query(cursor_dict=False):
@query(cursor_dict=True) @query(cursor_dict=True)
def get_camera_info(cursor, conn, camera_code=None, platform='isc', video_plan_type=None): def get_camera_info(cursor, conn, camera_code=None, platform='isc', video_plan_type=None):
""" """
查询camera_info、camera_ai_config表信息,获取摄像头列表
:param cursor: :param cursor:
:param conn: :param conn:
:param camera_code: :param camera_code:
......
...@@ -92,18 +92,21 @@ class EventMergerJob: ...@@ -92,18 +92,21 @@ class EventMergerJob:
:param full_day: :param full_day:
:param movement: :param movement:
""" """
# 获取当前摄像头最近事件的end_time,如果没有则用前一日的时间
last_check_time = EventMergerJob.get_scan_time(camera, now) last_check_time = EventMergerJob.get_scan_time(camera, now)
# 初始化预事件类(初始化influxdb的连接,以及预事件处理的开始时间和结束时间)
pre_event = PreEvent( pre_event = PreEvent(
last_check_time.strftime('%Y-%m-%d %H:%M:%S'), last_check_time.strftime('%Y-%m-%d %H:%M:%S'),
now.strftime('%Y-%m-%d %H:%M:%S') now.strftime('%Y-%m-%d %H:%M:%S')
) )
# 查询influxdb event_vss获取海康移动侦测回调告警消息点
alarm_list = list(pre_event.get_alarm_list( alarm_list = list(pre_event.get_alarm_list(
camera['point_index_code'])) camera['point_index_code']))
log.info('camera_code: {}, time: {}-{}, alarm_list: {}'.format( log.info('camera_code: {}, time: {}-{}, alarm_list: {}'.format(
camera['device_code'], last_check_time, now, len(alarm_list))) camera['device_code'], last_check_time, now, len(alarm_list)))
if movement: if movement:
# 将消息告警点合并成为一条真正的事件
events = pre_event.merge_alarm_to_event(alarm_list, full_day) events = pre_event.merge_alarm_to_event(alarm_list, full_day)
else: else:
log.warning('摄像头%s关闭移动侦测', camera['device_code']) log.warning('摄像头%s关闭移动侦测', camera['device_code'])
...@@ -116,6 +119,7 @@ class EventMergerJob: ...@@ -116,6 +119,7 @@ class EventMergerJob:
def process_camera(self, camera, full_day=False, movement=True): def process_camera(self, camera, full_day=False, movement=True):
""" """
处理摄像头 1:查询未处理摄像头 2:生成新事件 3:发送mq至record服务开始录制
:param camera: :param camera:
:param full_day: 是否开启全天录制,非开启全天录制的任务是6:00-23:00之间的任务,不建议开启全天录制 :param full_day: 是否开启全天录制,非开启全天录制的任务是6:00-23:00之间的任务,不建议开启全天录制
:param movement: 是否开启移动侦测,开启移动侦测只会针对移动侦测的视频进行云存储,建议开启 :param movement: 是否开启移动侦测,开启移动侦测只会针对移动侦测的视频进行云存储,建议开启
...@@ -125,15 +129,18 @@ class EventMergerJob: ...@@ -125,15 +129,18 @@ class EventMergerJob:
'camera_index': camera['point_index_code'], 'camera_index': camera['point_index_code'],
'db_table': camera['db_table'], 'db_table': camera['db_table'],
} }
# 预留三分钟不处理
now = datetime.utcnow() - timedelta(minutes=3) now = datetime.utcnow() - timedelta(minutes=3)
# 查询下载失败且重试时间小于当前时间、下载中、等待下载中的所有回放视频事件
untreated_events = mysql.get_untreated_events( untreated_events = mysql.get_untreated_events(
camera['db_table'], camera['device_code'], retry=True) camera['db_table'], camera['device_code'], retry=True)
if untreated_events: if untreated_events:
log.info('当前摄像头%s还存在未处理事件', camera['device_code']) log.info('当前摄像头%s还存在未处理事件', camera['device_code'])
# 将所有未处理的回放视频事件的时长累加,最后再冗余10秒
body['ex'] = sum([(event['end_time'] - event['start_time']).total_seconds() body['ex'] = sum([(event['end_time'] - event['start_time']).total_seconds()
for event in untreated_events]) + 10 for event in untreated_events]) + 10
# self.send_mq_message(body) # self.send_mq_message(body)
# 定义一个变量事件总时长,从body中获取
event_duration = body.get('ex', 0) event_duration = body.get('ex', 0)
call_get_events_func = self.get_alarm_events if camera['video_plan_type'] == 1 \ call_get_events_func = self.get_alarm_events if camera['video_plan_type'] == 1 \
...@@ -160,8 +167,9 @@ class EventMergerJob: ...@@ -160,8 +167,9 @@ class EventMergerJob:
return body.get('ex', 10) return body.get('ex', 10)
def run(self): def run(self):
# 总视频持续时长 默认0
total_video_duration = 0 total_video_duration = 0
# 摄像头数量 默认0
camera_count = 0 camera_count = 0
for camera in get_camera_info(): for camera in get_camera_info():
...@@ -171,7 +179,9 @@ class EventMergerJob: ...@@ -171,7 +179,9 @@ class EventMergerJob:
# 只有开启了云存储的才会启动录制服务 # 只有开启了云存储的才会启动录制服务
log.warning('摄像头%s未开启云存储功能,不需要进行录制', camera['device_code']) log.warning('摄像头%s未开启云存储功能,不需要进行录制', camera['device_code'])
continue continue
# 是否支持全天录制
full_day = True if camera['ai_config_support'][2] == '1' else False full_day = True if camera['ai_config_support'][2] == '1' else False
# 是否支持移动侦测
movement = True if camera['ai_config_support'][3] == '1' else False movement = True if camera['ai_config_support'][3] == '1' else False
try: try:
camera_event_duration = self.process_camera(camera, full_day=full_day, movement=movement) camera_event_duration = self.process_camera(camera, full_day=full_day, movement=movement)
......
...@@ -13,6 +13,7 @@ class PreEvent(object): ...@@ -13,6 +13,7 @@ class PreEvent(object):
influxdb.reconnect() influxdb.reconnect()
self.start_time = start_time self.start_time = start_time
self.end_time = end_time self.end_time = end_time
# 无效时间段 23点-次日6点
self.invalid_hour = [0, 1, 2, 3, 4, 5, 6, 23] self.invalid_hour = [0, 1, 2, 3, 4, 5, 6, 23]
self.tz_start_time = dt_parse(self.start_time.replace(' ', 'T') + '.001000Z').astimezone(tz) self.tz_start_time = dt_parse(self.start_time.replace(' ', 'T') + '.001000Z').astimezone(tz)
self.tz_end_time = dt_parse(self.end_time.replace(' ', 'T') + '.001000Z').astimezone(tz) self.tz_end_time = dt_parse(self.end_time.replace(' ', 'T') + '.001000Z').astimezone(tz)
...@@ -48,22 +49,26 @@ class PreEvent(object): ...@@ -48,22 +49,26 @@ class PreEvent(object):
if not full_day and alarm_time.hour in self.invalid_hour: if not full_day and alarm_time.hour in self.invalid_hour:
# 过滤无效时间 # 过滤无效时间
continue continue
# status 1表示事件触发,2表示事件结束
if alarm_point['status'] == 1: if alarm_point['status'] == 1:
if pre_status == 1: if pre_status == 1:
# 如果预处理状态是1 表示上个事件尚未结束,那么直接将当前事件时间作为上一个事件的结束时间
pre_events[-1]['end_time'] = alarm_time pre_events[-1]['end_time'] = alarm_time
else: else:
# 预录5s # 表示新事件开始,直接append并且预录5s
pre_events.append({'start_time': alarm_time - timedelta(seconds=5)}) pre_events.append({'start_time': alarm_time - timedelta(seconds=5)})
else: else:
if pre_status == 1: if pre_status == 1:
# 表示一个事件正常结束
pre_events[-1]['end_time'] = alarm_time + timedelta(seconds=10) pre_events[-1]['end_time'] = alarm_time + timedelta(seconds=10)
elif pre_status == -1: elif pre_status == -1:
# 有可能和上一个消息点被切割了(或者丢掉了),重新生成一条闭合的新事件
pre_events.append({ pre_events.append({
'start_time': self.tz_start_time, 'start_time': self.tz_start_time,
'end_time': alarm_time + timedelta(seconds=10) 'end_time': alarm_time + timedelta(seconds=10)
}) })
else: else:
# 表示两个2的点,暂时无更好的处理方法直接取当前时间点往前推30秒往后推10秒
pre_events.append({ pre_events.append({
'start_time': alarm_time - timedelta(seconds=30), 'start_time': alarm_time - timedelta(seconds=30),
'end_time': alarm_time + timedelta(seconds=10) 'end_time': alarm_time + timedelta(seconds=10)
...@@ -76,6 +81,7 @@ class PreEvent(object): ...@@ -76,6 +81,7 @@ class PreEvent(object):
return pre_events return pre_events
def merge_alarm_to_event(self, alarm_list, full_day=False): def merge_alarm_to_event(self, alarm_list, full_day=False):
# 告警点转变为事件
pre_events = self._alarm_point_to_event(alarm_list, full_day) pre_events = self._alarm_point_to_event(alarm_list, full_day)
# 合并时间间隔较短的事件或切分时间过长的事件 # 合并时间间隔较短的事件或切分时间过长的事件
...@@ -84,6 +90,8 @@ class PreEvent(object): ...@@ -84,6 +90,8 @@ class PreEvent(object):
if len(events) > 0 \ if len(events) > 0 \
and pre_event_item['start_time'] - events[-1]['end_time'] <= timedelta(seconds=30) \ and pre_event_item['start_time'] - events[-1]['end_time'] <= timedelta(seconds=30) \
and events[-1]['end_time'] - events[-1]['start_time'] <= timedelta(hours=0.5): and events[-1]['end_time'] - events[-1]['start_time'] <= timedelta(hours=0.5):
# 上一个事件的结束时间和下一个事件的开始时间 间隔小于30s
# and 上一个事件的时长小于30分钟 合并这两个事件
events[-1]['end_time'] = pre_event_item['end_time'] events[-1]['end_time'] = pre_event_item['end_time']
else: else:
events.append({ events.append({
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论