447 lines
14 KiB
Python
447 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Download NPR Car Talk podcast episodes by crawling NPR's listing endpoint.
|
|
|
|
NPR's HTML currently embeds the audio URL in a JSON blob inside a `data-audio`
|
|
attribute. This script extracts that JSON and downloads each episode into
|
|
`{date}_cartalk_{title}.{ext}` files, skipping files that already exist.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import concurrent.futures as futures
|
|
import dataclasses
|
|
import html
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Iterable, Iterator
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class Episode:
|
|
date: str
|
|
audio_url: str
|
|
title: str | None = None
|
|
|
|
def filename(self) -> str:
|
|
ext = guess_extension(self.audio_url)
|
|
# remove all non-alphanumeric characters and spaces
|
|
clean_title = re.sub(r"[^a-zA-Z0-9\s]", "", self.title or "")
|
|
clean_title = clean_title.strip()
|
|
clean_title = re.sub(r"\s+", "_", clean_title)
|
|
if clean_title:
|
|
clean_title = "_" + clean_title
|
|
return f"{self.date}_cartalk{clean_title}{ext}"
|
|
|
|
|
|
ARTICLE_SPLIT_RE = re.compile(r'<article class="item podcast-episode"')
|
|
DATE_RE = re.compile(r'<time datetime="([0-9]{4}-[0-9]{2}-[0-9]{2})"')
|
|
DATA_AUDIO_RE = re.compile(r"data-audio='([^']+)'")
|
|
AUDIO_URL_RE = re.compile(r'"audioUrl":"([^"]+)"')
|
|
TITLE_RE = re.compile(r'<h2 class="title"><a [^>]*>([^<]+)</a></h2>')
|
|
|
|
# fallback: occasionally pages still contain a direct href to the audio file
|
|
HREF_AUDIO_RE = re.compile(r'href="(https?://[^"]+\\.(?:mp3|m4a)[^"]*)"')
|
|
|
|
|
|
def guess_extension(url: str) -> str:
|
|
parsed = urllib.parse.urlparse(url)
|
|
path = parsed.path.lower()
|
|
if path.endswith(".m4a"):
|
|
return ".m4a"
|
|
return ".mp3"
|
|
|
|
|
|
def build_request(
|
|
url: str, user_agent: str, timeout_s: float
|
|
) -> urllib.request.Request:
|
|
request = urllib.request.Request(url)
|
|
request.add_header("User-Agent", user_agent)
|
|
request.add_header("Accept", "text/html,*/*;q=0.9")
|
|
return request
|
|
|
|
|
|
def fetch_text(url: str, user_agent: str, timeout_s: float, retries: int) -> str:
|
|
"""
|
|
Fetch a URL and return decoded text.
|
|
|
|
Uses a simple retry loop with jittered backoff for transient errors.
|
|
"""
|
|
last_error: Exception | None = None
|
|
for attempt in range(retries + 1):
|
|
if attempt:
|
|
delay = min(10.0, (2**attempt)) + random.random()
|
|
logging.getLogger("cartalk").warning(
|
|
"Retrying page fetch (%s/%s) in %.1fs: %s",
|
|
attempt,
|
|
retries,
|
|
delay,
|
|
url,
|
|
)
|
|
# small jitter keeps multiple workers from syncing retries
|
|
time.sleep(delay)
|
|
try:
|
|
started = time.time()
|
|
request = build_request(url, user_agent, timeout_s)
|
|
with urllib.request.urlopen(request, timeout=timeout_s) as response:
|
|
raw = response.read()
|
|
charset = response.headers.get_content_charset() or "utf-8"
|
|
elapsed = time.time() - started
|
|
logging.getLogger("cartalk").info(
|
|
"Fetched %s (%.1f KB) in %.2fs",
|
|
url,
|
|
len(raw) / 1024.0,
|
|
elapsed,
|
|
)
|
|
return raw.decode(charset, errors="replace")
|
|
except (urllib.error.URLError, TimeoutError) as exc:
|
|
last_error = exc
|
|
assert last_error is not None
|
|
raise last_error
|
|
|
|
|
|
def iter_episodes(html_text: str) -> Iterator[Episode]:
|
|
"""
|
|
Yield episodes found in NPR's "partial/next" HTML.
|
|
"""
|
|
# Split the page into per-episode chunks. The delimiter itself is discarded,
|
|
# so the first element is whatever came before the first <article ...>.
|
|
parts = ARTICLE_SPLIT_RE.split(html_text)
|
|
for chunk in parts[1:]:
|
|
date_match = DATE_RE.search(chunk)
|
|
if not date_match:
|
|
continue
|
|
date = date_match.group(1)
|
|
|
|
title = None
|
|
title_match = TITLE_RE.search(chunk)
|
|
if title_match:
|
|
title = html.unescape(title_match.group(1)).strip()
|
|
|
|
audio_url = extract_audio_url(chunk)
|
|
if not audio_url:
|
|
continue
|
|
|
|
yield Episode(date=date, audio_url=audio_url, title=title)
|
|
|
|
|
|
def extract_audio_url(article_html: str) -> str | None:
|
|
data_audio_match = DATA_AUDIO_RE.search(article_html)
|
|
if data_audio_match:
|
|
# data-audio is a JSON string, wrapped in single quotes, with backslash-
|
|
# escaped slashes and occasionally unicode escapes for &.
|
|
raw = html.unescape(data_audio_match.group(1))
|
|
try:
|
|
payload = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
payload = None
|
|
|
|
if isinstance(payload, dict):
|
|
url = payload.get("audioUrl")
|
|
if isinstance(url, str) and url.startswith(("http://", "https://")):
|
|
return url
|
|
|
|
# Some variants include the audioUrl key, but JSON is malformed due to
|
|
# unexpected escaping. Fall back to a direct search.
|
|
audio_url_match = AUDIO_URL_RE.search(raw)
|
|
if audio_url_match:
|
|
return unescape_audio_url(audio_url_match.group(1))
|
|
|
|
href_match = HREF_AUDIO_RE.search(article_html)
|
|
if href_match:
|
|
return html.unescape(href_match.group(1)).replace("&", "&")
|
|
|
|
return None
|
|
|
|
|
|
def unescape_audio_url(url: str) -> str:
|
|
# JSON leaves us with literal \/ and possibly \u0026; html.unescape handles
|
|
# any & style entities that slipped in.
|
|
url = html.unescape(url)
|
|
url = url.replace("\\/", "/").replace("\\u0026", "&")
|
|
return url
|
|
|
|
|
|
def cleanup_partial_downloads(output_dir: Path) -> None:
|
|
"""Remove any leftover .part files from previous incomplete runs."""
|
|
logger = logging.getLogger("cartalk")
|
|
part_files = list(output_dir.glob("*.part"))
|
|
for part_file in part_files:
|
|
logger.info("Cleaning up partial download: %s", part_file)
|
|
part_file.unlink()
|
|
|
|
|
|
def download_episode(
|
|
episode: Episode,
|
|
output_dir: Path,
|
|
user_agent: str,
|
|
timeout_s: float,
|
|
retries: int,
|
|
dry_run: bool,
|
|
) -> tuple[Episode, str]:
|
|
logger = logging.getLogger("cartalk")
|
|
filename = episode.filename()
|
|
final_path = output_dir / filename
|
|
temp_path = output_dir / f".{filename}.part"
|
|
|
|
# Clean up any existing partial download for this episode
|
|
if temp_path.exists():
|
|
logger.info("Removing partial download: %s", temp_path)
|
|
temp_path.unlink()
|
|
|
|
if final_path.exists():
|
|
logger.debug("Skip existing: %s", final_path)
|
|
return episode, "skip"
|
|
|
|
if dry_run:
|
|
return episode, "dry-run"
|
|
|
|
url = unescape_audio_url(episode.audio_url)
|
|
err_msg = ""
|
|
for attempt in range(retries + 1):
|
|
if attempt:
|
|
delay = min(10.0, (2**attempt)) + random.random()
|
|
logger.warning(
|
|
"Retrying download (%s/%s) in %.1fs: %s",
|
|
attempt,
|
|
retries,
|
|
delay,
|
|
filename,
|
|
)
|
|
time.sleep(delay)
|
|
try:
|
|
started = time.time()
|
|
request = build_request(url, user_agent, timeout_s)
|
|
with urllib.request.urlopen(request, timeout=timeout_s) as response:
|
|
status = getattr(response, "status", None)
|
|
if status is not None:
|
|
logger.info("Downloading %s (HTTP %s)", filename, status)
|
|
else:
|
|
logger.info("Downloading %s", filename)
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(temp_path, "wb") as file:
|
|
while True:
|
|
chunk = response.read(1024 * 256)
|
|
if not chunk:
|
|
break
|
|
file.write(chunk)
|
|
|
|
if temp_path.stat().st_size < 1024:
|
|
temp_path.unlink(missing_ok=True)
|
|
continue
|
|
|
|
os.replace(temp_path, final_path)
|
|
elapsed = time.time() - started
|
|
size_kb = final_path.stat().st_size / 1024.0
|
|
logger.info("Saved %s (%.1f KB) in %.2fs", final_path, size_kb, elapsed)
|
|
return episode, "ok"
|
|
except (urllib.error.URLError, TimeoutError, OSError) as exc:
|
|
temp_path.unlink(missing_ok=True)
|
|
err_msg = str(exc)
|
|
continue
|
|
|
|
logger.error("Failed after retries: %s: %s", filename, err_msg)
|
|
return episode, "fail"
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Download NPR Car Talk episodes into a local folder.",
|
|
)
|
|
parser.add_argument(
|
|
"--base-url",
|
|
default="https://www.npr.org/get/510208/render/partial/next?start=",
|
|
help="NPR listing endpoint base URL.",
|
|
)
|
|
parser.add_argument("--start", type=int, default=1, help="Start offset.")
|
|
parser.add_argument("--end", type=int, default=1300, help="End offset.")
|
|
parser.add_argument(
|
|
"--batch-size",
|
|
type=int,
|
|
default=24,
|
|
help="Offset step size (NPR uses 24 items per page).",
|
|
)
|
|
parser.add_argument(
|
|
"--parallel",
|
|
type=int,
|
|
default=5,
|
|
help="Number of parallel downloads.",
|
|
)
|
|
parser.add_argument(
|
|
"--output-dir",
|
|
default="car_talk_episodes",
|
|
help="Where audio files are saved.",
|
|
)
|
|
parser.add_argument(
|
|
"--timeout",
|
|
type=float,
|
|
default=30.0,
|
|
help="Per-request timeout in seconds.",
|
|
)
|
|
parser.add_argument(
|
|
"--retries",
|
|
type=int,
|
|
default=3,
|
|
help="Retries for page fetch and downloads.",
|
|
)
|
|
parser.add_argument(
|
|
"--user-agent",
|
|
default="Mozilla/5.0 (compatible; CarTalk-Downloader/1.0)",
|
|
help="User-Agent header for NPR requests.",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Parse and print what would be downloaded without writing files.",
|
|
)
|
|
parser.add_argument(
|
|
"-v",
|
|
"--verbose",
|
|
action="count",
|
|
default=0,
|
|
help="Increase verbosity (repeat for more detail).",
|
|
)
|
|
parser.add_argument(
|
|
"--quiet",
|
|
action="store_true",
|
|
help="Only print errors and final summary.",
|
|
)
|
|
return parser
|
|
|
|
|
|
def iter_page_offsets(start: int, end: int, batch_size: int) -> Iterable[int]:
|
|
if batch_size <= 0:
|
|
raise ValueError("batch_size must be > 0")
|
|
if end < start:
|
|
return range(0)
|
|
return range(start, end + 1, batch_size)
|
|
|
|
|
|
def configure_logging(verbose: int, quiet: bool) -> None:
|
|
logger = logging.getLogger("cartalk")
|
|
logger.propagate = False
|
|
|
|
level = logging.INFO
|
|
if quiet:
|
|
level = logging.ERROR
|
|
elif verbose >= 2:
|
|
level = logging.DEBUG
|
|
elif verbose >= 1:
|
|
level = logging.INFO
|
|
else:
|
|
level = logging.WARNING
|
|
|
|
logger.setLevel(level)
|
|
handler = logging.StreamHandler()
|
|
handler.setLevel(level)
|
|
handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
|
|
logger.handlers[:] = [handler]
|
|
|
|
|
|
def main() -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
|
|
configure_logging(args.verbose, args.quiet)
|
|
logger = logging.getLogger("cartalk")
|
|
|
|
output_dir = Path(args.output_dir)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Clean up any partial downloads from previous runs
|
|
cleanup_partial_downloads(output_dir)
|
|
|
|
logger.info(
|
|
"Config: start=%s end=%s batch=%s parallel=%s output_dir=%s",
|
|
args.start,
|
|
args.end,
|
|
args.batch_size,
|
|
args.parallel,
|
|
output_dir,
|
|
)
|
|
|
|
started = time.time()
|
|
ok = 0
|
|
skipped = 0
|
|
failed = 0
|
|
all_episodes_seen: set[str] = set()
|
|
|
|
for offset in iter_page_offsets(args.start, args.end, args.batch_size):
|
|
url = f"{args.base_url}{offset}"
|
|
logger.info("Fetching listing page (start=%s): %s", offset, url)
|
|
page = fetch_text(url, args.user_agent, args.timeout, args.retries)
|
|
page_episodes = list(iter_episodes(page))
|
|
logger.info("Found %s episodes in page (start=%s)", len(page_episodes), offset)
|
|
|
|
# Filter out episodes we've already seen (deduplication)
|
|
new_episodes = []
|
|
for episode in page_episodes:
|
|
if episode.date not in all_episodes_seen:
|
|
all_episodes_seen.add(episode.date)
|
|
new_episodes.append(episode)
|
|
else:
|
|
logger.debug("Skipping duplicate episode: %s", episode.date)
|
|
|
|
logger.info("New episodes to process: %s", len(new_episodes))
|
|
|
|
if args.dry_run:
|
|
for episode in new_episodes:
|
|
title = f" - {episode.title}" if episode.title else ""
|
|
print(f"{episode.date} {episode.audio_url}{title}")
|
|
continue
|
|
|
|
# Download episodes from this page immediately
|
|
if new_episodes:
|
|
with futures.ThreadPoolExecutor(
|
|
max_workers=max(1, int(args.parallel))
|
|
) as pool:
|
|
jobs = [
|
|
pool.submit(
|
|
download_episode,
|
|
episode,
|
|
output_dir,
|
|
args.user_agent,
|
|
args.timeout,
|
|
args.retries,
|
|
False,
|
|
)
|
|
for episode in new_episodes
|
|
]
|
|
for job in futures.as_completed(jobs):
|
|
episode, status = job.result()
|
|
if status == "ok":
|
|
ok += 1
|
|
elif status == "skip":
|
|
skipped += 1
|
|
else:
|
|
failed += 1
|
|
logger.debug(
|
|
"Result: %s date=%s url=%s",
|
|
status,
|
|
episode.date,
|
|
episode.audio_url,
|
|
)
|
|
if status != "skip" and not args.quiet:
|
|
print(f"[{status.upper()}] {episode.date} {episode.audio_url}")
|
|
|
|
elapsed = time.time() - started
|
|
print(
|
|
f"Done. ok={ok} skipped={skipped} failed={failed} "
|
|
f"output_dir={output_dir} elapsed={elapsed:.1f}s"
|
|
)
|
|
return 1 if failed else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|