提交 45bb863b authored 作者: zw.wang's avatar zw.wang

feat: 多任务并发录制

上级 a8e3cf8c
LAST_CHECK_TIME_KEY = 'hk_isc:recorder:camera:{}' LAST_CHECK_TIME_KEY = 'hk_isc:recorder:camera:{}'
PROCESSING_CAMERA_KEY = 'hk_isc:processing:camera:{}'
...@@ -100,34 +100,62 @@ def get_camera_info(cursor, conn): ...@@ -100,34 +100,62 @@ def get_camera_info(cursor, conn):
@query() @query()
def insert_video_info(cursor, conn, db_table, device_code, file_name, start_time, end_time, def insert_video_info(cursor, conn, db_table, device_code, start_time, end_time,
recovered_time, video_url, video_resolution, biz_type, service_type, status=0): biz_type, service_type, status=0,
file_name=None, video_url=None, video_resolution=None):
sql = ''' sql = '''
insert {} ( insert {} (
device_code, file_name, start_time, end_time, device_code, file_name, start_time, end_time,
recovered_time,
video_url, video_resolution, video_url, video_resolution,
biz_type, service_type, biz_type, service_type,
expired_time, status, update_time, create_time) status, create_time, update_time)
value (%(device_code)s, %(file_name)s, %(start_time)s, %(end_time)s, value (%(device_code)s, %(file_name)s, %(start_time)s, %(end_time)s,
%(recovered_time)s,
%(video_url)s, %(video_resolution)s, %(video_url)s, %(video_resolution)s,
%(biz_type)s, %(service_type)s, %(biz_type)s, %(service_type)s,
date_add(now(),interval 31 day), %(status)s, now(), now() %(status)s, now(), now()
) )
'''.format(db_table) '''.format(db_table)
cursor.execute(sql, { cursor.execute(sql, {
'device_code': device_code, 'device_code': device_code, 'file_name': file_name,
'file_name': file_name, 'start_time': start_time, 'end_time': end_time,
'start_time': start_time, 'video_url': video_url, 'video_resolution': video_resolution,
'end_time': end_time, 'status': status, 'biz_type': biz_type, 'service_type': service_type
'video_url': video_url,
'video_resolution': video_resolution,
'recovered_time': recovered_time,
'status': status,
'biz_type': biz_type,
'service_type': service_type
}) })
video_id = cursor.lastrowid video_id = cursor.lastrowid
conn.commit() conn.commit()
return video_id return video_id
@query()
def update_video_info(cursor, conn, db_table, video_id, status,
file_name=None, video_url=None, video_resolution=None, recovered_time=None):
sql = '''
update {}
set status = %s, update_time = now() {}
where id = %s
'''
sub_set = ''
if file_name:
sub_set += ', file_name = "{}", video_url="{}", video_resolution="{}", recovered_time="{}",' \
'expired_time=date_add(now(),interval 31 day)'\
.format(file_name, video_url, video_resolution, recovered_time)
cursor.execute(sql.format(db_table, sub_set), [status, video_id])
conn.commit()
@query(cursor_dict=True)
def get_untreated_events(cursor, conn, db_table, camera_code, status=3):
sql = '''
select
id as video_id,
device_code as camera_code,
start_time, end_time, status
from {}
where device_code = %s
and create_time > date_sub(create_time, interval 7 day)
and status = %s
order by create_time
'''.format(db_table)
cursor.execute(sql, [camera_code, status])
return cursor.fetchall()
...@@ -7,10 +7,10 @@ from datetime import datetime, timedelta ...@@ -7,10 +7,10 @@ from datetime import datetime, timedelta
from intelab_python_sdk.logger import log from intelab_python_sdk.logger import log
from isc_video_record.db import rabbitmq_connect, redis_connect from isc_video_record.db import rabbitmq_connect, redis_connect, mysql
from isc_video_record.const import LAST_CHECK_TIME_KEY from isc_video_record.const import LAST_CHECK_TIME_KEY
from isc_video_record.pre_event import PreEvent from isc_video_record.pre_event import PreEvent
from isc_video_record.db.mysql import get_camera_info from isc_video_record.db.mysql import get_camera_info, insert_video_info
tz = pytz.timezone('Asia/Shanghai') tz = pytz.timezone('Asia/Shanghai')
...@@ -43,9 +43,23 @@ class EventMergerJob: ...@@ -43,9 +43,23 @@ class EventMergerJob:
pipe = redis_connect() pipe = redis_connect()
for camera in get_camera_info(): for camera in get_camera_info():
try: try:
body = {
'camera_code': camera['device_code'],
'camera_index': camera['point_index_code'],
'db_table': camera['db_table'],
}
now = datetime.utcnow() - timedelta(minutes=3)
camera_code = camera['device_code'] camera_code = camera['device_code']
untreated_events = mysql.get_untreated_events(
camera['db_table'], camera_code, status=3)
if untreated_events:
log.info('当前摄像头%s还存在未处理事件,优先处理', camera_code)
body['ex'] = sum([(event['end_time'] - event['start_time']).total_seconds()
for event in untreated_events]) + 10
self.send_mq_message(body)
continue
now = datetime.utcnow() - timedelta(minutes=3)
last_check_time_key = LAST_CHECK_TIME_KEY.format(camera_code) last_check_time_key = LAST_CHECK_TIME_KEY.format(camera_code)
last_check_time = pipe.get(last_check_time_key) last_check_time = pipe.get(last_check_time_key)
if not last_check_time: if not last_check_time:
...@@ -70,12 +84,6 @@ class EventMergerJob: ...@@ -70,12 +84,6 @@ class EventMergerJob:
log.info('camera_code: {}, time: {}-{}, alarm_list: {}'.format( log.info('camera_code: {}, time: {}-{}, alarm_list: {}'.format(
camera_code, last_check_time, now, len(alarm_list))) camera_code, last_check_time, now, len(alarm_list)))
connection = None
if len(alarm_list) > 0:
connection = rabbitmq_connect()
channel = connection.channel()
channel.queue_declare(self.queue_name, durable=True)
if dynaconf.settings.get('EVENT_ON', True): if dynaconf.settings.get('EVENT_ON', True):
events = PreEvent.merge_alarm_to_event(alarm_list) events = PreEvent.merge_alarm_to_event(alarm_list)
else: else:
...@@ -83,31 +91,44 @@ class EventMergerJob: ...@@ -83,31 +91,44 @@ class EventMergerJob:
'start_time': last_check_time.astimezone(tz), 'end_time': now.astimezone(tz) 'start_time': last_check_time.astimezone(tz), 'end_time': now.astimezone(tz)
}] }]
event_duration = 0
for event in events: for event in events:
insert_video_info(
camera['db_table'], camera['device_code'],
event['start_time'].astimezone(pytz.utc), event['end_time'].astimezone(pytz.utc),
camera['biz_type'], camera['service_type'],
status=3 # status=3 表示排队中
)
event_duration += (event['end_time'] - event['start_time']).total_seconds()
body = { body = {
'camera_index': camera['point_index_code'],
'start_time': event['start_time'].astimezone(pytz.utc).strftime('%Y-%m-%dT%H:%M:%S'),
'end_time': event['end_time'].astimezone(pytz.utc).strftime('%Y-%m-%dT%H:%M:%S'),
'event_type': camera['event_type'],
'camera_code': camera['device_code'], 'camera_code': camera['device_code'],
'camera_index': camera['point_index_code'],
'db_table': camera['db_table'], 'db_table': camera['db_table'],
'biz_type': camera['biz_type'], 'event_duration': event_duration + 10
'service_type': camera['service_type']
} }
log.info(body) if len(events) > 0:
channel.basic_publish(exchange='', routing_key=self.queue_name, self.send_mq_message(body)
body=json.dumps(body, ensure_ascii=False))
if connection: log.info(body)
connection.close()
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)
pipe.close() pipe.close()
def send_mq_message(self, body):
connection = rabbitmq_connect()
channel = connection.channel()
channel.queue_declare(self.queue_name, durable=True)
channel.basic_publish(exchange='', routing_key=self.queue_name,
body=json.dumps(body, ensure_ascii=False))
connection.close()
if __name__ == '__main__': if __name__ == '__main__':
from intelab_python_sdk.logger import log_init from intelab_python_sdk.logger import log_init
log_init('event_merger', False, '/var/log/event_vss/') log_init('event_merger', False, '/var/log/event_vss/')
em = EventMergerJob() em = EventMergerJob()
em.start() em.start()
...@@ -10,11 +10,12 @@ from datetime import timedelta, datetime ...@@ -10,11 +10,12 @@ from datetime import timedelta, datetime
from intelab_python_sdk.logger import log_init, log from intelab_python_sdk.logger import log_init, log
from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat from intelab_python_sdk.ffmpeg.ffmpeg_concat import concat
from isc_video_record.db import rabbitmq_connect from isc_video_record.db import rabbitmq_connect, redis_connect
from isc_video_record.const import PROCESSING_CAMERA_KEY
from isc_video_record.utils.api_helper import IntelabApiHelper from isc_video_record.utils.api_helper import IntelabApiHelper
from isc_video_record.utils import aliyun_oss from isc_video_record.utils import aliyun_oss
from isc_video_record.utils.record_utils import record_thread, get_video_duration, time_to_seconds from isc_video_record.utils.record_utils import record_thread, get_video_duration, time_to_seconds
from isc_video_record.db.mysql import insert_video_info from isc_video_record.db import mysql
tz = pytz.timezone('Asia/Shanghai') tz = pytz.timezone('Asia/Shanghai')
...@@ -31,6 +32,11 @@ class StreamRecorder: ...@@ -31,6 +32,11 @@ class StreamRecorder:
self.channel = self.connection.channel() self.channel = self.connection.channel()
@staticmethod
def set_nx(pipe, camera_key, thread_id, ex):
res = pipe.set(camera_key, thread_id, nx=True, ex=int(ex))
return res
def start(self): def start(self):
log.info('binding to queue {}'.format(self.queue_name)) log.info('binding to queue {}'.format(self.queue_name))
...@@ -42,22 +48,30 @@ class StreamRecorder: ...@@ -42,22 +48,30 @@ class StreamRecorder:
""" """
if ch.is_open: if ch.is_open:
ch.basic_ack(delivery_tag) ch.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def do_work(conn, ch, delivery_tag, body): def do_work(conn, ch, delivery_tag, body):
thread_id = threading.get_ident() thread_id = threading.get_ident()
log.info('Thread id: %s Delivery tag: %s Message body: %s', thread_id, log.info('Thread id: %s Delivery tag: %s Message body: %s', thread_id,
delivery_tag, body) delivery_tag, body)
ack = False
pipe = redis_connect()
camera_key = PROCESSING_CAMERA_KEY.format(body['camera_code'])
try: try:
# TODO 异常报错时不需要ack掉消息 if self.set_nx(pipe, camera_key, thread_id, body['ex']):
self.process_message(body) ack = self.process_message(pipe, camera_key, thread_id, body)
else:
log.info('Thread id:%s: camera_code[%s] processing, end.', thread_id, body['camera_code'])
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)
finally:
if pipe.get(camera_key) == str(thread_id):
pipe.delete(camera_key)
cb = functools.partial(ack_message, ch, delivery_tag) cb = functools.partial(ack_message, ch, delivery_tag)
conn.add_callback_threadsafe(cb) conn.add_callback_threadsafe(cb)
log.info('finished processing MQ message. ack=%s', ack)
def on_message(ch, method_frame, _header_frame, body, args): def on_message(ch, method_frame, _header_frame, body, args):
(conn, thrds) = args (conn, thrds) = args
...@@ -72,7 +86,7 @@ class StreamRecorder: ...@@ -72,7 +86,7 @@ class StreamRecorder:
on_message_callback = functools.partial( on_message_callback = functools.partial(
on_message, args=(self.connection, threads)) on_message, args=(self.connection, threads))
self.channel.basic_qos(prefetch_count=1) self.channel.basic_qos(prefetch_count=2)
self.channel.basic_consume(on_message_callback=on_message_callback, self.channel.basic_consume(on_message_callback=on_message_callback,
queue=self.queue_name) queue=self.queue_name)
...@@ -90,17 +104,33 @@ class StreamRecorder: ...@@ -90,17 +104,33 @@ class StreamRecorder:
self.connection.close() self.connection.close()
def process_message(self, body): def process_message(self, pipe, camera_key, thread_id, body):
ack = False
events = mysql.get_untreated_events(body['db_table'], body['camera_code'])
log.info('Thread_id:%s: events count: %s', thread_id, len(events))
for inx, event in enumerate(events):
log.info(event)
if pipe.keys(camera_key):
if pipe.get(camera_key) == str(thread_id):
self.recording(thread_id, body['db_table'], event)
continue
if not self.set_nx(pipe, camera_key, thread_id, body['ex']):
break
else:
ack = True
return ack
def recording(self, thread_id, db_table, event):
t1 = time.time() t1 = time.time()
record_result = self.recorder( record_result = self.recorder(event['camera_code'], event['start_time'], event['end_time'], thread_id)
body['camera_code'],
datetime.strptime(body['start_time'], '%Y-%m-%dT%H:%M:%S').astimezone(tz),
datetime.strptime(body['end_time'], '%Y-%m-%dT%H:%M:%S').astimezone(tz),
)
t2 = time.time() t2 = time.time()
video_info, error_log = get_video_duration(record_result['file_name']) video_info, error_log = get_video_duration(record_result['file_name'])
log.info('Time consuming: %s, duration: %s, size: %sM', log.info('Thread_id:%s: Time consuming: %s, duration: %s, size: %sM',
thread_id,
round(t2 - t1, 2), time_to_seconds(video_info['duration']), round(t2 - t1, 2), time_to_seconds(video_info['duration']),
video_info['size']) video_info['size'])
url = '' url = ''
...@@ -108,32 +138,31 @@ class StreamRecorder: ...@@ -108,32 +138,31 @@ class StreamRecorder:
if record_result['file_name'] and os.path.isfile(record_result['file_name']): if record_result['file_name'] and os.path.isfile(record_result['file_name']):
url = aliyun_oss.oss_upload_file('isc_record/' + file_name, url = aliyun_oss.oss_upload_file('isc_record/' + file_name,
record_result['file_name']) record_result['file_name'])
status = 1 if record_result['is_completed'] else 2
mysql.update_video_info(db_table, event['video_id'], status,
file_name=file_name, video_url=url, video_resolution=video_info['resolution'],
recovered_time=record_result['recovered_time'].astimezone(pytz.utc))
os.remove(record_result['file_name']) os.remove(record_result['file_name'])
video_id = insert_video_info(body['db_table'], body['camera_code'], file_name,
body['start_time'], body['end_time'],
recovered_time=record_result['recovered_time'].astimezone(pytz.utc),
video_url=url, video_resolution=video_info['resolution'],
biz_type=body['biz_type'], service_type=body['service_type'],
status=1 if record_result['is_completed'] else 2)
log.info('video_info: %s, url: %s, video_id: %s.%s, time: %s', log.info('video_info: %s, url: %s, video_id: %s.%s, time: %s',
video_info, url, body['db_table'], video_info, url, db_table,
video_id, round(time.time() - t1, 2)) event['video_id'], round(time.time() - t1, 2))
return True return True
@staticmethod @staticmethod
def recorder(camera_code, start_time, end_time): def recorder(camera_code, start_time, end_time, thread_id=1):
""" """
:param camera_code: 摄像头序列号 :param camera_code: 摄像头序列号
:param start_time: 开始时间,上海时区 :param start_time: 开始时间,上海时区
:param end_time: 结束时间,上海时区 :param end_time: 结束时间,上海时区
:param thread_id:
""" """
playback_urls = api_helper.get_cameras_playback_urls( playback_urls = api_helper.get_cameras_playback_urls(
camera_code, camera_code,
IntelabApiHelper.iso_format(start_time), IntelabApiHelper.iso_format(start_time),
IntelabApiHelper.iso_format(end_time) IntelabApiHelper.iso_format(end_time)
) )
log.info('playback: %s', playback_urls) log.info('thread_id:%s: playback: %s', thread_id, playback_urls)
file_name = os.path.join(video_path, 'ISC_{}_{}_{}.mp4'.format( file_name = os.path.join(video_path, 'ISC_{}_{}_{}.mp4'.format(
camera_code, camera_code,
start_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S'), start_time.astimezone(pytz.utc).strftime('%Y%m%dT%H%M%S'),
......
...@@ -26,7 +26,7 @@ class IntelabApiHelper: ...@@ -26,7 +26,7 @@ class IntelabApiHelper:
response = requests.get(self.host + uri, params=params, timeout=30) response = requests.get(self.host + uri, params=params, timeout=30)
response.raise_for_status() response.raise_for_status()
res_json = response.json().get('data') res_json = response.json().get('data') or {}
# TODO 数据为空的异常处理 # TODO 数据为空的异常处理
results = [] results = []
for f in res_json.get('list') or []: for f in res_json.get('list') or []:
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论