#!/usr/bin/env python3 """Build per-member dashboard JSON + manifest + build_report for a Congress. Reads votes.jsonl + roster.json for both chambers, fans out analyze.aggregate across a multiprocessing pool, writes atomic per-member JSON files, then a manifest.json (picker payload) and build_report.json (run log). """ import argparse import datetime as _dt import hashlib import json import multiprocessing as mp import os import sys import time import traceback from pathlib import Path import analyze PIPELINE_VERSION = "1.0.0" SCHEMA_VERSION = 1 _WORKER_RECORDS = None _WORKER_META = None def _init_worker(records, meta): global _WORKER_RECORDS, _WORKER_META _WORKER_RECORDS = records _WORKER_META = meta def _worker(task): mid, m = task try: chamber = (m.get("chamber") or "").lower() recs = [r for r in _WORKER_RECORDS if r.get("chamber") == chamber] metrics = analyze.aggregate(recs, mid, m.get("party"), chamber) # Territorial delegates (AS/DC/GU/MP/PR/VI) sit in the House but # cannot vote on final passage — only on Committee-of-the-Whole # amendments — so their participation is structurally low. is_delegate = ( (m.get("chamber") or "").lower() == "house" and (m.get("state") or "") in ("AS", "DC", "GU", "MP", "PR", "VI") ) payload = { "id": mid, "name": m.get("full_name") or m.get("name"), "party": m.get("party"), "state": m.get("state"), "chamber": m.get("chamber"), "served_from": m.get("served_from"), "served_to": m.get("served_to"), "served_partial": bool(m.get("served_partial", False)), "is_delegate": is_delegate, "congress_term": m.get("congress_term"), "death_year": m.get("death_year"), "current_member": m.get("current_member"), "replaces": m.get("replaces"), "replaced_by": m.get("replaced_by"), "metrics": metrics, "_meta": _WORKER_META, } if (m.get("chamber") or "").lower() == "house": payload["district"] = m.get("district") return payload except Exception as e: return {"_error": f"{type(e).__name__}: {e}\n{traceback.format_exc()}", "id": mid} def _load_jsonl(path): out = [] with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: out.append(json.loads(line)) return out def _atomic_write_json(path: Path, obj): tmp = path.with_suffix(path.suffix + ".tmp") with open(tmp, "w", encoding="utf-8") as f: json.dump(obj, f, separators=(",", ":")) os.replace(tmp, path) def _chamber_letter(chamber): return "H" if (chamber or "").lower() == "house" else "S" def main(argv=None): ap = argparse.ArgumentParser(description="Build per-member dashboard JSON.") ap.add_argument("--congress", type=int, default=119) ap.add_argument("-v", "--verbose", action="store_true") args = ap.parse_args(argv) started = _dt.datetime.now(_dt.timezone.utc) t0 = time.monotonic() data_root = Path("data") / str(args.congress) house_votes_p = data_root / "house" / "votes.jsonl" senate_votes_p = data_root / "senate" / "votes.jsonl" house_roster_p = data_root / "house" / "roster.json" senate_roster_p = data_root / "senate" / "roster.json" for p in (house_votes_p, senate_votes_p, house_roster_p, senate_roster_p): if not p.exists(): print(f"build_members: missing input {p}", file=sys.stderr) return 2 house_recs = _load_jsonl(house_votes_p) senate_recs = _load_jsonl(senate_votes_p) records = house_recs + senate_recs with open(house_roster_p, "r", encoding="utf-8") as f: house_roster = json.load(f) with open(senate_roster_p, "r", encoding="utf-8") as f: senate_roster = json.load(f) roster = {} roster.update(house_roster) roster.update(senate_roster) analyze_path = Path(analyze.__file__) classifier_hash = hashlib.sha256(analyze_path.read_bytes()).hexdigest() snapshot_date = started.date().isoformat() meta = { "schema_version": SCHEMA_VERSION, "pipeline_version": PIPELINE_VERSION, "classifier_hash": classifier_hash, "data_snapshot_date": snapshot_date, "source_xml_count": {"house": len(house_recs), "senate": len(senate_recs)}, } out_dir = data_root / "members" if out_dir.exists(): for f in out_dir.glob("*.json"): f.unlink() out_dir.mkdir(parents=True, exist_ok=True) tasks = list(roster.items()) n_procs = min(8, os.cpu_count() or 4) succeeded = 0 failures = [] members_meta = [] with mp.Pool(processes=n_procs, initializer=_init_worker, initargs=(records, meta)) as pool: for result in pool.imap_unordered(_worker, tasks, chunksize=8): if "_error" in result: failures.append({"id": result.get("id"), "error": result["_error"]}) if args.verbose: print(f"FAIL {result.get('id')}: {result['_error'].splitlines()[0]}", file=sys.stderr) continue mid = result["id"] _atomic_write_json(out_dir / f"{mid}.json", result) entry = { "id": mid, "n": result.get("name") or mid, "p": result.get("party"), "s": result.get("state"), "c": _chamber_letter(result.get("chamber")), } if entry["c"] == "H" and result.get("district") is not None: entry["d"] = result["district"] if result.get("served_partial"): entry["sp"] = True if result.get("is_delegate"): entry["dl"] = True if result.get("replaces"): entry["rs"] = result["replaces"] if result.get("replaced_by"): entry["rb"] = result["replaced_by"] term = result.get("congress_term") or {} if term.get("startYear"): entry["sy"] = term["startYear"] if term.get("endYear"): entry["ey"] = term["endYear"] if result.get("death_year"): entry["dy"] = result["death_year"] # Mark "unseated" — member appears in vote data (in manifest) but # never actually cast a Yea/Nay; not a delegate, did not die. # Likely a member-elect who resigned or declined the seat before # serving (e.g. Gaetz, 119th). mx_voting = (result.get("metrics") or {}).get("voting") or 0 if (mx_voting == 0 and not result.get("is_delegate") and not result.get("served_partial") and not result.get("death_year")): entry["un"] = True mx = result.get("metrics") or {} entry["k"] = { "total": mx.get("total", 0), "voting": mx.get("voting", 0), "yeas": mx.get("yeas", 0), "nays": mx.get("nays", 0), "voted_with_gop": mx.get("voted_with_gop", 0), "voted_with_dem": mx.get("voted_with_dem", 0), "voted_against_gop": mx.get("voted_against_gop", 0), "voted_against_dem": mx.get("voted_against_dem", 0), "lone_wolf": mx.get("lone_wolf", 0), } members_meta.append(entry) succeeded += 1 if args.verbose: print(f"OK {mid} ({entry['n']})") members_meta.sort(key=lambda e: (e["n"] or "").lower()) manifest = { "version": f"{PIPELINE_VERSION}+{snapshot_date}", "generated_at": started.isoformat().replace("+00:00", "Z"), "congress": args.congress, "members": members_meta, } _atomic_write_json(data_root / "manifest.json", manifest) finished = _dt.datetime.now(_dt.timezone.utc) duration = round(time.monotonic() - t0, 2) total = len(tasks) report = { "started_at": started.isoformat().replace("+00:00", "Z"), "finished_at": finished.isoformat().replace("+00:00", "Z"), "duration_seconds": duration, "members_total": total, "members_succeeded": succeeded, "members_failed": len(failures), "failures": failures, "warnings": [], "source": { "house_votes": len(house_recs), "senate_votes": len(senate_recs), "roster_entries": total, }, } _atomic_write_json(data_root / "build_report.json", report) print(f"build_members: {succeeded}/{total} OK in {duration}s — manifest at {data_root / 'manifest.json'}") return 0 if not failures else 1 if __name__ == "__main__": sys.exit(main())