import oss2 from pathlib import Path import os import uuid def flies(file): file_urls = [] for file_url in file: url = upload_file(file_url) print(url) file_urls.append(url) return file_urls def upload_file(file_path): local_file_path = f'temp/{file_path.name}' try: os.makedirs(os.path.dirname(local_file_path), exist_ok=True) with open(local_file_path, 'wb+') as destination: for chunk in file_path.chunks(): destination.write(chunk) endpoint = 'https://oss-cn-beijing.aliyuncs.com' access_key_id = "LTAI5tRMxrM95Pi8JEEmqRcg" access_key_secret = "8vueGCsRVeFyQMcAA7sysO7LSnuJDG" if not access_key_id or not access_key_secret: print('❌ 错误: 未找到有效的 OSS 访问密钥,请检查环境变量。') return None # 生成唯一 Bucket 名称 bucket_name = f'oss-bucket-yj' print(f"创建 Bucket: {bucket_name}") # 初始化 Bucket 对象 auth = oss2.Auth(access_key_id, access_key_secret) bucket = oss2.Bucket(auth, endpoint, bucket_name) try: # 1. 创建 Bucket bucket.create_bucket(oss2.models.BUCKET_ACL_PUBLIC_READ) # 设置 Bucket 为公共读权限 print(f'✅ 成功创建 Bucket: {bucket_name}') except oss2.exceptions.BucketAlreadyExists: print(f'⚠️ Bucket {bucket_name} 已经存在') print('提示:请使用不同的 Bucket 名称或使用现有 Bucket') except oss2.exceptions.OssError as e: print(f'❌ OSS 错误: {e}') print(f' 错误码: {e.code}') print(f' 请求 ID: {e.request_id}') return None # 2. 验证本地文件是否存在 if not Path(local_file_path).exists(): print(f'❌ 文件错误: 本地文件 {local_file_path} 不存在') return None file_name = os.path.basename(local_file_path) oss_object_name = uuid.uuid4().hex[:12] + file_name result = bucket.put_object_from_file(oss_object_name, local_file_path) if result.status == 200: # 生成长期可访问的 URL public_url = f'https://{bucket_name}.{endpoint.replace("https://", "")}/{oss_object_name}' return public_url else: return None except Exception as e: print(e) finally: try: # 清理临时文件 if os.path.exists(local_file_path): os.remove(local_file_path) except Exception as e: pass