| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- #!/usr/bin/env python3
- """Build per-member dashboard JSON + manifest + build_report for a Congress.
- Reads votes.jsonl + roster.json for both chambers, fans out analyze.aggregate
- across a multiprocessing pool, writes atomic per-member JSON files, then a
- manifest.json (picker payload) and build_report.json (run log).
- """
- import argparse
- import datetime as _dt
- import hashlib
- import json
- import multiprocessing as mp
- import os
- import sys
- import time
- import traceback
- from pathlib import Path
- import analyze
- PIPELINE_VERSION = "1.0.0"
- SCHEMA_VERSION = 1
- _WORKER_RECORDS = None
- _WORKER_META = None
- def _init_worker(records, meta):
- global _WORKER_RECORDS, _WORKER_META
- _WORKER_RECORDS = records
- _WORKER_META = meta
- def _worker(task):
- mid, m = task
- try:
- chamber = (m.get("chamber") or "").lower()
- recs = [r for r in _WORKER_RECORDS if r.get("chamber") == chamber]
- metrics = analyze.aggregate(recs, mid, m.get("party"), chamber)
- # Territorial delegates (AS/DC/GU/MP/PR/VI) sit in the House but
- # cannot vote on final passage — only on Committee-of-the-Whole
- # amendments — so their participation is structurally low.
- is_delegate = (
- (m.get("chamber") or "").lower() == "house"
- and (m.get("state") or "") in ("AS", "DC", "GU", "MP", "PR", "VI")
- )
- payload = {
- "id": mid,
- "name": m.get("full_name") or m.get("name"),
- "party": m.get("party"),
- "state": m.get("state"),
- "chamber": m.get("chamber"),
- "served_from": m.get("served_from"),
- "served_to": m.get("served_to"),
- "served_partial": bool(m.get("served_partial", False)),
- "is_delegate": is_delegate,
- "congress_term": m.get("congress_term"),
- "death_year": m.get("death_year"),
- "current_member": m.get("current_member"),
- "replaces": m.get("replaces"),
- "replaced_by": m.get("replaced_by"),
- "metrics": metrics,
- "_meta": _WORKER_META,
- }
- if (m.get("chamber") or "").lower() == "house":
- payload["district"] = m.get("district")
- return payload
- except Exception as e:
- return {"_error": f"{type(e).__name__}: {e}\n{traceback.format_exc()}", "id": mid}
- def _load_jsonl(path):
- out = []
- with open(path, "r", encoding="utf-8") as f:
- for line in f:
- line = line.strip()
- if line:
- out.append(json.loads(line))
- return out
- def _atomic_write_json(path: Path, obj):
- tmp = path.with_suffix(path.suffix + ".tmp")
- with open(tmp, "w", encoding="utf-8") as f:
- json.dump(obj, f, separators=(",", ":"))
- os.replace(tmp, path)
- def _chamber_letter(chamber):
- return "H" if (chamber or "").lower() == "house" else "S"
- def main(argv=None):
- ap = argparse.ArgumentParser(description="Build per-member dashboard JSON.")
- ap.add_argument("--congress", type=int, default=119)
- ap.add_argument("-v", "--verbose", action="store_true")
- args = ap.parse_args(argv)
- started = _dt.datetime.now(_dt.timezone.utc)
- t0 = time.monotonic()
- data_root = Path("data") / str(args.congress)
- house_votes_p = data_root / "house" / "votes.jsonl"
- senate_votes_p = data_root / "senate" / "votes.jsonl"
- house_roster_p = data_root / "house" / "roster.json"
- senate_roster_p = data_root / "senate" / "roster.json"
- for p in (house_votes_p, senate_votes_p, house_roster_p, senate_roster_p):
- if not p.exists():
- print(f"build_members: missing input {p}", file=sys.stderr)
- return 2
- house_recs = _load_jsonl(house_votes_p)
- senate_recs = _load_jsonl(senate_votes_p)
- records = house_recs + senate_recs
- with open(house_roster_p, "r", encoding="utf-8") as f:
- house_roster = json.load(f)
- with open(senate_roster_p, "r", encoding="utf-8") as f:
- senate_roster = json.load(f)
- roster = {}
- roster.update(house_roster)
- roster.update(senate_roster)
- analyze_path = Path(analyze.__file__)
- classifier_hash = hashlib.sha256(analyze_path.read_bytes()).hexdigest()
- snapshot_date = started.date().isoformat()
- meta = {
- "schema_version": SCHEMA_VERSION,
- "pipeline_version": PIPELINE_VERSION,
- "classifier_hash": classifier_hash,
- "data_snapshot_date": snapshot_date,
- "source_xml_count": {"house": len(house_recs), "senate": len(senate_recs)},
- }
- out_dir = data_root / "members"
- if out_dir.exists():
- for f in out_dir.glob("*.json"):
- f.unlink()
- out_dir.mkdir(parents=True, exist_ok=True)
- tasks = list(roster.items())
- n_procs = min(8, os.cpu_count() or 4)
- succeeded = 0
- failures = []
- members_meta = []
- with mp.Pool(processes=n_procs, initializer=_init_worker, initargs=(records, meta)) as pool:
- for result in pool.imap_unordered(_worker, tasks, chunksize=8):
- if "_error" in result:
- failures.append({"id": result.get("id"), "error": result["_error"]})
- if args.verbose:
- print(f"FAIL {result.get('id')}: {result['_error'].splitlines()[0]}", file=sys.stderr)
- continue
- mid = result["id"]
- _atomic_write_json(out_dir / f"{mid}.json", result)
- entry = {
- "id": mid,
- "n": result.get("name") or mid,
- "p": result.get("party"),
- "s": result.get("state"),
- "c": _chamber_letter(result.get("chamber")),
- }
- if entry["c"] == "H" and result.get("district") is not None:
- entry["d"] = result["district"]
- if result.get("served_partial"):
- entry["sp"] = True
- if result.get("is_delegate"):
- entry["dl"] = True
- if result.get("replaces"):
- entry["rs"] = result["replaces"]
- if result.get("replaced_by"):
- entry["rb"] = result["replaced_by"]
- term = result.get("congress_term") or {}
- if term.get("startYear"):
- entry["sy"] = term["startYear"]
- if term.get("endYear"):
- entry["ey"] = term["endYear"]
- if result.get("death_year"):
- entry["dy"] = result["death_year"]
- # Mark "unseated" — member appears in vote data (in manifest) but
- # never actually cast a Yea/Nay; not a delegate, did not die.
- # Likely a member-elect who resigned or declined the seat before
- # serving (e.g. Gaetz, 119th).
- mx_voting = (result.get("metrics") or {}).get("voting") or 0
- if (mx_voting == 0
- and not result.get("is_delegate")
- and not result.get("served_partial")
- and not result.get("death_year")):
- entry["un"] = True
- mx = result.get("metrics") or {}
- entry["k"] = {
- "total": mx.get("total", 0),
- "voting": mx.get("voting", 0),
- "yeas": mx.get("yeas", 0),
- "nays": mx.get("nays", 0),
- "voted_with_gop": mx.get("voted_with_gop", 0),
- "voted_with_dem": mx.get("voted_with_dem", 0),
- "voted_against_gop": mx.get("voted_against_gop", 0),
- "voted_against_dem": mx.get("voted_against_dem", 0),
- "lone_wolf": mx.get("lone_wolf", 0),
- }
- members_meta.append(entry)
- succeeded += 1
- if args.verbose:
- print(f"OK {mid} ({entry['n']})")
- members_meta.sort(key=lambda e: (e["n"] or "").lower())
- manifest = {
- "version": f"{PIPELINE_VERSION}+{snapshot_date}",
- "generated_at": started.isoformat().replace("+00:00", "Z"),
- "congress": args.congress,
- "members": members_meta,
- }
- _atomic_write_json(data_root / "manifest.json", manifest)
- finished = _dt.datetime.now(_dt.timezone.utc)
- duration = round(time.monotonic() - t0, 2)
- total = len(tasks)
- report = {
- "started_at": started.isoformat().replace("+00:00", "Z"),
- "finished_at": finished.isoformat().replace("+00:00", "Z"),
- "duration_seconds": duration,
- "members_total": total,
- "members_succeeded": succeeded,
- "members_failed": len(failures),
- "failures": failures,
- "warnings": [],
- "source": {
- "house_votes": len(house_recs),
- "senate_votes": len(senate_recs),
- "roster_entries": total,
- },
- }
- _atomic_write_json(data_root / "build_report.json", report)
- print(f"build_members: {succeeded}/{total} OK in {duration}s — manifest at {data_root / 'manifest.json'}")
- return 0 if not failures else 1
- if __name__ == "__main__":
- sys.exit(main())
|