提交 19ba168f authored 作者: zw.wang's avatar zw.wang

chore: [playback] 通过获取摄像头本地视频报错信息判定下次开始时间

上级 9a80f199
...@@ -14,7 +14,7 @@ from isc_video_record.utils.alarm_utils import send_alarm_to_developer ...@@ -14,7 +14,7 @@ from isc_video_record.utils.alarm_utils import send_alarm_to_developer
from isc_video_record.const import LAST_CHECK_TIME_KEY from isc_video_record.const import LAST_CHECK_TIME_KEY
from isc_video_record.pre_event import PreEvent from isc_video_record.pre_event import PreEvent
from isc_video_record.db.mysql import get_camera_info, insert_video_info from isc_video_record.db.mysql import get_camera_info, insert_video_info
from isc_video_record.utils.api_helper import IntelabApiHelper from isc_video_record.utils.api_helper import IntelabApiHelper, PlaybackUrlException
api_helper = IntelabApiHelper() api_helper = IntelabApiHelper()
...@@ -40,9 +40,11 @@ class EventMergerJob: ...@@ -40,9 +40,11 @@ class EventMergerJob:
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_run_at))) time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_run_at)))
time.sleep(15 * 60) time.sleep(15 * 60)
def set_check_time(self, pipe, camera_code, now): def get_camera_local_events(self, pipe, camera, now):
"""
last_check_time_key = LAST_CHECK_TIME_KEY.format(camera_code) 获取摄像头本地移动侦测事件
"""
last_check_time_key = LAST_CHECK_TIME_KEY.format(camera['device_code'])
last_check_time = pipe.get(last_check_time_key) last_check_time = pipe.get(last_check_time_key)
if not last_check_time: if not last_check_time:
# 设备无上次事件,取最近的15分钟作为开始时间 # 设备无上次事件,取最近的15分钟作为开始时间
...@@ -52,11 +54,32 @@ class EventMergerJob: ...@@ -52,11 +54,32 @@ class EventMergerJob:
last_check_time, '%Y-%m-%d %H:%M:%S') last_check_time, '%Y-%m-%d %H:%M:%S')
# 调整最大事件长度为1天 # 调整最大事件长度为1天
if now - last_check_time > timedelta(days=1): if now - last_check_time > timedelta(days=1):
last_check_time = now - timedelta(days=1) now = last_check_time + timedelta(hours=2)
res = pipe.set(last_check_time_key, events = []
now.strftime('%Y-%m-%d %H:%M:%S')) try:
return res, last_check_time events = api_helper.get_cameras_playback_urls(
camera['device_code'],
IntelabApiHelper.iso_format(last_check_time.astimezone(tz)),
IntelabApiHelper.iso_format(now.astimezone(tz)))
except PlaybackUrlException as e:
log.warning(e)
if 'Internal Error' in e.msg:
log.warning('camera[%s] offline', camera['device_code'])
if 'backend service read timeout' in e.msg:
# TODO 需要重新处理事件
now = last_check_time + timedelta(hours=2)
log.warning('查询%s事件跨度较大.', camera['device_code'])
return self.get_camera_local_events(pipe, camera, now)
except Exception as e:
# TODO 网络请求错误重试
log.exception(e)
else:
res = pipe.set(last_check_time_key,
now.strftime('%Y-%m-%d %H:%M:%S'))
if not res:
events = []
return events
def look_untreated_events(self, camera, body): def look_untreated_events(self, camera, body):
""" """
...@@ -71,16 +94,6 @@ class EventMergerJob: ...@@ -71,16 +94,6 @@ class EventMergerJob:
self.send_mq_message(body) self.send_mq_message(body)
return body.get('ex', 0) return body.get('ex', 0)
def get_local_events(self, last_check_time, now, camera):
"""
查询sd卡上移动侦测视频
"""
events = api_helper.get_cameras_playback_urls(
camera['device_code'],
IntelabApiHelper.iso_format(last_check_time.astimezone(tz)),
IntelabApiHelper.iso_format(now.astimezone(tz)))
return events
def get_new_events(self, last_check_time, now, camera): def get_new_events(self, last_check_time, now, camera):
""" """
摄像头录制计划为全天录制时 摄像头录制计划为全天录制时
...@@ -112,20 +125,10 @@ class EventMergerJob: ...@@ -112,20 +125,10 @@ class EventMergerJob:
'db_table': camera['db_table'], 'db_table': camera['db_table'],
} }
now = datetime.utcnow() - timedelta(minutes=3) now = datetime.utcnow() - timedelta(minutes=3)
camera_code = camera['device_code']
online_info = api_helper.get_camera_online(camera_code)
# TODO 摄像头在线状态存在延迟
if not online_info.get('online', 0):
log.info('camera[%s] offline', camera_code)
return 0
event_duration = self.look_untreated_events(camera, body) event_duration = self.look_untreated_events(camera, body)
if event_duration == 0: if event_duration == 0:
res, last_check_time = self.set_check_time(pipe, camera_code, now) for event in self.get_camera_local_events(pipe, camera, now):
event_duration = 0
for event in self.get_local_events(last_check_time, now, camera):
insert_video_info( insert_video_info(
camera['db_table'], camera['device_code'], camera['db_table'], camera['device_code'],
event['start_time'].astimezone(pytz.utc), event['start_time'].astimezone(pytz.utc),
...@@ -136,6 +139,7 @@ class EventMergerJob: ...@@ -136,6 +139,7 @@ class EventMergerJob:
event_duration += (event['end_time'] - event['start_time']).total_seconds() event_duration += (event['end_time'] - event['start_time']).total_seconds()
body['ex'] = event_duration + 10 body['ex'] = event_duration + 10
log.info('camera[%s] event_duration: %s', camera['device_code'], event_duration)
if event_duration > 0: if event_duration > 0:
self.send_mq_message(body) self.send_mq_message(body)
log.info(body) log.info(body)
......
...@@ -242,6 +242,7 @@ class ProcessMessage: ...@@ -242,6 +242,7 @@ class ProcessMessage:
# 可以只通过一个回放流地址取到其他时间段的流 # 可以只通过一个回放流地址取到其他时间段的流
playback_stream = playback_urls[0] playback_stream = playback_urls[0]
else: else:
# 取流失败,直接返回
return {'file_name': file_name, 'is_completed': False, 'recovered_time': start_time} return {'file_name': file_name, 'is_completed': False, 'recovered_time': start_time}
part_num = retry_count = 0 part_num = retry_count = 0
......
...@@ -4,6 +4,27 @@ import dateutil.parser ...@@ -4,6 +4,27 @@ import dateutil.parser
from datetime import datetime, timedelta from datetime import datetime, timedelta
class PlaybackUrlException(Exception):
def __init__(self, response):
if not isinstance(response, dict):
response = response.get_json()
if 'message' in response:
self.msg = response.get('message')
else:
self.msg = response.get('msg')
self.code = response.get('code')
def __str__(self):
return self.msg
def to_dict(self):
return {
'msg': self.msg,
'code': self.code
}
class IntelabApiHelper: class IntelabApiHelper:
def __init__(self): def __init__(self):
config = dynaconf.settings.get('COMMON') config = dynaconf.settings.get('COMMON')
...@@ -24,12 +45,16 @@ class IntelabApiHelper: ...@@ -24,12 +45,16 @@ class IntelabApiHelper:
uri = '/python/api/v1/secure/camera/playback/urls/{}'.format(camera_code) uri = '/python/api/v1/secure/camera/playback/urls/{}'.format(camera_code)
expired_time = datetime.now() + timedelta(minutes=5) expired_time = datetime.now() + timedelta(minutes=5)
response = requests.get(self.host + uri, params=params, timeout=30) response = requests.get(self.host + uri, params=params, timeout=30)
response.raise_for_status() response.raise_for_status()
res_json = response.json()
res_json = response.json().get('data') or {} if res_json.get('code', '0') != '0':
# TODO 数据为空的异常处理 raise PlaybackUrlException(res_json)
data = res_json.get('data')
pre_events = data.get('list') or []
pre_events = res_json.get('list') or []
events = [] events = []
# TODO 单个事件时长较长,需要分切处理 # TODO 单个事件时长较长,需要分切处理
...@@ -47,7 +72,7 @@ class IntelabApiHelper: ...@@ -47,7 +72,7 @@ class IntelabApiHelper:
'end_time': end_time, 'end_time': end_time,
'expired_time': expired_time, 'expired_time': expired_time,
'stream_url': { 'stream_url': {
'url': res_json.get('url'), 'url': data.get('url'),
'extra_args': 'playBackMode=1', 'extra_args': 'playBackMode=1',
'protocol': protocol 'protocol': protocol
} }
...@@ -73,6 +98,13 @@ class IntelabApiHelper: ...@@ -73,6 +98,13 @@ class IntelabApiHelper:
if __name__ == '__main__': if __name__ == '__main__':
api_helper = IntelabApiHelper() api_helper = IntelabApiHelper()
try:
print(api_helper.get_cameras_playback_urls('F97310278',
'2021-05-25T09:14:35.000+08:00',
'2021-05-26T10:15:41.000+08:00'))
except Exception as e:
print(e)
print(api_helper.get_cameras_playback_urls('D86639983', print(api_helper.get_cameras_playback_urls('D86639983',
'2021-05-06T10:14:35.000+08:00', '2021-05-01T10:14:35.000+08:00',
'2021-05-06T10:15:41.000+08:00')) '2021-05-01T10:20:41.000+08:00'))
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论