from contextlib import contextmanager from functools import wraps import mysql.connector from dynaconf import settings from intelab_python_sdk.logger import log from mysql.connector import errorcode from retrying import retry class MysqlClient(object): def __init__(self, config): self.config = dict( host=config.get('HOST'), port=config.get('PORT'), user=config.get('USER'), password=config.get('PASSWORD'), database=config.get('DATABASE'), auth_plugin='mysql_native_password' ) @contextmanager def mysql_connector(self, cursor_dict=False): """ 构建上下文 """ conn = None try: conn = mysql.connector.connect(**self.config) if cursor_dict: cursor = conn.cursor( cursor_class=mysql.connector.cursor.MySQLCursorDict) else: cursor = conn.cursor() yield cursor, conn except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: log.error( "Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: log.error("Database does not exist") else: log.error("cannot connect to mysql. err: {}".format(err)) raise err finally: if conn: conn.close() db = MysqlClient(settings.MYSQL) version = settings.get('version', 'v3') def query(cursor_dict=False): """ 执行sql 语句 对被装饰的函数参数进行修改,传出mysql的connector和cursor 默认重试5次 :param cursor_dict: 查询结果为dict """ def in_func(func): @wraps(func) @retry(stop_max_attempt_number=5, wait_random_min=100, wait_random_max=1000) def connect(*args, **kw): with db.mysql_connector(cursor_dict) as (cursor, conn): return func(cursor, conn, *args, **kw) return connect return in_func @query(cursor_dict=True) def get_camera_info(cursor, conn): sql = ''' select id as camera_id, name as camera_name, point_index_code, '131331' as event_type, concat('common_video_', mod(service_type, 2), '.', 'video_data_motion_', mod(biz_type, 8)) as 'db_table', ai_config_support, device_code, service_type, biz_type from camera_info join camera_ai_config cac on camera_info.id = cac.camera_info_id where biz_type is not null and is_valid = 1 order by create_time; ''' cursor.execute(sql) cameras = cursor.fetchall() if not cameras: cameras = [] return cameras @query() def insert_video_info(cursor, conn, db_table, device_code, start_time, end_time, biz_type, service_type, status=0, file_name=None, video_url=None, video_resolution=None): sql = ''' insert {} ( device_code, file_name, start_time, end_time, video_url, video_resolution, biz_type, service_type, status, create_time, update_time) value (%(device_code)s, %(file_name)s, %(start_time)s, %(end_time)s, %(video_url)s, %(video_resolution)s, %(biz_type)s, %(service_type)s, %(status)s, now(), now() ) '''.format(db_table) cursor.execute(sql, { 'device_code': device_code, 'file_name': file_name, 'start_time': start_time, 'end_time': end_time, 'video_url': video_url, 'video_resolution': video_resolution, 'status': status, 'biz_type': biz_type, 'service_type': service_type }) video_id = cursor.lastrowid conn.commit() return video_id @query() def update_video_info(cursor, conn, db_table, video_id, status, file_name=None, video_url=None, video_resolution=None, recovered_time=None): sql = ''' update {} set status = %s, update_time = now() {} where id = %s ''' sub_set = '' if file_name: sub_set += ', file_name = "{}", video_url="{}", video_resolution="{}", recovered_time="{}",' \ 'expired_time=date_add(now(),interval 31 day)'\ .format(file_name, video_url, video_resolution, recovered_time) cursor.execute(sql.format(db_table, sub_set), [status, video_id]) conn.commit() @query(cursor_dict=True) def get_untreated_events(cursor, conn, db_table, camera_code, status=3, **kw): if 'retry' in kw: sub_str = 'status in (2, 3) and update_time > data_sub(now(), interval 12 hour)' else: sub_str = 'status = {}'.format(status) sql = ''' select id as video_id, device_code as camera_code, start_time, end_time, status, recovered_time, video_url, file_name from {} where device_code = %s and create_time > date_sub(now(), interval 7 day) and {} order by create_time '''.format(db_table, sub_str) cursor.execute(sql, [camera_code]) return cursor.fetchall()