""" 碧蓝航线停服维护公告爬取工具 从B站碧蓝航线官方专栏合集抓取最新维护公告,输出停服时间段。 默认项 --mid 233114659(碧蓝航线B站官方号UID) --collection-name 《碧蓝航线》维护公告 --cache 公告缓存文件,默认 maintenance/_cache.jsonl,位于脚本同级目录。 --state 合集状态文件,默认 maintenance/state.json,位于脚本同级目录。 命令 fetch 爬取:拉取专栏合集数据,写入缓存。 parse 解析:从缓存读取并解析公告,向 stdout 输出纯文本停服时间段,如:2026-06-20 14:00~18:00 解析失败输出空行,并返回退出码 2。 run 完整流程测试:拉取最新公告并解析停服维护信息,不依赖缓存。 示例 .\\toolkit\\python.exe dev_tools/bilibili_maintenance.py fetch --force .\\toolkit\\python.exe dev_tools/bilibili_maintenance.py parse .\\toolkit\\python.exe dev_tools/bilibili_maintenance.py run --verbose """ from __future__ import annotations import argparse import json import logging import re import sys from dataclasses import dataclass from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Optional import requests SCRIPT_DIR = Path(__file__).resolve().parent DEFAULT_MID = 233114659 DEFAULT_CACHE = SCRIPT_DIR / 'maintenance' / '_cache.jsonl' DEFAULT_STATE = SCRIPT_DIR / 'maintenance' / 'state.json' DEFAULT_COLLECTION_NAME = '《碧蓝航线》维护公告' ARTICLE_LISTS_API = 'https://api.bilibili.com/x/article/up/lists' ARTICLE_COLLECTION_API = 'https://api.bilibili.com/x/article/list/web/articles' CN_TZ = timezone(timedelta(hours=8)) WS = r'[\s ]*' WINDOW_RE = re.compile( r'司令部将于' + WS + r'(\d{1,2})月(\d{1,2})日' + WS + r'(\d{1,2})[::](\d{2})' + WS + r'[~~至到—\-]+' + WS + r'(\d{1,2})[::](\d{2})', ) DURATION_RE = re.compile(r'为期' + WS + r'(\d+)' + WS + r'个?小时') TITLE_TIME_RE = re.compile(r'(\d{1,2})月(\d{1,2})日' + WS + r'(\d{1,2})[::](\d{2})') logger = logging.getLogger('bilibili_maintenance') class FetchError(RuntimeError): pass @dataclass class MaintenanceInfo: window: str duration_hours: int @dataclass class CollectionMeta: collection_id: int collection_name: str articles_count: int update_time: int def configure_stdio() -> None: if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8') def session_for(mid: int) -> requests.Session: s = requests.Session() s.headers.update({ 'User-Agent': ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36' ), 'Referer': f'https://space.bilibili.com/{mid}/article', }) return s def get_json(session: requests.Session, url: str, params: dict, timeout: float) -> dict: response = session.get(url, params=params, timeout=timeout) response.raise_for_status() payload = response.json() if payload['code'] != 0: raise FetchError(payload.get('message') or payload.get('msg') or 'unknown error') return payload def find_collection(session: requests.Session, mid: int, name: str, timeout: float) -> tuple[CollectionMeta, dict]: payload = get_json(session, ARTICLE_LISTS_API, {'mid': str(mid), 'sort': 0}, timeout) for item in payload['data']['lists']: if item['name'] == name: return CollectionMeta(collection_id=int(item['id']), collection_name=item['name'], articles_count=int(item['articles_count']), update_time=int(item['update_time'])), payload raise FetchError(f'collection not found: mid={mid} name={name!r}') def normalize_article(article: dict, meta: CollectionMeta) -> dict: cvid = str(article['id']) pub_ts = int(article['publish_time']) return { 'pub_ts': pub_ts, 'pub_time': datetime.fromtimestamp(pub_ts, tz=CN_TZ).isoformat(), 'title': article['title'], 'text': article['summary'], 'url': f'https://www.bilibili.com/read/cv{cvid}', 'collection_id': meta.collection_id, 'collection_name': meta.collection_name, } def load_state(path: Path) -> dict: if not path.exists(): return {} return json.loads(path.read_text(encoding='utf-8')) def load_jsonl(path: Path) -> list[dict]: if not path.exists(): return [] return [json.loads(line) for line in path.read_text(encoding='utf-8').splitlines() if line.strip()] def persist_fetch(cache_path: Path, state_path: Path, records: list[dict], state: dict) -> None: cache_path.parent.mkdir(parents=True, exist_ok=True) cache_content = ''.join(json.dumps(record, ensure_ascii=False) + '\n' for record in records) state_content = json.dumps(state, ensure_ascii=False, indent=2) cache_tmp = cache_path.with_suffix(cache_path.suffix + '.tmp') state_tmp = state_path.with_suffix(state_path.suffix + '.tmp') cache_tmp.write_text(cache_content, encoding='utf-8') state_tmp.write_text(state_content, encoding='utf-8') cache_tmp.replace(cache_path) state_tmp.replace(state_path) def clean_text(text: str) -> str: return text.replace('[图片]', '').replace('\u3000', ' ').strip() def infer_year(published: datetime, month: int, day: int) -> int: year = published.year delta = (datetime(year, month, day, tzinfo=CN_TZ).date() - published.date()).days if delta < -60: year += 1 elif delta > 300: year -= 1 return year def _build_window_dt(published: datetime, sm: int, sd: int, sh: int, smin: int, eh: int, emin: int) -> tuple[datetime, datetime]: year = infer_year(published, sm, sd) start = datetime(year, sm, sd, sh, smin, tzinfo=CN_TZ) end = datetime(year, sm, sd, eh, emin, tzinfo=CN_TZ) return start, end def _hours_between(start: datetime, end: datetime) -> int: return round((end - start).total_seconds() / 3600) def parse_maintenance(text: str, pub_ts: int, title: str) -> Optional[MaintenanceInfo]: published = datetime.fromtimestamp(pub_ts, tz=CN_TZ) body = clean_text(text) window_match = WINDOW_RE.search(body) duration_match = DURATION_RE.search(body) if window_match: sm = int(window_match.group(1)) sd = int(window_match.group(2)) sh = int(window_match.group(3)) smin = int(window_match.group(4)) eh = int(window_match.group(5)) emin = int(window_match.group(6)) start, end = _build_window_dt(published, sm, sd, sh, smin, eh, emin) hours = int(duration_match.group(1)) if duration_match else _hours_between(start, end) return MaintenanceInfo(window=f'{start:%Y-%m-%d} {start:%H:%M}~{end:%H:%M}', duration_hours=hours) if duration_match: title_match = TITLE_TIME_RE.search(title) if title_match: sm = int(title_match.group(1)) sd = int(title_match.group(2)) sh = int(title_match.group(3)) smin = int(title_match.group(4)) hours = int(duration_match.group(1)) start, _ = _build_window_dt(published, sm, sd, sh, smin, sh, smin) end = start + timedelta(hours=hours) logger.warning('WINDOW_RE not matched; synthesized window from TITLE_TIME_RE and DURATION_RE (%02d-%02d %02d:%02d, duration %d h).', sm, sd, sh, smin, hours) return MaintenanceInfo(window=f'{start:%Y-%m-%d} {start:%H:%M}~{end:%H:%M}', duration_hours=hours) logger.warning('Failed to parse maintenance window from article text.') return None def latest_record(records: list[dict]) -> dict: return max(records, key=lambda r: r['pub_ts']) def parse_records(records: list[dict]) -> Optional[MaintenanceInfo]: if not records: return None record = latest_record(records) return parse_maintenance(record['text'], record['pub_ts'], record['title']) def fetch_articles(cache_path: Path, state_path: Path, mid: int, collection_name: str, timeout: float, force: bool, verbose: bool) -> tuple[list[dict], bool]: state = load_state(state_path) session = session_for(mid) meta, lists_payload = find_collection(session, mid, collection_name, timeout) if verbose: logger.info('%s', json.dumps(lists_payload, ensure_ascii=False, indent=2)) if not force and state.get('articles_count') == meta.articles_count and state.get('update_time') == meta.update_time: logger.info('skip: count=%s update_time=%s', meta.articles_count, meta.update_time) return load_jsonl(cache_path), False payload = get_json(session, ARTICLE_COLLECTION_API, {'id': str(meta.collection_id)}, timeout) records = [normalize_article(article, meta) for article in payload['data']['articles']] persist_fetch(cache_path, state_path, records, {'last_run_at': datetime.now(tz=CN_TZ).isoformat(timespec='seconds'), 'collection_id': meta.collection_id, 'articles_count': meta.articles_count, 'update_time': meta.update_time}) logger.info('updated: count=%s update_time=%s records=%s', meta.articles_count, meta.update_time, len(records)) return records, True def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog='bilibili_maintenance.py', description='碧蓝航线停服维护公告爬取工具') parser.add_argument('command', choices=['fetch', 'parse', 'run'], help='fetch: 拉取最新停服维护公告 | parse: 基于缓存解析停服时间 | run: 拉取并解析最新公告') parser.add_argument('--cache', type=Path, default=DEFAULT_CACHE, help=f'公告缓存文件路径(默认 {DEFAULT_CACHE})') parser.add_argument('--state', type=Path, default=DEFAULT_STATE, help=f'合集状态文件路径(默认 {DEFAULT_STATE})') parser.add_argument('--mid', type=int, default=DEFAULT_MID, help=f'B站UP主UID(默认 {DEFAULT_MID})') parser.add_argument('--collection-name', default=DEFAULT_COLLECTION_NAME, help=f'专栏合集名称,须与B站合集页标题一致(默认 {DEFAULT_COLLECTION_NAME!r})') parser.add_argument('--timeout', type=float, default=30.0, help='网络请求超时秒数(默认 30)') parser.add_argument('--force', action='store_true', help='强制重新拉取并刷新缓存数据') parser.add_argument('--verbose', action='store_true', help='启用调试日志,输出 API 原始响应') return parser def cmd_fetch(args: argparse.Namespace) -> int: try: fetch_articles(args.cache, args.state, args.mid, args.collection_name, args.timeout, args.force, args.verbose) except (FetchError, requests.RequestException) as exc: logger.error('fetch failed: %s', exc) return 1 return 0 def cmd_parse(args: argparse.Namespace) -> int: info = parse_records(load_jsonl(args.cache)) if info: print(info.window) return 0 print() return 2 def cmd_run(args: argparse.Namespace) -> int: try: records, _ = fetch_articles(args.cache, args.state, args.mid, args.collection_name, args.timeout, force=True, verbose=args.verbose) except (FetchError, requests.RequestException) as exc: logger.error('fetch failed: %s', exc) return 1 info = parse_records(records) if info: print(info.window) return 0 print() return 2 def main() -> int: configure_stdio() logging.basicConfig(level=logging.WARNING, format='%(levelname)s: %(message)s') args = build_parser().parse_args() if args.verbose: logging.getLogger().setLevel(logging.INFO) if args.command == 'fetch': return cmd_fetch(args) if args.command == 'parse': return cmd_parse(args) if args.command == 'run': return cmd_run(args) return 1 if __name__ == '__main__': raise SystemExit(main())