| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- #!/usr/bin/env python3
- """Enrich the 119th-Congress roster via Congress.gov API.
- Writes:
- data/<C>/members_directory.json — dict keyed by bioguide
- data/<C>/lis_to_bioguide.json — Senate LIS -> bioguide crosswalk
- data/<C>/api_cache/<sha>.json — raw cached API responses (idempotent)
- Reads CONGRESS_GOV_API_KEY from ./.env (never CLI/env-var).
- Standard library only. Throttled 350 ms between live requests.
- """
- import argparse, hashlib, json, os, re, sys, time, unicodedata, urllib.error, urllib.request
- from pathlib import Path
- ROOT = Path(__file__).resolve().parent
- DATA_ROOT = ROOT / "data"
- ENV_PATH = ROOT / ".env"
- API_BASE = "https://api.congress.gov/v3"
- THROTTLE_SEC = 0.35
- RETRY_BACKOFFS = (0.5, 1.0, 2.0)
- PARTY_MAP = {
- "R": "R", "Republican": "R",
- "D": "D", "Democratic": "D", "Democrat": "D",
- "I": "I", "Independent": "I", "ID": "I",
- }
- STATE_NAME_TO_CODE = {
- "Alabama":"AL","Alaska":"AK","Arizona":"AZ","Arkansas":"AR","California":"CA",
- "Colorado":"CO","Connecticut":"CT","Delaware":"DE","Florida":"FL","Georgia":"GA",
- "Hawaii":"HI","Idaho":"ID","Illinois":"IL","Indiana":"IN","Iowa":"IA","Kansas":"KS",
- "Kentucky":"KY","Louisiana":"LA","Maine":"ME","Maryland":"MD","Massachusetts":"MA",
- "Michigan":"MI","Minnesota":"MN","Mississippi":"MS","Missouri":"MO","Montana":"MT",
- "Nebraska":"NE","Nevada":"NV","New Hampshire":"NH","New Jersey":"NJ","New Mexico":"NM",
- "New York":"NY","North Carolina":"NC","North Dakota":"ND","Ohio":"OH","Oklahoma":"OK",
- "Oregon":"OR","Pennsylvania":"PA","Rhode Island":"RI","South Carolina":"SC",
- "South Dakota":"SD","Tennessee":"TN","Texas":"TX","Utah":"UT","Vermont":"VT",
- "Virginia":"VA","Washington":"WA","West Virginia":"WV","Wisconsin":"WI","Wyoming":"WY",
- "American Samoa":"AS","District of Columbia":"DC","Guam":"GU",
- "Northern Mariana Islands":"MP","Puerto Rico":"PR","Virgin Islands":"VI",
- }
- LIS_KEY_RE = re.compile(r"(?i)\b(lis|senateid|lis[_-]?id|lis[_-]?member[_-]?id)\b")
- def _load_api_key():
- if not ENV_PATH.exists():
- print(f"ERROR: .env not found at {ENV_PATH}", file=sys.stderr)
- sys.exit(2)
- for line in ENV_PATH.read_text().splitlines():
- line = line.strip()
- if not line or line.startswith("#") or "=" not in line:
- continue
- k, _, v = line.partition("=")
- if k.strip() == "CONGRESS_GOV_API_KEY":
- return v.strip().strip('"').strip("'")
- print("ERROR: CONGRESS_GOV_API_KEY not set in .env", file=sys.stderr)
- sys.exit(2)
- def _cache_path(cache_dir, url):
- # Cache key strips api_key so re-keying doesn't invalidate cache.
- clean = re.sub(r"([?&])api_key=[^&]*", r"\1", url).rstrip("?&")
- sha = hashlib.sha256(clean.encode("utf-8")).hexdigest()
- return cache_dir / f"{sha}.json"
- def _fetch(url, cache_dir, warnings, label="request"):
- cp = _cache_path(cache_dir, url)
- if cp.exists():
- try:
- return json.loads(cp.read_text())
- except Exception:
- pass # fall through and re-fetch
- last_err = None
- for i, backoff in enumerate((0,) + RETRY_BACKOFFS):
- if backoff:
- time.sleep(backoff)
- try:
- req = urllib.request.Request(url, headers={"Accept": "application/json", "User-Agent": "polisci-pipeline/1.0 (+enrich_roster.py)"})
- with urllib.request.urlopen(req, timeout=30) as resp:
- body = resp.read()
- data = json.loads(body)
- cp.parent.mkdir(parents=True, exist_ok=True)
- cp.write_text(json.dumps(data))
- time.sleep(THROTTLE_SEC)
- return data
- except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, json.JSONDecodeError) as e:
- last_err = e
- warnings.append(f"{label}: {last_err}")
- return None
- def _party_letter(member):
- pa = member.get("partyHistory") or []
- if pa:
- last = pa[-1]
- for key in ("partyAbbreviation", "partyName"):
- v = last.get(key)
- if v and v in PARTY_MAP:
- return PARTY_MAP[v]
- pn = member.get("partyName") or ""
- return PARTY_MAP.get(pn, "I" if pn else "")
- def _state_code(member):
- s = member.get("state") or ""
- if len(s) == 2 and s.isupper():
- return s
- return STATE_NAME_TO_CODE.get(s, s[:2].upper() if s else "")
- def _latest_chamber(member):
- terms = (member.get("terms") or {}).get("item") or member.get("terms") or []
- if isinstance(terms, dict):
- terms = terms.get("item") or []
- if not terms:
- return ""
- # Sort by startYear if available
- def sk(t): return t.get("startYear") or 0
- last = sorted(terms, key=sk)[-1]
- return (last.get("chamber") or "").strip()
- def _served_dates(member):
- terms = (member.get("terms") or {}).get("item") or member.get("terms") or []
- if isinstance(terms, dict):
- terms = terms.get("item") or []
- if not terms:
- return None, None
- starts = []
- ends = []
- for t in terms:
- sy = t.get("startYear")
- ey = t.get("endYear")
- if sy:
- starts.append(f"{int(sy):04d}-01-03")
- if ey:
- ends.append(f"{int(ey):04d}-01-03")
- served_from = min(starts) if starts else None
- served_to = max(ends) if ends and len(ends) == len(starts) else None
- return served_from, served_to
- def _scan_for_lis(obj):
- """Recursively scan obj for any key matching LIS pattern; return string value or None."""
- if isinstance(obj, dict):
- for k, v in obj.items():
- if isinstance(k, str) and LIS_KEY_RE.search(k):
- if isinstance(v, str) and re.match(r"^S?\d{3,4}$", v.strip()):
- val = v.strip()
- if not val.startswith("S"):
- val = "S" + val.zfill(3)
- return val
- found = _scan_for_lis(v)
- if found:
- return found
- elif isinstance(obj, list):
- for it in obj:
- found = _scan_for_lis(it)
- if found:
- return found
- return None
- def _normalize_member(m):
- bioguide = (m.get("bioguideId") or "").strip()
- if not bioguide:
- return None
- chamber = _latest_chamber(m)
- served_from, served_to = _served_dates(m)
- district = m.get("district")
- if district is not None:
- district = str(district)
- name = m.get("directOrderName") or m.get("name") or ""
- if not name:
- first = m.get("firstName") or ""
- last = m.get("lastName") or ""
- name = (first + " " + last).strip()
- # If name is "Last, First" prefer invertedOrderName? Use as-is otherwise.
- if "," in name and not m.get("directOrderName"):
- parts = [p.strip() for p in name.split(",", 1)]
- if len(parts) == 2:
- name = parts[1] + " " + parts[0]
- photo = ((m.get("depiction") or {}).get("imageUrl")) or None
- return {
- "bioguide": bioguide,
- "lis": None,
- "full_name": name,
- "party": _party_letter(m),
- "state": _state_code(m),
- "district": district if chamber.lower() == "house" else None,
- "chamber": chamber,
- "served_from": served_from,
- "served_to": served_to,
- "photo_url": photo,
- "source": "congress.gov/v3",
- }
- def _write_partial(out_dir, directory, lis_map, warnings, note):
- out_dir.mkdir(parents=True, exist_ok=True)
- (out_dir / "members_directory.json").write_text(
- json.dumps(directory, indent=2, sort_keys=True))
- (out_dir / "lis_to_bioguide.json").write_text(
- json.dumps(lis_map, indent=2, sort_keys=True))
- print(f"enrich_roster: WARNING {note}", file=sys.stderr)
- def main():
- ap = argparse.ArgumentParser(description="Enrich Congress roster via Congress.gov API.")
- ap.add_argument("--congress", type=int, default=119)
- args = ap.parse_args()
- api_key = _load_api_key()
- out_dir = DATA_ROOT / str(args.congress)
- cache_dir = out_dir / "api_cache"
- cache_dir.mkdir(parents=True, exist_ok=True)
- warnings = []
- directory = {}
- # Pass 1: paginate full member list
- url = (f"{API_BASE}/member/congress/{args.congress}"
- f"?currentMember=false&limit=250&format=json&api_key={api_key}")
- page = 0
- while url:
- page += 1
- print(f"enrich_roster: fetching page {page}", file=sys.stderr)
- data = _fetch(url, cache_dir, warnings, label=f"page {page}")
- if data is None:
- _write_partial(out_dir, directory, {}, warnings,
- f"pagination failed at page {page}; partial directory written")
- print(f"enrich_roster: {len(directory)} members directory written; "
- f"0 senators with LIS resolved; {len(warnings)} warnings")
- return 0
- for m in data.get("members") or []:
- norm = _normalize_member(m)
- if norm:
- directory[norm["bioguide"]] = norm
- nxt = ((data.get("pagination") or {}).get("next")) or None
- if nxt and "api_key=" not in nxt:
- sep = "&" if "?" in nxt else "?"
- nxt = f"{nxt}{sep}api_key={api_key}"
- url = nxt
- # Pass 2: LIS lookup for senators
- lis_map = {}
- senators = [b for b, e in directory.items() if (e.get("chamber") or "").lower() == "senate"]
- print(f"enrich_roster: resolving LIS for {len(senators)} senators", file=sys.stderr)
- resolved = 0
- for bid in senators:
- url = f"{API_BASE}/member/{bid}?format=json&api_key={api_key}"
- data = _fetch(url, cache_dir, warnings, label=f"member/{bid}")
- if data is None:
- warnings.append(f"member/{bid}: fetch failed")
- continue
- member = (data.get("member") or {})
- lis = _scan_for_lis(member)
- if lis:
- directory[bid]["lis"] = lis
- lis_map[lis] = bid
- resolved += 1
- else:
- warnings.append(f"member/{bid}: LIS not derivable from API")
- # Fallback: name+state+party match against vote-derived senate roster.
- # Why: Congress.gov v3 does not expose LIS reliably; name match is unique within a Congress.
- sen_roster_path = out_dir / "senate" / "roster.json"
- if sen_roster_path.exists():
- def _norm(s):
- return ''.join(c for c in unicodedata.normalize('NFKD', s or '') if not unicodedata.combining(c))
- def _last(n):
- p = re.sub(r'[.,]', '', _norm(n)).split()
- return p[-1].lower() if p else ''
- vote_sen = json.loads(sen_roster_path.read_text())
- idx = {}
- for bg in senators:
- e = directory[bg]
- idx.setdefault((_last(e.get("full_name", "")), e.get("state", ""), e.get("party", "")), []).append(bg)
- for lis_key, v in vote_sen.items():
- if re.match(r'^[A-Z]\d{6}$', lis_key):
- continue
- if lis_key in lis_map:
- continue
- k = (_last(v.get("name", "")), v.get("state", ""), v.get("party", ""))
- candidates = idx.get(k, [])
- if len(candidates) != 1:
- candidates = [bg for (l, s, _), bs in idx.items() for bg in bs
- if l == k[0] and s == k[1]]
- if len(candidates) == 1:
- bg = candidates[0]
- directory[bg]["lis"] = lis_key
- lis_map[lis_key] = bg
- resolved += 1
- out_dir.mkdir(parents=True, exist_ok=True)
- (out_dir / "members_directory.json").write_text(
- json.dumps(directory, indent=2, sort_keys=True))
- (out_dir / "lis_to_bioguide.json").write_text(
- json.dumps(lis_map, indent=2, sort_keys=True))
- print(f"enrich_roster: {len(directory)} members directory written; "
- f"{resolved} senators with LIS resolved; {len(warnings)} warnings")
- for w in warnings[:10]:
- print(f" warn: {w}", file=sys.stderr)
- return 0
- if __name__ == "__main__":
- sys.exit(main())
|