提交 80b4e640 authored 作者: zw.wang's avatar zw.wang

feat: 异常告警钉钉推送和异常重试

上级 6bc25db8
LAST_CHECK_TIME_KEY = 'hk_isc:recorder:camera:{}' LAST_CHECK_TIME_KEY = 'hk_isc:recorder:camera:{}'
PROCESSING_CAMERA_KEY = 'hk_isc:processing:camera:{}' PROCESSING_CAMERA_KEY = 'hk_isc:processing:camera:{}'
PROCESSING_TOTAL_KEY = 'hk_isc:recording:processing:total'
...@@ -144,18 +144,28 @@ def update_video_info(cursor, conn, db_table, video_id, status, ...@@ -144,18 +144,28 @@ def update_video_info(cursor, conn, db_table, video_id, status,
@query(cursor_dict=True) @query(cursor_dict=True)
def get_untreated_events(cursor, conn, db_table, camera_code, status=3): def get_untreated_events(cursor, conn, db_table, camera_code, status=3, **kw):
if 'retry' in kw:
sub_str = 'status in (2, 3) and update_time > data_sub(now(), interval 12 hour)'
else:
sub_str = 'status = {}'.format(status)
sql = ''' sql = '''
select select
id as video_id, id as video_id,
device_code as camera_code, device_code as camera_code,
start_time, end_time, status start_time, end_time,
status,
recovered_time,
video_url,
file_name
from {} from {}
where device_code = %s where device_code = %s
and create_time > date_sub(create_time, interval 7 day) and create_time > date_sub(now(), interval 7 day)
and status = %s and {}
order by create_time order by create_time
'''.format(db_table) '''.format(db_table, sub_str)
cursor.execute(sql, [camera_code, status]) cursor.execute(sql, [camera_code])
return cursor.fetchall() return cursor.fetchall()
...@@ -8,6 +8,7 @@ from datetime import datetime, timedelta ...@@ -8,6 +8,7 @@ from datetime import datetime, timedelta
from intelab_python_sdk.logger import log from intelab_python_sdk.logger import log
from isc_video_record.db import rabbitmq_connect, redis_connect, mysql from isc_video_record.db import rabbitmq_connect, redis_connect, mysql
from isc_video_record.utils.alarm_utils import send_alarm_to_developer
from isc_video_record.const import LAST_CHECK_TIME_KEY from isc_video_record.const import LAST_CHECK_TIME_KEY
from isc_video_record.pre_event import PreEvent from isc_video_record.pre_event import PreEvent
from isc_video_record.db.mysql import get_camera_info, insert_video_info from isc_video_record.db.mysql import get_camera_info, insert_video_info
...@@ -34,6 +35,7 @@ class EventMergerJob: ...@@ -34,6 +35,7 @@ class EventMergerJob:
self.run() self.run()
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)
send_alarm_to_developer('isc-merger', e)
next_run_at = time.time() + 15 * 16 next_run_at = time.time() + 15 * 16
log.info('next run at: %s', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_run_at))) log.info('next run at: %s', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_run_at)))
time.sleep(15 * 60) time.sleep(15 * 60)
......
...@@ -12,10 +12,11 @@ from intelab_python_sdk.logger import log_init, log ...@@ -12,10 +12,11 @@ from intelab_python_sdk.logger import log_init, log
from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat
from isc_video_record.db import rabbitmq_connect, redis_connect from isc_video_record.db import rabbitmq_connect, redis_connect
from isc_video_record.const import PROCESSING_CAMERA_KEY from isc_video_record.const import PROCESSING_CAMERA_KEY, PROCESSING_TOTAL_KEY
from isc_video_record.utils.api_helper import IntelabApiHelper from isc_video_record.utils.api_helper import IntelabApiHelper
from isc_video_record.utils import aliyun_oss from isc_video_record.utils import aliyun_oss
from isc_video_record.utils.record_utils import record_thread, get_video_duration, time_to_seconds from isc_video_record.utils.record_utils import record_thread, get_video_duration, time_to_seconds
from isc_video_record.utils.alarm_utils import send_alarm_to_developer
from isc_video_record.db import mysql from isc_video_record.db import mysql
tz = pytz.timezone('Asia/Shanghai') tz = pytz.timezone('Asia/Shanghai')
...@@ -66,6 +67,7 @@ class StreamRecorder: ...@@ -66,6 +67,7 @@ class StreamRecorder:
log.info('Thread id:%s: camera_code[%s] processing, end.', thread_id, body['camera_code']) log.info('Thread id:%s: camera_code[%s] processing, end.', thread_id, body['camera_code'])
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)
send_alarm_to_developer('recorder_{}'.format(thread_id), e)
finally: finally:
if pipe.get(camera_key) == str(thread_id): if pipe.get(camera_key) == str(thread_id):
pipe.delete(camera_key) pipe.delete(camera_key)
...@@ -108,22 +110,29 @@ class StreamRecorder: ...@@ -108,22 +110,29 @@ class StreamRecorder:
def process_message(self, pipe, camera_key, thread_id, body): def process_message(self, pipe, camera_key, thread_id, body):
ack = False ack = False
events = mysql.get_untreated_events(body['db_table'], body['camera_code']) events = mysql.get_untreated_events(body['db_table'], body['camera_code'],
retry='status=2')
log.info('Thread_id:%s: events count: %s', thread_id, len(events)) log.info('Thread_id:%s: events count: %s', thread_id, len(events))
pipe.incr(PROCESSING_TOTAL_KEY) # 当前录制进程数+1
for inx, event in enumerate(events): for inx, event in enumerate(events):
log.info(event) log.info(event)
if pipe.keys(camera_key): if pipe.keys(camera_key):
if pipe.get(camera_key) == str(thread_id): if pipe.get(camera_key) == str(thread_id):
# 判定当前分布式锁是本线程设置的
self.recording(thread_id, body['db_table'], event) self.recording(thread_id, body['db_table'], event)
continue continue
if not self.set_nx(pipe, camera_key, thread_id, body['ex']): if not self.set_nx(pipe, camera_key, thread_id, body['ex']):
break break
else: else:
ack = True ack = True
pipe.decr(PROCESSING_TOTAL_KEY) # 本次录制结束进程数-1
return ack return ack
def recording(self, thread_id, db_table, event): def recording(self, thread_id, db_table, event):
# TODO 录制失败的视频重试和续录
if event['status'] == 2:
log.info('下载失败重试任务%s', event['file_name'])
t1 = time.time() t1 = time.time()
record_result = self.recorder( record_result = self.recorder(
......
from dynaconf import settings
from threading import Thread
from intelab_python_sdk.dingtalk import DingTalkMessage
def send_alarm_to_developer(service_name, e):
dingtalk_config = settings.get('DINGTALK')
dingtalk_mobiles = dingtalk_config.get('MOBILES')
dingtalk = DingTalkMessage(dingtalk_config.get('WEBHOOK'),
dingtalk_config.get('SECRET'))
import traceback
content = '告警::\n{}服务模块出错:\n {}'.format(
service_name, traceback.format_exc())
t = Thread(target=dingtalk.send_text, args=(
content, dingtalk_mobiles, False
))
t.start()
...@@ -21,7 +21,7 @@ requires = [ ...@@ -21,7 +21,7 @@ requires = [
setuptools.setup( setuptools.setup(
name='isc-video-record', name='isc-video-record',
version='1.0.0a6', version='1.0.0a7',
description='ISC motion detection playback video stream recording service.', description='ISC motion detection playback video stream recording service.',
long_description=long_description, long_description=long_description,
long_description_content_type='text/markdown', long_description_content_type='text/markdown',
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论