From 94356c499203886f7bc5ba7853c0ce995ac1b756 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Lesimple?= Date: Sun, 19 Apr 2026 08:25:16 +0000 Subject: [PATCH] init: daily vulnerability watch automation scripts used by the cron vuln-watch workflow from the master branch. --- .gitignore | 4 + scripts/daily_vuln_watch_prompt.md | 155 ++++++++++++ scripts/vuln_watch/__init__.py | 0 scripts/vuln_watch/fetch_and_diff.py | 350 +++++++++++++++++++++++++++ scripts/vuln_watch/merge_state.py | 208 ++++++++++++++++ scripts/vuln_watch/sources.py | 59 +++++ scripts/vuln_watch/state.py | 128 ++++++++++ 7 files changed, 904 insertions(+) create mode 100644 .gitignore create mode 100644 scripts/daily_vuln_watch_prompt.md create mode 100644 scripts/vuln_watch/__init__.py create mode 100644 scripts/vuln_watch/fetch_and_diff.py create mode 100644 scripts/vuln_watch/merge_state.py create mode 100644 scripts/vuln_watch/sources.py create mode 100644 scripts/vuln_watch/state.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..aae3ad3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__/ +*.py[cod] +*.egg-info/ +.venv/ diff --git a/scripts/daily_vuln_watch_prompt.md b/scripts/daily_vuln_watch_prompt.md new file mode 100644 index 0000000..e8eda04 --- /dev/null +++ b/scripts/daily_vuln_watch_prompt.md @@ -0,0 +1,155 @@ +# Daily transient-execution vulnerability scan — classification step + +You are a scheduled agent running inside a GitHub Actions job. A preceding +workflow step has already fetched all configured sources, applied HTTP +conditional caching, deduped against prior state, and written the pre-filtered +list of new items to `new_items.json`. Your only job is to classify each item. + +## What counts as "relevant" + +spectre-meltdown-checker detects, reports, and suggests mitigations for CPU +vulnerabilities such as: Spectre v1/v2/v4, Meltdown, Foreshadow/L1TF, MDS +(ZombieLoad/RIDL/Fallout), TAA, SRBDS, iTLB Multihit, Zenbleed, Downfall (GDS), +Retbleed, Inception, SRSO, BHI, RFDS, Reptar, FP-DSS, and any similar +microarchitectural side-channel or speculative-execution issue on x86 +(Intel/AMD) or ARM CPUs. It also surfaces related hardware mitigation features +(SMAP/SMEP/UMIP/IBPB/eIBRS/STIBP…) when they gate the remediation for a tracked +CVE. + +It does **not** track generic software CVEs, GPU driver bugs, networking +stacks, filesystem bugs, userspace crypto issues, or unrelated kernel +subsystems. + +## Inputs + +- `new_items.json` — shape: + + ```json + { + "scan_date": "2026-04-18T14:24:43+00:00", + "window_cutoff": "2026-04-17T13:24:43+00:00", + "per_source": { + "phoronix": {"status": 200, "new": 2, "total_in_feed": 75}, + "oss-sec": {"status": 304, "new": 0} + }, + "items": [ + { + "source": "phoronix", + "stable_id": "CVE-2026-1234", + "title": "...", + "permalink": "https://...", + "guid": "...", + "published_at": "2026-04-18T05:00:00+00:00", + "extracted_cves": ["CVE-2026-1234"], + "vendor_ids": [], + "snippet": "first 400 chars of description, tags stripped" + } + ] + } + ``` + + `items` is already: (a) within the time window, (b) not known to prior + state under any of its alt-IDs. If `items` is empty, your only job is to + write the three stub output files with `(no new items in this window)`. + +- `./checker/` is a checkout of the **`test`** branch of this repo (the + development branch where coded-but-unreleased CVE checks live). This is + the source of truth for whether a CVE is already covered. Grep this + directory — not the working directory root, which only holds the + vuln-watch scripts and has no checker code. + +## Classification rules + +For each item in `items`, pick exactly one bucket: + +- **toimplement** — a clearly-identified new transient-execution / CPU + side-channel vulnerability in scope, **and not already covered by this + repo**. To verify the second half: grep `./checker/` for each entry of + `extracted_cves` *and* for any codename in the title (e.g., "FP-DSS", + "Inception"). If either matches existing code, demote to `tocheck`. +- **tocheck** — plausibly in-scope but ambiguous: mitigation-only feature + (LASS, IBT, APIC-virt, etc.); item seemingly already implemented but worth + confirming scope; unclear applicability (e.g. embedded-only ARM SKU); + CVE-ID pending; contradictory info across sources. +- **unrelated** — everything else. + +Tie-breakers: prefer `tocheck` over `unrelated` when uncertain. Prefer +`tocheck` over `toimplement` when the CVE ID is still "reserved" / "pending" — +false positives in `toimplement` waste human time more than false positives +in `tocheck`. + +`WebFetch` is available for resolving `tocheck` ambiguity. Budget: **3 +follow-ups per run total**. Do not use it for items you already plan to file +as `unrelated` or `toimplement`. + +## Outputs + +Compute `TODAY` = the `YYYY-MM-DD` prefix of `scan_date`. Write three files at +the repo root, overwriting if present: + +- `watch_${TODAY}_toimplement.md` +- `watch_${TODAY}_tocheck.md` +- `watch_${TODAY}_unrelated.md` + +Each file uses level-2 headers per source short-name, then one bullet per +item: the stable ID, the permalink, and 1–2 sentences of context. + +```markdown +## oss-sec +- **CVE-2026-1234** — https://www.openwall.com/lists/oss-security/2026/04/18/3 + New Intel transient-execution bug "Foo"; affects Redwood Cove cores. + Not yet covered (grepped CVE-2026-1234 and "Foo" — no matches). +``` + +If a bucket has no items, write `(no new items in this window)`. + +Append the following block to the **tocheck** file (creating it if +otherwise empty): + +```markdown +## Run summary +- scan_date: +- per-source counts (from per_source): ... +- fetch failures (status != 200/304): ... +- total classified this run: toimplement=, tocheck=, unrelated= +``` + +## `classifications.json` — required side-channel for the merge step + +Also write `classifications.json` at the repo root. It is a JSON array, one +record per item in `new_items.json.items`: + +```json +[ + { + "stable_id": "CVE-2026-1234", + "canonical_id": "CVE-2026-1234", + "bucket": "toimplement", + "extracted_cves": ["CVE-2026-1234"], + "sources": ["phoronix"], + "urls": ["https://www.phoronix.com/news/..."] + } +] +``` + +Rules: + +- One record per input item. Same `stable_id` as in `new_items.json`. +- `canonical_id`: prefer the first `extracted_cves` entry if any; otherwise + the item's `stable_id`. **Use the same `canonical_id` for multiple items + that are really the same CVE from different sources** — the merge step + will collapse them into one entry and add alias rows automatically. +- `sources` / `urls`: arrays; default to the item's own single source and + permalink if you didn't enrich further. +- If `new_items.json.items` is empty, write `[]`. + +## Guardrails + +- Do NOT modify any repo source code. Only write the four output files. +- Do NOT create commits, branches, or PRs. +- Do NOT call tools that post externally (Slack, GitHub comments, issues, …). +- Do NOT re-fetch the RSS/HTML sources — that was the prior step's job. + `WebFetch` is only for drilling into a specific advisory/article URL to + resolve a `tocheck` ambiguity (budget 3). +- If total runtime exceeds 10 minutes, finish what you have, write partial + outputs (+ a note in the tocheck run summary), and exit cleanly. diff --git a/scripts/vuln_watch/__init__.py b/scripts/vuln_watch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/vuln_watch/fetch_and_diff.py b/scripts/vuln_watch/fetch_and_diff.py new file mode 100644 index 0000000..d7ce973 --- /dev/null +++ b/scripts/vuln_watch/fetch_and_diff.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 +"""Fetch all configured sources, dedup against state/seen.json, emit new_items.json. + +Writes updated per-source HTTP cache metadata (etag, last_modified, hwm_*) back +into state/seen.json. Does NOT touch state.seen / state.aliases — that is the +merge step's job, after Claude has classified the new items. + +Usage: + SCAN_DATE=2026-04-18T14:24:43Z python -m scripts.vuln_watch.fetch_and_diff +""" +from __future__ import annotations + +import argparse +import datetime +import gzip +import json +import os +import pathlib +import re +import sys +import urllib.error +import urllib.parse +import urllib.request +from typing import Any, Iterable + +import feedparser # type: ignore[import-untyped] + +from .sources import REQUEST_TIMEOUT, SOURCES, Source, USER_AGENT +from . import state + + +CVE_RE = re.compile(r"CVE-\d{4}-\d{4,7}") +DEFAULT_WINDOW_HOURS = 25 +MAX_ITEMS_PER_FEED = 200 +SNIPPET_MAX = 400 +NEW_ITEMS_PATH = pathlib.Path("new_items.json") + + +def parse_iso(ts: str | None) -> datetime.datetime | None: + if not ts: + return None + try: + return datetime.datetime.fromisoformat(ts.replace("Z", "+00:00")) + except ValueError: + return None + + +def now_from_scan_date(scan_date: str) -> datetime.datetime: + dt = parse_iso(scan_date) + if dt is None: + dt = datetime.datetime.now(datetime.timezone.utc) + return dt + + +def conditional_get( + url: str, + etag: str | None, + last_modified: str | None, + user_agent: str = USER_AGENT, +) -> tuple[int | str, bytes | None, str | None, str | None]: + """Perform a conditional GET. + + Returns (status, body, new_etag, new_last_modified). + + status is: + - 200 with body on success + - 304 with body=None when unchanged + - an int HTTP error code on server-side errors + - a string describing a network/transport failure + """ + req = urllib.request.Request(url, headers={ + "User-Agent": user_agent, + # AMD's CDN stalls on non-gzip clients; asking for gzip speeds up + # every source and is strictly beneficial (we decompress locally). + "Accept-Encoding": "gzip", + }) + if etag: + req.add_header("If-None-Match", etag) + if last_modified: + req.add_header("If-Modified-Since", last_modified) + try: + with urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT) as resp: + body = resp.read() + if resp.headers.get("Content-Encoding", "").lower() == "gzip": + try: + body = gzip.decompress(body) + except OSError: + pass # server lied about encoding; use as-is + return ( + resp.status, + body, + resp.headers.get("ETag"), + resp.headers.get("Last-Modified"), + ) + except urllib.error.HTTPError as e: + if e.code == 304: + return (304, None, etag, last_modified) + return (e.code, None, etag, last_modified) + except (urllib.error.URLError, TimeoutError, OSError) as e: + return (f"network:{type(e).__name__}", None, etag, last_modified) + + +def extract_cves(text: str) -> list[str]: + seen: set[str] = set() + out: list[str] = [] + for m in CVE_RE.findall(text or ""): + if m not in seen: + seen.add(m) + out.append(m) + return out + + +def extract_vendor_ids(text: str, patterns: Iterable[str]) -> list[str]: + seen: set[str] = set() + out: list[str] = [] + for p in patterns: + for m in re.findall(p, text or ""): + if m not in seen: + seen.add(m) + out.append(m) + return out + + +def pick_stable_id(vendor_ids: list[str], cves: list[str], guid: str, link: str) -> str: + """Pick canonical-ish stable ID: vendor advisory → CVE → guid → permalink. + + CVE is preferred over guid/URL so that the same CVE seen via different + feeds collapses on its stable_id alone (in addition to the alias map). + """ + if vendor_ids: + return vendor_ids[0] + if cves: + return cves[0] + if guid: + return guid + return link + + +def clean_snippet(s: str) -> str: + s = re.sub(r"<[^>]+>", " ", s or "") + s = re.sub(r"\s+", " ", s) + return s.strip() + + +def _struct_time_to_iso(st: Any) -> str | None: + if not st: + return None + try: + return datetime.datetime(*st[:6], tzinfo=datetime.timezone.utc).isoformat() + except (TypeError, ValueError): + return None + + +def parse_feed_body(src: Source, body: bytes) -> list[dict[str, Any]]: + parsed = feedparser.parse(body) + items: list[dict[str, Any]] = [] + for entry in parsed.entries[:MAX_ITEMS_PER_FEED]: + link = (entry.get("link") or "").strip() + guid = (entry.get("id") or entry.get("guid") or "").strip() + title = (entry.get("title") or "").strip() + summary = entry.get("summary") or "" + published_at = ( + _struct_time_to_iso(entry.get("published_parsed")) + or _struct_time_to_iso(entry.get("updated_parsed")) + ) + blob = f"{title}\n{summary}" + cves = extract_cves(blob) + vendor_ids = extract_vendor_ids(blob, src.advisory_id_patterns) + stable_id = pick_stable_id(vendor_ids, cves, guid, link) + items.append({ + "source": src.name, + "stable_id": stable_id, + "title": title, + "permalink": link, + "guid": guid, + "published_at": published_at, + "extracted_cves": cves, + "vendor_ids": vendor_ids, + "snippet": clean_snippet(summary)[:SNIPPET_MAX], + }) + return items + + +def parse_html_body(src: Source, body: bytes) -> list[dict[str, Any]]: + """Scrape a non-RSS HTML page for advisory IDs and their nearest anchor.""" + text = body.decode("utf-8", errors="replace") + items: list[dict[str, Any]] = [] + seen_ids: set[str] = set() + permalink_base = src.display_url or src.url + for pat in src.advisory_id_patterns: + for m in re.finditer(pat, text): + advisory_id = m.group(0) + if advisory_id in seen_ids: + continue + seen_ids.add(advisory_id) + window = text[max(0, m.start() - 400): m.end() + 400] + href_match = re.search(r'href="([^"#]+)"', window) + if href_match: + permalink = urllib.parse.urljoin(permalink_base, href_match.group(1)) + else: + permalink = permalink_base + cves_in_window = extract_cves(window) + is_cve = advisory_id.startswith("CVE-") + cves = cves_in_window if not is_cve else list({advisory_id, *cves_in_window}) + vendor_ids = [] if is_cve else [advisory_id] + items.append({ + "source": src.name, + "stable_id": advisory_id, + "title": advisory_id, + "permalink": permalink, + "guid": "", + "published_at": None, # HTML pages often lack reliable dates + "extracted_cves": cves, + "vendor_ids": vendor_ids, + "snippet": clean_snippet(window)[:SNIPPET_MAX], + }) + return items + + +def parse_body(src: Source, body: bytes) -> list[dict[str, Any]]: + return parse_feed_body(src, body) if src.kind in ("rss", "atom") else parse_html_body(src, body) + + +def compute_cutoff(scan_now: datetime.datetime, last_run: str | None) -> datetime.datetime: + base = scan_now - datetime.timedelta(hours=DEFAULT_WINDOW_HOURS) + lr = parse_iso(last_run) + if lr is None: + return base + widened = scan_now - (scan_now - lr + datetime.timedelta(hours=1)) + return min(base, widened) + + +def candidate_ids(item: dict[str, Any]) -> list[str]: + """All identifiers under which this item might already be known.""" + seen: set[str] = set() + out: list[str] = [] + for cand in ( + *(item.get("extracted_cves") or []), + *(item.get("vendor_ids") or []), + item.get("stable_id"), + item.get("guid"), + item.get("permalink"), + ): + if cand and cand not in seen: + seen.add(cand) + out.append(cand) + return out + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--scan-date", default=os.environ.get("SCAN_DATE", "")) + ap.add_argument("--output", type=pathlib.Path, default=NEW_ITEMS_PATH) + args = ap.parse_args() + + scan_now = now_from_scan_date(args.scan_date) + scan_date_iso = scan_now.isoformat() + data = state.load() + cutoff = compute_cutoff(scan_now, data.get("last_run")) + + per_source: dict[str, dict[str, Any]] = {} + all_new: list[dict[str, Any]] = [] + + for src in SOURCES: + meta = dict(data["sources"].get(src.name, {})) + status, body, etag, last_modified = conditional_get( + src.url, meta.get("etag"), meta.get("last_modified"), + user_agent=src.user_agent or USER_AGENT, + ) + meta["last_fetched_at"] = scan_date_iso + meta["last_status"] = status + + if isinstance(status, str) or (isinstance(status, int) and status >= 400 and status != 304): + per_source[src.name] = {"status": status, "new": 0} + data["sources"][src.name] = meta + continue + + if status == 304 or body is None: + per_source[src.name] = {"status": 304, "new": 0} + data["sources"][src.name] = meta + continue + + # Refresh cache headers only on successful 200. + if etag: + meta["etag"] = etag + if last_modified: + meta["last_modified"] = last_modified + + items = parse_body(src, body) + total = len(items) + + in_window = [] + for it in items: + pub = parse_iso(it.get("published_at")) + if pub is None or pub >= cutoff: + in_window.append(it) + + new: list[dict[str, Any]] = [] + hwm_pub = meta.get("hwm_published_at") + hwm_id = meta.get("hwm_id") + for it in in_window: + if state.lookup(data, candidate_ids(it)) is not None: + continue + new.append(it) + pub = it.get("published_at") + if pub and (not hwm_pub or pub > hwm_pub): + hwm_pub = pub + hwm_id = it.get("stable_id") + + if new: + meta["hwm_published_at"] = hwm_pub + meta["hwm_id"] = hwm_id + + data["sources"][src.name] = meta + per_source[src.name] = {"status": status, "new": len(new), "total_in_feed": total} + all_new.extend(new) + + # Persist updated HTTP cache metadata regardless of whether Claude runs. + state.save(data) + + out = { + "scan_date": scan_date_iso, + "window_cutoff": cutoff.isoformat(), + "per_source": per_source, + "items": all_new, + } + args.output.write_text(json.dumps(out, indent=2, sort_keys=True) + "\n") + + # GitHub Actions step outputs + gh_out = os.environ.get("GITHUB_OUTPUT") + if gh_out: + with open(gh_out, "a") as f: + f.write(f"new_count={len(all_new)}\n") + failures = [ + s for s, v in per_source.items() + if not (isinstance(v["status"], int) and v["status"] in (200, 304)) + ] + f.write(f"fetch_failures_count={len(failures)}\n") + + print(f"Scan date: {scan_date_iso}") + print(f"Cutoff: {cutoff.isoformat()}") + print(f"New items: {len(all_new)}") + for s, v in per_source.items(): + print(f" {s:14s} status={str(v['status']):>16} new={v['new']}") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/vuln_watch/merge_state.py b/scripts/vuln_watch/merge_state.py new file mode 100644 index 0000000..9a173c6 --- /dev/null +++ b/scripts/vuln_watch/merge_state.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 +"""Merge Claude's classifications.json into state/seen.json. + +Inputs: + state/seen.json (already has updated .sources from fetch_and_diff) + classifications.json (written by the Claude step; list of records) + new_items.json (fallback source of per-item metadata, if Claude + omitted urls/sources in a record) + +Each classification record has shape: + { + "stable_id": "...", # required (the key used in new_items.json) + "canonical_id": "...", # optional; defaults to first extracted_cves, else stable_id + "bucket": "toimplement|tocheck|unrelated", + "extracted_cves": ["...", ...], # optional + "sources": ["...", ...], # optional + "urls": ["...", ...] # optional + } + +Behavior: + - Upsert seen[canonical_id], union sources/urls, promote bucket strength. + - For every alt_id in (stable_id, vendor_ids, extracted_cves) that differs + from canonical_id, set aliases[alt_id] = canonical_id. + - Update last_run to SCAN_DATE. + - Prune entries older than RETENTION_DAYS (180) before writing. + - Also writes the three daily watch_*.md files as stubs if Claude didn't run + (i.e. when new_items.json was empty and the classify step was skipped). +""" +from __future__ import annotations + +import argparse +import datetime +import json +import os +import pathlib +import sys +from typing import Any + +from . import state + + +RETENTION_DAYS = 180 +NEW_ITEMS_PATH = pathlib.Path("new_items.json") +CLASSIFICATIONS_PATH = pathlib.Path("classifications.json") + + +def _load_json(path: pathlib.Path, default: Any) -> Any: + if not path.exists(): + return default + return json.loads(path.read_text()) + + +def _canonical(record: dict[str, Any], fallback_meta: dict[str, Any] | None) -> str: + if record.get("canonical_id"): + return record["canonical_id"] + cves = record.get("extracted_cves") or (fallback_meta or {}).get("extracted_cves") or [] + if cves: + return cves[0] + return record["stable_id"] + + +def _alt_ids(record: dict[str, Any], fallback_meta: dict[str, Any] | None) -> list[str]: + ids: list[str] = [] + ids.append(record.get("stable_id", "")) + ids.extend(record.get("extracted_cves") or []) + if fallback_meta: + ids.extend(fallback_meta.get("extracted_cves") or []) + ids.extend(fallback_meta.get("vendor_ids") or []) + guid = fallback_meta.get("guid") + if guid: + ids.append(guid) + link = fallback_meta.get("permalink") + if link: + ids.append(link) + return [i for i in ids if i] + + +def _unique(seq: list[str]) -> list[str]: + seen: set[str] = set() + out: list[str] = [] + for x in seq: + if x and x not in seen: + seen.add(x) + out.append(x) + return out + + +def merge( + data: dict[str, Any], + classifications: list[dict[str, Any]], + new_items_by_stable_id: dict[str, dict[str, Any]], + scan_date: str, +) -> None: + for rec in classifications: + stable_id = rec.get("stable_id") + if not stable_id: + continue + meta = new_items_by_stable_id.get(stable_id, {}) + canonical = _canonical(rec, meta) + bucket = rec.get("bucket", "unrelated") + + title = (meta.get("title") or "").strip() + + existing = data["seen"].get(canonical) + if existing is None: + data["seen"][canonical] = { + "bucket": bucket, + "first_seen": scan_date, + "seen_at": scan_date, + "title": title, + "sources": _unique(list(rec.get("sources") or []) + ([meta.get("source")] if meta.get("source") else [])), + "urls": _unique(list(rec.get("urls") or []) + ([meta.get("permalink")] if meta.get("permalink") else [])), + } + else: + existing["bucket"] = state.promote_bucket(existing["bucket"], bucket) + existing["seen_at"] = scan_date + existing.setdefault("first_seen", existing.get("seen_at") or scan_date) + if not existing.get("title") and title: + existing["title"] = title + existing["sources"] = _unique(list(existing.get("sources") or []) + list(rec.get("sources") or []) + ([meta.get("source")] if meta.get("source") else [])) + existing["urls"] = _unique(list(existing.get("urls") or []) + list(rec.get("urls") or []) + ([meta.get("permalink")] if meta.get("permalink") else [])) + + # Aliases: every alt id that is not the canonical key points at it. + for alt in _alt_ids(rec, meta): + if alt != canonical: + data["aliases"][alt] = canonical + + +def ensure_stub_reports(scan_date: str) -> None: + """If the Claude step was skipped, write empty stub watch_*.md files so the + report artifact is consistent across runs.""" + day = scan_date[:10] # YYYY-MM-DD + stub = "(no new items in this window)\n" + for bucket in ("toimplement", "tocheck", "unrelated"): + p = pathlib.Path(f"watch_{day}_{bucket}.md") + if not p.exists(): + p.write_text(stub) + + +def write_snapshots(data: dict[str, Any], scan_date: str) -> None: + """Write current_toimplement.md and current_tocheck.md — full backlog + snapshots reflecting every entry in state under those buckets. A human + who reads only the latest run's artifact sees the complete picture + without having to consult prior runs.""" + for bucket in ("toimplement", "tocheck"): + entries = [ + (cid, rec) for cid, rec in data["seen"].items() + if rec.get("bucket") == bucket + ] + # Oldest first — long-lingering items stay at the top as a reminder. + entries.sort(key=lambda kv: kv[1].get("first_seen") or kv[1].get("seen_at") or "") + out = [ + f"# Current `{bucket}` backlog", + "", + f"_Snapshot as of {scan_date}. " + f"{len(entries)} item(s). Oldest first._", + "", + ] + if not entries: + out.append("(backlog is empty)") + else: + for cid, rec in entries: + title = rec.get("title") or "" + first_seen = (rec.get("first_seen") or rec.get("seen_at") or "")[:10] + sources = ", ".join(rec.get("sources") or []) or "(none)" + out.append(f"- **{cid}**" + (f" — {title}" if title else "")) + out.append(f" first seen {first_seen} · sources: {sources}") + for u in rec.get("urls") or []: + out.append(f" - {u}") + out.append("") + pathlib.Path(f"current_{bucket}.md").write_text("\n".join(out)) + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--scan-date", default=os.environ.get("SCAN_DATE", "")) + ap.add_argument("--classifications", type=pathlib.Path, default=CLASSIFICATIONS_PATH) + ap.add_argument("--new-items", type=pathlib.Path, default=NEW_ITEMS_PATH) + args = ap.parse_args() + + scan_date = args.scan_date or datetime.datetime.now(datetime.timezone.utc).isoformat() + + data = state.load() + classifications = _load_json(args.classifications, []) + new_items_doc = _load_json(args.new_items, {"items": []}) + new_items_by_stable_id = {it["stable_id"]: it for it in new_items_doc.get("items", []) if it.get("stable_id")} + + if not isinstance(classifications, list): + print(f"warning: {args.classifications} is not a list; ignoring", file=sys.stderr) + classifications = [] + + merge(data, classifications, new_items_by_stable_id, scan_date) + data["last_run"] = scan_date + + scan_now = datetime.datetime.fromisoformat(scan_date.replace("Z", "+00:00")) + before, after = state.prune(data, RETENTION_DAYS, scan_now) + state.save(data) + ensure_stub_reports(scan_date) + write_snapshots(data, scan_date) + + print(f"Merged {len(classifications)} classifications.") + print(f"Pruned seen: {before} -> {after} entries (retention={RETENTION_DAYS}d).") + print(f"Aliases: {len(data['aliases'])}.") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/vuln_watch/sources.py b/scripts/vuln_watch/sources.py new file mode 100644 index 0000000..7a4626b --- /dev/null +++ b/scripts/vuln_watch/sources.py @@ -0,0 +1,59 @@ +"""Declarative list of sources polled by the daily vuln scan.""" +from dataclasses import dataclass +from typing import Literal + +Kind = Literal["rss", "atom", "html"] + + +@dataclass(frozen=True) +class Source: + name: str + url: str + kind: Kind + # For HTML sources: regexes used to extract advisory IDs from the page. + advisory_id_patterns: tuple[str, ...] = () + # Human-facing URL to use as permalink fallback when `url` points at a + # non-browsable endpoint (e.g. a JS data file). Empty = use `url`. + display_url: str = "" + # Per-source UA override. AMD's CDN drops connections when the UA string + # contains a parenthesized URL, while Intel/ARM's WAF rejects UAs that + # don't identify themselves — so we can't use one UA everywhere. + # Empty = use the module-level USER_AGENT. + user_agent: str = "" + + +SOURCES: tuple[Source, ...] = ( + Source("phoronix", "https://www.phoronix.com/rss.php", "rss"), + Source("oss-sec", "https://seclists.org/rss/oss-sec.rss", "rss"), + Source("lwn", "https://lwn.net/headlines/newrss", "rss"), + Source("project-zero", "https://googleprojectzero.blogspot.com/feeds/posts/default", "atom"), + Source("vusec", "https://www.vusec.net/feed/", "rss"), + Source("comsec-eth", "https://comsec.ethz.ch/category/news/feed/", "rss"), + # api.msrc.microsoft.com/update-guide/rss is the real RSS endpoint; the + # msrc.microsoft.com/... URL returns the SPA shell (2.7 KB) instead. + Source("msrc", "https://api.msrc.microsoft.com/update-guide/rss", "rss"), + Source("cisa", "https://www.cisa.gov/cybersecurity-advisories/all.xml", "rss"), + Source("cert-cc", "https://www.kb.cert.org/vuls/atomfeed/", "atom"), + Source("intel-psirt", "https://www.intel.com/content/www/us/en/security-center/default.html", "html", + (r"INTEL-SA-\d+",)), + Source("amd-psirt", "https://www.amd.com/en/resources/product-security.html", "html", + (r"AMD-SB-\d+",), + user_agent="spectre-meltdown-checker/vuln-watch"), + Source("arm-spec", "https://developer.arm.com/Arm%20Security%20Center/Speculative%20Processor%20Vulnerability", "html", + (r"CVE-\d{4}-\d{4,7}",)), + # transient.fail renders its attack table from tree.js client-side; we + # pull the JS file directly (CVE regex works on its JSON-ish body). + Source("transient-fail", "https://transient.fail/tree.js", "html", + (r"CVE-\d{4}-\d{4,7}",), + display_url="https://transient.fail/"), +) + +# Identify ourselves honestly. Akamai/Cloudflare WAFs fronting intel.com, +# developer.arm.com, and cisa.gov return 403 when the UA claims "Mozilla" +# but TLS/HTTP fingerprint doesn't match a real browser — an honest bot UA +# passes those rules cleanly. +USER_AGENT = ( + "spectre-meltdown-checker/vuln-watch " + "(+https://github.com/speed47/spectre-meltdown-checker)" +) +REQUEST_TIMEOUT = 30 diff --git a/scripts/vuln_watch/state.py b/scripts/vuln_watch/state.py new file mode 100644 index 0000000..a2a56ac --- /dev/null +++ b/scripts/vuln_watch/state.py @@ -0,0 +1,128 @@ +"""Load/save/migrate/lookup helpers for state/seen.json. + +Schema v2: + { + "schema_version": 2, + "last_run": "|null", + "sources": { + "": { + "etag": "...", + "last_modified": "...", + "hwm_id": "...", + "hwm_published_at": "", + "last_fetched_at": "", + "last_status": 200|304||"" + } + }, + "seen": { + "": { + "bucket": "toimplement|tocheck|unrelated", + "seen_at": "", + "sources": ["", ...], + "urls": ["", ...] + } + }, + "aliases": { "": "" } + } +""" +from __future__ import annotations + +import datetime +import json +import pathlib +from typing import Any + + +STATE_PATH = pathlib.Path("state/seen.json") +SCHEMA_VERSION = 2 + + +def empty() -> dict[str, Any]: + return { + "schema_version": SCHEMA_VERSION, + "last_run": None, + "sources": {}, + "seen": {}, + "aliases": {}, + } + + +def load(path: pathlib.Path = STATE_PATH) -> dict[str, Any]: + if not path.exists(): + return empty() + data = json.loads(path.read_text()) + return _migrate(data) + + +def save(data: dict[str, Any], path: pathlib.Path = STATE_PATH) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n") + + +def _migrate(data: dict[str, Any]) -> dict[str, Any]: + """Bring any older schema up to SCHEMA_VERSION.""" + version = data.get("schema_version") + if version == SCHEMA_VERSION: + data.setdefault("sources", {}) + data.setdefault("aliases", {}) + data.setdefault("seen", {}) + return data + + # v1 shape: {"last_run": ..., "seen": {: {bucket, seen_at, source, cve?}}} + migrated_seen: dict[str, Any] = {} + aliases: dict[str, str] = {} + for key, entry in (data.get("seen") or {}).items(): + rec = { + "bucket": entry.get("bucket", "unrelated"), + "seen_at": entry.get("seen_at"), + "sources": [entry["source"]] if entry.get("source") else [], + "urls": [key] if isinstance(key, str) and key.startswith("http") else [], + } + migrated_seen[key] = rec + # If a v1 entry had a CVE that differs from the key, alias the CVE -> key. + cve = entry.get("cve") + if cve and cve != key: + aliases[cve] = key + + return { + "schema_version": SCHEMA_VERSION, + "last_run": data.get("last_run"), + "sources": {}, + "seen": migrated_seen, + "aliases": aliases, + } + + +def lookup(data: dict[str, Any], candidate_ids: list[str]) -> str | None: + """Return the canonical key if any candidate is already known, else None.""" + seen = data["seen"] + aliases = data["aliases"] + for cid in candidate_ids: + if not cid: + continue + if cid in seen: + return cid + canonical = aliases.get(cid) + if canonical and canonical in seen: + return canonical + return None + + +_BUCKET_STRENGTH = {"unrelated": 0, "tocheck": 1, "toimplement": 2} + + +def promote_bucket(current: str, incoming: str) -> str: + """Return whichever of two buckets represents the 'stronger' classification.""" + return incoming if _BUCKET_STRENGTH.get(incoming, 0) > _BUCKET_STRENGTH.get(current, 0) else current + + +def prune(data: dict[str, Any], days: int, now: datetime.datetime) -> tuple[int, int]: + """Drop seen entries older than `days`, and aliases pointing at dropped keys.""" + cutoff = (now - datetime.timedelta(days=days)).isoformat() + before = len(data["seen"]) + data["seen"] = { + k: v for k, v in data["seen"].items() + if (v.get("seen_at") or "9999") >= cutoff + } + data["aliases"] = {k: v for k, v in data["aliases"].items() if v in data["seen"]} + return before, len(data["seen"])