from contextlib import contextmanager from functools import wraps import mysql.connector from dynaconf import settings from intelab_python_sdk.logger import log from mysql.connector import errorcode from retrying import retry class MysqlClient(object): def __init__(self, config): self.config = dict( host=config.get('HOST'), port=config.get('PORT'), user=config.get('USER'), password=config.get('PASSWORD'), database=config.get('DATABASE'), auth_plugin='mysql_native_password' ) @contextmanager def mysql_connector(self, cursor_dict=False): """ 构建上下文 """ conn = None try: conn = mysql.connector.connect(**self.config) if cursor_dict: cursor = conn.cursor( cursor_class=mysql.connector.cursor.MySQLCursorDict) else: cursor = conn.cursor() yield cursor, conn except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: log.error( "Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: log.error("Database does not exist") else: log.error("cannot connect to mysql. err: {}".format(err)) raise err finally: if conn: conn.close() db = MysqlClient(settings.MYSQL) version = settings.get('version', 'v3') def query(cursor_dict=False): """ 执行sql 语句 对被装饰的函数参数进行修改,传出mysql的connector和cursor 默认重试5次 :param cursor_dict: 查询结果为dict """ def in_func(func): @wraps(func) @retry(stop_max_attempt_number=5, wait_random_min=100, wait_random_max=1000) def connect(*args, **kw): with db.mysql_connector(cursor_dict) as (cursor, conn): return func(cursor, conn, *args, **kw) return connect return in_func @query(cursor_dict=True) def get_camera_sn(cursor, conn): sql = ''' select device_number camera_sn, biz_type, name camera_name from camera.video_config ''' # where device_number = 'E57379052' cursor.execute(sql) return cursor.fetchall() @query(cursor_dict=True) def get_camera_video_url(cursor, conn, table, camera_sn, date): sql = ''' select video_url, id video_id, status, start_date, end_date, file_name from {} where date = %s and device_serial = %s '''.format(table) cursor.execute(sql, [date, camera_sn]) return cursor.fetchall() @query() def set_video_data_status(cursor, conn, table, video_id): sql = ''' update {} set status = 0 where id = %s '''.format(table) cursor.execute(sql, [video_id]) conn.commit()