Initial commit
This commit is contained in:
254
tools/tongyi/utils.py
Normal file
254
tools/tongyi/utils.py
Normal file
@@ -0,0 +1,254 @@
|
||||
# 工具函数
|
||||
|
||||
import os
|
||||
import re
|
||||
import requests
|
||||
from tqdm import tqdm
|
||||
from config import USER_AGENT, COOKIE, CHUNK_SIZE, DOWNLOAD_DIR
|
||||
|
||||
|
||||
def extract_bv_from_url(url):
|
||||
"""
|
||||
从分享链接中提取BV号
|
||||
:param url: 视频分享链接
|
||||
:return: BV号
|
||||
"""
|
||||
pattern = r"BV[0-9A-Za-z]+"
|
||||
match = re.search(pattern, url)
|
||||
if match:
|
||||
return match.group(0)
|
||||
else:
|
||||
raise ValueError("无法从链接中提取BV号")
|
||||
|
||||
|
||||
def get_video_info(bv_id):
|
||||
"""
|
||||
获取视频信息
|
||||
:param bv_id: BV号
|
||||
:return: 视频信息字典
|
||||
"""
|
||||
url = f"https://api.bilibili.com/x/web-interface/view?bvid={bv_id}"
|
||||
headers = {
|
||||
"User-Agent": USER_AGENT,
|
||||
"Cookie": COOKIE
|
||||
}
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
if data.get("code") != 0:
|
||||
raise Exception(f"获取视频信息失败: {data.get('message')}")
|
||||
return data.get("data", {})
|
||||
|
||||
|
||||
def get_play_urls(bv_id, cid):
|
||||
"""
|
||||
获取视频播放链接
|
||||
:param bv_id: BV号
|
||||
:param cid: 视频cid
|
||||
:return: 包含视频和音频链接的字典
|
||||
"""
|
||||
from config import API_URL
|
||||
|
||||
params = {
|
||||
"bvid": bv_id,
|
||||
"cid": cid,
|
||||
"qn": 80,
|
||||
"type": "",
|
||||
"otype": "json",
|
||||
"fourk": 1,
|
||||
"fnval": 16
|
||||
}
|
||||
|
||||
headers = {
|
||||
"User-Agent": USER_AGENT,
|
||||
"Cookie": COOKIE,
|
||||
"Referer": "https://www.bilibili.com/",
|
||||
"Origin": "https://www.bilibili.com"
|
||||
}
|
||||
|
||||
response = requests.get(API_URL, params=params, headers=headers)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if data.get("code") != 0:
|
||||
raise Exception(f"获取播放链接失败: {data.get('message')}")
|
||||
|
||||
play_urls = {
|
||||
"video": [],
|
||||
"audio": []
|
||||
}
|
||||
|
||||
video_info = data.get("data", {}).get("dash", {}).get("video", [])
|
||||
audio_info = data.get("data", {}).get("dash", {}).get("audio", [])
|
||||
|
||||
if not video_info:
|
||||
durl = data.get("data", {}).get("durl", [])
|
||||
for item in durl:
|
||||
play_urls["video"].append({
|
||||
"url": item.get("url"),
|
||||
"quality": 80,
|
||||
"codec": "h264"
|
||||
})
|
||||
else:
|
||||
for item in video_info:
|
||||
play_urls["video"].append({
|
||||
"url": item.get("baseUrl"),
|
||||
"quality": item.get("id"),
|
||||
"codec": item.get("codecs")
|
||||
})
|
||||
|
||||
if not audio_info and video_info:
|
||||
for item in video_info:
|
||||
play_urls["audio"].append({
|
||||
"url": item.get("baseUrl"),
|
||||
"quality": item.get("id"),
|
||||
"codec": item.get("codecs")
|
||||
})
|
||||
else:
|
||||
for item in audio_info:
|
||||
play_urls["audio"].append({
|
||||
"url": item.get("baseUrl"),
|
||||
"quality": item.get("id"),
|
||||
"codec": item.get("codecs")
|
||||
})
|
||||
|
||||
print(f"获取到视频流数量: {len(play_urls['video'])}")
|
||||
print(f"获取到音频流数量: {len(play_urls['audio'])}")
|
||||
|
||||
return play_urls
|
||||
|
||||
|
||||
def sanitize_filename(filename):
|
||||
"""
|
||||
清理文件名,移除非法字符
|
||||
:param filename: 原始文件名
|
||||
:return: 清理后的文件名
|
||||
"""
|
||||
illegal_chars = r"[<>:/\\|?*]"
|
||||
return re.sub(illegal_chars, "_", filename)
|
||||
|
||||
|
||||
def download_file(url, save_path):
|
||||
"""
|
||||
下载文件
|
||||
:param url: 文件下载链接
|
||||
:param save_path: 保存路径
|
||||
:return: 保存路径
|
||||
"""
|
||||
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
||||
|
||||
headers = {
|
||||
"User-Agent": USER_AGENT,
|
||||
"Cookie": COOKIE,
|
||||
"Referer": "https://www.bilibili.com/"
|
||||
}
|
||||
|
||||
response = requests.get(url, headers=headers, stream=True)
|
||||
response.raise_for_status()
|
||||
total_size = int(response.headers.get("content-length", 0))
|
||||
|
||||
with open(save_path, "wb") as file, tqdm(
|
||||
desc=os.path.basename(save_path),
|
||||
total=total_size,
|
||||
unit="iB",
|
||||
unit_scale=True,
|
||||
unit_divisor=1024,
|
||||
) as bar:
|
||||
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
|
||||
size = file.write(chunk)
|
||||
bar.update(size)
|
||||
|
||||
return save_path
|
||||
|
||||
|
||||
def download_audio_video(video_url, audio_url, save_dir, video_title):
|
||||
"""
|
||||
下载视频和音频
|
||||
:param video_url: 视频流链接
|
||||
:param audio_url: 音频流链接
|
||||
:param save_dir: 保存目录
|
||||
:param video_title: 视频标题
|
||||
:return: 视频和音频保存路径
|
||||
"""
|
||||
sanitized_title = sanitize_filename(video_title)
|
||||
|
||||
video_path = os.path.join(save_dir, f"{sanitized_title}_video.mp4")
|
||||
audio_path = os.path.join(save_dir, f"{sanitized_title}_audio.mp4")
|
||||
|
||||
print(f"正在下载视频: {video_title}")
|
||||
download_file(video_url, video_path)
|
||||
|
||||
print(f"正在下载音频: {video_title}")
|
||||
download_file(audio_url, audio_path)
|
||||
|
||||
return video_path, audio_path
|
||||
|
||||
|
||||
def merge_audio_video(video_path, audio_path, output_path):
|
||||
"""
|
||||
合并音视频
|
||||
:param video_path: 视频文件路径
|
||||
:param audio_path: 音频文件路径
|
||||
:param output_path: 输出文件路径
|
||||
:return: 输出文件路径
|
||||
"""
|
||||
import subprocess
|
||||
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
|
||||
print(f"正在合并音视频: {os.path.basename(output_path)}")
|
||||
|
||||
# 尝试使用FFmpeg合并音视频
|
||||
cmd = [
|
||||
"ffmpeg",
|
||||
"-i", video_path,
|
||||
"-i", audio_path,
|
||||
"-c:v", "copy",
|
||||
"-c:a", "aac",
|
||||
"-strict", "experimental",
|
||||
"-y",
|
||||
output_path
|
||||
]
|
||||
|
||||
try:
|
||||
# 尝试执行FFmpeg命令(使用字节模式避免编码问题)
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
check=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE
|
||||
)
|
||||
print("音视频合并完成")
|
||||
except subprocess.CalledProcessError as e:
|
||||
# FFmpeg失败,尝试使用其他方法
|
||||
print(f"FFmpeg合并失败: {e.stderr}")
|
||||
print("尝试使用备用方法...")
|
||||
|
||||
# 备用方法:使用Python的shutil复制文件
|
||||
# 这里只是一个占位符,实际需要更复杂的实现
|
||||
# 但至少可以让用户知道问题所在
|
||||
raise Exception(
|
||||
"音视频合并失败,请确保已安装FFmpeg并添加到系统路径\n"
|
||||
"或尝试安装MoviePy: pip install moviepy"
|
||||
)
|
||||
except FileNotFoundError:
|
||||
raise Exception(
|
||||
"FFmpeg未找到,请安装FFmpeg并添加到系统路径\n"
|
||||
"下载地址: https://ffmpeg.org/download.html"
|
||||
)
|
||||
|
||||
return output_path
|
||||
|
||||
|
||||
def cleanup_files(*file_paths):
|
||||
"""
|
||||
清理临时文件
|
||||
:param file_paths: 要清理的文件路径
|
||||
"""
|
||||
for file_path in file_paths:
|
||||
if os.path.exists(file_path):
|
||||
try:
|
||||
os.remove(file_path)
|
||||
print(f"已清理临时文件: {os.path.basename(file_path)}")
|
||||
except Exception as e:
|
||||
print(f"清理临时文件失败: {e}")
|
||||
Reference in New Issue
Block a user