From 7f216341657d2080c22b2618dbac854e2228f9a6 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Mon, 18 Aug 2025 17:49:13 -0700 Subject: [PATCH] fix: eliminate ServerOptions.login_timeout and have caller use tokio::time::timeout() instead (#2395) https://github.com/openai/codex/pull/2373 introduced `ServerOptions.login_timeout` and `spawn_timeout_watcher()` to use an extra thread to manage the timeout for the login server. Now that we have asyncified the login stack, we can use `tokio::time::timeout()` from "outside" the login library to manage the timeout rather than having to a commit to a specific "timeout" concept from within. --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/2395). * #2399 * #2398 * #2396 * __->__ #2395 * #2394 * #2393 * #2389 --- codex-rs/login/src/server.rs | 115 +++++------------- codex-rs/login/tests/login_server_e2e.rs | 2 - .../mcp-server/src/codex_message_processor.rs | 20 ++- 3 files changed, 48 insertions(+), 89 deletions(-) diff --git a/codex-rs/login/src/server.rs b/codex-rs/login/src/server.rs index 419874e7..f33f1ae4 100644 --- a/codex-rs/login/src/server.rs +++ b/codex-rs/login/src/server.rs @@ -3,11 +3,7 @@ use std::io::{self}; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; -use std::sync::mpsc; use std::thread; -use std::time::Duration; use crate::AuthDotJson; use crate::get_auth_file; @@ -32,7 +28,6 @@ pub struct ServerOptions { pub port: u16, pub open_browser: bool, pub force_state: Option, - pub login_timeout: Option, } impl ServerOptions { @@ -44,7 +39,6 @@ impl ServerOptions { port: DEFAULT_PORT, open_browser: true, force_state: None, - login_timeout: None, } } } @@ -126,24 +120,8 @@ pub fn run_login_server( if opts.open_browser { let _ = webbrowser::open(&auth_url); } - let shutdown_notify: Arc = - shutdown_flag.unwrap_or_else(|| Arc::new(tokio::sync::Notify::new())); - let shutdown_notify_clone = shutdown_notify.clone(); - let timeout_flag = Arc::new(AtomicBool::new(false)); - - // Channel used to signal completion to timeout watcher. - let (done_tx, done_rx) = mpsc::channel::<()>(); - - if let Some(timeout) = opts.login_timeout { - spawn_timeout_watcher( - done_rx, - timeout, - shutdown_notify.clone(), - timeout_flag.clone(), - server.clone(), - ); - } + // Map blocking reads from server.recv() to an async channel. let (tx, mut rx) = tokio::sync::mpsc::channel::(16); let _server_handle = { let server = server.clone(); @@ -158,59 +136,52 @@ pub fn run_login_server( }) }; - let server_for_task = server.clone(); - let server_handle = tokio::spawn(async move { - loop { - tokio::select! { - _ = shutdown_notify.notified() => { - let _ = done_tx.send(()); - if timeout_flag.load(Ordering::SeqCst) { - return Err(io::Error::other("Login timed out")); - } else { + let shutdown_notify = shutdown_flag.unwrap_or_else(|| Arc::new(tokio::sync::Notify::new())); + let server_handle = { + let shutdown_notify = shutdown_notify.clone(); + let server = server.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = shutdown_notify.notified() => { return Err(io::Error::other("Login was not completed")); } - } - maybe_req = rx.recv() => { - let Some(req) = maybe_req else { - let _ = done_tx.send(()); - if timeout_flag.load(Ordering::SeqCst) { - return Err(io::Error::other("Login timed out")); - } else { + maybe_req = rx.recv() => { + let Some(req) = maybe_req else { return Err(io::Error::other("Login was not completed")); - } - }; + }; - let url_raw = req.url().to_string(); - let response = - process_request(&url_raw, &opts, &redirect_uri, &pkce, actual_port, &state).await; + let url_raw = req.url().to_string(); + let response = + process_request(&url_raw, &opts, &redirect_uri, &pkce, actual_port, &state).await; - let is_login_complete = matches!(response, HandledRequest::ResponseAndExit(_)); - match response { - HandledRequest::Response(r) | HandledRequest::ResponseAndExit(r) => { - let _ = tokio::task::spawn_blocking(move || req.respond(r)).await; + let is_login_complete = matches!(response, HandledRequest::ResponseAndExit(_)); + match response { + HandledRequest::Response(r) | HandledRequest::ResponseAndExit(r) => { + let _ = tokio::task::spawn_blocking(move || req.respond(r)).await; + } + HandledRequest::RedirectWithHeader(header) => { + let redirect = Response::empty(302).with_header(header); + let _ = tokio::task::spawn_blocking(move || req.respond(redirect)).await; + } } - HandledRequest::RedirectWithHeader(header) => { - let redirect = Response::empty(302).with_header(header); - let _ = tokio::task::spawn_blocking(move || req.respond(redirect)).await; - } - } - if is_login_complete { - shutdown_notify.notify_waiters(); - let _ = done_tx.send(()); - server_for_task.unblock(); - return Ok(()); + if is_login_complete { + shutdown_notify.notify_waiters(); + server.unblock(); + return Ok(()); + } } } } - } - }); + }) + }; Ok(LoginServer { - auth_url: auth_url.clone(), + auth_url, actual_port, server_handle, - shutdown_flag: shutdown_notify_clone, + shutdown_flag: shutdown_notify, server, }) } @@ -319,26 +290,6 @@ async fn process_request( } } -/// Spawns a detached thread that waits for either a completion signal on `done_rx` -/// or the specified `timeout` to elapse. If the timeout elapses first it marks -/// the `shutdown_flag`, records `timeout_flag`, and unblocks the HTTP server so -/// that the main server loop can exit promptly. -fn spawn_timeout_watcher( - done_rx: mpsc::Receiver<()>, - timeout: Duration, - shutdown_notify: Arc, - timeout_flag: Arc, - server: Arc, -) { - thread::spawn(move || { - if done_rx.recv_timeout(timeout).is_err() { - timeout_flag.store(true, Ordering::SeqCst); - shutdown_notify.notify_waiters(); - server.unblock(); - } - }); -} - fn build_authorize_url( issuer: &str, client_id: &str, diff --git a/codex-rs/login/tests/login_server_e2e.rs b/codex-rs/login/tests/login_server_e2e.rs index 09a447d5..ef387f57 100644 --- a/codex-rs/login/tests/login_server_e2e.rs +++ b/codex-rs/login/tests/login_server_e2e.rs @@ -100,7 +100,6 @@ async fn end_to_end_login_flow_persists_auth_json() { port: 0, open_browser: false, force_state: Some(state), - login_timeout: None, }; let server = run_login_server(opts, None).unwrap(); let login_port = server.actual_port; @@ -159,7 +158,6 @@ async fn creates_missing_codex_home_dir() { port: 0, open_browser: false, force_state: Some(state), - login_timeout: None, }; let server = run_login_server(opts, None).unwrap(); let login_port = server.actual_port; diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs index 7e5da55a..00c8717c 100644 --- a/codex-rs/mcp-server/src/codex_message_processor.rs +++ b/codex-rs/mcp-server/src/codex_message_processor.rs @@ -146,7 +146,6 @@ impl CodexMessageProcessor { let opts = LoginServerOptions { open_browser: false, - login_timeout: Some(LOGIN_CHATGPT_TIMEOUT), ..LoginServerOptions::new(config.codex_home.clone(), CLIENT_ID.to_string()) }; @@ -158,6 +157,7 @@ impl CodexMessageProcessor { let reply = match run_login_server(opts, None) { Ok(server) => { let login_id = Uuid::new_v4(); + let shutdown_handle = server.cancel_handle(); // Replace active login if present. { @@ -166,7 +166,7 @@ impl CodexMessageProcessor { existing.drop(); } *guard = Some(ActiveLogin { - shutdown_handle: server.cancel_handle(), + shutdown_handle: shutdown_handle.clone(), login_id, }); } @@ -180,9 +180,19 @@ impl CodexMessageProcessor { let outgoing_clone = self.outgoing.clone(); let active_login = self.active_login.clone(); tokio::spawn(async move { - let (success, error_msg) = match server.block_until_done().await { - Ok(()) => (true, None), - Err(err) => (false, Some(format!("Login server error: {err}"))), + let (success, error_msg) = match tokio::time::timeout( + LOGIN_CHATGPT_TIMEOUT, + server.block_until_done(), + ) + .await + { + Ok(Ok(())) => (true, None), + Ok(Err(err)) => (false, Some(format!("Login server error: {err}"))), + Err(_elapsed) => { + // Timeout: cancel server and report + shutdown_handle.cancel(); + (false, Some("Login timed out".to_string())) + } }; let notification = LoginChatGptCompleteNotification { login_id,