extract dates from intel/amd HTML + honor WINDOW_HOURS env

This commit is contained in:
Stéphane Lesimple
2026-04-19 10:06:07 +00:00
parent 94356c4992
commit 12f545dc45

View File

@@ -181,9 +181,106 @@ def parse_feed_body(src: Source, body: bytes) -> list[dict[str, Any]]:
return items
def parse_html_body(src: Source, body: bytes) -> list[dict[str, Any]]:
"""Scrape a non-RSS HTML page for advisory IDs and their nearest anchor."""
text = body.decode("utf-8", errors="replace")
def _parse_intel_psirt(src: Source, text: str) -> list[dict[str, Any]]:
"""Intel's security-center page uses a table of <tr class="data"> rows:
<tr class="data" ...>
<td ...><a href="/.../intel-sa-NNNNN.html">Title</a></td>
<td>INTEL-SA-NNNNN</td>
<td>March 10, 2026</td> <- Last updated
<td>March 10, 2026</td> <- First published
</tr>
We pick the later of the two dates as `published_at` (most recent
activity) so updates to older advisories also show up in the window.
"""
items: list[dict[str, Any]] = []
seen_ids: set[str] = set()
permalink_base = src.display_url or src.url
for m in re.finditer(r'<tr class="data"[^>]*>(.*?)</tr>', text, re.DOTALL):
row = m.group(1)
sid = re.search(r'INTEL-SA-\d+', row)
if not sid:
continue
advisory_id = sid.group(0)
if advisory_id in seen_ids:
continue
seen_ids.add(advisory_id)
link_m = re.search(r'href="([^"#]+)"', row)
permalink = urllib.parse.urljoin(permalink_base, link_m.group(1)) if link_m else permalink_base
title_m = re.search(r'<a[^>]*>([^<]+)</a>', row)
title = title_m.group(1).strip() if title_m else advisory_id
published_at: str | None = None
for ds in re.findall(r'<td[^>]*>\s*([A-Z][a-z]+ \d{1,2}, \d{4})\s*</td>', row):
try:
dt = datetime.datetime.strptime(ds, "%B %d, %Y").replace(tzinfo=datetime.timezone.utc)
iso = dt.isoformat()
if published_at is None or iso > published_at:
published_at = iso
except ValueError:
continue
items.append({
"source": src.name,
"stable_id": advisory_id,
"title": title,
"permalink": permalink,
"guid": "",
"published_at": published_at,
"extracted_cves": extract_cves(row),
"vendor_ids": [advisory_id],
"snippet": clean_snippet(row)[:SNIPPET_MAX],
})
return items[:MAX_ITEMS_PER_FEED]
def _parse_amd_psirt(src: Source, text: str) -> list[dict[str, Any]]:
"""AMD's product-security page has a bulletin table where each row ends
with two `<td data-sort="YYYY-MM-DD HHMMSS">` cells (Published Date,
Last Updated Date). The machine-readable `data-sort` attribute is far
easier to parse than the human-readable text alongside it.
"""
items: list[dict[str, Any]] = []
seen_ids: set[str] = set()
permalink_base = src.display_url or src.url
for m in re.finditer(r'<tr[^>]*>(.*?AMD-SB-\d+.*?)</tr>', text, re.DOTALL):
row = m.group(1)
sid = re.search(r'AMD-SB-\d+', row)
if not sid:
continue
advisory_id = sid.group(0)
if advisory_id in seen_ids:
continue
seen_ids.add(advisory_id)
link_m = re.search(r'href="([^"#]+)"', row)
permalink = urllib.parse.urljoin(permalink_base, link_m.group(1)) if link_m else permalink_base
title_m = re.search(r'<a[^>]*>([^<]+)</a>', row)
title = title_m.group(1).strip() if title_m else advisory_id
published_at: str | None = None
for (y, mo, d, h, mi, s) in re.findall(
r'data-sort="(\d{4})-(\d{2})-(\d{2})\s+(\d{2})(\d{2})(\d{2})"', row
):
iso = f"{y}-{mo}-{d}T{h}:{mi}:{s}+00:00"
if published_at is None or iso > published_at:
published_at = iso
items.append({
"source": src.name,
"stable_id": advisory_id,
"title": title,
"permalink": permalink,
"guid": "",
"published_at": published_at,
"extracted_cves": extract_cves(row),
"vendor_ids": [advisory_id],
"snippet": clean_snippet(row)[:SNIPPET_MAX],
})
return items[:MAX_ITEMS_PER_FEED]
def _parse_html_generic(src: Source, text: str) -> list[dict[str, Any]]:
"""Fallback regex-only extractor for HTML sources with no known table
layout (arm-spec, transient-fail's tree.js). Emits `published_at=None`
— items pass the window filter as fail-safe, but state.seen dedup
prevents re-emission across runs."""
items: list[dict[str, Any]] = []
seen_ids: set[str] = set()
permalink_base = src.display_url or src.url
@@ -209,20 +306,38 @@ def parse_html_body(src: Source, body: bytes) -> list[dict[str, Any]]:
"title": advisory_id,
"permalink": permalink,
"guid": "",
"published_at": None, # HTML pages often lack reliable dates
"published_at": None,
"extracted_cves": cves,
"vendor_ids": vendor_ids,
"snippet": clean_snippet(window)[:SNIPPET_MAX],
})
return items
return items[:MAX_ITEMS_PER_FEED]
_HTML_PARSERS = {
"intel-psirt": _parse_intel_psirt,
"amd-psirt": _parse_amd_psirt,
}
def parse_html_body(src: Source, body: bytes) -> list[dict[str, Any]]:
"""Dispatch to a per-source HTML parser when one is registered;
fall back to the generic regex-over-advisory-IDs extractor."""
text = body.decode("utf-8", errors="replace")
parser = _HTML_PARSERS.get(src.name, _parse_html_generic)
return parser(src, text)
def parse_body(src: Source, body: bytes) -> list[dict[str, Any]]:
return parse_feed_body(src, body) if src.kind in ("rss", "atom") else parse_html_body(src, body)
def compute_cutoff(scan_now: datetime.datetime, last_run: str | None) -> datetime.datetime:
base = scan_now - datetime.timedelta(hours=DEFAULT_WINDOW_HOURS)
def compute_cutoff(
scan_now: datetime.datetime,
last_run: str | None,
window_hours: float = DEFAULT_WINDOW_HOURS,
) -> datetime.datetime:
base = scan_now - datetime.timedelta(hours=window_hours)
lr = parse_iso(last_run)
if lr is None:
return base
@@ -230,6 +345,23 @@ def compute_cutoff(scan_now: datetime.datetime, last_run: str | None) -> datetim
return min(base, widened)
def _resolve_window_hours() -> float:
"""Pick up WINDOW_HOURS from the environment (set by workflow_dispatch).
Falls back to DEFAULT_WINDOW_HOURS for cron runs or local invocations."""
raw = os.environ.get("WINDOW_HOURS", "").strip()
if not raw:
return float(DEFAULT_WINDOW_HOURS)
try:
v = float(raw)
if v <= 0:
raise ValueError("must be > 0")
return v
except ValueError:
print(f"warning: ignoring invalid WINDOW_HOURS={raw!r}, using {DEFAULT_WINDOW_HOURS}",
file=sys.stderr)
return float(DEFAULT_WINDOW_HOURS)
def candidate_ids(item: dict[str, Any]) -> list[str]:
"""All identifiers under which this item might already be known."""
seen: set[str] = set()
@@ -255,8 +387,9 @@ def main() -> int:
scan_now = now_from_scan_date(args.scan_date)
scan_date_iso = scan_now.isoformat()
window_hours = _resolve_window_hours()
data = state.load()
cutoff = compute_cutoff(scan_now, data.get("last_run"))
cutoff = compute_cutoff(scan_now, data.get("last_run"), window_hours)
per_source: dict[str, dict[str, Any]] = {}
all_new: list[dict[str, Any]] = []
@@ -337,9 +470,10 @@ def main() -> int:
]
f.write(f"fetch_failures_count={len(failures)}\n")
print(f"Scan date: {scan_date_iso}")
print(f"Cutoff: {cutoff.isoformat()}")
print(f"New items: {len(all_new)}")
print(f"Scan date: {scan_date_iso}")
print(f"Window: {window_hours:g} h")
print(f"Cutoff: {cutoff.isoformat()}")
print(f"New items: {len(all_new)}")
for s, v in per_source.items():
print(f" {s:14s} status={str(v['status']):>16} new={v['new']}")