Files
XCDesktop/tools/shared/oss_upload.py
2026-03-08 01:34:54 +08:00

85 lines
2.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
#coding=utf-8
import os
import oss2
from oss2 import Auth
def upload_file_to_oss(local_file_path, bucket_name, object_name, access_key_id, access_key_secret, endpoint='https://oss-cn-beijing.aliyuncs.com'):
"""
上传文件到OSS
:param local_file_path: 本地文件路径
:param bucket_name: OSS桶名称
:param object_name: OSS对象名称
:param access_key_id: 阿里云AccessKey ID
:param access_key_secret: 阿里云AccessKey Secret
:param endpoint: OSS端点默认为北京区域
:return: OSS文件URL
"""
print(f"开始上传文件到OSS: {local_file_path}")
try:
# 创建OSS客户端
auth = Auth(access_key_id, access_key_secret)
bucket = oss2.Bucket(auth, endpoint, bucket_name)
# 上传文件
bucket.put_object_from_file(object_name, local_file_path)
print(f"文件上传成功: {object_name}")
# 生成可访问的URL
# 注意这里生成的是临时URL有效期为3600秒
# 如果需要永久URL需要在OSS桶中设置文件为公共读
url = bucket.sign_url('GET', object_name, 3600)
print(f"生成OSS文件URL: {url}")
return url
except Exception as e:
print(f"文件上传失败: {e}")
import traceback
traceback.print_exc()
return None
def make_bucket_public(bucket_name, access_key_id, access_key_secret, endpoint='https://oss-cn-beijing.aliyuncs.com'):
"""
设置OSS桶为公共读
:param bucket_name: OSS桶名称
:param access_key_id: 阿里云AccessKey ID
:param access_key_secret: 阿里云AccessKey Secret
:param endpoint: OSS端点默认为北京区域
:return: 是否设置成功
"""
print(f"开始设置OSS桶为公共读: {bucket_name}")
try:
# 创建OSS客户端
auth = Auth(access_key_id, access_key_secret)
bucket = oss2.Bucket(auth, endpoint, bucket_name)
# 设置桶的访问策略为公共读
policy = """
{
"Version": "1",
"Statement": [
{
"Action": ["oss:GetObject"],
"Effect": "Allow",
"Principal": ["*"],
"Resource": ["acs:oss:*:*:{bucket_name}/*"],
"Condition": {}
}
]
}
""".format(bucket_name=bucket_name)
bucket.put_bucket_policy(policy)
print(f"OSS桶设置为公共读成功: {bucket_name}")
return True
except Exception as e:
print(f"OSS桶设置失败: {e}")
import traceback
traceback.print_exc()
return False