提交 6d32c885 authored 作者: zw.wang's avatar zw.wang

feat: [merger] 推送钉钉巡检

上级 80b4e640
......@@ -4,16 +4,26 @@ import time
import dynaconf
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from isc_video_record.db import influxdb
from intelab_python_sdk.logger import log
from isc_video_record.db import rabbitmq_connect, redis_connect, mysql
from isc_video_record.utils.alarm_utils import send_alarm_to_developer
from isc_video_record.const import LAST_CHECK_TIME_KEY
from isc_video_record.utils.alarm_utils import send_alarm_to_developer, send_markdown
from isc_video_record.utils.record_utils import get_time_str
from isc_video_record.const import LAST_CHECK_TIME_KEY, PROCESSING_TOTAL_KEY
from isc_video_record.pre_event import PreEvent
from isc_video_record.db.mysql import get_camera_info, insert_video_info
tz = pytz.timezone('Asia/Shanghai')
TIMEZONE = 'Asia/Shanghai'
tz = pytz.timezone(TIMEZONE)
text = """
\n- 当前处于录制中的摄像头个数为:{}
\n- 未处理视频总时长:{}
"""
class EventMergerJob:
......@@ -22,6 +32,11 @@ class EventMergerJob:
self.queue_name = 'ISC_RECORD_JOB'
self.local_service_name = 'cloud-record'
schedulers = BackgroundScheduler({
'apscheduler.timezone': TIMEZONE})
schedulers.add_job(self.run_moniter_task, 'cron', minute=0)
schedulers.start()
@staticmethod
def clean():
pipe = redis_connect()
......@@ -29,6 +44,20 @@ class EventMergerJob:
pipe.delete(key)
pipe.close()
def run_moniter_task(self):
"""
定时检查视频推流服务情况,并发送钉钉报警
"""
influxdb.reconnect()
res = influxdb.query(
'select * from one_week.pre_video_duration order by time desc limit 1')
pre_video_duration = list(res['pre_video_duration'])[0] if res and len(res) > 0 else {'value': 0}
with redis_connect() as pipe:
send_markdown('isc巡检', text.format(
pipe.get(PROCESSING_TOTAL_KEY) or 0,
get_time_str(pre_video_duration['value'])
))
def start(self):
while True:
try:
......@@ -36,13 +65,15 @@ class EventMergerJob:
except Exception as e:
log.exception(e)
send_alarm_to_developer('isc-merger', e)
next_run_at = time.time() + 15 * 16
log.info('next run at: %s', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_run_at)))
next_run_at = time.time() + 15 * 60
log.info('next run at: %s',
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_run_at)))
time.sleep(15 * 60)
def run(self):
pipe = redis_connect()
total_video_duration = 0
for camera in get_camera_info():
try:
body = {
......@@ -58,6 +89,7 @@ class EventMergerJob:
log.info('当前摄像头%s还存在未处理事件,优先处理', camera_code)
body['ex'] = sum([(event['end_time'] - event['start_time']).total_seconds()
for event in untreated_events]) + 10
total_video_duration += body['ex']
self.send_mq_message(body)
continue
......@@ -68,7 +100,8 @@ class EventMergerJob:
# 设备无上次事件,取最近的15分钟作为开始时间
last_check_time = now - timedelta(minutes=15)
else:
last_check_time = datetime.strptime(last_check_time, '%Y-%m-%d %H:%M:%S')
last_check_time = datetime.strptime(
last_check_time, '%Y-%m-%d %H:%M:%S')
# 调整最大事件长度为1天
if now - last_check_time > timedelta(days=1):
last_check_time = now - timedelta(days=1)
......@@ -82,7 +115,8 @@ class EventMergerJob:
last_check_time.strftime('%Y-%m-%d %H:%M:%S'),
now.strftime('%Y-%m-%d %H:%M:%S')
)
alarm_list = list(pre_event.get_alarm_list(camera['point_index_code']))
alarm_list = list(pre_event.get_alarm_list(
camera['point_index_code']))
log.info('camera_code: {}, time: {}-{}, alarm_list: {}'.format(
camera_code, last_check_time, now, len(alarm_list)))
......@@ -90,18 +124,21 @@ class EventMergerJob:
events = PreEvent.merge_alarm_to_event(alarm_list)
else:
events = [{
'start_time': last_check_time.astimezone(tz), 'end_time': now.astimezone(tz)
'start_time': last_check_time.astimezone(tz),
'end_time': now.astimezone(tz)
}]
event_duration = 0
for event in events:
insert_video_info(
camera['db_table'], camera['device_code'],
event['start_time'].astimezone(pytz.utc), event['end_time'].astimezone(pytz.utc),
event['start_time'].astimezone(pytz.utc),
event['end_time'].astimezone(pytz.utc),
camera['biz_type'], camera['service_type'],
status=3 # status=3 表示排队中
)
event_duration += (event['end_time'] - event['start_time']).total_seconds()
event_duration += (event['end_time'] -
event['start_time']).total_seconds()
body = {
'camera_code': camera['device_code'],
......@@ -109,14 +146,30 @@ class EventMergerJob:
'db_table': camera['db_table'],
'ex': event_duration + 10
}
total_video_duration += body['ex']
if len(events) > 0:
self.send_mq_message(body)
log.info(body)
except Exception as e:
log.exception(e)
send_alarm_to_developer(e)
pipe.close()
self.write_video_duration_to_influx(total_video_duration)
def write_video_duration_to_influx(self, total_video_duration):
influxdb.reconnect()
influxdb.write_points([{
'measurement': 'pre_video_duration',
'tags': {
'event_type': '131331'
},
'fields': {
'value': total_video_duration
},
time: datetime.now()
}], retention_policy='one_week')
def send_mq_message(self, body):
connection = rabbitmq_connect()
......
......@@ -19,7 +19,7 @@ class PreEvent(object):
select *
from "one_week"."event_vss"
where
time > $start_time
time >= $start_time
and time < $end_time
and camera_index = $camera_index
'''
......@@ -41,15 +41,16 @@ class PreEvent(object):
for alarm_point in alarm_list:
# 报警时间转换成上海时区
alarm_time = dt_parse(alarm_point['time']).astimezone(tz)
# 合并告警消息最小单位60s
if len(events) > 0 \
and alarm_time - events[-1]['end_time'] < timedelta(seconds=40) \
and alarm_time - events[-1]['end_time'] < timedelta(seconds=60) \
and events[-1]['end_time'] - events[-1]['start_time'] < timedelta(hours=0.5):
events[-1]['end_time'] = alarm_time + timedelta(seconds=40)
events[-1]['end_time'] = alarm_time + timedelta(seconds=10)
continue
events.append({
'start_time': alarm_time - timedelta(seconds=20),
'end_time': alarm_time + timedelta(seconds=40)
'start_time': alarm_time - timedelta(seconds=10),
'end_time': alarm_time + timedelta(seconds=10)
})
return events
......
......@@ -17,3 +17,10 @@ def send_alarm_to_developer(service_name, e):
content, dingtalk_mobiles, False
))
t.start()
def send_markdown(title, text):
dingtalk_config = settings.get('DINGTALK')
dingtalk = DingTalkMessage(dingtalk_config.get('WEBHOOK'),
dingtalk_config.get('SECRET'))
dingtalk.send_markdown(title, text)
......@@ -16,12 +16,13 @@ requires = [
'redis',
'mysql-connector',
'retrying',
'oss2'
'oss2',
'apscheduler'
]
setuptools.setup(
name='isc-video-record',
version='1.0.0a7',
version='1.0.0a8',
description='ISC motion detection playback video stream recording service.',
long_description=long_description,
long_description_content_type='text/markdown',
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论