74 lines
2.6 KiB
Python
74 lines
2.6 KiB
Python
import oss2
|
|
|
|
from pathlib import Path
|
|
import os
|
|
import uuid
|
|
def flies(file):
|
|
file_urls = []
|
|
for file_url in file:
|
|
url = upload_file(file_url)
|
|
print(url)
|
|
file_urls.append(url)
|
|
return file_urls
|
|
|
|
def upload_file(file_path):
|
|
local_file_path = f'temp/{file_path.name}'
|
|
try:
|
|
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
|
|
with open(local_file_path, 'wb+') as destination:
|
|
for chunk in file_path.chunks():
|
|
destination.write(chunk)
|
|
endpoint = 'https://oss-cn-beijing.aliyuncs.com'
|
|
access_key_id = "LTAI5tRMxrM95Pi8JEEmqRcg"
|
|
access_key_secret = "8vueGCsRVeFyQMcAA7sysO7LSnuJDG"
|
|
|
|
if not access_key_id or not access_key_secret:
|
|
print('❌ 错误: 未找到有效的 OSS 访问密钥,请检查环境变量。')
|
|
return None
|
|
|
|
# 生成唯一 Bucket 名称
|
|
bucket_name = f'oss-bucket-yj'
|
|
print(f"创建 Bucket: {bucket_name}")
|
|
|
|
# 初始化 Bucket 对象
|
|
auth = oss2.Auth(access_key_id, access_key_secret)
|
|
bucket = oss2.Bucket(auth, endpoint, bucket_name)
|
|
|
|
try:
|
|
# 1. 创建 Bucket
|
|
bucket.create_bucket(oss2.models.BUCKET_ACL_PUBLIC_READ) # 设置 Bucket 为公共读权限
|
|
print(f'✅ 成功创建 Bucket: {bucket_name}')
|
|
except oss2.exceptions.BucketAlreadyExists:
|
|
print(f'⚠️ Bucket {bucket_name} 已经存在')
|
|
print('提示:请使用不同的 Bucket 名称或使用现有 Bucket')
|
|
except oss2.exceptions.OssError as e:
|
|
print(f'❌ OSS 错误: {e}')
|
|
print(f' 错误码: {e.code}')
|
|
print(f' 请求 ID: {e.request_id}')
|
|
return None
|
|
|
|
# 2. 验证本地文件是否存在
|
|
if not Path(local_file_path).exists():
|
|
print(f'❌ 文件错误: 本地文件 {local_file_path} 不存在')
|
|
return None
|
|
|
|
file_name = os.path.basename(local_file_path)
|
|
oss_object_name = uuid.uuid4().hex[:12] + file_name
|
|
result = bucket.put_object_from_file(oss_object_name, local_file_path)
|
|
|
|
if result.status == 200:
|
|
|
|
# 生成长期可访问的 URL
|
|
public_url = f'https://{bucket_name}.{endpoint.replace("https://", "")}/{oss_object_name}'
|
|
return public_url
|
|
else:
|
|
return None
|
|
except Exception as e:
|
|
print(e)
|
|
finally:
|
|
try:
|
|
# 清理临时文件
|
|
if os.path.exists(local_file_path):
|
|
os.remove(local_file_path)
|
|
except Exception as e:
|
|
pass |