85 lines
2.7 KiB
Python
85 lines
2.7 KiB
Python
|
|
#!/usr/bin/env python
|
|||
|
|
#coding=utf-8
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import oss2
|
|||
|
|
from oss2 import Auth
|
|||
|
|
|
|||
|
|
def upload_file_to_oss(local_file_path, bucket_name, object_name, access_key_id, access_key_secret, endpoint='https://oss-cn-beijing.aliyuncs.com'):
|
|||
|
|
"""
|
|||
|
|
上传文件到OSS
|
|||
|
|
:param local_file_path: 本地文件路径
|
|||
|
|
:param bucket_name: OSS桶名称
|
|||
|
|
:param object_name: OSS对象名称
|
|||
|
|
:param access_key_id: 阿里云AccessKey ID
|
|||
|
|
:param access_key_secret: 阿里云AccessKey Secret
|
|||
|
|
:param endpoint: OSS端点,默认为北京区域
|
|||
|
|
:return: OSS文件URL
|
|||
|
|
"""
|
|||
|
|
print(f"开始上传文件到OSS: {local_file_path}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 创建OSS客户端
|
|||
|
|
auth = Auth(access_key_id, access_key_secret)
|
|||
|
|
bucket = oss2.Bucket(auth, endpoint, bucket_name)
|
|||
|
|
|
|||
|
|
# 上传文件
|
|||
|
|
bucket.put_object_from_file(object_name, local_file_path)
|
|||
|
|
print(f"文件上传成功: {object_name}")
|
|||
|
|
|
|||
|
|
# 生成可访问的URL
|
|||
|
|
# 注意:这里生成的是临时URL,有效期为3600秒
|
|||
|
|
# 如果需要永久URL,需要在OSS桶中设置文件为公共读
|
|||
|
|
url = bucket.sign_url('GET', object_name, 3600)
|
|||
|
|
print(f"生成OSS文件URL: {url}")
|
|||
|
|
|
|||
|
|
return url
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"文件上传失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def make_bucket_public(bucket_name, access_key_id, access_key_secret, endpoint='https://oss-cn-beijing.aliyuncs.com'):
|
|||
|
|
"""
|
|||
|
|
设置OSS桶为公共读
|
|||
|
|
:param bucket_name: OSS桶名称
|
|||
|
|
:param access_key_id: 阿里云AccessKey ID
|
|||
|
|
:param access_key_secret: 阿里云AccessKey Secret
|
|||
|
|
:param endpoint: OSS端点,默认为北京区域
|
|||
|
|
:return: 是否设置成功
|
|||
|
|
"""
|
|||
|
|
print(f"开始设置OSS桶为公共读: {bucket_name}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 创建OSS客户端
|
|||
|
|
auth = Auth(access_key_id, access_key_secret)
|
|||
|
|
bucket = oss2.Bucket(auth, endpoint, bucket_name)
|
|||
|
|
|
|||
|
|
# 设置桶的访问策略为公共读
|
|||
|
|
policy = """
|
|||
|
|
{
|
|||
|
|
"Version": "1",
|
|||
|
|
"Statement": [
|
|||
|
|
{
|
|||
|
|
"Action": ["oss:GetObject"],
|
|||
|
|
"Effect": "Allow",
|
|||
|
|
"Principal": ["*"],
|
|||
|
|
"Resource": ["acs:oss:*:*:{bucket_name}/*"],
|
|||
|
|
"Condition": {}
|
|||
|
|
}
|
|||
|
|
]
|
|||
|
|
}
|
|||
|
|
""".format(bucket_name=bucket_name)
|
|||
|
|
|
|||
|
|
bucket.put_bucket_policy(policy)
|
|||
|
|
print(f"OSS桶设置为公共读成功: {bucket_name}")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"OSS桶设置失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|