build_members.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. #!/usr/bin/env python3
  2. """Build per-member dashboard JSON + manifest + build_report for a Congress.
  3. Reads votes.jsonl + roster.json for both chambers, fans out analyze.aggregate
  4. across a multiprocessing pool, writes atomic per-member JSON files, then a
  5. manifest.json (picker payload) and build_report.json (run log).
  6. """
  7. import argparse
  8. import datetime as _dt
  9. import hashlib
  10. import json
  11. import multiprocessing as mp
  12. import os
  13. import sys
  14. import time
  15. import traceback
  16. from pathlib import Path
  17. import analyze
  18. PIPELINE_VERSION = "1.0.0"
  19. SCHEMA_VERSION = 1
  20. _WORKER_RECORDS = None
  21. _WORKER_META = None
  22. def _init_worker(records, meta):
  23. global _WORKER_RECORDS, _WORKER_META
  24. _WORKER_RECORDS = records
  25. _WORKER_META = meta
  26. def _worker(task):
  27. mid, m = task
  28. try:
  29. chamber = (m.get("chamber") or "").lower()
  30. recs = [r for r in _WORKER_RECORDS if r.get("chamber") == chamber]
  31. metrics = analyze.aggregate(recs, mid, m.get("party"), chamber)
  32. # Territorial delegates (AS/DC/GU/MP/PR/VI) sit in the House but
  33. # cannot vote on final passage — only on Committee-of-the-Whole
  34. # amendments — so their participation is structurally low.
  35. is_delegate = (
  36. (m.get("chamber") or "").lower() == "house"
  37. and (m.get("state") or "") in ("AS", "DC", "GU", "MP", "PR", "VI")
  38. )
  39. payload = {
  40. "id": mid,
  41. "name": m.get("full_name") or m.get("name"),
  42. "party": m.get("party"),
  43. "state": m.get("state"),
  44. "chamber": m.get("chamber"),
  45. "served_from": m.get("served_from"),
  46. "served_to": m.get("served_to"),
  47. "served_partial": bool(m.get("served_partial", False)),
  48. "is_delegate": is_delegate,
  49. "metrics": metrics,
  50. "_meta": _WORKER_META,
  51. }
  52. if (m.get("chamber") or "").lower() == "house":
  53. payload["district"] = m.get("district")
  54. return payload
  55. except Exception as e:
  56. return {"_error": f"{type(e).__name__}: {e}\n{traceback.format_exc()}", "id": mid}
  57. def _load_jsonl(path):
  58. out = []
  59. with open(path, "r", encoding="utf-8") as f:
  60. for line in f:
  61. line = line.strip()
  62. if line:
  63. out.append(json.loads(line))
  64. return out
  65. def _atomic_write_json(path: Path, obj):
  66. tmp = path.with_suffix(path.suffix + ".tmp")
  67. with open(tmp, "w", encoding="utf-8") as f:
  68. json.dump(obj, f, separators=(",", ":"))
  69. os.replace(tmp, path)
  70. def _chamber_letter(chamber):
  71. return "H" if (chamber or "").lower() == "house" else "S"
  72. def main(argv=None):
  73. ap = argparse.ArgumentParser(description="Build per-member dashboard JSON.")
  74. ap.add_argument("--congress", type=int, default=119)
  75. ap.add_argument("-v", "--verbose", action="store_true")
  76. args = ap.parse_args(argv)
  77. started = _dt.datetime.now(_dt.timezone.utc)
  78. t0 = time.monotonic()
  79. data_root = Path("data") / str(args.congress)
  80. house_votes_p = data_root / "house" / "votes.jsonl"
  81. senate_votes_p = data_root / "senate" / "votes.jsonl"
  82. house_roster_p = data_root / "house" / "roster.json"
  83. senate_roster_p = data_root / "senate" / "roster.json"
  84. for p in (house_votes_p, senate_votes_p, house_roster_p, senate_roster_p):
  85. if not p.exists():
  86. print(f"build_members: missing input {p}", file=sys.stderr)
  87. return 2
  88. house_recs = _load_jsonl(house_votes_p)
  89. senate_recs = _load_jsonl(senate_votes_p)
  90. records = house_recs + senate_recs
  91. with open(house_roster_p, "r", encoding="utf-8") as f:
  92. house_roster = json.load(f)
  93. with open(senate_roster_p, "r", encoding="utf-8") as f:
  94. senate_roster = json.load(f)
  95. roster = {}
  96. roster.update(house_roster)
  97. roster.update(senate_roster)
  98. analyze_path = Path(analyze.__file__)
  99. classifier_hash = hashlib.sha256(analyze_path.read_bytes()).hexdigest()
  100. snapshot_date = started.date().isoformat()
  101. meta = {
  102. "schema_version": SCHEMA_VERSION,
  103. "pipeline_version": PIPELINE_VERSION,
  104. "classifier_hash": classifier_hash,
  105. "data_snapshot_date": snapshot_date,
  106. "source_xml_count": {"house": len(house_recs), "senate": len(senate_recs)},
  107. }
  108. out_dir = data_root / "members"
  109. if out_dir.exists():
  110. for f in out_dir.glob("*.json"):
  111. f.unlink()
  112. out_dir.mkdir(parents=True, exist_ok=True)
  113. tasks = list(roster.items())
  114. n_procs = min(8, os.cpu_count() or 4)
  115. succeeded = 0
  116. failures = []
  117. members_meta = []
  118. with mp.Pool(processes=n_procs, initializer=_init_worker, initargs=(records, meta)) as pool:
  119. for result in pool.imap_unordered(_worker, tasks, chunksize=8):
  120. if "_error" in result:
  121. failures.append({"id": result.get("id"), "error": result["_error"]})
  122. if args.verbose:
  123. print(f"FAIL {result.get('id')}: {result['_error'].splitlines()[0]}", file=sys.stderr)
  124. continue
  125. mid = result["id"]
  126. _atomic_write_json(out_dir / f"{mid}.json", result)
  127. entry = {
  128. "id": mid,
  129. "n": result.get("name") or mid,
  130. "p": result.get("party"),
  131. "s": result.get("state"),
  132. "c": _chamber_letter(result.get("chamber")),
  133. }
  134. if entry["c"] == "H" and result.get("district") is not None:
  135. entry["d"] = result["district"]
  136. if result.get("served_partial"):
  137. entry["sp"] = True
  138. if result.get("is_delegate"):
  139. entry["dl"] = True
  140. mx = result.get("metrics") or {}
  141. entry["k"] = {
  142. "total": mx.get("total", 0),
  143. "voting": mx.get("voting", 0),
  144. "yeas": mx.get("yeas", 0),
  145. "nays": mx.get("nays", 0),
  146. "voted_with_gop": mx.get("voted_with_gop", 0),
  147. "voted_with_dem": mx.get("voted_with_dem", 0),
  148. "voted_against_gop": mx.get("voted_against_gop", 0),
  149. "voted_against_dem": mx.get("voted_against_dem", 0),
  150. "lone_wolf": mx.get("lone_wolf", 0),
  151. }
  152. members_meta.append(entry)
  153. succeeded += 1
  154. if args.verbose:
  155. print(f"OK {mid} ({entry['n']})")
  156. members_meta.sort(key=lambda e: (e["n"] or "").lower())
  157. manifest = {
  158. "version": f"{PIPELINE_VERSION}+{snapshot_date}",
  159. "generated_at": started.isoformat().replace("+00:00", "Z"),
  160. "congress": args.congress,
  161. "members": members_meta,
  162. }
  163. _atomic_write_json(data_root / "manifest.json", manifest)
  164. finished = _dt.datetime.now(_dt.timezone.utc)
  165. duration = round(time.monotonic() - t0, 2)
  166. total = len(tasks)
  167. report = {
  168. "started_at": started.isoformat().replace("+00:00", "Z"),
  169. "finished_at": finished.isoformat().replace("+00:00", "Z"),
  170. "duration_seconds": duration,
  171. "members_total": total,
  172. "members_succeeded": succeeded,
  173. "members_failed": len(failures),
  174. "failures": failures,
  175. "warnings": [],
  176. "source": {
  177. "house_votes": len(house_recs),
  178. "senate_votes": len(senate_recs),
  179. "roster_entries": total,
  180. },
  181. }
  182. _atomic_write_json(data_root / "build_report.json", report)
  183. print(f"build_members: {succeeded}/{total} OK in {duration}s — manifest at {data_root / 'manifest.json'}")
  184. return 0 if not failures else 1
  185. if __name__ == "__main__":
  186. sys.exit(main())