import pytz import json import time import dynaconf from datetime import datetime, timedelta from intelab_python_sdk.logger import log from isc_video_record.db import rabbitmq_connect, redis_connect, mysql from isc_video_record.const import LAST_CHECK_TIME_KEY from isc_video_record.pre_event import PreEvent from isc_video_record.db.mysql import get_camera_info, insert_video_info tz = pytz.timezone('Asia/Shanghai') class EventMergerJob: def __init__(self): self.queue_name = 'ISC_RECORD_JOB' self.local_service_name = 'cloud-record' @staticmethod def clean(): pipe = redis_connect() for key in pipe.keys(LAST_CHECK_TIME_KEY.format('*')): pipe.delete(key) pipe.close() def start(self): while True: try: self.run() except Exception as e: log.exception(e) next_run_at = time.time() + 15 * 16 log.info('next run at: %s', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_run_at))) time.sleep(15 * 60) def run(self): pipe = redis_connect() for camera in get_camera_info(): try: body = { 'camera_code': camera['device_code'], 'camera_index': camera['point_index_code'], 'db_table': camera['db_table'], } camera_code = camera['device_code'] untreated_events = mysql.get_untreated_events( camera['db_table'], camera_code, status=3) if untreated_events: log.info('当前摄像头%s还存在未处理事件,优先处理', camera_code) body['ex'] = sum([(event['end_time'] - event['start_time']).total_seconds() for event in untreated_events]) + 10 self.send_mq_message(body) continue now = datetime.utcnow() - timedelta(minutes=3) last_check_time_key = LAST_CHECK_TIME_KEY.format(camera_code) last_check_time = pipe.get(last_check_time_key) if not last_check_time: # 设备无上次事件,取最近的15分钟作为开始时间 last_check_time = now - timedelta(minutes=15) else: last_check_time = datetime.strptime(last_check_time, '%Y-%m-%d %H:%M:%S') # 调整最大事件长度为1天 if now - last_check_time > timedelta(days=1): last_check_time = now - timedelta(days=1) res = pipe.set(last_check_time_key, now.strftime('%Y-%m-%d %H:%M:%S')) if not res: continue pre_event = PreEvent( last_check_time.strftime('%Y-%m-%d %H:%M:%S'), now.strftime('%Y-%m-%d %H:%M:%S') ) alarm_list = list(pre_event.get_alarm_list(camera['point_index_code'])) log.info('camera_code: {}, time: {}-{}, alarm_list: {}'.format( camera_code, last_check_time, now, len(alarm_list))) if dynaconf.settings.get('EVENT_ON', True): events = PreEvent.merge_alarm_to_event(alarm_list) else: events = [{ 'start_time': last_check_time.astimezone(tz), 'end_time': now.astimezone(tz) }] event_duration = 0 for event in events: insert_video_info( camera['db_table'], camera['device_code'], event['start_time'].astimezone(pytz.utc), event['end_time'].astimezone(pytz.utc), camera['biz_type'], camera['service_type'], status=3 # status=3 表示排队中 ) event_duration += (event['end_time'] - event['start_time']).total_seconds() body = { 'camera_code': camera['device_code'], 'camera_index': camera['point_index_code'], 'db_table': camera['db_table'], 'ex': event_duration + 10 } if len(events) > 0: self.send_mq_message(body) log.info(body) except Exception as e: log.exception(e) pipe.close() def send_mq_message(self, body): connection = rabbitmq_connect() channel = connection.channel() channel.queue_declare(self.queue_name, durable=True) channel.basic_publish(exchange='', routing_key=self.queue_name, body=json.dumps(body, ensure_ascii=False)) connection.close() if __name__ == '__main__': from intelab_python_sdk.logger import log_init log_init('event_merger', False, '/var/log/event_vss/') em = EventMergerJob() em.start()