#!/usr/bin/env python3 """Parse cached XML rollcalls into a unified, chamber-agnostic schema. Outputs per (congress, chamber): data///votes.jsonl — one JSON object per vote, schema: { "chamber": "house"|"senate", "year": 2025, "session": 1, "num": 47, "date": "2025-01-09", "bill": "H R 1234" | "S 5" | "", "question": "On Passage", "result": "Passed", "desc": "Short title", "totals": {"R":{"yea":0,"nay":0,"present":0,"nv":0}, "D":{...}, "I":{...}}, "votes": {"M001184": "Yea", "K000389": "Nay", ...} # id -> raw vote text } data///roster.json — {id: {name, party, state, chamber}} Run: python3 parse.py --congress 119 --chamber {house|senate|both} """ import argparse, os, sys, json, glob, re import xml.etree.ElementTree as ET DATA_ROOT = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") MONTHS_3 = ["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"] MONTHS_FULL = ["January","February","March","April","May","June", "July","August","September","October","November","December"] # Independents who caucus with Democrats — counted as D for party totals. DEM_CAUCUSING_INDEPENDENTS = { # Bernie Sanders (S313 in Senate XML), Angus King (S354), etc. "S313", "S354", } # Reject characters that should never appear in government XML free-text fields. # Presence indicates injection, encoding corruption, or upstream tampering. _BAD_TEXT_RE = re.compile(r"[<>\x00-\x08\x0b\x0c\x0e-\x1f]") def _safe_text(s, field, source): """Return s unchanged if clean; raise ValueError with context if it contains angle brackets or ASCII control chars (excluding \\t, \\n, \\r).""" if s and _BAD_TEXT_RE.search(s): raise ValueError( f"Unsafe characters in upstream field '{field}' from {source}: {s!r}") return s # ---------- House parsing ---------- def _t(el, tag): sub = el.find(tag) return (sub.text or "").strip() if sub is not None and sub.text else "" def _ts(el, tag, source): """Like _t but validates the extracted text via _safe_text.""" return _safe_text(_t(el, tag), tag, source) def parse_house_vote(path, year, roll): try: with open(path, "rb") as f: root = ET.fromstring(f.read()) except Exception: return None, {} meta = root.find("vote-metadata") if meta is None: return None, {} # Date: "3-Jan-2025" raw = _t(meta, "action-date") iso = None try: d, mo, y = raw.split("-") iso = f"{y}-{MONTHS_3.index(mo)+1:02d}-{int(d):02d}" except Exception: pass totals = {"R":{"yea":0,"nay":0,"present":0,"nv":0}, "D":{"yea":0,"nay":0,"present":0,"nv":0}, "I":{"yea":0,"nay":0,"present":0,"nv":0}} for pt in meta.findall("vote-totals/totals-by-party"): party = (pt.findtext("party","") or "").strip() key = ("R" if party=="Republican" else "D" if party=="Democratic" else "I") totals[key] = { "yea": int(pt.findtext("yea-total","0") or 0), "nay": int(pt.findtext("nay-total","0") or 0), "present": int(pt.findtext("present-total","0") or 0), "nv": int(pt.findtext("not-voting-total","0") or 0), } votes = {} roster_seen = {} source = f"house:{year}/{roll}" for rv in root.iter("recorded-vote"): leg = rv.find("legislator") if leg is None: continue bid = leg.get("name-id") if not bid: continue vote_el = rv.find("vote") vote = (vote_el.text or "").strip() if vote_el is not None else "" votes[bid] = _safe_text(vote, "vote", source) # Roster: store name + party + state (last write wins; fine for stable identities) roster_seen[bid] = { "name": _safe_text((leg.get("unaccented-name") or leg.text or "").strip(), "legislator-name", source), "party": leg.get("party") or "", "state": leg.get("state") or "", "chamber": "house", } record = { "chamber": "house", "year": year, "session": None, "num": roll, "date": iso, "date_raw": raw, "bill": _ts(meta, "legis-num", source), "question": _ts(meta, "vote-question", source), "result": _ts(meta, "vote-result", source), "desc": _ts(meta, "vote-desc", source), "totals": totals, "votes": votes, } return record, roster_seen # ---------- Senate parsing ---------- def parse_senate_vote(path, session, vnum): try: with open(path, "rb") as f: root = ET.fromstring(f.read()) except Exception: return None, {} raw = _t(root, "vote_date") iso = None m = re.match(r"\s*(\w+)\s+(\d+),\s+(\d+)", raw) if m: mon, day, yr = m.groups() try: mi = MONTHS_FULL.index(mon) + 1 iso = f"{yr}-{mi:02d}-{int(day):02d}" except Exception: pass year_int = int(iso[:4]) if iso else None totals = {"R":{"yea":0,"nay":0,"present":0,"nv":0}, "D":{"yea":0,"nay":0,"present":0,"nv":0}, "I":{"yea":0,"nay":0,"present":0,"nv":0}} votes = {} roster_seen = {} source = f"senate:{session}/{vnum}" for mem in root.iter("member"): lid = (mem.findtext("lis_member_id","") or "").strip() if not lid: continue party = (mem.findtext("party","") or "").strip() vc = (mem.findtext("vote_cast","") or "").strip() votes[lid] = _safe_text(vc, "vote_cast", source) name = ((mem.findtext("first_name","") or "").strip() + " " + (mem.findtext("last_name","") or "").strip()).strip() roster_seen[lid] = { "name": _safe_text(name, "member-name", source), "party": party, "state": (mem.findtext("state","") or "").strip(), "chamber": "senate", } bucket = ("yea" if vc=="Yea" else "nay" if vc=="Nay" else "present" if vc=="Present" else "nv") # Count for party totals — Dem-caucusing Independents go into D. if party == "R": totals["R"][bucket] += 1 elif party == "D" or lid in DEM_CAUCUSING_INDEPENDENTS: totals["D"][bucket] += 1 else: totals["I"][bucket] += 1 # Compose bill identifier dt = root.findtext("document/document_type","") or "" dn = root.findtext("document/document_number","") or "" bill = _safe_text((dt + " " + dn).strip(), "bill", source) record = { "chamber": "senate", "year": year_int, "session": session, "num": vnum, "date": iso, "date_raw": raw, "bill": bill, "question": _ts(root, "question", source) or _ts(root, "vote_question_text", source), "result": _ts(root, "vote_result", source), "desc": _ts(root, "vote_title", source) or _ts(root, "vote_document_text", source), "totals": totals, "votes": votes, } return record, roster_seen # ---------- Orchestration ---------- def parse_chamber(congress, chamber): cache = os.path.join(DATA_ROOT, str(congress), chamber, "cache") out_votes = os.path.join(DATA_ROOT, str(congress), chamber, "votes.jsonl") out_roster = os.path.join(DATA_ROOT, str(congress), chamber, "roster.json") if not os.path.isdir(cache): print(f" no cache at {cache}", file=sys.stderr); return 0 roster = {} n = 0 with open(out_votes, "w") as out: for path in sorted(glob.glob(os.path.join(cache, "*.xml"))): base = os.path.splitext(os.path.basename(path))[0] try: a, b = base.split("_") a, b = int(a), int(b) except Exception: continue if chamber == "house": rec, seen = parse_house_vote(path, a, b) else: rec, seen = parse_senate_vote(path, a, b) if not rec: continue out.write(json.dumps(rec, separators=(",", ":")) + "\n") n += 1 for mid, info in seen.items(): # Don't overwrite a roster entry with a less-complete one if mid not in roster or not roster[mid].get("name"): roster[mid] = info else: # Update party/state in case of a switch — keep latest if info.get("party"): roster[mid]["party"] = info["party"] if info.get("state"): roster[mid]["state"] = info["state"] # Merge with members_directory.json if available (Phase 0.5 enrichment). vote_count = len(roster) directory_only = 0 directory_path = os.path.join(DATA_ROOT, str(congress), "members_directory.json") lis_xwalk_path = os.path.join(DATA_ROOT, str(congress), "lis_to_bioguide.json") directory_present = os.path.isfile(directory_path) if directory_present: try: with open(directory_path) as f: directory = json.load(f) except Exception as e: print(f" [{chamber}] WARNING: could not load {directory_path}: {e}", file=sys.stderr) directory = {} # For senate, map bioguide -> lis via reverse of lis_to_bioguide.json. bioguide_to_lis = {} if chamber == "senate" and os.path.isfile(lis_xwalk_path): try: with open(lis_xwalk_path) as f: lis_map = json.load(f) bioguide_to_lis = {b: l for l, b in lis_map.items()} except Exception as e: print(f" [{chamber}] WARNING: could not load {lis_xwalk_path}: {e}", file=sys.stderr) for bioguide, entry in directory.items(): ec = (entry.get("chamber") or "").lower() entry_chamber = "senate" if "senate" in ec else ("house" if "house" in ec else ec) if entry_chamber != chamber: continue # Roster key: bioguide for house, LIS for senate. if chamber == "house": key = bioguide else: key = bioguide_to_lis.get(bioguide) if not key: # Without LIS we cannot map to the per-chamber roster key; # still add under bioguide so the directory entry isn't lost. key = bioguide enrichment = { "full_name": entry.get("full_name"), "district": entry.get("district"), "served_from": entry.get("served_from"), "served_to": entry.get("served_to"), "photo_url": entry.get("photo_url"), "bioguide": bioguide, "lis": entry.get("lis"), } if key in roster: roster[key].update({k: v for k, v in enrichment.items() if v is not None}) roster[key]["served_partial"] = False else: roster[key] = { "name": entry.get("full_name") or "", "party": entry.get("party") or "", "state": entry.get("state") or "", "chamber": chamber, "served_partial": True, **{k: v for k, v in enrichment.items() if v is not None}, } directory_only += 1 total = len(roster) print(f" [{chamber}] merged: {vote_count} vote-derived + " f"{directory_only} directory-only = {total} total roster entries", file=sys.stderr) else: print(f" [{chamber}] merged: {vote_count} vote-derived + 0 directory-only " f"= {vote_count} total roster entries (no members_directory.json)", file=sys.stderr) with open(out_roster, "w") as f: json.dump(roster, f, indent=2, sort_keys=True) print(f" [{chamber}] parsed {n} votes, {len(roster)} roster entries → {out_votes}", file=sys.stderr) return n def main(): ap = argparse.ArgumentParser() ap.add_argument("--congress", type=int, required=True) ap.add_argument("--chamber", choices=["house","senate","both"], default="both") args = ap.parse_args() if args.chamber in ("house","both"): parse_chamber(args.congress, "house") if args.chamber in ("senate","both"): parse_chamber(args.congress, "senate") # Validation gate: total combined unique-member count across chambers. directory_path = os.path.join(DATA_ROOT, str(args.congress), "members_directory.json") if not os.path.isfile(directory_path): print(f"WARNING: {directory_path} missing — roster may be incomplete " f"(Phase 0.5 enrichment was skipped). Run enrich_roster.py.", file=sys.stderr) if args.chamber == "both": seen = set() for ch in ("house", "senate"): rp = os.path.join(DATA_ROOT, str(args.congress), ch, "roster.json") if not os.path.isfile(rp): continue try: with open(rp) as f: r = json.load(f) for k, v in r.items(): bid = v.get("bioguide") or k seen.add(bid) except Exception: pass if len(seen) < 535: print(f"WARNING: combined roster has only {len(seen)} unique members " f"(<535 expected). Congress.gov API may be down or " f"enrich_roster.py was not run.", file=sys.stderr) if __name__ == "__main__": main()