diff --git a/scripts/vuln_watch/fetch_and_diff.py b/scripts/vuln_watch/fetch_and_diff.py
index d7ce973..10aee2e 100644
--- a/scripts/vuln_watch/fetch_and_diff.py
+++ b/scripts/vuln_watch/fetch_and_diff.py
@@ -181,9 +181,106 @@ def parse_feed_body(src: Source, body: bytes) -> list[dict[str, Any]]:
return items
-def parse_html_body(src: Source, body: bytes) -> list[dict[str, Any]]:
- """Scrape a non-RSS HTML page for advisory IDs and their nearest anchor."""
- text = body.decode("utf-8", errors="replace")
+def _parse_intel_psirt(src: Source, text: str) -> list[dict[str, Any]]:
+ """Intel's security-center page uses a table of
rows:
+
+
+ | Title |
+ INTEL-SA-NNNNN |
+ March 10, 2026 | <- Last updated
+ March 10, 2026 | <- First published
+
+
+ We pick the later of the two dates as `published_at` (most recent
+ activity) so updates to older advisories also show up in the window.
+ """
+ items: list[dict[str, Any]] = []
+ seen_ids: set[str] = set()
+ permalink_base = src.display_url or src.url
+ for m in re.finditer(r']*>(.*?)
', text, re.DOTALL):
+ row = m.group(1)
+ sid = re.search(r'INTEL-SA-\d+', row)
+ if not sid:
+ continue
+ advisory_id = sid.group(0)
+ if advisory_id in seen_ids:
+ continue
+ seen_ids.add(advisory_id)
+ link_m = re.search(r'href="([^"#]+)"', row)
+ permalink = urllib.parse.urljoin(permalink_base, link_m.group(1)) if link_m else permalink_base
+ title_m = re.search(r']*>([^<]+)', row)
+ title = title_m.group(1).strip() if title_m else advisory_id
+ published_at: str | None = None
+ for ds in re.findall(r']*>\s*([A-Z][a-z]+ \d{1,2}, \d{4})\s* | ', row):
+ try:
+ dt = datetime.datetime.strptime(ds, "%B %d, %Y").replace(tzinfo=datetime.timezone.utc)
+ iso = dt.isoformat()
+ if published_at is None or iso > published_at:
+ published_at = iso
+ except ValueError:
+ continue
+ items.append({
+ "source": src.name,
+ "stable_id": advisory_id,
+ "title": title,
+ "permalink": permalink,
+ "guid": "",
+ "published_at": published_at,
+ "extracted_cves": extract_cves(row),
+ "vendor_ids": [advisory_id],
+ "snippet": clean_snippet(row)[:SNIPPET_MAX],
+ })
+ return items[:MAX_ITEMS_PER_FEED]
+
+
+def _parse_amd_psirt(src: Source, text: str) -> list[dict[str, Any]]:
+ """AMD's product-security page has a bulletin table where each row ends
+ with two `` cells (Published Date,
+ Last Updated Date). The machine-readable `data-sort` attribute is far
+ easier to parse than the human-readable text alongside it.
+ """
+ items: list[dict[str, Any]] = []
+ seen_ids: set[str] = set()
+ permalink_base = src.display_url or src.url
+ for m in re.finditer(r' | ]*>(.*?AMD-SB-\d+.*?)
', text, re.DOTALL):
+ row = m.group(1)
+ sid = re.search(r'AMD-SB-\d+', row)
+ if not sid:
+ continue
+ advisory_id = sid.group(0)
+ if advisory_id in seen_ids:
+ continue
+ seen_ids.add(advisory_id)
+ link_m = re.search(r'href="([^"#]+)"', row)
+ permalink = urllib.parse.urljoin(permalink_base, link_m.group(1)) if link_m else permalink_base
+ title_m = re.search(r']*>([^<]+)', row)
+ title = title_m.group(1).strip() if title_m else advisory_id
+ published_at: str | None = None
+ for (y, mo, d, h, mi, s) in re.findall(
+ r'data-sort="(\d{4})-(\d{2})-(\d{2})\s+(\d{2})(\d{2})(\d{2})"', row
+ ):
+ iso = f"{y}-{mo}-{d}T{h}:{mi}:{s}+00:00"
+ if published_at is None or iso > published_at:
+ published_at = iso
+ items.append({
+ "source": src.name,
+ "stable_id": advisory_id,
+ "title": title,
+ "permalink": permalink,
+ "guid": "",
+ "published_at": published_at,
+ "extracted_cves": extract_cves(row),
+ "vendor_ids": [advisory_id],
+ "snippet": clean_snippet(row)[:SNIPPET_MAX],
+ })
+ return items[:MAX_ITEMS_PER_FEED]
+
+
+def _parse_html_generic(src: Source, text: str) -> list[dict[str, Any]]:
+ """Fallback regex-only extractor for HTML sources with no known table
+ layout (arm-spec, transient-fail's tree.js). Emits `published_at=None`
+ — items pass the window filter as fail-safe, but state.seen dedup
+ prevents re-emission across runs."""
items: list[dict[str, Any]] = []
seen_ids: set[str] = set()
permalink_base = src.display_url or src.url
@@ -209,20 +306,38 @@ def parse_html_body(src: Source, body: bytes) -> list[dict[str, Any]]:
"title": advisory_id,
"permalink": permalink,
"guid": "",
- "published_at": None, # HTML pages often lack reliable dates
+ "published_at": None,
"extracted_cves": cves,
"vendor_ids": vendor_ids,
"snippet": clean_snippet(window)[:SNIPPET_MAX],
})
- return items
+ return items[:MAX_ITEMS_PER_FEED]
+
+
+_HTML_PARSERS = {
+ "intel-psirt": _parse_intel_psirt,
+ "amd-psirt": _parse_amd_psirt,
+}
+
+
+def parse_html_body(src: Source, body: bytes) -> list[dict[str, Any]]:
+ """Dispatch to a per-source HTML parser when one is registered;
+ fall back to the generic regex-over-advisory-IDs extractor."""
+ text = body.decode("utf-8", errors="replace")
+ parser = _HTML_PARSERS.get(src.name, _parse_html_generic)
+ return parser(src, text)
def parse_body(src: Source, body: bytes) -> list[dict[str, Any]]:
return parse_feed_body(src, body) if src.kind in ("rss", "atom") else parse_html_body(src, body)
-def compute_cutoff(scan_now: datetime.datetime, last_run: str | None) -> datetime.datetime:
- base = scan_now - datetime.timedelta(hours=DEFAULT_WINDOW_HOURS)
+def compute_cutoff(
+ scan_now: datetime.datetime,
+ last_run: str | None,
+ window_hours: float = DEFAULT_WINDOW_HOURS,
+) -> datetime.datetime:
+ base = scan_now - datetime.timedelta(hours=window_hours)
lr = parse_iso(last_run)
if lr is None:
return base
@@ -230,6 +345,23 @@ def compute_cutoff(scan_now: datetime.datetime, last_run: str | None) -> datetim
return min(base, widened)
+def _resolve_window_hours() -> float:
+ """Pick up WINDOW_HOURS from the environment (set by workflow_dispatch).
+ Falls back to DEFAULT_WINDOW_HOURS for cron runs or local invocations."""
+ raw = os.environ.get("WINDOW_HOURS", "").strip()
+ if not raw:
+ return float(DEFAULT_WINDOW_HOURS)
+ try:
+ v = float(raw)
+ if v <= 0:
+ raise ValueError("must be > 0")
+ return v
+ except ValueError:
+ print(f"warning: ignoring invalid WINDOW_HOURS={raw!r}, using {DEFAULT_WINDOW_HOURS}",
+ file=sys.stderr)
+ return float(DEFAULT_WINDOW_HOURS)
+
+
def candidate_ids(item: dict[str, Any]) -> list[str]:
"""All identifiers under which this item might already be known."""
seen: set[str] = set()
@@ -255,8 +387,9 @@ def main() -> int:
scan_now = now_from_scan_date(args.scan_date)
scan_date_iso = scan_now.isoformat()
+ window_hours = _resolve_window_hours()
data = state.load()
- cutoff = compute_cutoff(scan_now, data.get("last_run"))
+ cutoff = compute_cutoff(scan_now, data.get("last_run"), window_hours)
per_source: dict[str, dict[str, Any]] = {}
all_new: list[dict[str, Any]] = []
@@ -337,9 +470,10 @@ def main() -> int:
]
f.write(f"fetch_failures_count={len(failures)}\n")
- print(f"Scan date: {scan_date_iso}")
- print(f"Cutoff: {cutoff.isoformat()}")
- print(f"New items: {len(all_new)}")
+ print(f"Scan date: {scan_date_iso}")
+ print(f"Window: {window_hours:g} h")
+ print(f"Cutoff: {cutoff.isoformat()}")
+ print(f"New items: {len(all_new)}")
for s, v in per_source.items():
print(f" {s:14s} status={str(v['status']):>16} new={v['new']}")