提交 a8dbe4f1 authored 作者: zw.wang's avatar zw.wang

feat: [tasks] 新增定时任务查询昨日录制情况

上级 30b4da73
...@@ -250,3 +250,22 @@ def get_last_video_info(cursor, conn, db_table, camera_code): ...@@ -250,3 +250,22 @@ def get_last_video_info(cursor, conn, db_table, camera_code):
'''.format(db_table) '''.format(db_table)
cursor.execute(sql, [camera_code]) cursor.execute(sql, [camera_code])
return cursor.fetchone() return cursor.fetchone()
@query(cursor_dict=True)
def get_complete_event_by_time(cursor, conn, db_table, start_time, end_time):
sql = '''
select id as video_id,
`device_code`,
`create_time`,
`start_time`,
`end_time`,
`update_time`,
`video_url`
from {}
where `status`= 1
and `update_time`> %s
and `update_time`< %s
'''.format(db_table)
cursor.execute(sql, [start_time, end_time])
return cursor.fetchall()
...@@ -43,14 +43,23 @@ retry_text = """ ...@@ -43,14 +43,23 @@ retry_text = """
++=== \n ++=== \n
""" """
daily_text = """ ## {}日报, 共{}路摄像头 daily_text = """ ## {}-{}汇总如下: \n
### 共{}路摄像头, 其中{}路离线 \n
### 移动侦测事件: \n
● 总视频时长为: {} \n ● 总视频时长为: {} \n
● 录制完成时长: {} \n 目前录制完成时长: {} \n
● 稍后重试时长: {} \n ● 稍后重试时长: {} \n
● 多次失败时长: {} \n ● 多次失败时长: {} \n
● 暂未处理时长: {} \n ● 暂未处理时长: {} \n
[摄像头视频时长详情]({}) [摄像头视频时长详情]({}) \n
--- \n
### 录制详情: \n
● 进程时间累计耗时: {} \n
● 完成视频总时长: {} \n
● 完成视频分布日期: \n
> {}
""" """
...@@ -61,10 +70,10 @@ class Tasks: ...@@ -61,10 +70,10 @@ class Tasks:
schedulers = BlockingScheduler({ schedulers = BlockingScheduler({
'apscheduler.timezone': TIMEZONE}) 'apscheduler.timezone': TIMEZONE})
schedulers.add_job(self.run_moniter_online, 'interval', minutes=3, seconds=10) schedulers.add_job(self.run_monitoring_online, 'interval', minutes=3, seconds=10)
schedulers.add_job(self.run_moniter_hour, 'cron', minute='0,30') schedulers.add_job(self.run_monitoring_hour, 'cron', minute='0,30')
schedulers.add_job(self.clean_expired, 'cron', hour=4, minute=30) schedulers.add_job(self.clean_expired, 'cron', hour=4, minute=30)
schedulers.add_job(self.run_moniter_daily, 'cron', hour=9, minute=30) schedulers.add_job(self.run_monitoring_daily, 'cron', hour=9, minute=30)
schedulers.start() schedulers.start()
@staticmethod @staticmethod
...@@ -98,7 +107,7 @@ class Tasks: ...@@ -98,7 +107,7 @@ class Tasks:
log.info('删除失效日期在%s之前的视频文件共%s时长', expired_time, get_time_str(video_duration)) log.info('删除失效日期在%s之前的视频文件共%s时长', expired_time, get_time_str(video_duration))
@staticmethod @staticmethod
def run_moniter_online(): def run_monitoring_online():
measure_points = [] measure_points = []
for camera in get_camera_info(): for camera in get_camera_info():
online_info = api_helper.get_camera_online(camera['device_code']) online_info = api_helper.get_camera_online(camera['device_code'])
...@@ -129,7 +138,7 @@ class Tasks: ...@@ -129,7 +138,7 @@ class Tasks:
pipe.close() pipe.close()
@staticmethod @staticmethod
def run_moniter_hour(): def run_monitoring_hour():
""" """
定时检查视频推流服务情况,并发送钉钉报警 定时检查视频推流服务情况,并发送钉钉报警
""" """
...@@ -192,20 +201,20 @@ class Tasks: ...@@ -192,20 +201,20 @@ class Tasks:
), mobiles) ), mobiles)
@staticmethod @staticmethod
def run_moniter_daily(): def run_monitoring_daily():
""" """
监控日报 监控日报
""" """
headers = [ headers = [
'区域路径', '区域路径',
'摄像头序列号', '摄像头名称', 'IndexCode', '摄像头序列号', '摄像头名称', 'IndexCode',
'总时长', '完成', '失败', '未处理', '未知', '在线时长', '离线时长', '存储' '总时长', '完成', '失败', '未处理', '未知', '在线时长', '离线时长', '存储', '标签'
] ]
columns = [ columns = [
'region_path_name', 'region_path_name',
'device_code', 'camera_name', 'point_index_code', 'device_code', 'camera_name', 'point_index_code',
'total', 'done', 'failed', 'untreated', 'unknown', 'total', 'done', 'failed', 'untreated', 'unknown',
'online_duration', 'offline_duration', 'db_table' 'online_duration', 'offline_duration', 'db_table', 'status'
] ]
query_str = ''' query_str = '''
...@@ -222,6 +231,7 @@ class Tasks: ...@@ -222,6 +231,7 @@ class Tasks:
start_time = end_time - timedelta(days=1) start_time = end_time - timedelta(days=1)
file_name = 'isc-daily-{}.xlsx'.format(end_time_cst.strftime('%Y%m%d')) file_name = 'isc-daily-{}.xlsx'.format(end_time_cst.strftime('%Y%m%d'))
camera_count = 0 camera_count = 0
offline_camera_count = 0
daily_untreated_duration = 0 daily_untreated_duration = 0
daily_total_duration = daily_done_duration = 0 daily_total_duration = daily_done_duration = 0
...@@ -265,6 +275,16 @@ class Tasks: ...@@ -265,6 +275,16 @@ class Tasks:
camera['online_duration'] = get_time_str(online_seconds) camera['online_duration'] = get_time_str(online_seconds)
camera['offline_duration'] = get_time_str(offline_seconds) camera['offline_duration'] = get_time_str(offline_seconds)
if offline_seconds > 23 * 3600:
status = '离线'
offline_camera_count += 1
elif done >= total - 5:
status = '录制完整'
elif total > 10 and done == 0:
status = '录制失败'
else:
status = '稍后重试'
camera['status'] = status
results.append(camera) results.append(camera)
daily_total_duration += total daily_total_duration += total
...@@ -282,23 +302,41 @@ class Tasks: ...@@ -282,23 +302,41 @@ class Tasks:
send_markdown( send_markdown(
'isc-daily', 'isc-daily',
daily_text.format( daily_text.format(
end_time_cst.strftime('%Y-%m-%d'), (end_time_cst - timedelta(days=1)).strftime('%Y%m%d'), end_time_cst.strftime('%Y%m%d'),
camera_count, camera_count, offline_camera_count,
get_time_str(daily_total_duration), get_time_str(daily_total_duration),
get_time_str(daily_done_duration), get_time_str(daily_done_duration), get_time_str(daily_retry_duration),
get_time_str(daily_retry_duration), get_time_str(daily_failed_duration), get_time_str(daily_untreated_duration),
get_time_str(daily_failed_duration), url, *Tasks.get_consuming_detail(start_time, end_time)
get_time_str(daily_untreated_duration),
url
) )
) )
@staticmethod
def get_consuming_detail(start_time, end_time):
time_consuming = 0
total_duration = 0
group_by_date = {}
for inx in range(8):
db_table = '`common_video_1`.`video_data_motion_{}`'.format(inx)
videos = mysql.get_complete_event_by_time(db_table, start_time, end_time)
for video in videos:
video_date = video['start_time'].strftime('%Y-%m-%d')
video_duration = (video['end_time'] - video['start_time']).total_seconds()
time_consuming += (video['update_time'] - video['create_time']).total_seconds()
total_duration += video_duration
group_by_date[video_date] = group_by_date.get(video_date, 0) + video_duration
return (get_time_str(time_consuming),
get_time_str(total_duration),
' \n\n > '.join(['{}: {}'.format(k, get_time_str(v)) for k, v in group_by_date.items()]))
if __name__ == '__main__': if __name__ == '__main__':
from intelab_python_sdk.logger import log_init from intelab_python_sdk.logger import log_init
log_init(__name__, False) log_init(__name__, False)
t = Tasks() t = Tasks()
# t.start() # t.start()
# t.run_moniter_hour() # t.run_monitoring_hour()
# t.run_moniter_online() # t.run_monitoring_online()
t.clean_expired() # t.clean_expired()
t.run_monitoring_daily()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论