提交 ed09c34e authored 作者: zw.wang's avatar zw.wang

feat: [online] 判定设备离线处理

上级 62b12173
...@@ -199,7 +199,6 @@ class EventMergerJob: ...@@ -199,7 +199,6 @@ class EventMergerJob:
return events return events
def process_camera(self, pipe, camera): def process_camera(self, pipe, camera):
""""""
body = { body = {
'camera_code': camera['device_code'], 'camera_code': camera['device_code'],
'camera_index': camera['point_index_code'], 'camera_index': camera['point_index_code'],
...@@ -207,6 +206,12 @@ class EventMergerJob: ...@@ -207,6 +206,12 @@ class EventMergerJob:
} }
now = datetime.utcnow() - timedelta(minutes=3) now = datetime.utcnow() - timedelta(minutes=3)
camera_code = camera['device_code'] camera_code = camera['device_code']
online_info = api_helper.get_camera_online(camera_code)
# TODO 摄像头在线状态存在延迟
if not online_info.get('online', 0):
log.info('thread_id:%s:%s: camera offline', self.thread_id, camera_code)
return 0
event_duration = self.look_untreated_events(camera, body) event_duration = self.look_untreated_events(camera, body)
if event_duration == 0: if event_duration == 0:
res, last_check_time = self.set_check_time(pipe, camera_code, now) res, last_check_time = self.set_check_time(pipe, camera_code, now)
......
...@@ -11,7 +11,7 @@ from datetime import timedelta, datetime ...@@ -11,7 +11,7 @@ from datetime import timedelta, datetime
from intelab_python_sdk.logger import log_init, log from intelab_python_sdk.logger import log_init, log
from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat
from isc_video_record.db import rabbitmq_connect, redis_connect from isc_video_record.db import rabbitmq_connect, redis_connect, influxdb
from isc_video_record.const import PROCESSING_CAMERA_KEY from isc_video_record.const import PROCESSING_CAMERA_KEY
from isc_video_record.utils.api_helper import IntelabApiHelper from isc_video_record.utils.api_helper import IntelabApiHelper
from isc_video_record.utils import aliyun_oss from isc_video_record.utils import aliyun_oss
...@@ -170,7 +170,7 @@ class ProcessMessage: ...@@ -170,7 +170,7 @@ class ProcessMessage:
event['end_time'].astimezone(tz)) event['end_time'].astimezone(tz))
t2 = time.time() t2 = time.time()
video_info, error_log = get_video_duration(record_result['file_name']) video_info, _ = get_video_duration(record_result['file_name'])
log.info('thread_id:%s:%s: time consuming: %s, duration: %s, size: %sM', log.info('thread_id:%s:%s: time consuming: %s, duration: %s, size: %sM',
self.thread_id, event['camera_code'], self.thread_id, event['camera_code'],
round(t2 - t1, 2), time_to_seconds(video_info['duration']), round(t2 - t1, 2), time_to_seconds(video_info['duration']),
...@@ -250,14 +250,12 @@ class ProcessMessage: ...@@ -250,14 +250,12 @@ class ProcessMessage:
retry_count += 1 retry_count += 1
complete_duration = (end_time - start_time).total_seconds() complete_duration = (end_time - start_time).total_seconds()
file_info, _ = self.stream_record(playback_stream['stream_url'], start_time, end_time) file_info, error_log = self.stream_record(playback_stream['stream_url'], start_time, end_time)
file_duration = time_to_seconds(file_info['duration']) file_duration = time_to_seconds(file_info['duration'])
if not os.path.isfile(file_info['file_name']): if not os.path.isfile(file_info['file_name']):
log.info('当前录制无文件输出:%s, 重试计数: %s', self.body['camera_code'], retry_count) log.info('当前录制无文件输出:%s, 重试计数: %s', self.body['camera_code'], retry_count)
time.sleep(1) time.sleep(1)
continue elif file_duration < complete_duration - 2:
if file_duration < complete_duration - 2:
# 视频文件时长小于完整时长 # 视频文件时长小于完整时长
new_start_time = start_time + timedelta(seconds=file_duration) new_start_time = start_time + timedelta(seconds=file_duration)
part_file_name = os.path.join(video_path, 'ISC_{}_{}_{}_{}.mp4'.format( part_file_name = os.path.join(video_path, 'ISC_{}_{}_{}_{}.mp4'.format(
...@@ -272,11 +270,14 @@ class ProcessMessage: ...@@ -272,11 +270,14 @@ class ProcessMessage:
part_num += 1 part_num += 1
retry_count = 0 retry_count = 0
else: else:
# 录制完成
part_files_set.add(file_info['file_name']) part_files_set.add(file_info['file_name'])
is_completed = True is_completed = True
start_time = end_time start_time = end_time
break break
self.write_retry_info_to_influx(self.body['camera_code'], error_log)
part_files = sorted(list(part_files_set)) part_files = sorted(list(part_files_set))
if len(part_files) > 1: if len(part_files) > 1:
concat(part_files, file_name, removed=True) concat(part_files, file_name, removed=True)
...@@ -306,8 +307,25 @@ class ProcessMessage: ...@@ -306,8 +307,25 @@ class ProcessMessage:
log.info('thread_id:%s:%s:stream_url: %s', log.info('thread_id:%s:%s:stream_url: %s',
self.thread_id, self.body['camera_code'], stream_url) self.thread_id, self.body['camera_code'], stream_url)
# TODO 多进程处理 # TODO 多进程处理
record_thread(stream_url, file_name, thread_name=self.body['camera_code']) _, error_log = record_thread(stream_url, file_name, thread_name=self.body['camera_code'])
return get_video_duration(file_name) return get_video_duration(file_name), error_log
def write_retry_info_to_influx(self, camera_code, error_log):
try:
influxdb.reconnect()
influxdb.write_points([{
'measurement': 'video_retry_count',
'tags': {
'camera_code': camera_code
},
'fields': {
'value': 1,
'error_log': error_log
},
time: datetime.now()
}], retention_policy='one_week')
except Exception as e:
log.exception(e)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -64,6 +64,21 @@ class IntelabApiHelper: ...@@ -64,6 +64,21 @@ class IntelabApiHelper:
return events return events
def get_camera_online(self, camera_code):
"""
查询摄像头在线状态
:return :
online - 1 在线,0 离线
collectTime: 数据采集时间
"""
uri = '/python/api/v1/secure/camera/status/by/{}'.format(camera_code)
response = requests.get(self.host + uri, timeout=30)
response.raise_for_status()
res_json = response.json().get('data') or {}
return res_json
if __name__ == '__main__': if __name__ == '__main__':
api_helper = IntelabApiHelper() api_helper = IntelabApiHelper()
......
...@@ -17,7 +17,7 @@ def record_thread(stream_url, out_file, thread_name='ffmpeg-log'): ...@@ -17,7 +17,7 @@ def record_thread(stream_url, out_file, thread_name='ffmpeg-log'):
out_file out_file
] ]
log_buffer = '' log_buffer = error_log = ''
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stdin=None, stdin=None,
stderr=subprocess.STDOUT) stderr=subprocess.STDOUT)
...@@ -40,7 +40,8 @@ def record_thread(stream_url, out_file, thread_name='ffmpeg-log'): ...@@ -40,7 +40,8 @@ def record_thread(stream_url, out_file, thread_name='ffmpeg-log'):
log_buffer = '' log_buffer = ''
if log_buffer and 'error' in log_buffer: if log_buffer and 'error' in log_buffer:
log.error('%s:error: %s', thread_name, log_buffer.strip()) error_log += log_buffer.strip()
log.warn('%s:error: %s', thread_name, log_buffer.strip())
elif log_buffer: elif log_buffer:
log.debug('%s: %s', thread_name, log_buffer.strip()) log.debug('%s: %s', thread_name, log_buffer.strip())
...@@ -48,7 +49,7 @@ def record_thread(stream_url, out_file, thread_name='ffmpeg-log'): ...@@ -48,7 +49,7 @@ def record_thread(stream_url, out_file, thread_name='ffmpeg-log'):
return_code = process.poll() return_code = process.poll()
return out_file return out_file, error_log
def get_video_duration(file_name): def get_video_duration(file_name):
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论