提交 dc824437 authored 作者: zw.wang's avatar zw.wang

feat: [tasks] 新增摄像头状态巡检任务

上级 5f0e701f
......@@ -138,8 +138,8 @@ class EventMergerJob:
body['ex'] = event_duration + 10
if event_duration > 0:
self.send_mq_message(body)
log.info(body)
return body['ex']
def run(self):
......@@ -177,7 +177,7 @@ class EventMergerJob:
'value': float(total_video_duration),
'camera_count': camera_count
},
time: datetime.now()
'time': datetime.now()
}], retention_policy='one_week')
def send_mq_message(self, body):
......
......@@ -5,6 +5,8 @@ import pytz
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta
from intelab_python_sdk.logger import log
from isc_video_record.db import redis_connect, mysql, influxdb
from isc_video_record.utils.alarm_utils import send_markdown
from isc_video_record.utils.record_utils import get_time_str
......@@ -12,14 +14,31 @@ from isc_video_record.const import LAST_CHECK_TIME_KEY, PROCESSING_CAMERA_KEY
from isc_video_record.utils.excel_utils import gen_excel
from isc_video_record.utils.aliyun_oss import oss_upload_file
from isc_video_record.db.mysql import get_camera_info
from isc_video_record.utils.api_helper import IntelabApiHelper
api_helper = IntelabApiHelper()
TIMEZONE = 'Asia/Shanghai'
tz = pytz.timezone(TIMEZONE)
text = """
- 当前处于录制中的摄像头个数为:{}
- 未处理视频总时长:{}
- 所属视频摄像头个数:{}
text = """ ## isc视频录制巡检报告
● 当前处于录制中的摄像头个数为:{} \n
● 未处理视频总时长:{} \n
● 当前在线摄像头个数:{} \n
● 当前离线摄像头个数:{} \n
● 摄像头状态巡检时间:{} \n
"""
daily_text = """ ## {}日报, 共{}路摄像头
● 总视频时长为: {} \n
● 录制完成时长: {} \n
● 稍后重试时长: {} \n
● 不再重试时长: {} \n
[摄像头视频时长详情]({})
"""
......@@ -33,10 +52,34 @@ class Tasks:
schedulers = BlockingScheduler({
'apscheduler.timezone': TIMEZONE})
schedulers.add_job(self.run_moniter_online, 'interval', minutes=3, seconds=10)
schedulers.add_job(self.run_moniter_hour, 'cron', minute='0,30')
schedulers.add_job(self.run_moniter_daily, 'cron', hour=10, minute=30)
schedulers.start()
@staticmethod
def run_moniter_online():
measure_points = []
for camera in get_camera_info():
online_info = api_helper.get_camera_online(camera['device_code'])
log.info('camera[%s] online: %s, collect_time: %s', camera['device_code'],
online_info.get('online', 0), online_info.get('collectTime'))
measure_points.append({
'measurement': 'isc_camera_status',
'tags': {
'camera_code': camera['device_code'],
'camera_index': camera['point_index_code']
},
'fields': {
'value': online_info.get('online', 0)
},
'time': online_info['collectTime']
})
if len(measure_points):
influxdb.reconnect()
influxdb.write_points(measure_points, retention_policy='one_week')
@staticmethod
def clean():
pipe = redis_connect()
......@@ -50,16 +93,46 @@ class Tasks:
定时检查视频推流服务情况,并发送钉钉报警
"""
influxdb.reconnect()
# 获取当前录制数量
res = influxdb.query(
'select * from one_week.pre_video_duration order by time desc limit 1')
pre_video_duration = list(res['pre_video_duration'])[0] if res and len(res) > 0 else {
'value': 0, 'camera_count': 0}
# 获取摄像头在线\离线数量
collect_time = datetime.utcnow() - timedelta(minutes=5.1)
query_str = '''
select mode(value), count(value)
from one_week.isc_camera_status
where time >= '{}'
group by value
'''.format(collect_time)
status_res = list(influxdb.query(query_str))
online_count = offline_count = 0
if status_res:
status_res = status_res[0]
for status in status_res:
if status['mode'] == 1:
online_count = status['count']
else:
offline_count = status['count']
collect_time = status['time']
if isinstance(collect_time, str):
collect_time = datetime.strptime(
collect_time, '%Y-%m-%dT%H:%M:%S.%fZ')
mobiles = None
if online_count <= 3:
mobiles = ['15131601294']
with redis_connect() as pipe:
send_markdown('isc巡检', text.format(
len(pipe.keys(PROCESSING_CAMERA_KEY.format('*'))),
get_time_str(pre_video_duration['value']),
pre_video_duration['camera_count']
))
online_count, offline_count,
collect_time.astimezone(tz).strftime('%Y-%m-%d %H:%M:%S')
), mobiles)
@staticmethod
def run_moniter_daily():
......@@ -78,7 +151,9 @@ class Tasks:
start_time = end_time - timedelta(days=1)
file_name = 'isc-daily-{}.xlsx'.format(end_time.strftime('%Y%m%d'))
camera_count = 0
daily_duration = 0
daily_total_duration = daily_done_duration = 0
daily_failed_duration = daily_retry_duration = 0
results = []
for camera in get_camera_info():
......@@ -107,7 +182,10 @@ class Tasks:
camera['total'] = get_time_str(total)
results.append(camera)
daily_duration += total
daily_total_duration += total
daily_done_duration += done
daily_failed_duration += unknown
daily_retry_duration += failed
camera_count += 1
with tempfile.TemporaryDirectory() as tmp:
......@@ -117,13 +195,21 @@ class Tasks:
send_markdown(
'isc-daily',
('**{}日报** \n\n 共{}路摄像头,总视频时长为:{}\n\n [摄像头视频时长详情]({}) ')
.format(end_time.strftime('%Y-%m-%d'), camera_count, get_time_str(daily_duration),
url)
daily_text.format(
end_time.strftime('%Y-%m-%d'),
camera_count,
get_time_str(daily_total_duration),
get_time_str(daily_done_duration),
get_time_str(daily_retry_duration),
get_time_str(daily_failed_duration),
url
)
)
if __name__ == '__main__':
from intelab_python_sdk.logger import log_init
log_init(__name__, False)
t = Tasks()
t.run_moniter_daily()
# t.start()
t.run_moniter_hour()
......@@ -19,8 +19,8 @@ def send_alarm_to_developer(service_name, e):
t.start()
def send_markdown(title, text):
def send_markdown(title, text, mobiles=None):
dingtalk_config = settings.get('DINGTALK')
dingtalk = DingTalkMessage(dingtalk_config.get('WEBHOOK'),
dingtalk_config.get('SECRET'))
dingtalk.send_markdown(title, text)
dingtalk.send_markdown(title, text, mobiles)
......@@ -23,7 +23,7 @@ requires = [
setuptools.setup(
name='isc-video-record',
version='1.0.0a19',
version='1.0.0a20',
description='ISC motion detection playback video stream recording service.',
long_description=long_description,
long_description_content_type='text/markdown',
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论