85 lines
2.7 KiB
Python
85 lines
2.7 KiB
Python
#!/usr/bin/env python
|
||
#coding=utf-8
|
||
|
||
import os
|
||
import oss2
|
||
from oss2 import Auth
|
||
|
||
def upload_file_to_oss(local_file_path, bucket_name, object_name, access_key_id, access_key_secret, endpoint='https://oss-cn-beijing.aliyuncs.com'):
|
||
"""
|
||
上传文件到OSS
|
||
:param local_file_path: 本地文件路径
|
||
:param bucket_name: OSS桶名称
|
||
:param object_name: OSS对象名称
|
||
:param access_key_id: 阿里云AccessKey ID
|
||
:param access_key_secret: 阿里云AccessKey Secret
|
||
:param endpoint: OSS端点,默认为北京区域
|
||
:return: OSS文件URL
|
||
"""
|
||
print(f"开始上传文件到OSS: {local_file_path}")
|
||
|
||
try:
|
||
# 创建OSS客户端
|
||
auth = Auth(access_key_id, access_key_secret)
|
||
bucket = oss2.Bucket(auth, endpoint, bucket_name)
|
||
|
||
# 上传文件
|
||
bucket.put_object_from_file(object_name, local_file_path)
|
||
print(f"文件上传成功: {object_name}")
|
||
|
||
# 生成可访问的URL
|
||
# 注意:这里生成的是临时URL,有效期为3600秒
|
||
# 如果需要永久URL,需要在OSS桶中设置文件为公共读
|
||
url = bucket.sign_url('GET', object_name, 3600)
|
||
print(f"生成OSS文件URL: {url}")
|
||
|
||
return url
|
||
|
||
except Exception as e:
|
||
print(f"文件上传失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def make_bucket_public(bucket_name, access_key_id, access_key_secret, endpoint='https://oss-cn-beijing.aliyuncs.com'):
|
||
"""
|
||
设置OSS桶为公共读
|
||
:param bucket_name: OSS桶名称
|
||
:param access_key_id: 阿里云AccessKey ID
|
||
:param access_key_secret: 阿里云AccessKey Secret
|
||
:param endpoint: OSS端点,默认为北京区域
|
||
:return: 是否设置成功
|
||
"""
|
||
print(f"开始设置OSS桶为公共读: {bucket_name}")
|
||
|
||
try:
|
||
# 创建OSS客户端
|
||
auth = Auth(access_key_id, access_key_secret)
|
||
bucket = oss2.Bucket(auth, endpoint, bucket_name)
|
||
|
||
# 设置桶的访问策略为公共读
|
||
policy = """
|
||
{
|
||
"Version": "1",
|
||
"Statement": [
|
||
{
|
||
"Action": ["oss:GetObject"],
|
||
"Effect": "Allow",
|
||
"Principal": ["*"],
|
||
"Resource": ["acs:oss:*:*:{bucket_name}/*"],
|
||
"Condition": {}
|
||
}
|
||
]
|
||
}
|
||
""".format(bucket_name=bucket_name)
|
||
|
||
bucket.put_bucket_policy(policy)
|
||
print(f"OSS桶设置为公共读成功: {bucket_name}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"OSS桶设置失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|