头图来自《mono女孩》
1331 字
7 分钟
曲线救国:用Termux+Python脚本实现qBittorrent RSS自动化追番
TIP
起因
事情是这样的:由于四月新番有好几部我都比较感兴趣,但是我平常是在动漫花园一个一个找种子下载的,这样太麻烦了;于是我搜了一下发现 qBitTorrent 有 RSS 订阅功能,但是不知道为什么,在我的网络环境下使用代理时,总是无法正常下载,各种设置都调了之后还是处于一个基本不可用的状态,所以索性就让 ChatGPT 帮我写了个脚本来使用代理抓取种子再传入 qBittorrent 进行下载。
环境搭建
省流:环境就是我手机上的 Termux
由于我正好有一部闲置的手机,所以我就给它装了个 Termux,然后找根线连到电脑上供电。这样这个手机就相当于一个非常小型的 NAS 了(毕竟它的机身存储只有256G)。
还有一个前提条件,由于我之前在这个手机上装了一个 Alist,所以我可以很方便地用电脑通过局域网访问到这个手机的存储:
正如我在前边的起因里写的那样,由于 qB 在我的网络环境下有点水土不服(电脑端的 qBittorrent 和 我在Termux 里装的 qBittorrent-nox 都是这样),所以只好用脚本去抓取 torrent 再传到 qBittorrent 里了。
脚本内容
省流:下边的两段代码都是 ChatGPT 写的,在主函数里调用的时候修改一下配置参数就可以了
下载器模块:
import requests
import os
import xml.etree.ElementTree as ET
import io
import re
# 获取番剧名、字幕组、集数等信息的模块
def extract_group(title: str) -> str:
match = re.match(r'^[\[\【]([^\]\】]+)[\]\】]', title)
return match.group(1).strip() if match else None
def extract_episode(title: str) -> str:
patterns = [
r'\s*[-\s_]+(\d{2,3})(?:\D|$)', # e.g. " - 01 " or "_12" or "-099"
r'第\s*(\d+)\s*[话集]', # e.g. "第 3 集" or "第12话"
r'\[(\d{2,3})\]', # e.g. "[01]"
]
for pattern in patterns:
match = re.search(pattern, title)
if match:
return match.group(1).zfill(2)
return None
def extract_title(title: str) -> dict:
# 1. 移除字幕组
title = re.sub(r'^[\[\【][^\]\】]+[\]\】]\s*', '', title)
# 2. 移除 ★...★
title = re.sub(r'★.*?★', '', title)
# 3. 移除末尾或标题中的 “无修版”、UNCENSORED 等附加信息
title = re.sub(r'[\(\(][^()\(\)]{0,20}无修版[^()\(\)]{0,20}[\)\)]', '', title, flags=re.IGNORECASE)
title = re.sub(r'[\(\(][^()\(\)]{0,30}uncensored[^()\(\)]{0,30}[\)\)]', '', title, flags=re.IGNORECASE)
# 4. 匹配 [多语言标题]
multi_title_match = re.search(r'[\[\【]([^\[\]【】]+?/[^\[\]【】]+)[\]\】]', title)
if multi_title_match:
raw_titles = multi_title_match.group(1)
parts = [part.strip() for part in re.split(r'[//]', raw_titles)]
else:
clean_title = re.split(r'\[\d{2,3}\]|\[\s*(1080p|720p|HEVC|x264)', title)[0]
clean_title = re.split(r'[-\s_]+(\d{2,3})', clean_title)[0]
parts = [part.strip() for part in re.split(r'[//]', clean_title) if part.strip()]
# 最后移除左右中/英文括号
cleaned_parts = [re.sub(r'^[\[\【\((]*|[\]\】\))]*$', '', p) for p in parts]
return {
"main": cleaned_parts[0] if cleaned_parts else None,
"aliases": cleaned_parts
}
def parse_rss_title(title: str) -> dict:
title_info = extract_title(title)
return {
'anime': title_info["main"],
'aliases': title_info["aliases"],
'episode': extract_episode(title),
'group': extract_group(title),
'original_title': title
}
# 下载器模块
def download_from_rss(rss_url, proxy, qb_url, qb_user, qb_pass, base_download_path, autoTMM=False):
"""
从 RSS 订阅中获取种子信息并添加到 qBittorrent 中
:param rss_url: RSS URL
:param proxy: 代理设置
:param qb_url: qBittorrent Web UI 地址
:param qb_user: qBittorrent 用户名
:param qb_pass: qBittorrent 密码
:param base_download_path: 下载文件夹路径
:param autoTMM: 是否启用自动分类(默认为 False)
"""
proxies = {
'http': proxy,
'https': proxy
} if proxy else None
# 创建会话
session = requests.Session()
login = session.post(f'{qb_url}/api/v2/auth/login', data={
'username': qb_user,
'password': qb_pass
})
if login.text != 'Ok.':
print('❌ 登录失败')
return
try:
# 使用请求抓取 RSS
resp = session.get(rss_url, proxies=proxies)
tree = ET.parse(io.BytesIO(resp.content))
root = tree.getroot()
# 处理命名空间
ns = {'mikan': 'https://mikanani.kas.pub/0.1/'}
# 遍历 RSS 条目
for entry in root.findall('./channel/item'):
title = entry.findtext('title')
enclosure = entry.find('enclosure')
torrent_url = enclosure.attrib['url']
parsed_title = parse_rss_title(title)
# 直接拼接 folder_name,例如 "番剧名 [字幕组]"
anime = parsed_title.get("anime")
group = parsed_title.get("group")
folder_name = f"{anime} [{group}]" if anime and group else None
'''
# 根据标题生成文件夹名
title_parts = title.split('[')
anime_name = title_parts[0].strip()
group = title_parts[1].replace(']', '').strip() if len(title_parts) > 1 else 'UnknownGroup'
folder_name = f"{anime_name} [{group}]"
'''
print(folder_name)
download_path = os.path.join(base_download_path, folder_name)
# 如果文件夹不存在则创建
if not os.path.exists(download_path):
os.makedirs(download_path)
# 获取种子文件并上传到 qBittorrent
torrent_resp = session.get(torrent_url, proxies=proxies, allow_redirects=True)
if torrent_resp.status_code != 200 or b'<!DOCTYPE html' in torrent_resp.content[:100]:
print(f"⚠️ 无法下载种子: {torrent_url}")
continue
torrent_data = torrent_resp.content
files = {
'torrents': ('file.torrent', torrent_data, 'application/x-bittorrent')
}
data = {
'savepath': download_path,
'autoTMM': str(autoTMM).lower() # "false"/"true"
}
add_resp = session.post(f'{qb_url}/api/v2/torrents/add', files=files, data=data)
if add_resp.status_code == 200:
print(f"✅ 成功添加任务: {folder_name}")
else:
print(f"❌ 上传失败: {add_resp.text}")
except Exception as e:
print(f"❌ 错误: {e}")
主函数:
import time
from apscheduler.schedulers.background import BackgroundScheduler
import pytz
from qb_downloader import download_from_rss
from datetime import datetime
# 配置参数
rss_url = 'https://mikanani.kas.pub/RSS/MyBangumi?token=yourtoken'
proxy = 'socks5h://127.0.0.1:10808' # 如果没有代理,可设为 None
qb_url = 'http://127.0.0.1:8080' # qBittorrent Web UI 地址
qb_user = 'admin' # qBittorrent 用户名
qb_pass = 'password' # qBittorrent 密码
base_download_path = '/data/data/com.termux/files/home/Downloads/' # 你的下载目录
# 创建调度器
scheduler = BackgroundScheduler(timezone=pytz.timezone('Asia/Shanghai'))
# 定义RSS更新函数
def update_rss_data():
try:
# 调用下载函数来从 RSS 更新下载内容
download_from_rss(rss_url, proxy, qb_url, qb_user, qb_pass, base_download_path)
print(f"RSS更新成功: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
except Exception as e:
print(f"RSS更新失败: {e}")
# 设置半小时更新一次
scheduler.add_job(func=update_rss_data, trigger='interval', minutes=30)
# 启动调度器
scheduler.start()
# 主逻辑
if __name__ == "__main__":
try:
# 调用一次立即下载任务,首次启动时同步更新
update_rss_data()
# 保持程序运行,定时任务会在后台执行
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
# 退出时停止调度器
scheduler.shutdown()
效果
可以自动解析RSS订阅,每半个小时自动更新并且自动下载种子到qBittorrent中,并自动分类进行下载,自动分的文件夹名的格式是“番剧名 [字幕组]”。
曲线救国:用Termux+Python脚本实现qBittorrent RSS自动化追番
https://rimrose.work/posts/rss_bangumi_download/