提交 3ec77629 authored 作者: zw.wang's avatar zw.wang

fix: 修复若干问题

上级 b05597fd
import json
from contextlib import contextmanager
from functools import wraps
......@@ -128,7 +129,9 @@ def insert_video_info(cursor, conn, db_table, device_code, start_time, end_time,
@query()
def update_video_info(cursor, conn, db_table, video_id, status,
file_name=None, video_url=None, video_resolution=None, recovered_time=None):
file_name=None, video_url=None,
video_resolution=None, recovered_time=None, retry_info=None,
next_retry_time=None, remark=None):
sql = '''
update {}
set status = %s, update_time = now() {}
......@@ -139,6 +142,14 @@ def update_video_info(cursor, conn, db_table, video_id, status,
sub_set += ', file_name = "{}", video_url="{}", video_resolution="{}", recovered_time="{}",' \
'expired_time=date_add(now(),interval 31 day)'\
.format(file_name, video_url, video_resolution, recovered_time)
if retry_info:
sub_set += ', retry_info=\'{}\''.format(json.dumps(retry_info, ensure_ascii=False))
if next_retry_time:
sub_set += ', next_retry_time="{}"'.format(next_retry_time)
if remark:
sub_set += ', remark="{}"'.format(remark)
log.info(sql.format(db_table, sub_set), status, video_id)
cursor.execute(sql.format(db_table, sub_set), [status, video_id])
conn.commit()
......@@ -147,7 +158,7 @@ def update_video_info(cursor, conn, db_table, video_id, status,
def get_untreated_events(cursor, conn, db_table, camera_code, status=3, **kw):
if 'retry' in kw:
sub_str = 'status in (2, 3) and update_time > date_sub(now(), interval 12 hour)'
sub_str = '(status = 2 and next_retry_time < now()) or status = 3'
else:
sub_str = 'status = {}'.format(status)
......@@ -159,7 +170,8 @@ def get_untreated_events(cursor, conn, db_table, camera_code, status=3, **kw):
status,
recovered_time,
video_url,
file_name
file_name,
retry_info, next_retry_time, remark
from {}
where device_code = %s
and create_time > date_sub(now(), interval 7 day)
......@@ -169,3 +181,18 @@ def get_untreated_events(cursor, conn, db_table, camera_code, status=3, **kw):
cursor.execute(sql, [camera_code])
return cursor.fetchall()
@query(cursor_dict=True)
def get_events_by_time(cursor, conn, db_table, camera_code, start_time, end_time):
sql = '''
select *
from {}
where device_code = %s
and start_time >= %s
and end_time < %s
order by start_time
'''.format(db_table)
cursor.execute(sql, [camera_code, start_time, end_time])
return cursor.fetchall()
......@@ -2,6 +2,8 @@ import pytz
import json
import time
import dynaconf
import tempfile
import os
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
......@@ -16,6 +18,8 @@ from isc_video_record.utils.record_utils import get_time_str
from isc_video_record.const import LAST_CHECK_TIME_KEY, PROCESSING_TOTAL_KEY
from isc_video_record.pre_event import PreEvent
from isc_video_record.db.mysql import get_camera_info, insert_video_info
from isc_video_record.utils.excel_utils import gen_excel
from isc_video_record.utils.aliyun_oss import oss_upload_file
TIMEZONE = 'Asia/Shanghai'
tz = pytz.timezone(TIMEZONE)
......@@ -23,6 +27,7 @@ tz = pytz.timezone(TIMEZONE)
text = """
- 当前处于录制中的摄像头个数为:{}
- 未处理视频总时长:{}
- 未处理视频摄像头个数:{}
"""
......@@ -34,7 +39,8 @@ class EventMergerJob:
schedulers = BackgroundScheduler({
'apscheduler.timezone': TIMEZONE})
schedulers.add_job(self.run_moniter_task, 'cron', minute=0)
schedulers.add_job(self.run_moniter_hour, 'cron', minute=0)
schedulers.add_job(self.run_moniter_daily, 'cron', hour=10, minute=30)
schedulers.start()
@staticmethod
......@@ -44,20 +50,74 @@ class EventMergerJob:
pipe.delete(key)
pipe.close()
def run_moniter_task(self):
def run_moniter_hour(self):
"""
定时检查视频推流服务情况,并发送钉钉报警
"""
influxdb.reconnect()
res = influxdb.query(
'select * from one_week.pre_video_duration order by time desc limit 1')
pre_video_duration = list(res['pre_video_duration'])[0] if res and len(res) > 0 else {'value': 0}
pre_video_duration = list(res['pre_video_duration'])[0] if res and len(res) > 0 else {
'value': 0, 'camera_count': 0}
with redis_connect() as pipe:
send_markdown('isc巡检', text.format(
pipe.get(PROCESSING_TOTAL_KEY) or 0,
get_time_str(pre_video_duration['value'])
get_time_str(pre_video_duration['value']),
pre_video_duration['camera_count']
))
def run_moniter_daily(self):
"""
监控日报
"""
headers = [
'摄像头序列号', '摄像头名称', 'IndexCode',
'总时长', '完成', '失败', '未处理', '未知'
]
columns = [
'device_code', 'camera_name', 'point_index_code',
'total', 'done', 'failed', 'untreated', 'unknown'
]
end_time = datetime.utcnow().astimezone(tz).replace(hour=0, minute=0, second=0)
start_time = end_time - timedelta(days=1)
file_name = 'isc-daily-{}.xlsx'.format(end_time.strftime('%Y%m%d'))
results = []
for camera in get_camera_info():
done = failed = untreated = unknown = total = 0
events = mysql.get_events_by_time(camera['db_table'], camera['device_code'],
start_time, end_time)
for event in events:
duration = (event['end_time'] - event['start_time']).total_seconds()
total += duration
if event['status'] == 1:
done += duration
elif event['status'] == 2:
failed += duration
elif event['status'] == 3:
untreated += duration
else:
unknown += duration
# TODO 在线和离线时间
camera['done'] = get_time_str(done)
camera['failed'] = get_time_str(failed)
camera['untreated'] = get_time_str(untreated)
camera['unknown'] = get_time_str(unknown)
camera['total'] = get_time_str(total)
results.append(camera)
with tempfile.TemporaryDirectory() as tmp:
tmp_path = os.path.join(tmp, file_name)
gen_excel(results, headers, columns, tmp_path, title='isc-record')
url = oss_upload_file('isc/daily/{}'.format(file_name), tmp_path)
send_markdown(
'isc-daily',
('**{}日报** \n\n [摄像头视频时长详情]({}) ').format(end_time.strftime('%Y-%m-%d'), url)
)
def start(self):
while True:
try:
......@@ -70,95 +130,116 @@ class EventMergerJob:
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_run_at)))
time.sleep(15 * 60)
def set_check_time(self, pipe, camera_code, now):
last_check_time_key = LAST_CHECK_TIME_KEY.format(camera_code)
last_check_time = pipe.get(last_check_time_key)
if not last_check_time:
# 设备无上次事件,取最近的15分钟作为开始时间
last_check_time = now - timedelta(minutes=15)
else:
last_check_time = datetime.strptime(
last_check_time, '%Y-%m-%d %H:%M:%S')
# 调整最大事件长度为1天
if now - last_check_time > timedelta(days=1):
last_check_time = now - timedelta(days=1)
res = pipe.set(last_check_time_key,
now.strftime('%Y-%m-%d %H:%M:%S'))
return res, last_check_time
def look_untreated_events(self, camera, body):
"""
查看当前摄像头是否存在未处理事件
"""
untreated_events = mysql.get_untreated_events(
camera['db_table'], camera['device_code'], retry=True)
if untreated_events:
log.info('当前摄像头%s还存在未处理事件,优先处理', camera['device_code'])
body['ex'] = sum([(event['end_time'] - event['start_time']).total_seconds()
for event in untreated_events]) + 10
self.send_mq_message(body)
return body.get('ex', 0)
def get_new_events(self, last_check_time, now, camera):
"""
查询指定时间段内摄像头是否有移动告警消息并合并成事件
"""
pre_event = PreEvent(
last_check_time.strftime('%Y-%m-%d %H:%M:%S'),
now.strftime('%Y-%m-%d %H:%M:%S')
)
alarm_list = list(pre_event.get_alarm_list(
camera['point_index_code']))
log.info('camera_code: {}, time: {}-{}, alarm_list: {}'.format(
camera['device_code'], last_check_time, now, len(alarm_list)))
if dynaconf.settings.get('EVENT_ON', True):
events = PreEvent.merge_alarm_to_event(alarm_list)
else:
events = [{
'start_time': last_check_time.astimezone(tz),
'end_time': now.astimezone(tz)
}]
return events
def process_camera(self, pipe, camera):
""""""
body = {
'camera_code': camera['device_code'],
'camera_index': camera['point_index_code'],
'db_table': camera['db_table'],
}
now = datetime.utcnow() - timedelta(minutes=3)
camera_code = camera['device_code']
event_duration = self.look_untreated_events(camera, body)
if event_duration == 0:
res, last_check_time = self.set_check_time(pipe, camera_code, now)
event_duration = 0
for event in self.get_new_events(last_check_time, now, camera):
insert_video_info(
camera['db_table'], camera['device_code'],
event['start_time'].astimezone(pytz.utc),
event['end_time'].astimezone(pytz.utc),
camera['biz_type'], camera['service_type'],
status=3 # status=3 表示排队中
)
event_duration += (event['end_time'] - event['start_time']).total_seconds()
body['ex'] = event_duration + 10
if event_duration > 0:
self.send_mq_message(body)
log.info(body)
return body['ex']
def run(self):
pipe = redis_connect()
total_video_duration = 0
camera_count = 0
for camera in get_camera_info():
try:
body = {
'camera_code': camera['device_code'],
'camera_index': camera['point_index_code'],
'db_table': camera['db_table'],
}
camera_code = camera['device_code']
untreated_events = mysql.get_untreated_events(
camera['db_table'], camera_code, status=3)
if untreated_events:
log.info('当前摄像头%s还存在未处理事件,优先处理', camera_code)
body['ex'] = sum([(event['end_time'] - event['start_time']).total_seconds()
for event in untreated_events]) + 10
total_video_duration += body['ex']
self.send_mq_message(body)
continue
now = datetime.utcnow() - timedelta(minutes=3)
last_check_time_key = LAST_CHECK_TIME_KEY.format(camera_code)
last_check_time = pipe.get(last_check_time_key)
if not last_check_time:
# 设备无上次事件,取最近的15分钟作为开始时间
last_check_time = now - timedelta(minutes=15)
else:
last_check_time = datetime.strptime(
last_check_time, '%Y-%m-%d %H:%M:%S')
# 调整最大事件长度为1天
if now - last_check_time > timedelta(days=1):
last_check_time = now - timedelta(days=1)
res = pipe.set(last_check_time_key,
now.strftime('%Y-%m-%d %H:%M:%S'))
if not res:
continue
pre_event = PreEvent(
last_check_time.strftime('%Y-%m-%d %H:%M:%S'),
now.strftime('%Y-%m-%d %H:%M:%S')
)
alarm_list = list(pre_event.get_alarm_list(
camera['point_index_code']))
log.info('camera_code: {}, time: {}-{}, alarm_list: {}'.format(
camera_code, last_check_time, now, len(alarm_list)))
camera_event_duration = self.process_camera(pipe, camera)
total_video_duration += camera_event_duration
if camera_event_duration > 0:
camera_count += 1
if dynaconf.settings.get('EVENT_ON', True):
events = PreEvent.merge_alarm_to_event(alarm_list)
else:
events = [{
'start_time': last_check_time.astimezone(tz),
'end_time': now.astimezone(tz)
}]
event_duration = 0
for event in events:
insert_video_info(
camera['db_table'], camera['device_code'],
event['start_time'].astimezone(pytz.utc),
event['end_time'].astimezone(pytz.utc),
camera['biz_type'], camera['service_type'],
status=3 # status=3 表示排队中
)
event_duration += (event['end_time'] -
event['start_time']).total_seconds()
body = {
'camera_code': camera['device_code'],
'camera_index': camera['point_index_code'],
'db_table': camera['db_table'],
'ex': event_duration + 10
}
total_video_duration += body['ex']
if len(events) > 0:
self.send_mq_message(body)
log.info(body)
except Exception as e:
log.exception(e)
send_alarm_to_developer(e)
pipe.close()
self.write_video_duration_to_influx(total_video_duration)
def write_video_duration_to_influx(self, total_video_duration):
try:
self.write_video_duration_to_influx(total_video_duration, camera_count)
except Exception as e:
log.exception(e)
def write_video_duration_to_influx(self, total_video_duration, camera_count):
influxdb.reconnect()
influxdb.write_points([{
'measurement': 'pre_video_duration',
......@@ -166,7 +247,8 @@ class EventMergerJob:
'event_type': '131331'
},
'fields': {
'value': total_video_duration
'value': float(total_video_duration),
'camera_count': camera_count
},
time: datetime.now()
}], retention_policy='one_week')
......
......@@ -8,6 +8,7 @@ import pytz
import dynaconf
from datetime import timedelta, datetime
from redis.exceptions import ConnectionError
from intelab_python_sdk.logger import log_init, log
from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat
......@@ -36,6 +37,7 @@ class StreamRecorder:
@staticmethod
def set_nx(pipe, camera_key, thread_id, ex):
pipe.pipeline()
res = pipe.set(camera_key, thread_id, nx=True, ex=int(ex))
return res
......@@ -110,8 +112,7 @@ class StreamRecorder:
def process_message(self, pipe, camera_key, thread_id, body):
ack = False
events = mysql.get_untreated_events(body['db_table'], body['camera_code'],
retry='status=2')
events = mysql.get_untreated_events(body['db_table'], body['camera_code'], retry=True)
log.info('Thread_id:%s: events count: %s', thread_id, len(events))
pipe.incr(PROCESSING_TOTAL_KEY) # 当前录制进程数+1
......@@ -126,13 +127,14 @@ class StreamRecorder:
break
else:
ack = True
pipe.decr(PROCESSING_TOTAL_KEY) # 本次录制结束进程数-1
try:
pipe.decr(PROCESSING_TOTAL_KEY) # 本次录制结束进程数-1
except ConnectionError:
with redis_connect() as pipe:
pipe.decr(PROCESSING_TOTAL_KEY)
return ack
def recording(self, thread_id, db_table, event):
# TODO 录制失败的视频重试和续录
if event['status'] == 2:
log.info('下载失败重试任务%s', event['file_name'])
t1 = time.time()
record_result = self.recorder(
......@@ -144,8 +146,8 @@ class StreamRecorder:
t2 = time.time()
video_info, error_log = get_video_duration(record_result['file_name'])
log.info('Thread_id:%s: Time consuming: %s, duration: %s, size: %sM',
thread_id,
log.info('Thread_id:%s:%s: Time consuming: %s, duration: %s, size: %sM',
thread_id, event['camera_code'],
round(t2 - t1, 2), time_to_seconds(video_info['duration']),
video_info['size'])
url = ''
......@@ -154,13 +156,38 @@ class StreamRecorder:
url = aliyun_oss.oss_upload_file('isc_record/' + file_name,
record_result['file_name'])
status = 1 if record_result['is_completed'] else 2
# TODO 失败的续录
mysql.update_video_info(db_table, event['video_id'], status,
file_name=file_name, video_url=url, video_resolution=video_info['resolution'],
recovered_time=record_result['recovered_time'].astimezone(pytz.utc))
os.remove(record_result['file_name'])
else:
# 无有效视频文件,标记事件状态 status = 2
mysql.update_video_info(db_table, event['video_id'], 2)
now = datetime.now()
if not event.get('retry_info'):
event['retry_info'] = '[]'
event['retry_info'] = json.loads(event['retry_info'])
retry_count = len(event['retry_info'])
next_retry_time = remark = None
event['retry_info'].append({
'retry_count': retry_count + 1,
'retry_time': now.strftime('%Y-%m-%d %H:%M:%S'),
'result': 'failed'
})
if retry_count < 5:
# 无有效视频文件,标记事件状态 status = 2
status = 2
next_retry_time = now + timedelta(hours=3)
remark = (event.get('remark') or 'start') + '+failed'
else:
status = 4 # 状态4: 不再重试!
remark = event['remark'] + '+failed+end.'
mysql.update_video_info(db_table, event['video_id'], status,
retry_info=event['retry_info'],
next_retry_time=next_retry_time,
remark=remark)
log.info('video_info: %s, url: %s, video_id: %s.%s, time: %s',
video_info, url, db_table,
......@@ -180,7 +207,7 @@ class StreamRecorder:
IntelabApiHelper.iso_format(start_time),
IntelabApiHelper.iso_format(end_time)
)
log.info('thread_id:%s: playback: %s', thread_id, playback_urls)
log.info('thread_id:%s:%s: playback: %s', thread_id, camera_code, playback_urls)
file_name = os.path.join(video_path, 'ISC_{}_{}_{}.mp4'.format(
camera_code,
start_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S'),
......@@ -195,15 +222,17 @@ class StreamRecorder:
part_num = retry_count = 0
is_completed = False
part_files_set = set()
while retry_count < 6:
while retry_count < 3:
# 重试六次
retry_count += 1
complete_duration = (end_time - start_time).total_seconds()
file_info, _ = StreamRecorder.stream_record(
playback_stream['stream_url'], start_time, end_time)
playback_stream['stream_url'], start_time, end_time, camera_code)
file_duration = time_to_seconds(file_info['duration'])
if not os.path.isfile(file_info['file_name']):
log.info('当前录制无文件输出:%s, 重试计数: %s', camera_code, retry_count)
time.sleep(1)
continue
if file_duration < complete_duration - 2:
......@@ -231,7 +260,8 @@ class StreamRecorder:
concat(part_files, file_name, removed=True)
elif len(part_files) == 1:
shutil.move(part_files[0], file_name)
log.info('The download is complete, file %s', file_name)
log.info('thread_id:%s:%s: The download is complete, file %s',
thread_id, camera_code, file_name)
return {
'file_name': file_name,
'is_completed': is_completed,
......@@ -240,7 +270,7 @@ class StreamRecorder:
}
@staticmethod
def stream_record(stream, start_time, end_time):
def stream_record(stream, start_time, end_time, camera_code):
start_time = start_time.strftime('%Y%m%dT%H%M%S')
end_time = end_time.strftime('%Y%m%dT%H%M%S')
......@@ -252,8 +282,8 @@ class StreamRecorder:
end_time, stream['extra_args'])
file_name = os.path.join(
video_path, 'rtmp_{}_{}.mp4'.format(start_time, end_time))
log.info('stream_url: %s', stream_url)
record_thread(stream_url, file_name)
log.info('{} stream_url: %s', camera_code, stream_url)
record_thread(stream_url, file_name, thread_name=camera_code)
return get_video_duration(file_name)
......
import openpyxl
def gen_excel(rows, headers, columns, file_name, title='sheet'):
new = openpyxl.Workbook()
sheet = new.active
sheet.title = title
for index, value in enumerate(headers):
_ = sheet.cell(row=1, column=index+1, value=u'%s' % value)
for row, target in enumerate(rows):
for col, key in enumerate(columns):
_ = sheet.cell(row=row+2, column=col + 1,
value=u'%s' % target[key])
newworkbook = new.save(file_name)
return newworkbook
......@@ -4,7 +4,7 @@ import subprocess
from intelab_python_sdk.logger import log
def record_thread(stream_url, out_file):
def record_thread(stream_url, out_file, thread_name='ffmpeg-log'):
cmd = [
'ffmpeg',
......@@ -40,9 +40,9 @@ def record_thread(stream_url, out_file):
log_buffer = ''
if log_buffer and 'error' in log_buffer:
log.error('ffmpeg-log:error: %s', log_buffer.strip())
log.error('%s:error: %s', thread_name, log_buffer.strip())
elif log_buffer:
log.debug('ffmpeg-log: %s', log_buffer.strip())
log.debug('%s: %s', thread_name, log_buffer.strip())
log_buffer = stdout
......
......@@ -17,12 +17,13 @@ requires = [
'mysql-connector',
'retrying',
'oss2',
'apscheduler'
'apscheduler',
'openpyxl'
]
setuptools.setup(
name='isc-video-record',
version='1.0.0a9',
version='1.0.0a13',
description='ISC motion detection playback video stream recording service.',
long_description=long_description,
long_description_content_type='text/markdown',
......
from isc_video_record.db.redis import redis_connect
from isc_video_record.const import PROCESSING_CAMERA_KEY, PROCESSING_TOTAL_KEY
with redis_connect() as pipe:
processing_keys = pipe.keys(PROCESSING_CAMERA_KEY.format('*'))
for key in processing_keys:
print('清除任务key:', key)
pipe.delete(key)
print('正在录制中摄像头数量:', pipe.get(PROCESSING_TOTAL_KEY))
pipe.delete(PROCESSING_TOTAL_KEY)
import time
import threading
from isc_video_record.db.redis import redis_connect
def test_setnx():
with redis_connect() as pipe:
res = pipe.set('b', '1', nx=True, ex=60)
time.sleep(2)
print('设置结果', res)
ths = []
for i in range(100):
t = threading.Thread(target=test_setnx, name='thread_{}'.format(i))
t.start()
ths.append(t)
for t in ths:
t.join()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论