build_members.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. #!/usr/bin/env python3
  2. """Build per-member dashboard JSON + manifest + build_report for a Congress.
  3. Reads votes.jsonl + roster.json for both chambers, fans out analyze.aggregate
  4. across a multiprocessing pool, writes atomic per-member JSON files, then a
  5. manifest.json (picker payload) and build_report.json (run log).
  6. """
  7. import argparse
  8. import datetime as _dt
  9. import hashlib
  10. import json
  11. import multiprocessing as mp
  12. import os
  13. import sys
  14. import time
  15. import traceback
  16. from pathlib import Path
  17. import analyze
  18. PIPELINE_VERSION = "1.0.0"
  19. SCHEMA_VERSION = 1
  20. _WORKER_RECORDS = None
  21. _WORKER_META = None
  22. def _init_worker(records, meta):
  23. global _WORKER_RECORDS, _WORKER_META
  24. _WORKER_RECORDS = records
  25. _WORKER_META = meta
  26. def _worker(task):
  27. mid, m = task
  28. try:
  29. chamber = (m.get("chamber") or "").lower()
  30. recs = [r for r in _WORKER_RECORDS if r.get("chamber") == chamber]
  31. metrics = analyze.aggregate(recs, mid, m.get("party"), chamber)
  32. payload = {
  33. "id": mid,
  34. "name": m.get("full_name") or m.get("name"),
  35. "party": m.get("party"),
  36. "state": m.get("state"),
  37. "chamber": m.get("chamber"),
  38. "served_from": m.get("served_from"),
  39. "served_to": m.get("served_to"),
  40. "served_partial": bool(m.get("served_partial", False)),
  41. "metrics": metrics,
  42. "_meta": _WORKER_META,
  43. }
  44. if (m.get("chamber") or "").lower() == "house":
  45. payload["district"] = m.get("district")
  46. return payload
  47. except Exception as e:
  48. return {"_error": f"{type(e).__name__}: {e}\n{traceback.format_exc()}", "id": mid}
  49. def _load_jsonl(path):
  50. out = []
  51. with open(path, "r", encoding="utf-8") as f:
  52. for line in f:
  53. line = line.strip()
  54. if line:
  55. out.append(json.loads(line))
  56. return out
  57. def _atomic_write_json(path: Path, obj):
  58. tmp = path.with_suffix(path.suffix + ".tmp")
  59. with open(tmp, "w", encoding="utf-8") as f:
  60. json.dump(obj, f, separators=(",", ":"))
  61. os.replace(tmp, path)
  62. def _chamber_letter(chamber):
  63. return "H" if (chamber or "").lower() == "house" else "S"
  64. def main(argv=None):
  65. ap = argparse.ArgumentParser(description="Build per-member dashboard JSON.")
  66. ap.add_argument("--congress", type=int, default=119)
  67. ap.add_argument("-v", "--verbose", action="store_true")
  68. args = ap.parse_args(argv)
  69. started = _dt.datetime.now(_dt.timezone.utc)
  70. t0 = time.monotonic()
  71. data_root = Path("data") / str(args.congress)
  72. house_votes_p = data_root / "house" / "votes.jsonl"
  73. senate_votes_p = data_root / "senate" / "votes.jsonl"
  74. house_roster_p = data_root / "house" / "roster.json"
  75. senate_roster_p = data_root / "senate" / "roster.json"
  76. for p in (house_votes_p, senate_votes_p, house_roster_p, senate_roster_p):
  77. if not p.exists():
  78. print(f"build_members: missing input {p}", file=sys.stderr)
  79. return 2
  80. house_recs = _load_jsonl(house_votes_p)
  81. senate_recs = _load_jsonl(senate_votes_p)
  82. records = house_recs + senate_recs
  83. with open(house_roster_p, "r", encoding="utf-8") as f:
  84. house_roster = json.load(f)
  85. with open(senate_roster_p, "r", encoding="utf-8") as f:
  86. senate_roster = json.load(f)
  87. roster = {}
  88. roster.update(house_roster)
  89. roster.update(senate_roster)
  90. analyze_path = Path(analyze.__file__)
  91. classifier_hash = hashlib.sha256(analyze_path.read_bytes()).hexdigest()
  92. snapshot_date = started.date().isoformat()
  93. meta = {
  94. "schema_version": SCHEMA_VERSION,
  95. "pipeline_version": PIPELINE_VERSION,
  96. "classifier_hash": classifier_hash,
  97. "data_snapshot_date": snapshot_date,
  98. "source_xml_count": {"house": len(house_recs), "senate": len(senate_recs)},
  99. }
  100. out_dir = data_root / "members"
  101. if out_dir.exists():
  102. for f in out_dir.glob("*.json"):
  103. f.unlink()
  104. out_dir.mkdir(parents=True, exist_ok=True)
  105. tasks = list(roster.items())
  106. n_procs = min(8, os.cpu_count() or 4)
  107. succeeded = 0
  108. failures = []
  109. members_meta = []
  110. with mp.Pool(processes=n_procs, initializer=_init_worker, initargs=(records, meta)) as pool:
  111. for result in pool.imap_unordered(_worker, tasks, chunksize=8):
  112. if "_error" in result:
  113. failures.append({"id": result.get("id"), "error": result["_error"]})
  114. if args.verbose:
  115. print(f"FAIL {result.get('id')}: {result['_error'].splitlines()[0]}", file=sys.stderr)
  116. continue
  117. mid = result["id"]
  118. _atomic_write_json(out_dir / f"{mid}.json", result)
  119. entry = {
  120. "id": mid,
  121. "n": result.get("name") or mid,
  122. "p": result.get("party"),
  123. "s": result.get("state"),
  124. "c": _chamber_letter(result.get("chamber")),
  125. }
  126. if entry["c"] == "H" and result.get("district") is not None:
  127. entry["d"] = result["district"]
  128. if result.get("served_partial"):
  129. entry["sp"] = True
  130. mx = result.get("metrics") or {}
  131. entry["k"] = {
  132. "total": mx.get("total", 0),
  133. "voting": mx.get("voting", 0),
  134. "yeas": mx.get("yeas", 0),
  135. "nays": mx.get("nays", 0),
  136. "voted_with_gop": mx.get("voted_with_gop", 0),
  137. "voted_with_dem": mx.get("voted_with_dem", 0),
  138. "voted_against_gop": mx.get("voted_against_gop", 0),
  139. "voted_against_dem": mx.get("voted_against_dem", 0),
  140. "lone_wolf": mx.get("lone_wolf", 0),
  141. }
  142. members_meta.append(entry)
  143. succeeded += 1
  144. if args.verbose:
  145. print(f"OK {mid} ({entry['n']})")
  146. members_meta.sort(key=lambda e: (e["n"] or "").lower())
  147. manifest = {
  148. "version": f"{PIPELINE_VERSION}+{snapshot_date}",
  149. "generated_at": started.isoformat().replace("+00:00", "Z"),
  150. "congress": args.congress,
  151. "members": members_meta,
  152. }
  153. _atomic_write_json(data_root / "manifest.json", manifest)
  154. finished = _dt.datetime.now(_dt.timezone.utc)
  155. duration = round(time.monotonic() - t0, 2)
  156. total = len(tasks)
  157. report = {
  158. "started_at": started.isoformat().replace("+00:00", "Z"),
  159. "finished_at": finished.isoformat().replace("+00:00", "Z"),
  160. "duration_seconds": duration,
  161. "members_total": total,
  162. "members_succeeded": succeeded,
  163. "members_failed": len(failures),
  164. "failures": failures,
  165. "warnings": [],
  166. "source": {
  167. "house_votes": len(house_recs),
  168. "senate_votes": len(senate_recs),
  169. "roster_entries": total,
  170. },
  171. }
  172. _atomic_write_json(data_root / "build_report.json", report)
  173. print(f"build_members: {succeeded}/{total} OK in {duration}s — manifest at {data_root / 'manifest.json'}")
  174. return 0 if not failures else 1
  175. if __name__ == "__main__":
  176. sys.exit(main())