# 工具函数 import os import re import requests from tqdm import tqdm from config import USER_AGENT, COOKIE, CHUNK_SIZE, DOWNLOAD_DIR def extract_bv_from_url(url): """ 从分享链接中提取BV号 :param url: 视频分享链接 :return: BV号 """ pattern = r"BV[0-9A-Za-z]+" match = re.search(pattern, url) if match: return match.group(0) else: raise ValueError("无法从链接中提取BV号") def get_video_info(bv_id): """ 获取视频信息 :param bv_id: BV号 :return: 视频信息字典 """ url = f"https://api.bilibili.com/x/web-interface/view?bvid={bv_id}" headers = { "User-Agent": USER_AGENT, "Cookie": COOKIE } response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() if data.get("code") != 0: raise Exception(f"获取视频信息失败: {data.get('message')}") return data.get("data", {}) def get_play_urls(bv_id, cid): """ 获取视频播放链接 :param bv_id: BV号 :param cid: 视频cid :return: 包含视频和音频链接的字典 """ from config import API_URL params = { "bvid": bv_id, "cid": cid, "qn": 80, "type": "", "otype": "json", "fourk": 1, "fnval": 16 } headers = { "User-Agent": USER_AGENT, "Cookie": COOKIE, "Referer": "https://www.bilibili.com/", "Origin": "https://www.bilibili.com" } response = requests.get(API_URL, params=params, headers=headers) response.raise_for_status() data = response.json() if data.get("code") != 0: raise Exception(f"获取播放链接失败: {data.get('message')}") play_urls = { "video": [], "audio": [] } video_info = data.get("data", {}).get("dash", {}).get("video", []) audio_info = data.get("data", {}).get("dash", {}).get("audio", []) if not video_info: durl = data.get("data", {}).get("durl", []) for item in durl: play_urls["video"].append({ "url": item.get("url"), "quality": 80, "codec": "h264" }) else: for item in video_info: play_urls["video"].append({ "url": item.get("baseUrl"), "quality": item.get("id"), "codec": item.get("codecs") }) if not audio_info and video_info: for item in video_info: play_urls["audio"].append({ "url": item.get("baseUrl"), "quality": item.get("id"), "codec": item.get("codecs") }) else: for item in audio_info: play_urls["audio"].append({ "url": item.get("baseUrl"), "quality": item.get("id"), "codec": item.get("codecs") }) print(f"获取到视频流数量: {len(play_urls['video'])}") print(f"获取到音频流数量: {len(play_urls['audio'])}") return play_urls def sanitize_filename(filename): """ 清理文件名,移除非法字符 :param filename: 原始文件名 :return: 清理后的文件名 """ illegal_chars = r"[<>:/\\|?*]" return re.sub(illegal_chars, "_", filename) def download_file(url, save_path): """ 下载文件 :param url: 文件下载链接 :param save_path: 保存路径 :return: 保存路径 """ os.makedirs(os.path.dirname(save_path), exist_ok=True) headers = { "User-Agent": USER_AGENT, "Cookie": COOKIE, "Referer": "https://www.bilibili.com/" } response = requests.get(url, headers=headers, stream=True) response.raise_for_status() total_size = int(response.headers.get("content-length", 0)) with open(save_path, "wb") as file, tqdm( desc=os.path.basename(save_path), total=total_size, unit="iB", unit_scale=True, unit_divisor=1024, ) as bar: for chunk in response.iter_content(chunk_size=CHUNK_SIZE): size = file.write(chunk) bar.update(size) return save_path def download_audio_video(video_url, audio_url, save_dir, video_title): """ 下载视频和音频 :param video_url: 视频流链接 :param audio_url: 音频流链接 :param save_dir: 保存目录 :param video_title: 视频标题 :return: 视频和音频保存路径 """ sanitized_title = sanitize_filename(video_title) video_path = os.path.join(save_dir, f"{sanitized_title}_video.mp4") audio_path = os.path.join(save_dir, f"{sanitized_title}_audio.mp4") print(f"正在下载视频: {video_title}") download_file(video_url, video_path) print(f"正在下载音频: {video_title}") download_file(audio_url, audio_path) return video_path, audio_path def merge_audio_video(video_path, audio_path, output_path): """ 合并音视频 :param video_path: 视频文件路径 :param audio_path: 音频文件路径 :param output_path: 输出文件路径 :return: 输出文件路径 """ import subprocess os.makedirs(os.path.dirname(output_path), exist_ok=True) print(f"正在合并音视频: {os.path.basename(output_path)}") # 尝试使用FFmpeg合并音视频 cmd = [ "ffmpeg", "-i", video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-strict", "experimental", "-y", output_path ] try: # 尝试执行FFmpeg命令(使用字节模式避免编码问题) result = subprocess.run( cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) print("音视频合并完成") except subprocess.CalledProcessError as e: # FFmpeg失败,尝试使用其他方法 print(f"FFmpeg合并失败: {e.stderr}") print("尝试使用备用方法...") # 备用方法:使用Python的shutil复制文件 # 这里只是一个占位符,实际需要更复杂的实现 # 但至少可以让用户知道问题所在 raise Exception( "音视频合并失败,请确保已安装FFmpeg并添加到系统路径\n" "或尝试安装MoviePy: pip install moviepy" ) except FileNotFoundError: raise Exception( "FFmpeg未找到,请安装FFmpeg并添加到系统路径\n" "下载地址: https://ffmpeg.org/download.html" ) return output_path def cleanup_files(*file_paths): """ 清理临时文件 :param file_paths: 要清理的文件路径 """ for file_path in file_paths: if os.path.exists(file_path): try: os.remove(file_path) print(f"已清理临时文件: {os.path.basename(file_path)}") except Exception as e: print(f"清理临时文件失败: {e}")