fix: eliminate ServerOptions.login_timeout and have caller use tokio::time::timeout() instead (#2395)

https://github.com/openai/codex/pull/2373 introduced
`ServerOptions.login_timeout` and `spawn_timeout_watcher()` to use an
extra thread to manage the timeout for the login server. Now that we
have asyncified the login stack, we can use `tokio::time::timeout()`
from "outside" the login library to manage the timeout rather than
having to a commit to a specific "timeout" concept from within.

---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/2395).
* #2399
* #2398
* #2396
* __->__ #2395
* #2394
* #2393
* #2389
This commit is contained in:
Michael Bolin
2025-08-18 17:49:13 -07:00
committed by GitHub
parent 5b1989f4d7
commit 7f21634165
3 changed files with 48 additions and 89 deletions

View File

@@ -3,11 +3,7 @@ use std::io::{self};
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use crate::AuthDotJson;
use crate::get_auth_file;
@@ -32,7 +28,6 @@ pub struct ServerOptions {
pub port: u16,
pub open_browser: bool,
pub force_state: Option<String>,
pub login_timeout: Option<Duration>,
}
impl ServerOptions {
@@ -44,7 +39,6 @@ impl ServerOptions {
port: DEFAULT_PORT,
open_browser: true,
force_state: None,
login_timeout: None,
}
}
}
@@ -126,24 +120,8 @@ pub fn run_login_server(
if opts.open_browser {
let _ = webbrowser::open(&auth_url);
}
let shutdown_notify: Arc<tokio::sync::Notify> =
shutdown_flag.unwrap_or_else(|| Arc::new(tokio::sync::Notify::new()));
let shutdown_notify_clone = shutdown_notify.clone();
let timeout_flag = Arc::new(AtomicBool::new(false));
// Channel used to signal completion to timeout watcher.
let (done_tx, done_rx) = mpsc::channel::<()>();
if let Some(timeout) = opts.login_timeout {
spawn_timeout_watcher(
done_rx,
timeout,
shutdown_notify.clone(),
timeout_flag.clone(),
server.clone(),
);
}
// Map blocking reads from server.recv() to an async channel.
let (tx, mut rx) = tokio::sync::mpsc::channel::<Request>(16);
let _server_handle = {
let server = server.clone();
@@ -158,59 +136,52 @@ pub fn run_login_server(
})
};
let server_for_task = server.clone();
let server_handle = tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_notify.notified() => {
let _ = done_tx.send(());
if timeout_flag.load(Ordering::SeqCst) {
return Err(io::Error::other("Login timed out"));
} else {
let shutdown_notify = shutdown_flag.unwrap_or_else(|| Arc::new(tokio::sync::Notify::new()));
let server_handle = {
let shutdown_notify = shutdown_notify.clone();
let server = server.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_notify.notified() => {
return Err(io::Error::other("Login was not completed"));
}
}
maybe_req = rx.recv() => {
let Some(req) = maybe_req else {
let _ = done_tx.send(());
if timeout_flag.load(Ordering::SeqCst) {
return Err(io::Error::other("Login timed out"));
} else {
maybe_req = rx.recv() => {
let Some(req) = maybe_req else {
return Err(io::Error::other("Login was not completed"));
}
};
};
let url_raw = req.url().to_string();
let response =
process_request(&url_raw, &opts, &redirect_uri, &pkce, actual_port, &state).await;
let url_raw = req.url().to_string();
let response =
process_request(&url_raw, &opts, &redirect_uri, &pkce, actual_port, &state).await;
let is_login_complete = matches!(response, HandledRequest::ResponseAndExit(_));
match response {
HandledRequest::Response(r) | HandledRequest::ResponseAndExit(r) => {
let _ = tokio::task::spawn_blocking(move || req.respond(r)).await;
let is_login_complete = matches!(response, HandledRequest::ResponseAndExit(_));
match response {
HandledRequest::Response(r) | HandledRequest::ResponseAndExit(r) => {
let _ = tokio::task::spawn_blocking(move || req.respond(r)).await;
}
HandledRequest::RedirectWithHeader(header) => {
let redirect = Response::empty(302).with_header(header);
let _ = tokio::task::spawn_blocking(move || req.respond(redirect)).await;
}
}
HandledRequest::RedirectWithHeader(header) => {
let redirect = Response::empty(302).with_header(header);
let _ = tokio::task::spawn_blocking(move || req.respond(redirect)).await;
}
}
if is_login_complete {
shutdown_notify.notify_waiters();
let _ = done_tx.send(());
server_for_task.unblock();
return Ok(());
if is_login_complete {
shutdown_notify.notify_waiters();
server.unblock();
return Ok(());
}
}
}
}
}
});
})
};
Ok(LoginServer {
auth_url: auth_url.clone(),
auth_url,
actual_port,
server_handle,
shutdown_flag: shutdown_notify_clone,
shutdown_flag: shutdown_notify,
server,
})
}
@@ -319,26 +290,6 @@ async fn process_request(
}
}
/// Spawns a detached thread that waits for either a completion signal on `done_rx`
/// or the specified `timeout` to elapse. If the timeout elapses first it marks
/// the `shutdown_flag`, records `timeout_flag`, and unblocks the HTTP server so
/// that the main server loop can exit promptly.
fn spawn_timeout_watcher(
done_rx: mpsc::Receiver<()>,
timeout: Duration,
shutdown_notify: Arc<tokio::sync::Notify>,
timeout_flag: Arc<AtomicBool>,
server: Arc<Server>,
) {
thread::spawn(move || {
if done_rx.recv_timeout(timeout).is_err() {
timeout_flag.store(true, Ordering::SeqCst);
shutdown_notify.notify_waiters();
server.unblock();
}
});
}
fn build_authorize_url(
issuer: &str,
client_id: &str,

View File

@@ -100,7 +100,6 @@ async fn end_to_end_login_flow_persists_auth_json() {
port: 0,
open_browser: false,
force_state: Some(state),
login_timeout: None,
};
let server = run_login_server(opts, None).unwrap();
let login_port = server.actual_port;
@@ -159,7 +158,6 @@ async fn creates_missing_codex_home_dir() {
port: 0,
open_browser: false,
force_state: Some(state),
login_timeout: None,
};
let server = run_login_server(opts, None).unwrap();
let login_port = server.actual_port;

View File

@@ -146,7 +146,6 @@ impl CodexMessageProcessor {
let opts = LoginServerOptions {
open_browser: false,
login_timeout: Some(LOGIN_CHATGPT_TIMEOUT),
..LoginServerOptions::new(config.codex_home.clone(), CLIENT_ID.to_string())
};
@@ -158,6 +157,7 @@ impl CodexMessageProcessor {
let reply = match run_login_server(opts, None) {
Ok(server) => {
let login_id = Uuid::new_v4();
let shutdown_handle = server.cancel_handle();
// Replace active login if present.
{
@@ -166,7 +166,7 @@ impl CodexMessageProcessor {
existing.drop();
}
*guard = Some(ActiveLogin {
shutdown_handle: server.cancel_handle(),
shutdown_handle: shutdown_handle.clone(),
login_id,
});
}
@@ -180,9 +180,19 @@ impl CodexMessageProcessor {
let outgoing_clone = self.outgoing.clone();
let active_login = self.active_login.clone();
tokio::spawn(async move {
let (success, error_msg) = match server.block_until_done().await {
Ok(()) => (true, None),
Err(err) => (false, Some(format!("Login server error: {err}"))),
let (success, error_msg) = match tokio::time::timeout(
LOGIN_CHATGPT_TIMEOUT,
server.block_until_done(),
)
.await
{
Ok(Ok(())) => (true, None),
Ok(Err(err)) => (false, Some(format!("Login server error: {err}"))),
Err(_elapsed) => {
// Timeout: cancel server and report
shutdown_handle.cancel();
(false, Some("Login timed out".to_string()))
}
};
let notification = LoginChatGptCompleteNotification {
login_id,