提交 6212b98f authored 作者: zw.wang's avatar zw.wang

feat: [merger] 合并事件使用数据库中上次结束时间

上级 78373ef3
...@@ -228,3 +228,17 @@ def deleted_video_by_id(cursor, conn, db_table, video_id): ...@@ -228,3 +228,17 @@ def deleted_video_by_id(cursor, conn, db_table, video_id):
sql = 'delete from {} {}'.format(db_table, _filter) sql = 'delete from {} {}'.format(db_table, _filter)
cursor.execute(sql) cursor.execute(sql)
conn.commit() conn.commit()
@query(cursor_dict=True)
def get_last_video_info(cursor, conn, db_table, camera_code):
sql = '''
select id as video_id, start_time, end_time, status
from {}
where
device_code = %s
and start_time > date_sub(now(), interval 1 day)
order by end_time desc limit 1;
'''.format(db_table)
cursor.execute(sql, [camera_code])
return cursor.fetchone()
...@@ -40,30 +40,20 @@ class EventMergerJob: ...@@ -40,30 +40,20 @@ class EventMergerJob:
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_run_at))) time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_run_at)))
time.sleep(15 * 60) time.sleep(15 * 60)
def get_scan_time(self, pipe, camera, now): def get_scan_time(self, camera, now):
last_check_time_key = LAST_CHECK_TIME_KEY.format(camera['device_code']) last_video_info = mysql.get_last_video_info(
last_check_time = pipe.get(last_check_time_key) camera['db_table'], camera['device_code'])
if not last_check_time: if last_video_info:
# 设备无上次事件,取最近的15分钟作为开始时间 last_check_time = last_video_info['end_time']
last_check_time = now - timedelta(hours=2)
pipe.set(last_check_time_key, last_check_time.strftime('%Y-%m-%d %H:%M:%S'))
else: else:
last_check_time = datetime.strptime( last_check_time = now - timedelta(days=1)
last_check_time, '%Y-%m-%d %H:%M:%S') return last_check_time
if now - last_check_time > timedelta(days=7):
# 设备一直离线超过七天则放弃七天以前的数据
last_check_time = now - timedelta(days=7)
if now - last_check_time > timedelta(days=1):
# 时间跨度超过一天,则分割成两个小时
now = last_check_time + timedelta(hours=2)
return last_check_time, now
def get_camera_local_events(self, pipe, camera, now): def get_camera_local_events(self, pipe, camera, now):
""" """
获取摄像头本地移动侦测事件 获取摄像头本地移动侦测事件
""" """
last_check_time, now = self.get_scan_time(pipe, camera, now) last_check_time = self.get_scan_time(camera, now)
events = [] events = []
try: try:
log.info('查询摄像头%s在%s,%s的本地视频文件', camera['device_code'], log.info('查询摄像头%s在%s,%s的本地视频文件', camera['device_code'],
...@@ -85,7 +75,8 @@ class EventMergerJob: ...@@ -85,7 +75,8 @@ class EventMergerJob:
# TODO 网络请求错误重试 # TODO 网络请求错误重试
log.exception(e) log.exception(e)
else: else:
last_check_time_key = LAST_CHECK_TIME_KEY.format(camera['device_code']) last_check_time_key = LAST_CHECK_TIME_KEY.format(
camera['device_code'])
res = pipe.set(last_check_time_key, res = pipe.set(last_check_time_key,
now.strftime('%Y-%m-%d %H:%M:%S')) now.strftime('%Y-%m-%d %H:%M:%S'))
if not res: if not res:
...@@ -98,7 +89,7 @@ class EventMergerJob: ...@@ -98,7 +89,7 @@ class EventMergerJob:
查询指定时间段内摄像头是否有移动告警消息并合并成事件 查询指定时间段内摄像头是否有移动告警消息并合并成事件
""" """
last_check_time, now = self.get_scan_time(pipe, camera, now) last_check_time = self.get_scan_time(camera, now)
pre_event = PreEvent( pre_event = PreEvent(
last_check_time.strftime('%Y-%m-%d %H:%M:%S'), last_check_time.strftime('%Y-%m-%d %H:%M:%S'),
...@@ -135,15 +126,14 @@ class EventMergerJob: ...@@ -135,15 +126,14 @@ class EventMergerJob:
untreated_events = mysql.get_untreated_events( untreated_events = mysql.get_untreated_events(
camera['db_table'], camera['device_code'], retry=True) camera['db_table'], camera['device_code'], retry=True)
if untreated_events: if untreated_events:
log.info('当前摄像头%s还存在未处理事件,优先处理', camera['device_code']) log.info('当前摄像头%s还存在未处理事件', camera['device_code'])
body['ex'] = sum([(event['end_time'] - event['start_time']).total_seconds() body['ex'] = sum([(event['end_time'] - event['start_time']).total_seconds()
for event in untreated_events]) + 10 for event in untreated_events]) + 10
self.send_mq_message(body) # self.send_mq_message(body)
event_duration = body.get('ex', 0) event_duration = body.get('ex', 0)
if event_duration == 0: call_get_events_func = self.get_alarm_events if camera['video_plan_type'] == 1 \
call_get_events_func = self.get_alarm_events if camera['video_plan_type'] == 1 else \ else self.get_camera_local_events
self.get_camera_local_events
for event in call_get_events_func(pipe, camera, now): for event in call_get_events_func(pipe, camera, now):
insert_video_info( insert_video_info(
...@@ -153,10 +143,12 @@ class EventMergerJob: ...@@ -153,10 +143,12 @@ class EventMergerJob:
camera['biz_type'], camera['service_type'], camera['biz_type'], camera['service_type'],
status=3 # status=3 表示排队中 status=3 # status=3 表示排队中
) )
event_duration += (event['end_time'] - event['start_time']).total_seconds() event_duration += (event['end_time'] -
event['start_time']).total_seconds()
body['ex'] = event_duration + 10 body['ex'] = event_duration + 10
log.info('camera[%s] event_duration: %s', camera['device_code'], event_duration) log.info('camera[%s] event_duration: %s',
camera['device_code'], event_duration)
if event_duration > 0: if event_duration > 0:
self.send_mq_message(body) self.send_mq_message(body)
log.info(body) log.info(body)
...@@ -185,7 +177,8 @@ class EventMergerJob: ...@@ -185,7 +177,8 @@ class EventMergerJob:
pipe.close() pipe.close()
try: try:
self.write_video_duration_to_influx(total_video_duration, camera_count) self.write_video_duration_to_influx(
total_video_duration, camera_count)
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)
......
import json
import requests import requests
import dynaconf import dynaconf
import dateutil.parser import dateutil.parser
from datetime import datetime, timedelta from datetime import datetime, timedelta
from intelab_python_sdk.logger import log
class PlaybackUrlException(Exception): class PlaybackUrlException(Exception):
...@@ -60,6 +62,10 @@ class IntelabApiHelper: ...@@ -60,6 +62,10 @@ class IntelabApiHelper:
for pre_event in pre_events: for pre_event in pre_events:
# 合并时间间隔较短的事件 # 合并时间间隔较短的事件
if 'beginTime' not in pre_event:
log.exception(ValueError(json.dumps(pre_event)))
continue
start_time = dateutil.parser.parse(pre_event['beginTime']) start_time = dateutil.parser.parse(pre_event['beginTime'])
end_time = dateutil.parser.parse(pre_event['endTime']) end_time = dateutil.parser.parse(pre_event['endTime'])
if len(events) > 0 \ if len(events) > 0 \
......
...@@ -24,7 +24,7 @@ requires = [ ...@@ -24,7 +24,7 @@ requires = [
setuptools.setup( setuptools.setup(
name='isc-video-record', name='isc-video-record',
version='1.0.0b3', version='1.0.0b5',
description='ISC motion detection playback video stream recording service.', description='ISC motion detection playback video stream recording service.',
long_description=long_description, long_description=long_description,
long_description_content_type='text/markdown', long_description_content_type='text/markdown',
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论