2025-12-19 19:16:41 +08:00
|
|
|
|
import oss2
|
|
|
|
|
|
import os
|
|
|
|
|
|
import uuid
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
|
|
def upload_file_to_oss(local_file_path):
|
|
|
|
|
|
# 建议使用环境变量存储敏感信息,这里为了示例方便,假设已通过环境变量设置
|
|
|
|
|
|
endpoint = 'https://oss-cn-beijing.aliyuncs.com'
|
|
|
|
|
|
access_key_id = "LTAI5tRMxrM95Pi8JEEmqRcg"
|
|
|
|
|
|
access_key_secret = "8vueGCsRVeFyQMcAA7sysO7LSnuJDG"
|
|
|
|
|
|
|
|
|
|
|
|
if not access_key_id or not access_key_secret:
|
|
|
|
|
|
print('❌ 错误: 未找到有效的 OSS 访问密钥,请检查环境变量。')
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 生成唯一 Bucket 名称
|
|
|
|
|
|
bucket_name = f'oss-bucket-yj'
|
|
|
|
|
|
print(f"创建 Bucket: {bucket_name}")
|
|
|
|
|
|
|
|
|
|
|
|
# 初始化 Bucket 对象
|
|
|
|
|
|
auth = oss2.Auth(access_key_id, access_key_secret)
|
|
|
|
|
|
bucket = oss2.Bucket(auth, endpoint, bucket_name)
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 1. 创建 Bucket
|
|
|
|
|
|
bucket.create_bucket(oss2.models.BUCKET_ACL_PUBLIC_READ) # 设置 Bucket 为公共读权限
|
|
|
|
|
|
print(f'✅ 成功创建 Bucket: {bucket_name}')
|
|
|
|
|
|
except oss2.exceptions.BucketAlreadyExists:
|
|
|
|
|
|
print(f'⚠️ Bucket {bucket_name} 已经存在')
|
|
|
|
|
|
print('提示:请使用不同的 Bucket 名称或使用现有 Bucket')
|
|
|
|
|
|
except oss2.exceptions.OssError as e:
|
|
|
|
|
|
print(f'❌ OSS 错误: {e}')
|
|
|
|
|
|
print(f' 错误码: {e.code}')
|
|
|
|
|
|
print(f' 请求 ID: {e.request_id}')
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 2. 验证本地文件是否存在
|
|
|
|
|
|
if not Path(local_file_path).exists():
|
|
|
|
|
|
print(f'❌ 文件错误: 本地文件 {local_file_path} 不存在')
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
file_name = os.path.basename(local_file_path)
|
|
|
|
|
|
oss_object_name = uuid.uuid4().hex[:12] + file_name
|
|
|
|
|
|
result = bucket.put_object_from_file(oss_object_name, local_file_path)
|
|
|
|
|
|
|
|
|
|
|
|
if result.status == 200:
|
|
|
|
|
|
print(f'✅ 文件上传成功')
|
|
|
|
|
|
print(f' ETag: {result.etag}')
|
|
|
|
|
|
print(f' 文件大小: {result.resp.headers.get("content-length")} bytes')
|
|
|
|
|
|
|
|
|
|
|
|
# 生成长期可访问的 URL
|
|
|
|
|
|
public_url = f'https://{bucket_name}.{endpoint.replace("https://", "")}/{oss_object_name}'
|
|
|
|
|
|
print(f' 文件公共 URL(长期有效): {public_url}')
|
|
|
|
|
|
return public_url
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f'❌ 文件上传失败,状态码: {result.status}')
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 示例调用
|
|
|
|
|
|
local_file = r'C:\Users\27942\Desktop\codes\lm_code\test.py'
|
|
|
|
|
|
signed_url = upload_file_to_oss(local_file)
|
|
|
|
|
|
if signed_url:
|
|
|
|
|
|
print(f"可访问文件的 URL: {signed_url}")
|