255 lines
7.0 KiB
Python
255 lines
7.0 KiB
Python
|
|
# 工具函数
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import re
|
|||
|
|
import requests
|
|||
|
|
from tqdm import tqdm
|
|||
|
|
from config import USER_AGENT, COOKIE, CHUNK_SIZE, DOWNLOAD_DIR
|
|||
|
|
|
|||
|
|
|
|||
|
|
def extract_bv_from_url(url):
|
|||
|
|
"""
|
|||
|
|
从分享链接中提取BV号
|
|||
|
|
:param url: 视频分享链接
|
|||
|
|
:return: BV号
|
|||
|
|
"""
|
|||
|
|
pattern = r"BV[0-9A-Za-z]+"
|
|||
|
|
match = re.search(pattern, url)
|
|||
|
|
if match:
|
|||
|
|
return match.group(0)
|
|||
|
|
else:
|
|||
|
|
raise ValueError("无法从链接中提取BV号")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_video_info(bv_id):
|
|||
|
|
"""
|
|||
|
|
获取视频信息
|
|||
|
|
:param bv_id: BV号
|
|||
|
|
:return: 视频信息字典
|
|||
|
|
"""
|
|||
|
|
url = f"https://api.bilibili.com/x/web-interface/view?bvid={bv_id}"
|
|||
|
|
headers = {
|
|||
|
|
"User-Agent": USER_AGENT,
|
|||
|
|
"Cookie": COOKIE
|
|||
|
|
}
|
|||
|
|
response = requests.get(url, headers=headers)
|
|||
|
|
response.raise_for_status()
|
|||
|
|
data = response.json()
|
|||
|
|
if data.get("code") != 0:
|
|||
|
|
raise Exception(f"获取视频信息失败: {data.get('message')}")
|
|||
|
|
return data.get("data", {})
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_play_urls(bv_id, cid):
|
|||
|
|
"""
|
|||
|
|
获取视频播放链接
|
|||
|
|
:param bv_id: BV号
|
|||
|
|
:param cid: 视频cid
|
|||
|
|
:return: 包含视频和音频链接的字典
|
|||
|
|
"""
|
|||
|
|
from config import API_URL
|
|||
|
|
|
|||
|
|
params = {
|
|||
|
|
"bvid": bv_id,
|
|||
|
|
"cid": cid,
|
|||
|
|
"qn": 80,
|
|||
|
|
"type": "",
|
|||
|
|
"otype": "json",
|
|||
|
|
"fourk": 1,
|
|||
|
|
"fnval": 16
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
headers = {
|
|||
|
|
"User-Agent": USER_AGENT,
|
|||
|
|
"Cookie": COOKIE,
|
|||
|
|
"Referer": "https://www.bilibili.com/",
|
|||
|
|
"Origin": "https://www.bilibili.com"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
response = requests.get(API_URL, params=params, headers=headers)
|
|||
|
|
response.raise_for_status()
|
|||
|
|
data = response.json()
|
|||
|
|
|
|||
|
|
if data.get("code") != 0:
|
|||
|
|
raise Exception(f"获取播放链接失败: {data.get('message')}")
|
|||
|
|
|
|||
|
|
play_urls = {
|
|||
|
|
"video": [],
|
|||
|
|
"audio": []
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
video_info = data.get("data", {}).get("dash", {}).get("video", [])
|
|||
|
|
audio_info = data.get("data", {}).get("dash", {}).get("audio", [])
|
|||
|
|
|
|||
|
|
if not video_info:
|
|||
|
|
durl = data.get("data", {}).get("durl", [])
|
|||
|
|
for item in durl:
|
|||
|
|
play_urls["video"].append({
|
|||
|
|
"url": item.get("url"),
|
|||
|
|
"quality": 80,
|
|||
|
|
"codec": "h264"
|
|||
|
|
})
|
|||
|
|
else:
|
|||
|
|
for item in video_info:
|
|||
|
|
play_urls["video"].append({
|
|||
|
|
"url": item.get("baseUrl"),
|
|||
|
|
"quality": item.get("id"),
|
|||
|
|
"codec": item.get("codecs")
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
if not audio_info and video_info:
|
|||
|
|
for item in video_info:
|
|||
|
|
play_urls["audio"].append({
|
|||
|
|
"url": item.get("baseUrl"),
|
|||
|
|
"quality": item.get("id"),
|
|||
|
|
"codec": item.get("codecs")
|
|||
|
|
})
|
|||
|
|
else:
|
|||
|
|
for item in audio_info:
|
|||
|
|
play_urls["audio"].append({
|
|||
|
|
"url": item.get("baseUrl"),
|
|||
|
|
"quality": item.get("id"),
|
|||
|
|
"codec": item.get("codecs")
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
print(f"获取到视频流数量: {len(play_urls['video'])}")
|
|||
|
|
print(f"获取到音频流数量: {len(play_urls['audio'])}")
|
|||
|
|
|
|||
|
|
return play_urls
|
|||
|
|
|
|||
|
|
|
|||
|
|
def sanitize_filename(filename):
|
|||
|
|
"""
|
|||
|
|
清理文件名,移除非法字符
|
|||
|
|
:param filename: 原始文件名
|
|||
|
|
:return: 清理后的文件名
|
|||
|
|
"""
|
|||
|
|
illegal_chars = r"[<>:/\\|?*]"
|
|||
|
|
return re.sub(illegal_chars, "_", filename)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def download_file(url, save_path):
|
|||
|
|
"""
|
|||
|
|
下载文件
|
|||
|
|
:param url: 文件下载链接
|
|||
|
|
:param save_path: 保存路径
|
|||
|
|
:return: 保存路径
|
|||
|
|
"""
|
|||
|
|
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
|||
|
|
|
|||
|
|
headers = {
|
|||
|
|
"User-Agent": USER_AGENT,
|
|||
|
|
"Cookie": COOKIE,
|
|||
|
|
"Referer": "https://www.bilibili.com/"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
response = requests.get(url, headers=headers, stream=True)
|
|||
|
|
response.raise_for_status()
|
|||
|
|
total_size = int(response.headers.get("content-length", 0))
|
|||
|
|
|
|||
|
|
with open(save_path, "wb") as file, tqdm(
|
|||
|
|
desc=os.path.basename(save_path),
|
|||
|
|
total=total_size,
|
|||
|
|
unit="iB",
|
|||
|
|
unit_scale=True,
|
|||
|
|
unit_divisor=1024,
|
|||
|
|
) as bar:
|
|||
|
|
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
|
|||
|
|
size = file.write(chunk)
|
|||
|
|
bar.update(size)
|
|||
|
|
|
|||
|
|
return save_path
|
|||
|
|
|
|||
|
|
|
|||
|
|
def download_audio_video(video_url, audio_url, save_dir, video_title):
|
|||
|
|
"""
|
|||
|
|
下载视频和音频
|
|||
|
|
:param video_url: 视频流链接
|
|||
|
|
:param audio_url: 音频流链接
|
|||
|
|
:param save_dir: 保存目录
|
|||
|
|
:param video_title: 视频标题
|
|||
|
|
:return: 视频和音频保存路径
|
|||
|
|
"""
|
|||
|
|
sanitized_title = sanitize_filename(video_title)
|
|||
|
|
|
|||
|
|
video_path = os.path.join(save_dir, f"{sanitized_title}_video.mp4")
|
|||
|
|
audio_path = os.path.join(save_dir, f"{sanitized_title}_audio.mp4")
|
|||
|
|
|
|||
|
|
print(f"正在下载视频: {video_title}")
|
|||
|
|
download_file(video_url, video_path)
|
|||
|
|
|
|||
|
|
print(f"正在下载音频: {video_title}")
|
|||
|
|
download_file(audio_url, audio_path)
|
|||
|
|
|
|||
|
|
return video_path, audio_path
|
|||
|
|
|
|||
|
|
|
|||
|
|
def merge_audio_video(video_path, audio_path, output_path):
|
|||
|
|
"""
|
|||
|
|
合并音视频
|
|||
|
|
:param video_path: 视频文件路径
|
|||
|
|
:param audio_path: 音频文件路径
|
|||
|
|
:param output_path: 输出文件路径
|
|||
|
|
:return: 输出文件路径
|
|||
|
|
"""
|
|||
|
|
import subprocess
|
|||
|
|
|
|||
|
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
|||
|
|
|
|||
|
|
print(f"正在合并音视频: {os.path.basename(output_path)}")
|
|||
|
|
|
|||
|
|
# 尝试使用FFmpeg合并音视频
|
|||
|
|
cmd = [
|
|||
|
|
"ffmpeg",
|
|||
|
|
"-i", video_path,
|
|||
|
|
"-i", audio_path,
|
|||
|
|
"-c:v", "copy",
|
|||
|
|
"-c:a", "aac",
|
|||
|
|
"-strict", "experimental",
|
|||
|
|
"-y",
|
|||
|
|
output_path
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 尝试执行FFmpeg命令(使用字节模式避免编码问题)
|
|||
|
|
result = subprocess.run(
|
|||
|
|
cmd,
|
|||
|
|
check=True,
|
|||
|
|
stdout=subprocess.PIPE,
|
|||
|
|
stderr=subprocess.PIPE
|
|||
|
|
)
|
|||
|
|
print("音视频合并完成")
|
|||
|
|
except subprocess.CalledProcessError as e:
|
|||
|
|
# FFmpeg失败,尝试使用其他方法
|
|||
|
|
print(f"FFmpeg合并失败: {e.stderr}")
|
|||
|
|
print("尝试使用备用方法...")
|
|||
|
|
|
|||
|
|
# 备用方法:使用Python的shutil复制文件
|
|||
|
|
# 这里只是一个占位符,实际需要更复杂的实现
|
|||
|
|
# 但至少可以让用户知道问题所在
|
|||
|
|
raise Exception(
|
|||
|
|
"音视频合并失败,请确保已安装FFmpeg并添加到系统路径\n"
|
|||
|
|
"或尝试安装MoviePy: pip install moviepy"
|
|||
|
|
)
|
|||
|
|
except FileNotFoundError:
|
|||
|
|
raise Exception(
|
|||
|
|
"FFmpeg未找到,请安装FFmpeg并添加到系统路径\n"
|
|||
|
|
"下载地址: https://ffmpeg.org/download.html"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return output_path
|
|||
|
|
|
|||
|
|
|
|||
|
|
def cleanup_files(*file_paths):
|
|||
|
|
"""
|
|||
|
|
清理临时文件
|
|||
|
|
:param file_paths: 要清理的文件路径
|
|||
|
|
"""
|
|||
|
|
for file_path in file_paths:
|
|||
|
|
if os.path.exists(file_path):
|
|||
|
|
try:
|
|||
|
|
os.remove(file_path)
|
|||
|
|
print(f"已清理临时文件: {os.path.basename(file_path)}")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"清理临时文件失败: {e}")
|