255 lines
7.0 KiB
Python
255 lines
7.0 KiB
Python
# 工具函数
|
||
|
||
import os
|
||
import re
|
||
import requests
|
||
from tqdm import tqdm
|
||
from config import USER_AGENT, COOKIE, CHUNK_SIZE, DOWNLOAD_DIR
|
||
|
||
|
||
def extract_bv_from_url(url):
|
||
"""
|
||
从分享链接中提取BV号
|
||
:param url: 视频分享链接
|
||
:return: BV号
|
||
"""
|
||
pattern = r"BV[0-9A-Za-z]+"
|
||
match = re.search(pattern, url)
|
||
if match:
|
||
return match.group(0)
|
||
else:
|
||
raise ValueError("无法从链接中提取BV号")
|
||
|
||
|
||
def get_video_info(bv_id):
|
||
"""
|
||
获取视频信息
|
||
:param bv_id: BV号
|
||
:return: 视频信息字典
|
||
"""
|
||
url = f"https://api.bilibili.com/x/web-interface/view?bvid={bv_id}"
|
||
headers = {
|
||
"User-Agent": USER_AGENT,
|
||
"Cookie": COOKIE
|
||
}
|
||
response = requests.get(url, headers=headers)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
if data.get("code") != 0:
|
||
raise Exception(f"获取视频信息失败: {data.get('message')}")
|
||
return data.get("data", {})
|
||
|
||
|
||
def get_play_urls(bv_id, cid):
|
||
"""
|
||
获取视频播放链接
|
||
:param bv_id: BV号
|
||
:param cid: 视频cid
|
||
:return: 包含视频和音频链接的字典
|
||
"""
|
||
from config import API_URL
|
||
|
||
params = {
|
||
"bvid": bv_id,
|
||
"cid": cid,
|
||
"qn": 80,
|
||
"type": "",
|
||
"otype": "json",
|
||
"fourk": 1,
|
||
"fnval": 16
|
||
}
|
||
|
||
headers = {
|
||
"User-Agent": USER_AGENT,
|
||
"Cookie": COOKIE,
|
||
"Referer": "https://www.bilibili.com/",
|
||
"Origin": "https://www.bilibili.com"
|
||
}
|
||
|
||
response = requests.get(API_URL, params=params, headers=headers)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
|
||
if data.get("code") != 0:
|
||
raise Exception(f"获取播放链接失败: {data.get('message')}")
|
||
|
||
play_urls = {
|
||
"video": [],
|
||
"audio": []
|
||
}
|
||
|
||
video_info = data.get("data", {}).get("dash", {}).get("video", [])
|
||
audio_info = data.get("data", {}).get("dash", {}).get("audio", [])
|
||
|
||
if not video_info:
|
||
durl = data.get("data", {}).get("durl", [])
|
||
for item in durl:
|
||
play_urls["video"].append({
|
||
"url": item.get("url"),
|
||
"quality": 80,
|
||
"codec": "h264"
|
||
})
|
||
else:
|
||
for item in video_info:
|
||
play_urls["video"].append({
|
||
"url": item.get("baseUrl"),
|
||
"quality": item.get("id"),
|
||
"codec": item.get("codecs")
|
||
})
|
||
|
||
if not audio_info and video_info:
|
||
for item in video_info:
|
||
play_urls["audio"].append({
|
||
"url": item.get("baseUrl"),
|
||
"quality": item.get("id"),
|
||
"codec": item.get("codecs")
|
||
})
|
||
else:
|
||
for item in audio_info:
|
||
play_urls["audio"].append({
|
||
"url": item.get("baseUrl"),
|
||
"quality": item.get("id"),
|
||
"codec": item.get("codecs")
|
||
})
|
||
|
||
print(f"获取到视频流数量: {len(play_urls['video'])}")
|
||
print(f"获取到音频流数量: {len(play_urls['audio'])}")
|
||
|
||
return play_urls
|
||
|
||
|
||
def sanitize_filename(filename):
|
||
"""
|
||
清理文件名,移除非法字符
|
||
:param filename: 原始文件名
|
||
:return: 清理后的文件名
|
||
"""
|
||
illegal_chars = r"[<>:/\\|?*]"
|
||
return re.sub(illegal_chars, "_", filename)
|
||
|
||
|
||
def download_file(url, save_path):
|
||
"""
|
||
下载文件
|
||
:param url: 文件下载链接
|
||
:param save_path: 保存路径
|
||
:return: 保存路径
|
||
"""
|
||
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
||
|
||
headers = {
|
||
"User-Agent": USER_AGENT,
|
||
"Cookie": COOKIE,
|
||
"Referer": "https://www.bilibili.com/"
|
||
}
|
||
|
||
response = requests.get(url, headers=headers, stream=True)
|
||
response.raise_for_status()
|
||
total_size = int(response.headers.get("content-length", 0))
|
||
|
||
with open(save_path, "wb") as file, tqdm(
|
||
desc=os.path.basename(save_path),
|
||
total=total_size,
|
||
unit="iB",
|
||
unit_scale=True,
|
||
unit_divisor=1024,
|
||
) as bar:
|
||
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
|
||
size = file.write(chunk)
|
||
bar.update(size)
|
||
|
||
return save_path
|
||
|
||
|
||
def download_audio_video(video_url, audio_url, save_dir, video_title):
|
||
"""
|
||
下载视频和音频
|
||
:param video_url: 视频流链接
|
||
:param audio_url: 音频流链接
|
||
:param save_dir: 保存目录
|
||
:param video_title: 视频标题
|
||
:return: 视频和音频保存路径
|
||
"""
|
||
sanitized_title = sanitize_filename(video_title)
|
||
|
||
video_path = os.path.join(save_dir, f"{sanitized_title}_video.mp4")
|
||
audio_path = os.path.join(save_dir, f"{sanitized_title}_audio.mp4")
|
||
|
||
print(f"正在下载视频: {video_title}")
|
||
download_file(video_url, video_path)
|
||
|
||
print(f"正在下载音频: {video_title}")
|
||
download_file(audio_url, audio_path)
|
||
|
||
return video_path, audio_path
|
||
|
||
|
||
def merge_audio_video(video_path, audio_path, output_path):
|
||
"""
|
||
合并音视频
|
||
:param video_path: 视频文件路径
|
||
:param audio_path: 音频文件路径
|
||
:param output_path: 输出文件路径
|
||
:return: 输出文件路径
|
||
"""
|
||
import subprocess
|
||
|
||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||
|
||
print(f"正在合并音视频: {os.path.basename(output_path)}")
|
||
|
||
# 尝试使用FFmpeg合并音视频
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-i", video_path,
|
||
"-i", audio_path,
|
||
"-c:v", "copy",
|
||
"-c:a", "aac",
|
||
"-strict", "experimental",
|
||
"-y",
|
||
output_path
|
||
]
|
||
|
||
try:
|
||
# 尝试执行FFmpeg命令(使用字节模式避免编码问题)
|
||
result = subprocess.run(
|
||
cmd,
|
||
check=True,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE
|
||
)
|
||
print("音视频合并完成")
|
||
except subprocess.CalledProcessError as e:
|
||
# FFmpeg失败,尝试使用其他方法
|
||
print(f"FFmpeg合并失败: {e.stderr}")
|
||
print("尝试使用备用方法...")
|
||
|
||
# 备用方法:使用Python的shutil复制文件
|
||
# 这里只是一个占位符,实际需要更复杂的实现
|
||
# 但至少可以让用户知道问题所在
|
||
raise Exception(
|
||
"音视频合并失败,请确保已安装FFmpeg并添加到系统路径\n"
|
||
"或尝试安装MoviePy: pip install moviepy"
|
||
)
|
||
except FileNotFoundError:
|
||
raise Exception(
|
||
"FFmpeg未找到,请安装FFmpeg并添加到系统路径\n"
|
||
"下载地址: https://ffmpeg.org/download.html"
|
||
)
|
||
|
||
return output_path
|
||
|
||
|
||
def cleanup_files(*file_paths):
|
||
"""
|
||
清理临时文件
|
||
:param file_paths: 要清理的文件路径
|
||
"""
|
||
for file_path in file_paths:
|
||
if os.path.exists(file_path):
|
||
try:
|
||
os.remove(file_path)
|
||
print(f"已清理临时文件: {os.path.basename(file_path)}")
|
||
except Exception as e:
|
||
print(f"清理临时文件失败: {e}")
|