"""Script that spawns a local webserver for retrieving an OpenAI API key. - Listens on 127.0.0.1:1455 - Opens http://localhost:1455/auth/callback in the browser - If the user successfully navigates the auth flow, $CODEX_HOME/auth.json will be written with the API key. - User will be redirected to http://localhost:1455/success upon success. The script should exit with a non-zero code if the user fails to navigate the auth flow. """ from __future__ import annotations import argparse import base64 import datetime import errno import hashlib import http.server import json import os import secrets import sys import threading import urllib.parse import urllib.request import webbrowser from dataclasses import dataclass # Required port for OAuth client. REQUIRED_PORT = 1455 URL_BASE = f"http://localhost:{REQUIRED_PORT}" DEFAULT_ISSUER = "https://auth.openai.com" DEFAULT_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann" EXIT_CODE_WHEN_ADDRESS_ALREADY_IN_USE = 13 @dataclass class TokenData: id_token: str access_token: str refresh_token: str @dataclass class AuthBundle: """Aggregates authentication data produced after successful OAuth flow.""" api_key: str token_data: TokenData last_refresh: str def main() -> None: parser = argparse.ArgumentParser(description="Retrieve API key via local HTTP flow") parser.add_argument( "--no-browser", action="store_true", help="Do not automatically open the browser", ) parser.add_argument("--verbose", action="store_true", help="Enable request logging") args = parser.parse_args() codex_home = os.environ.get("CODEX_HOME") if not codex_home: eprint("ERROR: CODEX_HOME environment variable is not set") sys.exit(1) # Spawn server. try: httpd = _ApiKeyHTTPServer( ("127.0.0.1", REQUIRED_PORT), _ApiKeyHTTPHandler, codex_home=codex_home, verbose=args.verbose, ) except OSError as e: eprint(f"ERROR: {e}") if e.errno == errno.EADDRINUSE: # Caller might want to handle this case specially. sys.exit(EXIT_CODE_WHEN_ADDRESS_ALREADY_IN_USE) else: sys.exit(1) auth_url = httpd.auth_url() with httpd: eprint(f"Starting local login server on {URL_BASE}") if not args.no_browser: try: webbrowser.open(auth_url, new=1, autoraise=True) except Exception as e: eprint(f"Failed to open browser: {e}") eprint( f"If your browser did not open, navigate to this URL to authenticate:\n\n{auth_url}" ) # Run the server in the main thread until `shutdown()` is called by the # request handler. try: httpd.serve_forever() except KeyboardInterrupt: eprint("\nKeyboard interrupt received, exiting.") # Server has been shut down by the request handler. Exit with the code # it set (0 on success, non-zero on failure). sys.exit(httpd.exit_code) class _ApiKeyHTTPHandler(http.server.BaseHTTPRequestHandler): """A minimal request handler that captures an *api key* from query/post.""" # We store the result in the server instance itself. server: "_ApiKeyHTTPServer" # type: ignore[override] - helpful annotation def do_GET(self) -> None: # noqa: N802 – required by BaseHTTPRequestHandler path = urllib.parse.urlparse(self.path).path if path == "/success": # Serve confirmation page then gracefully shut down the server so # the main thread can exit with the previously captured exit code. self._send_html(LOGIN_SUCCESS_HTML) # Ensure the data is flushed to the client before we stop. try: self.wfile.flush() except Exception as e: eprint(f"Failed to flush response: {e}") self.request_shutdown() elif path == "/auth/callback": query = urllib.parse.urlparse(self.path).query params = urllib.parse.parse_qs(query) # Validate state ------------------------------------------------- if params.get("state", [None])[0] != self.server.state: self.send_error(400, "State parameter mismatch") return # Standard OAuth flow ----------------------------------------- code = params.get("code", [None])[0] if not code: self.send_error(400, "Missing authorization code") return try: auth_bundle, success_url = self._exchange_code_for_api_key(code) except Exception as exc: # noqa: BLE001 – propagate to client self.send_error(500, f"Token exchange failed: {exc}") return # Persist API key along with additional token metadata. if _write_auth_file( auth=auth_bundle, codex_home=self.server.codex_home, ): self.server.exit_code = 0 self._send_redirect(success_url) else: self.send_error(500, "Unable to persist auth file") else: self.send_error(404, "Endpoint not supported") def do_POST(self) -> None: # noqa: N802 – required by BaseHTTPRequestHandler self.send_error(404, "Endpoint not supported") def send_error(self, code, message=None, explain=None) -> None: """Send an error response and stop the server. We avoid calling `sys.exit()` directly from the request-handling thread so that the response has a chance to be written to the socket. Instead we shut the server down; the main thread will then exit with the appropriate status code. """ super().send_error(code, message, explain) try: self.wfile.flush() except Exception as e: eprint(f"Failed to flush response: {e}") self.request_shutdown() def _send_redirect(self, url: str) -> None: self.send_response(302) self.send_header("Location", url) self.end_headers() def _send_html(self, body: str) -> None: encoded = body.encode() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(encoded))) self.end_headers() self.wfile.write(encoded) # Silence logging for cleanliness unless --verbose flag is used. def log_message(self, fmt: str, *args): # type: ignore[override] if getattr(self.server, "verbose", False): # type: ignore[attr-defined] super().log_message(fmt, *args) def _exchange_code_for_api_key(self, code: str) -> tuple[AuthBundle, str]: """Perform token + token-exchange to obtain an OpenAI API key. Returns (AuthBundle, success_url). """ token_endpoint = f"{self.server.issuer}/oauth/token" # 1. Authorization-code -> (id_token, access_token, refresh_token) data = urllib.parse.urlencode( { "grant_type": "authorization_code", "code": code, "redirect_uri": self.server.redirect_uri, "client_id": self.server.client_id, "code_verifier": self.server.pkce.code_verifier, } ).encode() token_data: TokenData with urllib.request.urlopen( urllib.request.Request( token_endpoint, data=data, method="POST", headers={"Content-Type": "application/x-www-form-urlencoded"}, ) ) as resp: payload = json.loads(resp.read().decode()) token_data = TokenData( id_token=payload["id_token"], access_token=payload["access_token"], refresh_token=payload["refresh_token"], ) id_token_parts = token_data.id_token.split(".") if len(id_token_parts) != 3: raise ValueError("Invalid ID token") access_token_parts = token_data.access_token.split(".") if len(access_token_parts) != 3: raise ValueError("Invalid access token") id_token_claims = json.loads( base64.urlsafe_b64decode(id_token_parts[1] + "==").decode("utf-8") ) access_token_claims = json.loads( base64.urlsafe_b64decode(access_token_parts[1] + "==").decode("utf-8") ) token_claims = id_token_claims.get("https://api.openai.com/auth", {}) access_claims = access_token_claims.get("https://api.openai.com/auth", {}) org_id = token_claims.get("organization_id") if not org_id: raise ValueError("Missing organization in id_token claims") project_id = token_claims.get("project_id") if not project_id: raise ValueError("Missing project in id_token claims") random_id = secrets.token_hex(6) # 2. Token exchange to obtain API key today = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d") exchange_data = urllib.parse.urlencode( { "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", "client_id": self.server.client_id, "requested_token": "openai-api-key", "subject_token": token_data.id_token, "subject_token_type": "urn:ietf:params:oauth:token-type:id_token", "name": f"Codex CLI [auto-generated] ({today}) [{random_id}]", } ).encode() exchanged_access_token: str with urllib.request.urlopen( urllib.request.Request( token_endpoint, data=exchange_data, method="POST", headers={"Content-Type": "application/x-www-form-urlencoded"}, ) ) as resp: exchange_payload = json.loads(resp.read().decode()) exchanged_access_token = exchange_payload["access_token"] # Determine whether the organization still requires additional # setup (e.g., adding a payment method) based on the ID-token # claim provided by the auth service. completed_onboarding = token_claims.get("completed_platform_onboarding") == True chatgpt_plan_type = access_claims.get("chatgpt_plan_type") is_org_owner = token_claims.get("is_org_owner") == True needs_setup = not completed_onboarding and is_org_owner # Build the success URL on the same host/port as the callback and # include the required query parameters for the front-end page. success_url_query = { "id_token": token_data.id_token, "needs_setup": "true" if needs_setup else "false", "org_id": org_id, "project_id": project_id, "plan_type": chatgpt_plan_type, "platform_url": ( "https://platform.openai.com" if self.server.issuer == "https://auth.openai.com" else "https://platform.api.openai.org" ), } success_url = f"{URL_BASE}/success?{urllib.parse.urlencode(success_url_query)}" # TODO(mbolin): Port maybeRedeemCredits() to Python and call it here. # Persist refresh_token/id_token for future use (redeem credits etc.) last_refresh_str = ( datetime.datetime.now(datetime.timezone.utc) .isoformat() .replace("+00:00", "Z") ) auth_bundle = AuthBundle( api_key=exchanged_access_token, token_data=token_data, last_refresh=last_refresh_str, ) return (auth_bundle, success_url) def request_shutdown(self) -> None: # shutdown() must be invoked from another thread to avoid # deadlocking the serve_forever() loop, which is running in this # same thread. A short-lived helper thread does the trick. threading.Thread(target=self.server.shutdown, daemon=True).start() def _write_auth_file(*, auth: AuthBundle, codex_home: str) -> bool: """Persist *api_key* to $CODEX_HOME/auth.json. Returns True on success, False otherwise. Any error is printed to *stderr* so that the Rust layer can surface the problem. """ if not os.path.isdir(codex_home): try: os.makedirs(codex_home, exist_ok=True) except Exception as exc: # pragma: no cover – unlikely eprint(f"ERROR: unable to create CODEX_HOME directory: {exc}") return False auth_path = os.path.join(codex_home, "auth.json") auth_json_contents = { "OPENAI_API_KEY": auth.api_key, "tokens": { "id_token": auth.token_data.id_token, "access_token": auth.token_data.access_token, "refresh_token": auth.token_data.refresh_token, }, "last_refresh": auth.last_refresh, } try: with open(auth_path, "w", encoding="utf-8") as fp: if hasattr(os, "fchmod"): # POSIX-safe os.fchmod(fp.fileno(), 0o600) json.dump(auth_json_contents, fp, indent=2) except Exception as exc: # pragma: no cover – permissions/filesystem eprint(f"ERROR: unable to write auth file: {exc}") return False return True @dataclass class PkceCodes: code_verifier: str code_challenge: str class _ApiKeyHTTPServer(http.server.HTTPServer): """HTTPServer with shutdown helper & self-contained OAuth configuration.""" def __init__( self, server_address: tuple[str, int], request_handler_class: type[http.server.BaseHTTPRequestHandler], *, codex_home: str, verbose: bool = False, ) -> None: super().__init__(server_address, request_handler_class, bind_and_activate=True) self.exit_code = 1 self.codex_home = codex_home self.verbose: bool = verbose self.issuer: str = DEFAULT_ISSUER self.client_id: str = DEFAULT_CLIENT_ID port = server_address[1] self.redirect_uri: str = f"http://localhost:{port}/auth/callback" self.pkce: PkceCodes = _generate_pkce() self.state: str = secrets.token_hex(32) def auth_url(self) -> str: """Return fully-formed OpenID authorization URL.""" params = { "response_type": "code", "client_id": self.client_id, "redirect_uri": self.redirect_uri, "scope": "openid profile email offline_access", "code_challenge": self.pkce.code_challenge, "code_challenge_method": "S256", "id_token_add_organizations": "true", "state": self.state, } return f"{self.issuer}/oauth/authorize?" + urllib.parse.urlencode(params) def _generate_pkce() -> PkceCodes: """Generate PKCE *code_verifier* and *code_challenge* (S256).""" code_verifier = secrets.token_hex(64) digest = hashlib.sha256(code_verifier.encode()).digest() code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode() return PkceCodes(code_verifier, code_challenge) def eprint(*args, **kwargs) -> None: print(*args, file=sys.stderr, **kwargs) LOGIN_SUCCESS_HTML = """ Sign into Codex CLI
Signed in to Codex CLI
""" # Unconditionally call `main()` instead of gating it behind # `if __name__ == "__main__"` because this script is either: # # - invoked as a string passed to `python3 -c` # - run via `python3 login_with_chatgpt.py` for testing as part of local # development main()