Files
XCDesktop/tools/tongyi/utils.py
2026-03-08 01:34:54 +08:00

255 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 工具函数
import os
import re
import requests
from tqdm import tqdm
from config import USER_AGENT, COOKIE, CHUNK_SIZE, DOWNLOAD_DIR
def extract_bv_from_url(url):
"""
从分享链接中提取BV号
:param url: 视频分享链接
:return: BV号
"""
pattern = r"BV[0-9A-Za-z]+"
match = re.search(pattern, url)
if match:
return match.group(0)
else:
raise ValueError("无法从链接中提取BV号")
def get_video_info(bv_id):
"""
获取视频信息
:param bv_id: BV号
:return: 视频信息字典
"""
url = f"https://api.bilibili.com/x/web-interface/view?bvid={bv_id}"
headers = {
"User-Agent": USER_AGENT,
"Cookie": COOKIE
}
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
raise Exception(f"获取视频信息失败: {data.get('message')}")
return data.get("data", {})
def get_play_urls(bv_id, cid):
"""
获取视频播放链接
:param bv_id: BV号
:param cid: 视频cid
:return: 包含视频和音频链接的字典
"""
from config import API_URL
params = {
"bvid": bv_id,
"cid": cid,
"qn": 80,
"type": "",
"otype": "json",
"fourk": 1,
"fnval": 16
}
headers = {
"User-Agent": USER_AGENT,
"Cookie": COOKIE,
"Referer": "https://www.bilibili.com/",
"Origin": "https://www.bilibili.com"
}
response = requests.get(API_URL, params=params, headers=headers)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
raise Exception(f"获取播放链接失败: {data.get('message')}")
play_urls = {
"video": [],
"audio": []
}
video_info = data.get("data", {}).get("dash", {}).get("video", [])
audio_info = data.get("data", {}).get("dash", {}).get("audio", [])
if not video_info:
durl = data.get("data", {}).get("durl", [])
for item in durl:
play_urls["video"].append({
"url": item.get("url"),
"quality": 80,
"codec": "h264"
})
else:
for item in video_info:
play_urls["video"].append({
"url": item.get("baseUrl"),
"quality": item.get("id"),
"codec": item.get("codecs")
})
if not audio_info and video_info:
for item in video_info:
play_urls["audio"].append({
"url": item.get("baseUrl"),
"quality": item.get("id"),
"codec": item.get("codecs")
})
else:
for item in audio_info:
play_urls["audio"].append({
"url": item.get("baseUrl"),
"quality": item.get("id"),
"codec": item.get("codecs")
})
print(f"获取到视频流数量: {len(play_urls['video'])}")
print(f"获取到音频流数量: {len(play_urls['audio'])}")
return play_urls
def sanitize_filename(filename):
"""
清理文件名,移除非法字符
:param filename: 原始文件名
:return: 清理后的文件名
"""
illegal_chars = r"[<>:/\\|?*]"
return re.sub(illegal_chars, "_", filename)
def download_file(url, save_path):
"""
下载文件
:param url: 文件下载链接
:param save_path: 保存路径
:return: 保存路径
"""
os.makedirs(os.path.dirname(save_path), exist_ok=True)
headers = {
"User-Agent": USER_AGENT,
"Cookie": COOKIE,
"Referer": "https://www.bilibili.com/"
}
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
with open(save_path, "wb") as file, tqdm(
desc=os.path.basename(save_path),
total=total_size,
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as bar:
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
size = file.write(chunk)
bar.update(size)
return save_path
def download_audio_video(video_url, audio_url, save_dir, video_title):
"""
下载视频和音频
:param video_url: 视频流链接
:param audio_url: 音频流链接
:param save_dir: 保存目录
:param video_title: 视频标题
:return: 视频和音频保存路径
"""
sanitized_title = sanitize_filename(video_title)
video_path = os.path.join(save_dir, f"{sanitized_title}_video.mp4")
audio_path = os.path.join(save_dir, f"{sanitized_title}_audio.mp4")
print(f"正在下载视频: {video_title}")
download_file(video_url, video_path)
print(f"正在下载音频: {video_title}")
download_file(audio_url, audio_path)
return video_path, audio_path
def merge_audio_video(video_path, audio_path, output_path):
"""
合并音视频
:param video_path: 视频文件路径
:param audio_path: 音频文件路径
:param output_path: 输出文件路径
:return: 输出文件路径
"""
import subprocess
os.makedirs(os.path.dirname(output_path), exist_ok=True)
print(f"正在合并音视频: {os.path.basename(output_path)}")
# 尝试使用FFmpeg合并音视频
cmd = [
"ffmpeg",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-c:a", "aac",
"-strict", "experimental",
"-y",
output_path
]
try:
# 尝试执行FFmpeg命令使用字节模式避免编码问题
result = subprocess.run(
cmd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
print("音视频合并完成")
except subprocess.CalledProcessError as e:
# FFmpeg失败尝试使用其他方法
print(f"FFmpeg合并失败: {e.stderr}")
print("尝试使用备用方法...")
# 备用方法使用Python的shutil复制文件
# 这里只是一个占位符,实际需要更复杂的实现
# 但至少可以让用户知道问题所在
raise Exception(
"音视频合并失败请确保已安装FFmpeg并添加到系统路径\n"
"或尝试安装MoviePy: pip install moviepy"
)
except FileNotFoundError:
raise Exception(
"FFmpeg未找到请安装FFmpeg并添加到系统路径\n"
"下载地址: https://ffmpeg.org/download.html"
)
return output_path
def cleanup_files(*file_paths):
"""
清理临时文件
:param file_paths: 要清理的文件路径
"""
for file_path in file_paths:
if os.path.exists(file_path):
try:
os.remove(file_path)
print(f"已清理临时文件: {os.path.basename(file_path)}")
except Exception as e:
print(f"清理临时文件失败: {e}")