#!/usr/bin/env python3 """Enrich the 119th-Congress roster via Congress.gov API. Writes: data//members_directory.json — dict keyed by bioguide data//lis_to_bioguide.json — Senate LIS -> bioguide crosswalk data//api_cache/.json — raw cached API responses (idempotent) Reads CONGRESS_GOV_API_KEY from ./.env (never CLI/env-var). Standard library only. Throttled 350 ms between live requests. """ import argparse, hashlib, json, os, re, sys, time, unicodedata, urllib.error, urllib.request from pathlib import Path ROOT = Path(__file__).resolve().parent DATA_ROOT = ROOT / "data" ENV_PATH = ROOT / ".env" API_BASE = "https://api.congress.gov/v3" THROTTLE_SEC = 0.35 RETRY_BACKOFFS = (0.5, 1.0, 2.0) PARTY_MAP = { "R": "R", "Republican": "R", "D": "D", "Democratic": "D", "Democrat": "D", "I": "I", "Independent": "I", "ID": "I", } STATE_NAME_TO_CODE = { "Alabama":"AL","Alaska":"AK","Arizona":"AZ","Arkansas":"AR","California":"CA", "Colorado":"CO","Connecticut":"CT","Delaware":"DE","Florida":"FL","Georgia":"GA", "Hawaii":"HI","Idaho":"ID","Illinois":"IL","Indiana":"IN","Iowa":"IA","Kansas":"KS", "Kentucky":"KY","Louisiana":"LA","Maine":"ME","Maryland":"MD","Massachusetts":"MA", "Michigan":"MI","Minnesota":"MN","Mississippi":"MS","Missouri":"MO","Montana":"MT", "Nebraska":"NE","Nevada":"NV","New Hampshire":"NH","New Jersey":"NJ","New Mexico":"NM", "New York":"NY","North Carolina":"NC","North Dakota":"ND","Ohio":"OH","Oklahoma":"OK", "Oregon":"OR","Pennsylvania":"PA","Rhode Island":"RI","South Carolina":"SC", "South Dakota":"SD","Tennessee":"TN","Texas":"TX","Utah":"UT","Vermont":"VT", "Virginia":"VA","Washington":"WA","West Virginia":"WV","Wisconsin":"WI","Wyoming":"WY", "American Samoa":"AS","District of Columbia":"DC","Guam":"GU", "Northern Mariana Islands":"MP","Puerto Rico":"PR","Virgin Islands":"VI", } LIS_KEY_RE = re.compile(r"(?i)\b(lis|senateid|lis[_-]?id|lis[_-]?member[_-]?id)\b") def _load_api_key(): if not ENV_PATH.exists(): print(f"ERROR: .env not found at {ENV_PATH}", file=sys.stderr) sys.exit(2) for line in ENV_PATH.read_text().splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, _, v = line.partition("=") if k.strip() == "CONGRESS_GOV_API_KEY": return v.strip().strip('"').strip("'") print("ERROR: CONGRESS_GOV_API_KEY not set in .env", file=sys.stderr) sys.exit(2) def _cache_path(cache_dir, url): # Cache key strips api_key so re-keying doesn't invalidate cache. clean = re.sub(r"([?&])api_key=[^&]*", r"\1", url).rstrip("?&") sha = hashlib.sha256(clean.encode("utf-8")).hexdigest() return cache_dir / f"{sha}.json" def _fetch(url, cache_dir, warnings, label="request"): cp = _cache_path(cache_dir, url) if cp.exists(): try: return json.loads(cp.read_text()) except Exception: pass # fall through and re-fetch last_err = None for i, backoff in enumerate((0,) + RETRY_BACKOFFS): if backoff: time.sleep(backoff) try: req = urllib.request.Request(url, headers={"Accept": "application/json", "User-Agent": "polisci-pipeline/1.0 (+enrich_roster.py)"}) with urllib.request.urlopen(req, timeout=30) as resp: body = resp.read() data = json.loads(body) cp.parent.mkdir(parents=True, exist_ok=True) cp.write_text(json.dumps(data)) time.sleep(THROTTLE_SEC) return data except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, json.JSONDecodeError) as e: last_err = e warnings.append(f"{label}: {last_err}") return None def _party_letter(member): pa = member.get("partyHistory") or [] if pa: last = pa[-1] for key in ("partyAbbreviation", "partyName"): v = last.get(key) if v and v in PARTY_MAP: return PARTY_MAP[v] pn = member.get("partyName") or "" return PARTY_MAP.get(pn, "I" if pn else "") def _state_code(member): s = member.get("state") or "" if len(s) == 2 and s.isupper(): return s return STATE_NAME_TO_CODE.get(s, s[:2].upper() if s else "") def _latest_chamber(member): terms = _terms_list(member) if not terms: return "" def sk(t): return t.get("startYear") or 0 last = sorted(terms, key=sk)[-1] return (last.get("chamber") or "").strip() def _terms_list(member): terms = member.get("terms") if isinstance(terms, dict): return terms.get("item") or [] if isinstance(terms, list): return terms return [] def _served_dates(member): terms = _terms_list(member) if not terms: return None, None starts, ends = [], [] for t in terms: sy = t.get("startYear") ey = t.get("endYear") if sy: starts.append(f"{int(sy):04d}-01-03") if ey: ends.append(f"{int(ey):04d}-01-03") served_from = min(starts) if starts else None served_to = max(ends) if ends and len(ends) == len(starts) else None return served_from, served_to def _congress_term(member, congress): """Find the term for the target Congress; returns dict or None.""" for t in _terms_list(member): if t.get("congress") == congress: return { "startYear": t.get("startYear"), "endYear": t.get("endYear"), "district": str(t["district"]) if t.get("district") is not None else None, "chamber": t.get("chamber"), } return None def _scan_for_lis(obj): """Recursively scan obj for any key matching LIS pattern; return string value or None.""" if isinstance(obj, dict): for k, v in obj.items(): if isinstance(k, str) and LIS_KEY_RE.search(k): if isinstance(v, str) and re.match(r"^S?\d{3,4}$", v.strip()): val = v.strip() if not val.startswith("S"): val = "S" + val.zfill(3) return val found = _scan_for_lis(v) if found: return found elif isinstance(obj, list): for it in obj: found = _scan_for_lis(it) if found: return found return None def _normalize_member(m, congress=None): bioguide = (m.get("bioguideId") or "").strip() if not bioguide: return None chamber = _latest_chamber(m) served_from, served_to = _served_dates(m) district = m.get("district") if district is not None: district = str(district) name = m.get("directOrderName") or m.get("name") or "" if not name: first = m.get("firstName") or "" last = m.get("lastName") or "" name = (first + " " + last).strip() if "," in name and not m.get("directOrderName"): parts = [p.strip() for p in name.split(",", 1)] if len(parts) == 2: name = parts[1] + " " + parts[0] photo = ((m.get("depiction") or {}).get("imageUrl")) or None # Per-Congress term — most accurate source of district, start/end year for # this Congress (matters for mid-term resignations and special-election entrants). term = _congress_term(m, congress) if congress is not None else None if term and term.get("district") is not None: district = term["district"] term_chamber = (term or {}).get("chamber") or chamber return { "bioguide": bioguide, "lis": None, "full_name": name, "party": _party_letter(m), "state": _state_code(m), "district": district if (term_chamber or "").lower().startswith("house") else None, "chamber": term_chamber, "served_from": served_from, "served_to": served_to, "congress_term": term, "death_year": m.get("deathYear"), "current_member": m.get("currentMember"), "photo_url": photo, "source": "congress.gov/v3", } def _write_partial(out_dir, directory, lis_map, warnings, note): out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "members_directory.json").write_text( json.dumps(directory, indent=2, sort_keys=True)) (out_dir / "lis_to_bioguide.json").write_text( json.dumps(lis_map, indent=2, sort_keys=True)) print(f"enrich_roster: WARNING {note}", file=sys.stderr) def main(): ap = argparse.ArgumentParser(description="Enrich Congress roster via Congress.gov API.") ap.add_argument("--congress", type=int, default=119) args = ap.parse_args() api_key = _load_api_key() out_dir = DATA_ROOT / str(args.congress) cache_dir = out_dir / "api_cache" cache_dir.mkdir(parents=True, exist_ok=True) warnings = [] directory = {} # Pass 1: paginate full member list url = (f"{API_BASE}/member/congress/{args.congress}" f"?currentMember=false&limit=250&format=json&api_key={api_key}") page = 0 while url: page += 1 print(f"enrich_roster: fetching page {page}", file=sys.stderr) data = _fetch(url, cache_dir, warnings, label=f"page {page}") if data is None: _write_partial(out_dir, directory, {}, warnings, f"pagination failed at page {page}; partial directory written") print(f"enrich_roster: {len(directory)} members directory written; " f"0 senators with LIS resolved; {len(warnings)} warnings") return 0 for m in data.get("members") or []: norm = _normalize_member(m, args.congress) if norm: directory[norm["bioguide"]] = norm nxt = ((data.get("pagination") or {}).get("next")) or None if nxt and "api_key=" not in nxt: sep = "&" if "?" in nxt else "?" nxt = f"{nxt}{sep}api_key={api_key}" url = nxt # Pass 2: LIS lookup for senators lis_map = {} senators = [b for b, e in directory.items() if (e.get("chamber") or "").lower() == "senate"] print(f"enrich_roster: resolving LIS for {len(senators)} senators", file=sys.stderr) resolved = 0 for bid in senators: url = f"{API_BASE}/member/{bid}?format=json&api_key={api_key}" data = _fetch(url, cache_dir, warnings, label=f"member/{bid}") if data is None: warnings.append(f"member/{bid}: fetch failed") continue member = (data.get("member") or {}) lis = _scan_for_lis(member) if lis: directory[bid]["lis"] = lis lis_map[lis] = bid resolved += 1 else: warnings.append(f"member/{bid}: LIS not derivable from API") # Fallback: name+state+party match against vote-derived senate roster. # Why: Congress.gov v3 does not expose LIS reliably; name match is unique within a Congress. sen_roster_path = out_dir / "senate" / "roster.json" if sen_roster_path.exists(): def _norm(s): return ''.join(c for c in unicodedata.normalize('NFKD', s or '') if not unicodedata.combining(c)) def _last(n): p = re.sub(r'[.,]', '', _norm(n)).split() return p[-1].lower() if p else '' vote_sen = json.loads(sen_roster_path.read_text()) idx = {} for bg in senators: e = directory[bg] idx.setdefault((_last(e.get("full_name", "")), e.get("state", ""), e.get("party", "")), []).append(bg) for lis_key, v in vote_sen.items(): if re.match(r'^[A-Z]\d{6}$', lis_key): continue if lis_key in lis_map: continue k = (_last(v.get("name", "")), v.get("state", ""), v.get("party", "")) candidates = idx.get(k, []) if len(candidates) != 1: candidates = [bg for (l, s, _), bs in idx.items() for bg in bs if l == k[0] and s == k[1]] if len(candidates) == 1: bg = candidates[0] directory[bg]["lis"] = lis_key lis_map[lis_key] = bg resolved += 1 out_dir.mkdir(parents=True, exist_ok=True) # Fallback: individual lookups for House bioguide IDs that appear in vote # data but are missing from the per-Congress directory. Catches people who # were members-elect (appear in opening-day quorum XML) but never seated, # e.g. Matt Gaetz in the 119th. house_roster_path = out_dir / "house" / "roster.json" rescued = 0 if house_roster_path.exists(): house_roster = json.loads(house_roster_path.read_text()) missing = [bg for bg in house_roster if re.match(r"^[A-Z]\d{6}$", bg) and bg not in directory] if missing: print(f"enrich_roster: rescuing {len(missing)} House bioguide(s) missing from bulk directory", file=sys.stderr) for bg in missing: url = f"{API_BASE}/member/{bg}?format=json&api_key={api_key}" data = _fetch(url, cache_dir, warnings, label=f"member/{bg}") if data is None: continue member = (data.get("member") or {}) norm = _normalize_member(member, args.congress) if norm: directory[bg] = norm rescued += 1 # Replacement-linking pass — pair predecessor↔successor by (state, district) # within the target Congress. Heuristic: any House seat with >1 member whose # 119th term touches the Congress window. Sort by startYear (and then by # served_to is-null) to determine order. seats = {} for bg, e in directory.items(): if not (e.get("chamber") or "").lower().startswith("house"): continue term = e.get("congress_term") or {} if term.get("congress") and term["congress"] != args.congress: continue # shouldn't happen, but safe state = e.get("state") district = (term.get("district") if term else None) or e.get("district") if not state or district is None: continue seats.setdefault((state, str(district)), []).append(bg) pairs = 0 for key, bgs in seats.items(): if len(bgs) < 2: continue def sortkey(bg): e = directory[bg] term = e.get("congress_term") or {} start = term.get("startYear") or 9999 # served_to None => still serving => sort last ended = e.get("served_to") is not None return (start, 0 if ended else 1) ordered = sorted(bgs, key=sortkey) for i in range(len(ordered) - 1): pred, succ = ordered[i], ordered[i + 1] directory[pred]["replaced_by"] = succ directory[succ]["replaces"] = pred pairs += 1 if pairs: print(f"enrich_roster: linked {pairs} House predecessor↔successor pair(s)", file=sys.stderr) # Per-Congress term + death_year live on the individual /member/{bg} response # (the bulk listing only carries chamber + startYear). For accurate banner # copy on replacement chains, fetch the individual record for every member # who is on either side of a replacement pair. Cached, so re-runs are free. enrich_targets = set() for bg, e in directory.items(): if e.get("replaces") or e.get("replaced_by"): enrich_targets.add(bg) if enrich_targets: print(f"enrich_roster: fetching detail for {len(enrich_targets)} replacement-chain members", file=sys.stderr) for bg in sorted(enrich_targets): url = f"{API_BASE}/member/{bg}?format=json&api_key={api_key}" data = _fetch(url, cache_dir, warnings, label=f"member-detail/{bg}") if data is None: continue member = (data.get("member") or {}) term = _congress_term(member, args.congress) if term: directory[bg]["congress_term"] = term # If individual endpoint reports a per-Congress district, prefer it. if term.get("district") is not None: directory[bg]["district"] = term["district"] if member.get("deathYear") is not None: directory[bg]["death_year"] = member.get("deathYear") if member.get("currentMember") is not None: directory[bg]["current_member"] = member.get("currentMember") (out_dir / "members_directory.json").write_text( json.dumps(directory, indent=2, sort_keys=True)) (out_dir / "lis_to_bioguide.json").write_text( json.dumps(lis_map, indent=2, sort_keys=True)) print(f"enrich_roster: {len(directory)} members directory written; " f"{resolved} senators with LIS resolved; {rescued} House rescues; " f"{pairs} replacements linked; {len(warnings)} warnings") for w in warnings[:10]: print(f" warn: {w}", file=sys.stderr) return 0 if __name__ == "__main__": sys.exit(main())