diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index 6fac42d5..87b60e01 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -145,8 +145,15 @@ impl ConversationManager { self.finalize_spawn(codex, conversation_id).await } - pub async fn remove_conversation(&self, conversation_id: ConversationId) { - self.conversations.write().await.remove(&conversation_id); + /// Removes the conversation from the manager's internal map, though the + /// conversation is stored as `Arc`, it is possible that + /// other references to it exist elsewhere. Returns the conversation if the + /// conversation was found and removed. + pub async fn remove_conversation( + &self, + conversation_id: &ConversationId, + ) -> Option> { + self.conversations.write().await.remove(conversation_id) } /// Fork an existing conversation by dropping the last `drop_last_messages` diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 3b47830c..b8cd7bc5 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -61,7 +61,9 @@ pub mod spawn; pub mod terminal; mod tool_apply_patch; pub mod turn_diff_tracker; +pub use rollout::ARCHIVED_SESSIONS_SUBDIR; pub use rollout::RolloutRecorder; +pub use rollout::SESSIONS_SUBDIR; pub use rollout::SessionMeta; pub use rollout::list::ConversationItem; pub use rollout::list::ConversationsPage; diff --git a/codex-rs/core/src/rollout/mod.rs b/codex-rs/core/src/rollout/mod.rs index 4883517c..a78d7609 100644 --- a/codex-rs/core/src/rollout/mod.rs +++ b/codex-rs/core/src/rollout/mod.rs @@ -1,6 +1,7 @@ //! Rollout module: persistence and discovery of session rollout files. -pub(crate) const SESSIONS_SUBDIR: &str = "sessions"; +pub const SESSIONS_SUBDIR: &str = "sessions"; +pub const ARCHIVED_SESSIONS_SUBDIR: &str = "archived_sessions"; pub mod list; pub(crate) mod policy; diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs index 2169a8f2..e6133e17 100644 --- a/codex-rs/mcp-server/src/codex_message_processor.rs +++ b/codex-rs/mcp-server/src/codex_message_processor.rs @@ -35,6 +35,8 @@ use codex_protocol::mcp_protocol::AddConversationListenerParams; use codex_protocol::mcp_protocol::AddConversationSubscriptionResponse; use codex_protocol::mcp_protocol::ApplyPatchApprovalParams; use codex_protocol::mcp_protocol::ApplyPatchApprovalResponse; +use codex_protocol::mcp_protocol::ArchiveConversationParams; +use codex_protocol::mcp_protocol::ArchiveConversationResponse; use codex_protocol::mcp_protocol::AuthMode; use codex_protocol::mcp_protocol::AuthStatusChangeNotification; use codex_protocol::mcp_protocol::ClientRequest; @@ -73,12 +75,16 @@ use codex_protocol::protocol::USER_MESSAGE_BEGIN; use mcp_types::JSONRPCErrorError; use mcp_types::RequestId; use std::collections::HashMap; +use std::ffi::OsStr; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use tokio::select; use tokio::sync::Mutex; use tokio::sync::oneshot; use tracing::error; +use tracing::info; +use tracing::warn; use uuid::Uuid; // Duration before a ChatGPT login attempt is abandoned. @@ -142,6 +148,9 @@ impl CodexMessageProcessor { ClientRequest::ResumeConversation { request_id, params } => { self.handle_resume_conversation(request_id, params).await; } + ClientRequest::ArchiveConversation { request_id, params } => { + self.archive_conversation(request_id, params).await; + } ClientRequest::SendUserMessage { request_id, params } => { self.send_user_message(request_id, params).await; } @@ -670,6 +679,141 @@ impl CodexMessageProcessor { } } + async fn archive_conversation(&self, request_id: RequestId, params: ArchiveConversationParams) { + let ArchiveConversationParams { + conversation_id, + rollout_path, + } = params; + + // Verify that the rollout path is in the sessions directory or else + // a malicious client could specify an arbitrary path. + let rollout_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR); + let canonical_rollout_path = tokio::fs::canonicalize(&rollout_path).await; + let canonical_rollout_path = if let Ok(path) = canonical_rollout_path + && path.starts_with(&rollout_folder) + { + path + } else { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!( + "rollout path `{}` must be in sessions directory", + rollout_path.display() + ), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + }; + + let required_suffix = format!("{}.jsonl", conversation_id.0); + let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!( + "rollout path `{}` missing file name", + rollout_path.display() + ), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + }; + + if !file_name + .to_string_lossy() + .ends_with(required_suffix.as_str()) + { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!( + "rollout path `{}` does not match conversation id {conversation_id}", + rollout_path.display() + ), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + + let removed_conversation = self + .conversation_manager + .remove_conversation(&conversation_id) + .await; + if let Some(conversation) = removed_conversation { + info!("conversation {conversation_id} was active; shutting down"); + let conversation_clone = conversation.clone(); + let notify = Arc::new(tokio::sync::Notify::new()); + let notify_clone = notify.clone(); + + // Establish the listener for ShutdownComplete before submitting + // Shutdown so it is not missed. + let is_shutdown = tokio::spawn(async move { + loop { + select! { + _ = notify_clone.notified() => { + break; + } + event = conversation_clone.next_event() => { + if let Ok(event) = event && matches!(event.msg, EventMsg::ShutdownComplete) { + break; + } + } + } + } + }); + + // Request shutdown. + match conversation.submit(Op::Shutdown).await { + Ok(_) => { + // Successfully submitted Shutdown; wait before proceeding. + select! { + _ = is_shutdown => { + // Normal shutdown: proceed with archive. + } + _ = tokio::time::sleep(Duration::from_secs(10)) => { + warn!("conversation {conversation_id} shutdown timed out; proceeding with archive"); + notify.notify_one(); + } + } + } + Err(err) => { + error!("failed to submit Shutdown to conversation {conversation_id}: {err}"); + notify.notify_one(); + // Perhaps we lost a shutdown race, so let's continue to + // clean up the .jsonl file. + } + } + } + + // Move the .jsonl file to the archived sessions subdir. + let result: std::io::Result<()> = async { + let archive_folder = self + .config + .codex_home + .join(codex_core::ARCHIVED_SESSIONS_SUBDIR); + tokio::fs::create_dir_all(&archive_folder).await?; + tokio::fs::rename(&canonical_rollout_path, &archive_folder.join(&file_name)).await?; + Ok(()) + } + .await; + + match result { + Ok(()) => { + let response = ArchiveConversationResponse {}; + self.outgoing.send_response(request_id, response).await; + } + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to archive conversation: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) { let SendUserMessageParams { conversation_id, diff --git a/codex-rs/mcp-server/tests/common/mcp_process.rs b/codex-rs/mcp-server/tests/common/mcp_process.rs index cebc332a..64f2cc38 100644 --- a/codex-rs/mcp-server/tests/common/mcp_process.rs +++ b/codex-rs/mcp-server/tests/common/mcp_process.rs @@ -13,6 +13,7 @@ use anyhow::Context; use assert_cmd::prelude::*; use codex_mcp_server::CodexToolCallParam; use codex_protocol::mcp_protocol::AddConversationListenerParams; +use codex_protocol::mcp_protocol::ArchiveConversationParams; use codex_protocol::mcp_protocol::CancelLoginChatGptParams; use codex_protocol::mcp_protocol::GetAuthStatusParams; use codex_protocol::mcp_protocol::InterruptConversationParams; @@ -186,6 +187,15 @@ impl McpProcess { self.send_request("newConversation", params).await } + /// Send an `archiveConversation` JSON-RPC request. + pub async fn send_archive_conversation_request( + &mut self, + params: ArchiveConversationParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("archiveConversation", params).await + } + /// Send an `addConversationListener` JSON-RPC request. pub async fn send_add_conversation_listener_request( &mut self, diff --git a/codex-rs/mcp-server/tests/suite/archive_conversation.rs b/codex-rs/mcp-server/tests/suite/archive_conversation.rs new file mode 100644 index 00000000..e54a9989 --- /dev/null +++ b/codex-rs/mcp-server/tests/suite/archive_conversation.rs @@ -0,0 +1,105 @@ +use std::path::Path; + +use codex_core::ARCHIVED_SESSIONS_SUBDIR; +use codex_protocol::mcp_protocol::ArchiveConversationParams; +use codex_protocol::mcp_protocol::ArchiveConversationResponse; +use codex_protocol::mcp_protocol::NewConversationParams; +use codex_protocol::mcp_protocol::NewConversationResponse; +use mcp_test_support::McpProcess; +use mcp_test_support::to_response; +use mcp_types::JSONRPCResponse; +use mcp_types::RequestId; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn archive_conversation_moves_rollout_into_archived_directory() { + let codex_home = TempDir::new().expect("create temp dir"); + create_config_toml(codex_home.path()).expect("write config.toml"); + + let mut mcp = McpProcess::new(codex_home.path()) + .await + .expect("spawn mcp process"); + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) + .await + .expect("initialize timeout") + .expect("initialize request"); + + let new_request_id = mcp + .send_new_conversation_request(NewConversationParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await + .expect("send newConversation"); + let new_response: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(new_request_id)), + ) + .await + .expect("newConversation timeout") + .expect("newConversation response"); + + let NewConversationResponse { + conversation_id, + rollout_path, + .. + } = to_response::(new_response) + .expect("deserialize newConversation response"); + + assert!( + rollout_path.exists(), + "expected rollout path {} to exist", + rollout_path.display() + ); + + let archive_request_id = mcp + .send_archive_conversation_request(ArchiveConversationParams { + conversation_id, + rollout_path: rollout_path.clone(), + }) + .await + .expect("send archiveConversation"); + let archive_response: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(archive_request_id)), + ) + .await + .expect("archiveConversation timeout") + .expect("archiveConversation response"); + + let _: ArchiveConversationResponse = + to_response::(archive_response) + .expect("deserialize archiveConversation response"); + + let archived_directory = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR); + let archived_rollout_path = + archived_directory.join(rollout_path.file_name().unwrap_or_else(|| { + panic!("rollout path {} missing file name", rollout_path.display()) + })); + + assert!( + !rollout_path.exists(), + "expected rollout path {} to be moved", + rollout_path.display() + ); + assert!( + archived_rollout_path.exists(), + "expected archived rollout path {} to exist", + archived_rollout_path.display() + ); +} + +fn create_config_toml(codex_home: &Path) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write(config_toml, config_contents()) +} + +fn config_contents() -> &'static str { + r#"model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" +"# +} diff --git a/codex-rs/mcp-server/tests/suite/mod.rs b/codex-rs/mcp-server/tests/suite/mod.rs index 4a9220da..7f89cc33 100644 --- a/codex-rs/mcp-server/tests/suite/mod.rs +++ b/codex-rs/mcp-server/tests/suite/mod.rs @@ -1,4 +1,5 @@ // Aggregates all former standalone integration tests as modules. +mod archive_conversation; mod auth; mod codex_message_processor_flow; mod codex_tool; diff --git a/codex-rs/protocol-ts/src/lib.rs b/codex-rs/protocol-ts/src/lib.rs index d5645147..5d8d4b17 100644 --- a/codex-rs/protocol-ts/src/lib.rs +++ b/codex-rs/protocol-ts/src/lib.rs @@ -39,6 +39,7 @@ pub fn generate_ts(out_dir: &Path, prettier: Option<&Path>) -> Result<()> { codex_protocol::mcp_protocol::ServerNotification::export_all_to(out_dir)?; codex_protocol::mcp_protocol::ListConversationsResponse::export_all_to(out_dir)?; codex_protocol::mcp_protocol::ResumeConversationResponse::export_all_to(out_dir)?; + codex_protocol::mcp_protocol::ArchiveConversationResponse::export_all_to(out_dir)?; generate_index_ts(out_dir)?; diff --git a/codex-rs/protocol/src/mcp_protocol.rs b/codex-rs/protocol/src/mcp_protocol.rs index 70e33403..00391c7d 100644 --- a/codex-rs/protocol/src/mcp_protocol.rs +++ b/codex-rs/protocol/src/mcp_protocol.rs @@ -91,6 +91,11 @@ pub enum ClientRequest { request_id: RequestId, params: ResumeConversationParams, }, + ArchiveConversation { + #[serde(rename = "id")] + request_id: RequestId, + params: ArchiveConversationParams, + }, SendUserMessage { #[serde(rename = "id")] request_id: RequestId, @@ -263,6 +268,18 @@ pub struct AddConversationSubscriptionResponse { pub subscription_id: Uuid, } +/// The [`ConversationId`] must match the `rollout_path`. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] +#[serde(rename_all = "camelCase")] +pub struct ArchiveConversationParams { + pub conversation_id: ConversationId, + pub rollout_path: PathBuf, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] +#[serde(rename_all = "camelCase")] +pub struct ArchiveConversationResponse {} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] #[serde(rename_all = "camelCase")] pub struct RemoveConversationSubscriptionResponse {}