import os import oss2 import urllib.request from functools import lru_cache from dynaconf import settings from oss2.exceptions import NoSuchKey local_endpoint = 'https://oss-cn-{}-internal.aliyuncs.com' # 局域网配置 config = settings.get('ALI_OSS', {}) region = config.get('region', 'hangzhou') local_endpoint = config.get('endpoint', local_endpoint.format(region)) @lru_cache() def _get_bucket(bucket_name=None, l_region=None): endpoint = 'https://oss-cn-{}.aliyuncs.com'.format(l_region or region) if l_region else local_endpoint bucket_name = bucket_name or config['bucket_name'] bucket = oss2.Bucket(oss2.Auth(config['access_key_id'], config['access_key_secret']), endpoint, bucket_name) return bucket def oss_upload_file(origin_file, local_file): """ 上传文件 """ # 用私有域名替换掉阿里的域名 url_base = config.get('internal', 'https://{}.oss-cn-{}.aliyuncs.com/'.format(config['bucket_name'], region)) bucket = _get_bucket() result = bucket.put_object_from_file(origin_file, local_file) if result.status == 200: origin_file_url = os.path.join(url_base, origin_file) else: origin_file_url = '' return origin_file_url def oss_delete_file(origin_file): """ 删除单个文件 """ res = oss_batch_delete_files([origin_file]) return True if len(res) > 0 else False def oss_batch_delete_files(files_list, bucket_name=None, l_region=None): """ 批量删除云端文件 """ bucket = _get_bucket(bucket_name, l_region) bucket_files = [] for origin_file in files_list: if origin_file.startswith('http://') or origin_file.startswith('https://'): bucket_files.append('/'.join(origin_file.split('/')[3:])) else: bucket_files.append(origin_file) result = bucket.batch_delete_objects(bucket_files) return result.deleted_keys def oss_download_file(origin_file, local_file): """ 下载视频文件 """ old_origin_file = origin_file try: bucket = _get_bucket() if origin_file.startswith('http://') or origin_file.startswith('https://'): origin_file = '/'.join(origin_file.split('/')[3:]) bucket.get_object_to_file(origin_file, local_file) except NoSuchKey: urllib.request.urlretrieve(old_origin_file, local_file) return local_file if __name__ == '__main__': # print(oss_upload_file('test-2.mp4', '/home/wen/Videos/3_C90842327_2020_06_04_13_37_14.mp4')) # print(oss_download_file('https://test-qzwjtest.oss-cn-hangzhou.aliyuncs.com/test-2.mp4', 't.mp4')) # print(oss_delete_file('https://test-qzwjtest.oss-cn-hangzhou.aliyuncs.com/test-2.mp4')) oss_download_file('D00268229_2020-10-23_14-07-13.mp4', '/tmp/v3/videos/D00268229_2020-10-23_14-07-13.mp4')