Added MCP server command to enable authentication using ChatGPT (#2373)
This PR adds two new APIs for the MCP server: 1) loginChatGpt, and 2) cancelLoginChatGpt. The first starts a login server and returns a local URL that allows for browser-based authentication, and the second provides a way to cancel the login attempt. If the login attempt succeeds, a notification (in the form of an event) is sent to a subscriber. I also added a timeout mechanism for the existing login server. The loginChatGpt code path uses a 10-minute timeout by default, so if the user fails to complete the login flow in that timeframe, the login server automatically shuts down. I tested the timeout code by manually setting the timeout to a much lower number and confirming that it works as expected when used e2e.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_core::CodexConversation;
|
||||
use codex_core::ConversationManager;
|
||||
@@ -14,6 +15,7 @@ use codex_core::protocol::ExecApprovalRequestEvent;
|
||||
use codex_core::protocol::ReviewDecision;
|
||||
use mcp_types::JSONRPCErrorError;
|
||||
use mcp_types::RequestId;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
@@ -36,6 +38,9 @@ use crate::wire_format::ExecCommandApprovalResponse;
|
||||
use crate::wire_format::InputItem as WireInputItem;
|
||||
use crate::wire_format::InterruptConversationParams;
|
||||
use crate::wire_format::InterruptConversationResponse;
|
||||
use crate::wire_format::LOGIN_CHATGPT_COMPLETE_EVENT;
|
||||
use crate::wire_format::LoginChatGptCompleteNotification;
|
||||
use crate::wire_format::LoginChatGptResponse;
|
||||
use crate::wire_format::NewConversationParams;
|
||||
use crate::wire_format::NewConversationResponse;
|
||||
use crate::wire_format::RemoveConversationListenerParams;
|
||||
@@ -46,6 +51,24 @@ use crate::wire_format::SendUserTurnParams;
|
||||
use crate::wire_format::SendUserTurnResponse;
|
||||
use codex_core::protocol::InputItem as CoreInputItem;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_login::CLIENT_ID;
|
||||
use codex_login::ServerOptions as LoginServerOptions;
|
||||
use codex_login::ShutdownHandle;
|
||||
use codex_login::run_login_server;
|
||||
|
||||
// Duration before a ChatGPT login attempt is abandoned.
|
||||
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
|
||||
|
||||
struct ActiveLogin {
|
||||
shutdown_handle: ShutdownHandle,
|
||||
login_id: Uuid,
|
||||
}
|
||||
|
||||
impl ActiveLogin {
|
||||
fn drop(&self) {
|
||||
self.shutdown_handle.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles JSON-RPC messages for Codex conversations.
|
||||
pub(crate) struct CodexMessageProcessor {
|
||||
@@ -53,6 +76,7 @@ pub(crate) struct CodexMessageProcessor {
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
conversation_listeners: HashMap<Uuid, oneshot::Sender<()>>,
|
||||
active_login: Arc<Mutex<Option<ActiveLogin>>>,
|
||||
}
|
||||
|
||||
impl CodexMessageProcessor {
|
||||
@@ -66,6 +90,7 @@ impl CodexMessageProcessor {
|
||||
outgoing,
|
||||
codex_linux_sandbox_exe,
|
||||
conversation_listeners: HashMap::new(),
|
||||
active_login: Arc::new(Mutex::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +117,134 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::RemoveConversationListener { request_id, params } => {
|
||||
self.remove_conversation_listener(request_id, params).await;
|
||||
}
|
||||
ClientRequest::LoginChatGpt { request_id } => {
|
||||
self.login_chatgpt(request_id).await;
|
||||
}
|
||||
ClientRequest::CancelLoginChatGpt { request_id, params } => {
|
||||
self.cancel_login_chatgpt(request_id, params.login_id).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn login_chatgpt(&mut self, request_id: RequestId) {
|
||||
let config =
|
||||
match Config::load_with_cli_overrides(Default::default(), ConfigOverrides::default()) {
|
||||
Ok(cfg) => cfg,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("error loading config for login: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let opts = LoginServerOptions {
|
||||
open_browser: false,
|
||||
login_timeout: Some(LOGIN_CHATGPT_TIMEOUT),
|
||||
..LoginServerOptions::new(config.codex_home.clone(), CLIENT_ID.to_string())
|
||||
};
|
||||
|
||||
enum LoginChatGptReply {
|
||||
Response(LoginChatGptResponse),
|
||||
Error(JSONRPCErrorError),
|
||||
}
|
||||
|
||||
let reply = match run_login_server(opts, None) {
|
||||
Ok(server) => {
|
||||
let login_id = Uuid::new_v4();
|
||||
|
||||
// Replace active login if present.
|
||||
{
|
||||
let mut guard = self.active_login.lock().await;
|
||||
if let Some(existing) = guard.take() {
|
||||
existing.drop();
|
||||
}
|
||||
*guard = Some(ActiveLogin {
|
||||
shutdown_handle: server.cancel_handle(),
|
||||
login_id,
|
||||
});
|
||||
}
|
||||
|
||||
let response = LoginChatGptResponse {
|
||||
login_id,
|
||||
auth_url: server.auth_url.clone(),
|
||||
};
|
||||
|
||||
// Spawn background task to monitor completion.
|
||||
let outgoing_clone = self.outgoing.clone();
|
||||
let active_login = self.active_login.clone();
|
||||
tokio::spawn(async move {
|
||||
let result =
|
||||
tokio::task::spawn_blocking(move || server.block_until_done()).await;
|
||||
let (success, error_msg) = match result {
|
||||
Ok(Ok(())) => (true, None),
|
||||
Ok(Err(err)) => (false, Some(format!("Login server error: {err}"))),
|
||||
Err(join_err) => (
|
||||
false,
|
||||
Some(format!("failed to join login server thread: {join_err}")),
|
||||
),
|
||||
};
|
||||
let notification = LoginChatGptCompleteNotification {
|
||||
login_id,
|
||||
success,
|
||||
error: error_msg,
|
||||
};
|
||||
let params = serde_json::to_value(¬ification).ok();
|
||||
outgoing_clone
|
||||
.send_notification(OutgoingNotification {
|
||||
method: LOGIN_CHATGPT_COMPLETE_EVENT.to_string(),
|
||||
params,
|
||||
})
|
||||
.await;
|
||||
|
||||
// Clear the active login if it matches this attempt. It may have been replaced or cancelled.
|
||||
let mut guard = active_login.lock().await;
|
||||
if guard.as_ref().map(|l| l.login_id) == Some(login_id) {
|
||||
*guard = None;
|
||||
}
|
||||
});
|
||||
|
||||
LoginChatGptReply::Response(response)
|
||||
}
|
||||
Err(err) => LoginChatGptReply::Error(JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to start login server: {err}"),
|
||||
data: None,
|
||||
}),
|
||||
};
|
||||
|
||||
match reply {
|
||||
LoginChatGptReply::Response(resp) => {
|
||||
self.outgoing.send_response(request_id, resp).await
|
||||
}
|
||||
LoginChatGptReply::Error(err) => self.outgoing.send_error(request_id, err).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn cancel_login_chatgpt(&mut self, request_id: RequestId, login_id: Uuid) {
|
||||
let mut guard = self.active_login.lock().await;
|
||||
if guard.as_ref().map(|l| l.login_id) == Some(login_id) {
|
||||
if let Some(active) = guard.take() {
|
||||
active.drop();
|
||||
}
|
||||
drop(guard);
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
crate::wire_format::CancelLoginChatGptResponse {},
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
drop(guard);
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("login id not found: {login_id}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user