Files

255 lines
7.0 KiB
Python
Raw Permalink Normal View History

2026-03-08 01:34:54 +08:00
# 工具函数
import os
import re
import requests
from tqdm import tqdm
from config import USER_AGENT, COOKIE, CHUNK_SIZE, DOWNLOAD_DIR
def extract_bv_from_url(url):
"""
从分享链接中提取BV号
:param url: 视频分享链接
:return: BV号
"""
pattern = r"BV[0-9A-Za-z]+"
match = re.search(pattern, url)
if match:
return match.group(0)
else:
raise ValueError("无法从链接中提取BV号")
def get_video_info(bv_id):
"""
获取视频信息
:param bv_id: BV号
:return: 视频信息字典
"""
url = f"https://api.bilibili.com/x/web-interface/view?bvid={bv_id}"
headers = {
"User-Agent": USER_AGENT,
"Cookie": COOKIE
}
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
raise Exception(f"获取视频信息失败: {data.get('message')}")
return data.get("data", {})
def get_play_urls(bv_id, cid):
"""
获取视频播放链接
:param bv_id: BV号
:param cid: 视频cid
:return: 包含视频和音频链接的字典
"""
from config import API_URL
params = {
"bvid": bv_id,
"cid": cid,
"qn": 80,
"type": "",
"otype": "json",
"fourk": 1,
"fnval": 16
}
headers = {
"User-Agent": USER_AGENT,
"Cookie": COOKIE,
"Referer": "https://www.bilibili.com/",
"Origin": "https://www.bilibili.com"
}
response = requests.get(API_URL, params=params, headers=headers)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
raise Exception(f"获取播放链接失败: {data.get('message')}")
play_urls = {
"video": [],
"audio": []
}
video_info = data.get("data", {}).get("dash", {}).get("video", [])
audio_info = data.get("data", {}).get("dash", {}).get("audio", [])
if not video_info:
durl = data.get("data", {}).get("durl", [])
for item in durl:
play_urls["video"].append({
"url": item.get("url"),
"quality": 80,
"codec": "h264"
})
else:
for item in video_info:
play_urls["video"].append({
"url": item.get("baseUrl"),
"quality": item.get("id"),
"codec": item.get("codecs")
})
if not audio_info and video_info:
for item in video_info:
play_urls["audio"].append({
"url": item.get("baseUrl"),
"quality": item.get("id"),
"codec": item.get("codecs")
})
else:
for item in audio_info:
play_urls["audio"].append({
"url": item.get("baseUrl"),
"quality": item.get("id"),
"codec": item.get("codecs")
})
print(f"获取到视频流数量: {len(play_urls['video'])}")
print(f"获取到音频流数量: {len(play_urls['audio'])}")
return play_urls
def sanitize_filename(filename):
"""
清理文件名移除非法字符
:param filename: 原始文件名
:return: 清理后的文件名
"""
illegal_chars = r"[<>:/\\|?*]"
return re.sub(illegal_chars, "_", filename)
def download_file(url, save_path):
"""
下载文件
:param url: 文件下载链接
:param save_path: 保存路径
:return: 保存路径
"""
os.makedirs(os.path.dirname(save_path), exist_ok=True)
headers = {
"User-Agent": USER_AGENT,
"Cookie": COOKIE,
"Referer": "https://www.bilibili.com/"
}
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
with open(save_path, "wb") as file, tqdm(
desc=os.path.basename(save_path),
total=total_size,
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as bar:
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
size = file.write(chunk)
bar.update(size)
return save_path
def download_audio_video(video_url, audio_url, save_dir, video_title):
"""
下载视频和音频
:param video_url: 视频流链接
:param audio_url: 音频流链接
:param save_dir: 保存目录
:param video_title: 视频标题
:return: 视频和音频保存路径
"""
sanitized_title = sanitize_filename(video_title)
video_path = os.path.join(save_dir, f"{sanitized_title}_video.mp4")
audio_path = os.path.join(save_dir, f"{sanitized_title}_audio.mp4")
print(f"正在下载视频: {video_title}")
download_file(video_url, video_path)
print(f"正在下载音频: {video_title}")
download_file(audio_url, audio_path)
return video_path, audio_path
def merge_audio_video(video_path, audio_path, output_path):
"""
合并音视频
:param video_path: 视频文件路径
:param audio_path: 音频文件路径
:param output_path: 输出文件路径
:return: 输出文件路径
"""
import subprocess
os.makedirs(os.path.dirname(output_path), exist_ok=True)
print(f"正在合并音视频: {os.path.basename(output_path)}")
# 尝试使用FFmpeg合并音视频
cmd = [
"ffmpeg",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-c:a", "aac",
"-strict", "experimental",
"-y",
output_path
]
try:
# 尝试执行FFmpeg命令使用字节模式避免编码问题
result = subprocess.run(
cmd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
print("音视频合并完成")
except subprocess.CalledProcessError as e:
# FFmpeg失败尝试使用其他方法
print(f"FFmpeg合并失败: {e.stderr}")
print("尝试使用备用方法...")
# 备用方法使用Python的shutil复制文件
# 这里只是一个占位符,实际需要更复杂的实现
# 但至少可以让用户知道问题所在
raise Exception(
"音视频合并失败请确保已安装FFmpeg并添加到系统路径\n"
"或尝试安装MoviePy: pip install moviepy"
)
except FileNotFoundError:
raise Exception(
"FFmpeg未找到请安装FFmpeg并添加到系统路径\n"
"下载地址: https://ffmpeg.org/download.html"
)
return output_path
def cleanup_files(*file_paths):
"""
清理临时文件
:param file_paths: 要清理的文件路径
"""
for file_path in file_paths:
if os.path.exists(file_path):
try:
os.remove(file_path)
print(f"已清理临时文件: {os.path.basename(file_path)}")
except Exception as e:
print(f"清理临时文件失败: {e}")