From 12f545dc45f68181d2096757b6c31b609b420956 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Lesimple?= Date: Sun, 19 Apr 2026 10:06:07 +0000 Subject: [PATCH] extract dates from intel/amd HTML + honor WINDOW_HOURS env --- scripts/vuln_watch/fetch_and_diff.py | 156 +++++++++++++++++++++++++-- 1 file changed, 145 insertions(+), 11 deletions(-) diff --git a/scripts/vuln_watch/fetch_and_diff.py b/scripts/vuln_watch/fetch_and_diff.py index d7ce973..10aee2e 100644 --- a/scripts/vuln_watch/fetch_and_diff.py +++ b/scripts/vuln_watch/fetch_and_diff.py @@ -181,9 +181,106 @@ def parse_feed_body(src: Source, body: bytes) -> list[dict[str, Any]]: return items -def parse_html_body(src: Source, body: bytes) -> list[dict[str, Any]]: - """Scrape a non-RSS HTML page for advisory IDs and their nearest anchor.""" - text = body.decode("utf-8", errors="replace") +def _parse_intel_psirt(src: Source, text: str) -> list[dict[str, Any]]: + """Intel's security-center page uses a table of rows: + + + Title + INTEL-SA-NNNNN + March 10, 2026 <- Last updated + March 10, 2026 <- First published + + + We pick the later of the two dates as `published_at` (most recent + activity) so updates to older advisories also show up in the window. + """ + items: list[dict[str, Any]] = [] + seen_ids: set[str] = set() + permalink_base = src.display_url or src.url + for m in re.finditer(r']*>(.*?)', text, re.DOTALL): + row = m.group(1) + sid = re.search(r'INTEL-SA-\d+', row) + if not sid: + continue + advisory_id = sid.group(0) + if advisory_id in seen_ids: + continue + seen_ids.add(advisory_id) + link_m = re.search(r'href="([^"#]+)"', row) + permalink = urllib.parse.urljoin(permalink_base, link_m.group(1)) if link_m else permalink_base + title_m = re.search(r']*>([^<]+)', row) + title = title_m.group(1).strip() if title_m else advisory_id + published_at: str | None = None + for ds in re.findall(r']*>\s*([A-Z][a-z]+ \d{1,2}, \d{4})\s*', row): + try: + dt = datetime.datetime.strptime(ds, "%B %d, %Y").replace(tzinfo=datetime.timezone.utc) + iso = dt.isoformat() + if published_at is None or iso > published_at: + published_at = iso + except ValueError: + continue + items.append({ + "source": src.name, + "stable_id": advisory_id, + "title": title, + "permalink": permalink, + "guid": "", + "published_at": published_at, + "extracted_cves": extract_cves(row), + "vendor_ids": [advisory_id], + "snippet": clean_snippet(row)[:SNIPPET_MAX], + }) + return items[:MAX_ITEMS_PER_FEED] + + +def _parse_amd_psirt(src: Source, text: str) -> list[dict[str, Any]]: + """AMD's product-security page has a bulletin table where each row ends + with two `` cells (Published Date, + Last Updated Date). The machine-readable `data-sort` attribute is far + easier to parse than the human-readable text alongside it. + """ + items: list[dict[str, Any]] = [] + seen_ids: set[str] = set() + permalink_base = src.display_url or src.url + for m in re.finditer(r']*>(.*?AMD-SB-\d+.*?)', text, re.DOTALL): + row = m.group(1) + sid = re.search(r'AMD-SB-\d+', row) + if not sid: + continue + advisory_id = sid.group(0) + if advisory_id in seen_ids: + continue + seen_ids.add(advisory_id) + link_m = re.search(r'href="([^"#]+)"', row) + permalink = urllib.parse.urljoin(permalink_base, link_m.group(1)) if link_m else permalink_base + title_m = re.search(r']*>([^<]+)', row) + title = title_m.group(1).strip() if title_m else advisory_id + published_at: str | None = None + for (y, mo, d, h, mi, s) in re.findall( + r'data-sort="(\d{4})-(\d{2})-(\d{2})\s+(\d{2})(\d{2})(\d{2})"', row + ): + iso = f"{y}-{mo}-{d}T{h}:{mi}:{s}+00:00" + if published_at is None or iso > published_at: + published_at = iso + items.append({ + "source": src.name, + "stable_id": advisory_id, + "title": title, + "permalink": permalink, + "guid": "", + "published_at": published_at, + "extracted_cves": extract_cves(row), + "vendor_ids": [advisory_id], + "snippet": clean_snippet(row)[:SNIPPET_MAX], + }) + return items[:MAX_ITEMS_PER_FEED] + + +def _parse_html_generic(src: Source, text: str) -> list[dict[str, Any]]: + """Fallback regex-only extractor for HTML sources with no known table + layout (arm-spec, transient-fail's tree.js). Emits `published_at=None` + — items pass the window filter as fail-safe, but state.seen dedup + prevents re-emission across runs.""" items: list[dict[str, Any]] = [] seen_ids: set[str] = set() permalink_base = src.display_url or src.url @@ -209,20 +306,38 @@ def parse_html_body(src: Source, body: bytes) -> list[dict[str, Any]]: "title": advisory_id, "permalink": permalink, "guid": "", - "published_at": None, # HTML pages often lack reliable dates + "published_at": None, "extracted_cves": cves, "vendor_ids": vendor_ids, "snippet": clean_snippet(window)[:SNIPPET_MAX], }) - return items + return items[:MAX_ITEMS_PER_FEED] + + +_HTML_PARSERS = { + "intel-psirt": _parse_intel_psirt, + "amd-psirt": _parse_amd_psirt, +} + + +def parse_html_body(src: Source, body: bytes) -> list[dict[str, Any]]: + """Dispatch to a per-source HTML parser when one is registered; + fall back to the generic regex-over-advisory-IDs extractor.""" + text = body.decode("utf-8", errors="replace") + parser = _HTML_PARSERS.get(src.name, _parse_html_generic) + return parser(src, text) def parse_body(src: Source, body: bytes) -> list[dict[str, Any]]: return parse_feed_body(src, body) if src.kind in ("rss", "atom") else parse_html_body(src, body) -def compute_cutoff(scan_now: datetime.datetime, last_run: str | None) -> datetime.datetime: - base = scan_now - datetime.timedelta(hours=DEFAULT_WINDOW_HOURS) +def compute_cutoff( + scan_now: datetime.datetime, + last_run: str | None, + window_hours: float = DEFAULT_WINDOW_HOURS, +) -> datetime.datetime: + base = scan_now - datetime.timedelta(hours=window_hours) lr = parse_iso(last_run) if lr is None: return base @@ -230,6 +345,23 @@ def compute_cutoff(scan_now: datetime.datetime, last_run: str | None) -> datetim return min(base, widened) +def _resolve_window_hours() -> float: + """Pick up WINDOW_HOURS from the environment (set by workflow_dispatch). + Falls back to DEFAULT_WINDOW_HOURS for cron runs or local invocations.""" + raw = os.environ.get("WINDOW_HOURS", "").strip() + if not raw: + return float(DEFAULT_WINDOW_HOURS) + try: + v = float(raw) + if v <= 0: + raise ValueError("must be > 0") + return v + except ValueError: + print(f"warning: ignoring invalid WINDOW_HOURS={raw!r}, using {DEFAULT_WINDOW_HOURS}", + file=sys.stderr) + return float(DEFAULT_WINDOW_HOURS) + + def candidate_ids(item: dict[str, Any]) -> list[str]: """All identifiers under which this item might already be known.""" seen: set[str] = set() @@ -255,8 +387,9 @@ def main() -> int: scan_now = now_from_scan_date(args.scan_date) scan_date_iso = scan_now.isoformat() + window_hours = _resolve_window_hours() data = state.load() - cutoff = compute_cutoff(scan_now, data.get("last_run")) + cutoff = compute_cutoff(scan_now, data.get("last_run"), window_hours) per_source: dict[str, dict[str, Any]] = {} all_new: list[dict[str, Any]] = [] @@ -337,9 +470,10 @@ def main() -> int: ] f.write(f"fetch_failures_count={len(failures)}\n") - print(f"Scan date: {scan_date_iso}") - print(f"Cutoff: {cutoff.isoformat()}") - print(f"New items: {len(all_new)}") + print(f"Scan date: {scan_date_iso}") + print(f"Window: {window_hours:g} h") + print(f"Cutoff: {cutoff.isoformat()}") + print(f"New items: {len(all_new)}") for s, v in per_source.items(): print(f" {s:14s} status={str(v['status']):>16} new={v['new']}")