提交 d7afc0e8 authored 作者: zw.wang's avatar zw.wang

feat: [tasks] 定时任务拆分

上级 ed09c34e
......@@ -9,3 +9,12 @@ stopasgroup=true
killasgroup=true
autostart=true
autorestart=true
[program:tasks]
command=isc_video_record tasks -d -l /var/log/event_vss
directory=/root/isc-video-record
user=root
stopasgroup=true
killasgroup=true
autostart=true
autorestart=true
......@@ -6,7 +6,7 @@ def get_parser():
parsers = argparse.ArgumentParser(
description='ISC motion detection playback video stream recording service.'
)
parsers.add_argument('worker', choices=['merger', 'recorder'], type=str)
parsers.add_argument('worker', choices=['merger', 'recorder', 'tasks'], type=str)
parsers.add_argument('-d', '--debug', required=False, help='Enable debug output',
dest='debug', action='store_true', default=False)
......@@ -32,6 +32,10 @@ def command_line_runner():
from isc_video_record.recorder import StreamRecorder
sr = StreamRecorder()
sr.start()
elif args['worker'] == 'tasks':
from isc_video_record.tasks import Tasks
t = Tasks()
t.start()
else:
parser.print_help
......
......@@ -2,24 +2,18 @@ import pytz
import json
import time
import dynaconf
import tempfile
import os
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from isc_video_record.db import influxdb
from intelab_python_sdk.logger import log
from isc_video_record.db import rabbitmq_connect, redis_connect, mysql
from isc_video_record.utils.alarm_utils import send_alarm_to_developer, send_markdown
from isc_video_record.utils.record_utils import get_time_str
from isc_video_record.const import LAST_CHECK_TIME_KEY, PROCESSING_CAMERA_KEY
from isc_video_record.utils.alarm_utils import send_alarm_to_developer
from isc_video_record.const import LAST_CHECK_TIME_KEY
from isc_video_record.pre_event import PreEvent
from isc_video_record.db.mysql import get_camera_info, insert_video_info
from isc_video_record.utils.excel_utils import gen_excel
from isc_video_record.utils.aliyun_oss import oss_upload_file
from isc_video_record.utils.api_helper import IntelabApiHelper
api_helper = IntelabApiHelper()
......@@ -27,12 +21,6 @@ api_helper = IntelabApiHelper()
TIMEZONE = 'Asia/Shanghai'
tz = pytz.timezone(TIMEZONE)
text = """
- 当前处于录制中的摄像头个数为:{}
- 未处理视频总时长:{}
- 所属视频摄像头个数:{}
"""
class EventMergerJob:
......@@ -40,87 +28,6 @@ class EventMergerJob:
self.queue_name = 'ISC_RECORD_JOB'
self.local_service_name = 'cloud-record'
schedulers = BackgroundScheduler({
'apscheduler.timezone': TIMEZONE})
schedulers.add_job(self.run_moniter_hour, 'cron', minute='0,30')
schedulers.add_job(self.run_moniter_daily, 'cron', hour=10, minute=30)
schedulers.start()
@staticmethod
def clean():
pipe = redis_connect()
for key in pipe.keys(LAST_CHECK_TIME_KEY.format('*')):
pipe.delete(key)
pipe.close()
def run_moniter_hour(self):
"""
定时检查视频推流服务情况,并发送钉钉报警
"""
influxdb.reconnect()
res = influxdb.query(
'select * from one_week.pre_video_duration order by time desc limit 1')
pre_video_duration = list(res['pre_video_duration'])[0] if res and len(res) > 0 else {
'value': 0, 'camera_count': 0}
with redis_connect() as pipe:
send_markdown('isc巡检', text.format(
len(pipe.keys(PROCESSING_CAMERA_KEY.format('*'))),
get_time_str(pre_video_duration['value']),
pre_video_duration['camera_count']
))
def run_moniter_daily(self):
"""
监控日报
"""
headers = [
'摄像头序列号', '摄像头名称', 'IndexCode',
'总时长', '完成', '失败', '未处理', '未知'
]
columns = [
'device_code', 'camera_name', 'point_index_code',
'total', 'done', 'failed', 'untreated', 'unknown'
]
end_time = datetime.utcnow().astimezone(tz).replace(hour=0, minute=0, second=0)
start_time = end_time - timedelta(days=1)
file_name = 'isc-daily-{}.xlsx'.format(end_time.strftime('%Y%m%d'))
results = []
for camera in get_camera_info():
done = failed = untreated = unknown = total = 0
events = mysql.get_events_by_time(camera['db_table'], camera['device_code'],
start_time, end_time)
for event in events:
duration = (event['end_time'] - event['start_time']).total_seconds()
total += duration
if event['status'] == 1:
done += duration
elif event['status'] == 2:
failed += duration
elif event['status'] == 3:
untreated += duration
else:
unknown += duration
# TODO 在线和离线时间
camera['done'] = get_time_str(done)
camera['failed'] = get_time_str(failed)
camera['untreated'] = get_time_str(untreated)
camera['unknown'] = get_time_str(unknown)
camera['total'] = get_time_str(total)
results.append(camera)
with tempfile.TemporaryDirectory() as tmp:
tmp_path = os.path.join(tmp, file_name)
gen_excel(results, headers, columns, tmp_path, title='isc-record')
url = oss_upload_file('isc/daily/{}'.format(file_name), tmp_path)
send_markdown(
'isc-daily',
('**{}日报** \n\n [摄像头视频时长详情]({}) ').format(end_time.strftime('%Y-%m-%d'), url)
)
def start(self):
while True:
try:
......
......@@ -42,6 +42,7 @@ class PreEvent(object):
# 报警时间转换成上海时区
alarm_time = dt_parse(alarm_point['time']).astimezone(tz)
# 合并告警消息最小单位60s
# TODO 事件的开始和结束需要依赖于status,status=1触发,status=2结束
if len(events) > 0 \
and alarm_time - events[-1]['end_time'] < timedelta(seconds=60) \
and events[-1]['end_time'] - events[-1]['start_time'] < timedelta(hours=0.5):
......
......@@ -188,6 +188,8 @@ class ProcessMessage:
recovered_time=record_result['recovered_time'].astimezone(pytz.utc))
os.remove(record_result['file_name'])
else:
# TODO 离线事件重试的时间需要调整为3小时一次,但是普通录制失败的视频重试为30分钟
now = datetime.now()
if not event.get('retry_info'):
event['retry_info'] = '[]'
......@@ -204,7 +206,7 @@ class ProcessMessage:
if retry_count < 5:
# 无有效视频文件,标记事件状态 status = 2
status = 2
next_retry_time = now + timedelta(hours=3)
next_retry_time = now + timedelta(minutes=30)
remark = (event.get('remark') or 'start') + '+failed'
else:
status = 4 # 状态4: 不再重试!
......
import tempfile
import os
import pytz
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta
from isc_video_record.db import redis_connect, mysql, influxdb
from isc_video_record.utils.alarm_utils import send_markdown
from isc_video_record.utils.record_utils import get_time_str
from isc_video_record.const import LAST_CHECK_TIME_KEY, PROCESSING_CAMERA_KEY
from isc_video_record.utils.excel_utils import gen_excel
from isc_video_record.utils.aliyun_oss import oss_upload_file
from isc_video_record.db.mysql import get_camera_info
TIMEZONE = 'Asia/Shanghai'
tz = pytz.timezone(TIMEZONE)
text = """
- 当前处于录制中的摄像头个数为:{}
- 未处理视频总时长:{}
- 所属视频摄像头个数:{}
"""
class Tasks:
def __init__(self):
# TODO 定期删除过期视频文件
pass
def start(self):
schedulers = BlockingScheduler({
'apscheduler.timezone': TIMEZONE})
schedulers.add_job(self.run_moniter_hour, 'cron', minute='0,30')
schedulers.add_job(self.run_moniter_daily, 'cron', hour=10, minute=30)
schedulers.start()
@staticmethod
def clean():
pipe = redis_connect()
for key in pipe.keys(LAST_CHECK_TIME_KEY.format('*')):
pipe.delete(key)
pipe.close()
@staticmethod
def run_moniter_hour():
"""
定时检查视频推流服务情况,并发送钉钉报警
"""
influxdb.reconnect()
res = influxdb.query(
'select * from one_week.pre_video_duration order by time desc limit 1')
pre_video_duration = list(res['pre_video_duration'])[0] if res and len(res) > 0 else {
'value': 0, 'camera_count': 0}
with redis_connect() as pipe:
send_markdown('isc巡检', text.format(
len(pipe.keys(PROCESSING_CAMERA_KEY.format('*'))),
get_time_str(pre_video_duration['value']),
pre_video_duration['camera_count']
))
@staticmethod
def run_moniter_daily():
"""
监控日报
"""
headers = [
'摄像头序列号', '摄像头名称', 'IndexCode',
'总时长', '完成', '失败', '未处理', '未知'
]
columns = [
'device_code', 'camera_name', 'point_index_code',
'total', 'done', 'failed', 'untreated', 'unknown'
]
end_time = datetime.utcnow().astimezone(tz).replace(hour=0, minute=0, second=0)
start_time = end_time - timedelta(days=1)
file_name = 'isc-daily-{}.xlsx'.format(end_time.strftime('%Y%m%d'))
camera_count = 0
daily_duration = 0
results = []
for camera in get_camera_info():
done = failed = untreated = unknown = total = 0
events = mysql.get_events_by_time(camera['db_table'], camera['device_code'],
start_time, end_time)
for event in events:
duration = (event['end_time'] - event['start_time']).total_seconds()
total += duration
if event['status'] == 1:
done += duration
elif event['status'] == 2:
failed += duration
elif event['status'] == 3:
untreated += duration
else:
unknown += duration
# TODO 在线和离线时间
camera['done'] = get_time_str(done)
camera['failed'] = get_time_str(failed)
camera['untreated'] = get_time_str(untreated)
camera['unknown'] = get_time_str(unknown)
camera['total'] = get_time_str(total)
results.append(camera)
daily_duration += total
camera_count += 1
with tempfile.TemporaryDirectory() as tmp:
tmp_path = os.path.join(tmp, file_name)
gen_excel(results, headers, columns, tmp_path, title='isc-record')
url = oss_upload_file('isc/daily/{}'.format(file_name), tmp_path)
send_markdown(
'isc-daily',
('**{}日报** \n\n 共{}路摄像头,总视频时长为:{}\n\n [摄像头视频时长详情]({}) ')
.format(end_time.strftime('%Y-%m-%d'), camera_count, get_time_str(daily_duration),
url)
)
if __name__ == '__main__':
t = Tasks()
t.run_moniter_daily()
t.run_moniter_hour()
......@@ -31,24 +31,15 @@ class IntelabApiHelper:
pre_events = res_json.get('list') or []
events = []
if len(pre_events) > 0:
events.append({
'start_time': dateutil.parser.parse(pre_events[0]['beginTime']),
'end_time': dateutil.parser.parse(pre_events[0]['endTime']),
'expired_time': expired_time,
'stream_url': {
'url': res_json.get('url'),
'extra_args': 'playBackMode=1',
'protocol': protocol
}
})
# TODO 单个事件时长较长,需要分切处理
for pre_event in pre_events[1:]:
for pre_event in pre_events:
# 合并时间间隔较短的事件
start_time = dateutil.parser.parse(pre_event['beginTime'])
end_time = dateutil.parser.parse(pre_event['endTime'])
if start_time - events[-1]['end_time'] <= timedelta(seconds=2):
if len(events) > 0 \
and start_time - events[-1]['end_time'] <= timedelta(seconds=2) \
and events[-1]['end_time'] - events[-1]['start_time'] <= timedelta(hours=0.5):
events[-1]['end_time'] = end_time
else:
events.append({
......@@ -70,7 +61,7 @@ class IntelabApiHelper:
:return :
online - 1 在线,0 离线
collectTime: 数据采集时间
collectTime: 集时间
"""
uri = '/python/api/v1/secure/camera/status/by/{}'.format(camera_code)
......
......@@ -23,7 +23,7 @@ requires = [
setuptools.setup(
name='isc-video-record',
version='1.0.0a18',
version='1.0.0a19',
description='ISC motion detection playback video stream recording service.',
long_description=long_description,
long_description_content_type='text/markdown',
......
import pytz
from dynaconf import settings
from datetime import datetime
from isc_video_record.utils.isc_client import HikVisionClient
from isc_video_record.pre_event import PreEvent
tz = pytz.timezone('Asia/Shanghai')
config = settings.get('ISC')
client = HikVisionClient(config.get('KEY'), config.get('SECRET'),
config.get('HOST'), config.get('PORT'))
camera_index = '8b86a98f6c5143219d69c9c0344b156a'
start_time = datetime(2021, 5, 23, 7, 10, 0)
end_time = datetime(2021, 5, 23, 7, 30, 0)
res = client.get_cameras_playback_urls(camera_index,
client.iso_format(start_time.astimezone(tz)),
client.iso_format(end_time.astimezone(tz)))
for event in res:
cur_start_time = max(event['start_time'], start_time.astimezone(tz))
cur_end_time = min(event['end_time'], end_time.astimezone(tz))
print(cur_start_time, cur_end_time)
print('-------')
pre_events = PreEvent(start_time.strftime('%Y-%m-%d %H:%M:%S'),
end_time.strftime('%Y-%m-%d %H:%M:%S'))
for event in pre_events.merge_alarm_to_event(pre_events.get_alarm_list(camera_index)):
print(event['start_time'], event['end_time'])
......@@ -21,17 +21,18 @@ client = HikVisionClient(config.get('KEY'), config.get('SECRET'),
def main():
start_time = datetime(2021, 5, 18, 16, 7, 0).astimezone(tz)
end_time = datetime(2021, 5, 18, 16, 15, 0).astimezone(tz)
res = client.get_cameras_playback_urls('3dd6c37895a54e5084a390e33a7c806a',
start_time = datetime(2021, 5, 23, 10, 13, 0).astimezone(tz)
end_time = datetime(2021, 5, 23, 10, 59, 0).astimezone(tz)
res = client.get_cameras_playback_urls('8b86a98f6c5143219d69c9c0344b156a',
client.iso_format(start_time), client.iso_format(end_time))
print(res)
for event in res:
cur_start_time = max(event['start_time'], start_time)
cur_end_time = min(event['end_time'], end_time)
print(event)
file_info, error_log = stream_record(event['stream_url'], cur_start_time, cur_end_time)
print(file_info)
print(cur_start_time, cur_end_time)
# print(event)
# file_info, error_log = stream_record(event['stream_url'], cur_start_time, cur_end_time)
# print(file_info)
def stream_record(stream, start_time, end_time):
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论