feat: add ArchiveConversation to ClientRequest (#3353)
Adds support for `ArchiveConversation` in the JSON-RPC server that takes a `(ConversationId, PathBuf)` pair and: - verifies the `ConversationId` corresponds to the rollout id at the `PathBuf` - if so, invokes `ConversationManager.remove_conversation(ConversationId)` - if the `CodexConversation` was in memory, send `Shutdown` and wait for `ShutdownComplete` with a timeout - moves the `.jsonl` file to `$CODEX_HOME/archived_sessions` --------- Co-authored-by: Gabriel Peal <gabriel@openai.com>
This commit is contained in:
@@ -145,8 +145,15 @@ impl ConversationManager {
|
|||||||
self.finalize_spawn(codex, conversation_id).await
|
self.finalize_spawn(codex, conversation_id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn remove_conversation(&self, conversation_id: ConversationId) {
|
/// Removes the conversation from the manager's internal map, though the
|
||||||
self.conversations.write().await.remove(&conversation_id);
|
/// conversation is stored as `Arc<CodexConversation>`, it is possible that
|
||||||
|
/// other references to it exist elsewhere. Returns the conversation if the
|
||||||
|
/// conversation was found and removed.
|
||||||
|
pub async fn remove_conversation(
|
||||||
|
&self,
|
||||||
|
conversation_id: &ConversationId,
|
||||||
|
) -> Option<Arc<CodexConversation>> {
|
||||||
|
self.conversations.write().await.remove(conversation_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fork an existing conversation by dropping the last `drop_last_messages`
|
/// Fork an existing conversation by dropping the last `drop_last_messages`
|
||||||
|
|||||||
@@ -61,7 +61,9 @@ pub mod spawn;
|
|||||||
pub mod terminal;
|
pub mod terminal;
|
||||||
mod tool_apply_patch;
|
mod tool_apply_patch;
|
||||||
pub mod turn_diff_tracker;
|
pub mod turn_diff_tracker;
|
||||||
|
pub use rollout::ARCHIVED_SESSIONS_SUBDIR;
|
||||||
pub use rollout::RolloutRecorder;
|
pub use rollout::RolloutRecorder;
|
||||||
|
pub use rollout::SESSIONS_SUBDIR;
|
||||||
pub use rollout::SessionMeta;
|
pub use rollout::SessionMeta;
|
||||||
pub use rollout::list::ConversationItem;
|
pub use rollout::list::ConversationItem;
|
||||||
pub use rollout::list::ConversationsPage;
|
pub use rollout::list::ConversationsPage;
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
//! Rollout module: persistence and discovery of session rollout files.
|
//! Rollout module: persistence and discovery of session rollout files.
|
||||||
|
|
||||||
pub(crate) const SESSIONS_SUBDIR: &str = "sessions";
|
pub const SESSIONS_SUBDIR: &str = "sessions";
|
||||||
|
pub const ARCHIVED_SESSIONS_SUBDIR: &str = "archived_sessions";
|
||||||
|
|
||||||
pub mod list;
|
pub mod list;
|
||||||
pub(crate) mod policy;
|
pub(crate) mod policy;
|
||||||
|
|||||||
@@ -35,6 +35,8 @@ use codex_protocol::mcp_protocol::AddConversationListenerParams;
|
|||||||
use codex_protocol::mcp_protocol::AddConversationSubscriptionResponse;
|
use codex_protocol::mcp_protocol::AddConversationSubscriptionResponse;
|
||||||
use codex_protocol::mcp_protocol::ApplyPatchApprovalParams;
|
use codex_protocol::mcp_protocol::ApplyPatchApprovalParams;
|
||||||
use codex_protocol::mcp_protocol::ApplyPatchApprovalResponse;
|
use codex_protocol::mcp_protocol::ApplyPatchApprovalResponse;
|
||||||
|
use codex_protocol::mcp_protocol::ArchiveConversationParams;
|
||||||
|
use codex_protocol::mcp_protocol::ArchiveConversationResponse;
|
||||||
use codex_protocol::mcp_protocol::AuthMode;
|
use codex_protocol::mcp_protocol::AuthMode;
|
||||||
use codex_protocol::mcp_protocol::AuthStatusChangeNotification;
|
use codex_protocol::mcp_protocol::AuthStatusChangeNotification;
|
||||||
use codex_protocol::mcp_protocol::ClientRequest;
|
use codex_protocol::mcp_protocol::ClientRequest;
|
||||||
@@ -73,12 +75,16 @@ use codex_protocol::protocol::USER_MESSAGE_BEGIN;
|
|||||||
use mcp_types::JSONRPCErrorError;
|
use mcp_types::JSONRPCErrorError;
|
||||||
use mcp_types::RequestId;
|
use mcp_types::RequestId;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::ffi::OsStr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use tokio::select;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tracing::error;
|
use tracing::error;
|
||||||
|
use tracing::info;
|
||||||
|
use tracing::warn;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
// Duration before a ChatGPT login attempt is abandoned.
|
// Duration before a ChatGPT login attempt is abandoned.
|
||||||
@@ -142,6 +148,9 @@ impl CodexMessageProcessor {
|
|||||||
ClientRequest::ResumeConversation { request_id, params } => {
|
ClientRequest::ResumeConversation { request_id, params } => {
|
||||||
self.handle_resume_conversation(request_id, params).await;
|
self.handle_resume_conversation(request_id, params).await;
|
||||||
}
|
}
|
||||||
|
ClientRequest::ArchiveConversation { request_id, params } => {
|
||||||
|
self.archive_conversation(request_id, params).await;
|
||||||
|
}
|
||||||
ClientRequest::SendUserMessage { request_id, params } => {
|
ClientRequest::SendUserMessage { request_id, params } => {
|
||||||
self.send_user_message(request_id, params).await;
|
self.send_user_message(request_id, params).await;
|
||||||
}
|
}
|
||||||
@@ -670,6 +679,141 @@ impl CodexMessageProcessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn archive_conversation(&self, request_id: RequestId, params: ArchiveConversationParams) {
|
||||||
|
let ArchiveConversationParams {
|
||||||
|
conversation_id,
|
||||||
|
rollout_path,
|
||||||
|
} = params;
|
||||||
|
|
||||||
|
// Verify that the rollout path is in the sessions directory or else
|
||||||
|
// a malicious client could specify an arbitrary path.
|
||||||
|
let rollout_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR);
|
||||||
|
let canonical_rollout_path = tokio::fs::canonicalize(&rollout_path).await;
|
||||||
|
let canonical_rollout_path = if let Ok(path) = canonical_rollout_path
|
||||||
|
&& path.starts_with(&rollout_folder)
|
||||||
|
{
|
||||||
|
path
|
||||||
|
} else {
|
||||||
|
let error = JSONRPCErrorError {
|
||||||
|
code: INVALID_REQUEST_ERROR_CODE,
|
||||||
|
message: format!(
|
||||||
|
"rollout path `{}` must be in sessions directory",
|
||||||
|
rollout_path.display()
|
||||||
|
),
|
||||||
|
data: None,
|
||||||
|
};
|
||||||
|
self.outgoing.send_error(request_id, error).await;
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let required_suffix = format!("{}.jsonl", conversation_id.0);
|
||||||
|
let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else {
|
||||||
|
let error = JSONRPCErrorError {
|
||||||
|
code: INVALID_REQUEST_ERROR_CODE,
|
||||||
|
message: format!(
|
||||||
|
"rollout path `{}` missing file name",
|
||||||
|
rollout_path.display()
|
||||||
|
),
|
||||||
|
data: None,
|
||||||
|
};
|
||||||
|
self.outgoing.send_error(request_id, error).await;
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if !file_name
|
||||||
|
.to_string_lossy()
|
||||||
|
.ends_with(required_suffix.as_str())
|
||||||
|
{
|
||||||
|
let error = JSONRPCErrorError {
|
||||||
|
code: INVALID_REQUEST_ERROR_CODE,
|
||||||
|
message: format!(
|
||||||
|
"rollout path `{}` does not match conversation id {conversation_id}",
|
||||||
|
rollout_path.display()
|
||||||
|
),
|
||||||
|
data: None,
|
||||||
|
};
|
||||||
|
self.outgoing.send_error(request_id, error).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let removed_conversation = self
|
||||||
|
.conversation_manager
|
||||||
|
.remove_conversation(&conversation_id)
|
||||||
|
.await;
|
||||||
|
if let Some(conversation) = removed_conversation {
|
||||||
|
info!("conversation {conversation_id} was active; shutting down");
|
||||||
|
let conversation_clone = conversation.clone();
|
||||||
|
let notify = Arc::new(tokio::sync::Notify::new());
|
||||||
|
let notify_clone = notify.clone();
|
||||||
|
|
||||||
|
// Establish the listener for ShutdownComplete before submitting
|
||||||
|
// Shutdown so it is not missed.
|
||||||
|
let is_shutdown = tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
_ = notify_clone.notified() => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
event = conversation_clone.next_event() => {
|
||||||
|
if let Ok(event) = event && matches!(event.msg, EventMsg::ShutdownComplete) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Request shutdown.
|
||||||
|
match conversation.submit(Op::Shutdown).await {
|
||||||
|
Ok(_) => {
|
||||||
|
// Successfully submitted Shutdown; wait before proceeding.
|
||||||
|
select! {
|
||||||
|
_ = is_shutdown => {
|
||||||
|
// Normal shutdown: proceed with archive.
|
||||||
|
}
|
||||||
|
_ = tokio::time::sleep(Duration::from_secs(10)) => {
|
||||||
|
warn!("conversation {conversation_id} shutdown timed out; proceeding with archive");
|
||||||
|
notify.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
|
||||||
|
notify.notify_one();
|
||||||
|
// Perhaps we lost a shutdown race, so let's continue to
|
||||||
|
// clean up the .jsonl file.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move the .jsonl file to the archived sessions subdir.
|
||||||
|
let result: std::io::Result<()> = async {
|
||||||
|
let archive_folder = self
|
||||||
|
.config
|
||||||
|
.codex_home
|
||||||
|
.join(codex_core::ARCHIVED_SESSIONS_SUBDIR);
|
||||||
|
tokio::fs::create_dir_all(&archive_folder).await?;
|
||||||
|
tokio::fs::rename(&canonical_rollout_path, &archive_folder.join(&file_name)).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(()) => {
|
||||||
|
let response = ArchiveConversationResponse {};
|
||||||
|
self.outgoing.send_response(request_id, response).await;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
let error = JSONRPCErrorError {
|
||||||
|
code: INTERNAL_ERROR_CODE,
|
||||||
|
message: format!("failed to archive conversation: {err}"),
|
||||||
|
data: None,
|
||||||
|
};
|
||||||
|
self.outgoing.send_error(request_id, error).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) {
|
async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) {
|
||||||
let SendUserMessageParams {
|
let SendUserMessageParams {
|
||||||
conversation_id,
|
conversation_id,
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ use anyhow::Context;
|
|||||||
use assert_cmd::prelude::*;
|
use assert_cmd::prelude::*;
|
||||||
use codex_mcp_server::CodexToolCallParam;
|
use codex_mcp_server::CodexToolCallParam;
|
||||||
use codex_protocol::mcp_protocol::AddConversationListenerParams;
|
use codex_protocol::mcp_protocol::AddConversationListenerParams;
|
||||||
|
use codex_protocol::mcp_protocol::ArchiveConversationParams;
|
||||||
use codex_protocol::mcp_protocol::CancelLoginChatGptParams;
|
use codex_protocol::mcp_protocol::CancelLoginChatGptParams;
|
||||||
use codex_protocol::mcp_protocol::GetAuthStatusParams;
|
use codex_protocol::mcp_protocol::GetAuthStatusParams;
|
||||||
use codex_protocol::mcp_protocol::InterruptConversationParams;
|
use codex_protocol::mcp_protocol::InterruptConversationParams;
|
||||||
@@ -186,6 +187,15 @@ impl McpProcess {
|
|||||||
self.send_request("newConversation", params).await
|
self.send_request("newConversation", params).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send an `archiveConversation` JSON-RPC request.
|
||||||
|
pub async fn send_archive_conversation_request(
|
||||||
|
&mut self,
|
||||||
|
params: ArchiveConversationParams,
|
||||||
|
) -> anyhow::Result<i64> {
|
||||||
|
let params = Some(serde_json::to_value(params)?);
|
||||||
|
self.send_request("archiveConversation", params).await
|
||||||
|
}
|
||||||
|
|
||||||
/// Send an `addConversationListener` JSON-RPC request.
|
/// Send an `addConversationListener` JSON-RPC request.
|
||||||
pub async fn send_add_conversation_listener_request(
|
pub async fn send_add_conversation_listener_request(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
|||||||
105
codex-rs/mcp-server/tests/suite/archive_conversation.rs
Normal file
105
codex-rs/mcp-server/tests/suite/archive_conversation.rs
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
|
||||||
|
use codex_protocol::mcp_protocol::ArchiveConversationParams;
|
||||||
|
use codex_protocol::mcp_protocol::ArchiveConversationResponse;
|
||||||
|
use codex_protocol::mcp_protocol::NewConversationParams;
|
||||||
|
use codex_protocol::mcp_protocol::NewConversationResponse;
|
||||||
|
use mcp_test_support::McpProcess;
|
||||||
|
use mcp_test_support::to_response;
|
||||||
|
use mcp_types::JSONRPCResponse;
|
||||||
|
use mcp_types::RequestId;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
use tokio::time::timeout;
|
||||||
|
|
||||||
|
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn archive_conversation_moves_rollout_into_archived_directory() {
|
||||||
|
let codex_home = TempDir::new().expect("create temp dir");
|
||||||
|
create_config_toml(codex_home.path()).expect("write config.toml");
|
||||||
|
|
||||||
|
let mut mcp = McpProcess::new(codex_home.path())
|
||||||
|
.await
|
||||||
|
.expect("spawn mcp process");
|
||||||
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
|
||||||
|
.await
|
||||||
|
.expect("initialize timeout")
|
||||||
|
.expect("initialize request");
|
||||||
|
|
||||||
|
let new_request_id = mcp
|
||||||
|
.send_new_conversation_request(NewConversationParams {
|
||||||
|
model: Some("mock-model".to_string()),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("send newConversation");
|
||||||
|
let new_response: JSONRPCResponse = timeout(
|
||||||
|
DEFAULT_READ_TIMEOUT,
|
||||||
|
mcp.read_stream_until_response_message(RequestId::Integer(new_request_id)),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("newConversation timeout")
|
||||||
|
.expect("newConversation response");
|
||||||
|
|
||||||
|
let NewConversationResponse {
|
||||||
|
conversation_id,
|
||||||
|
rollout_path,
|
||||||
|
..
|
||||||
|
} = to_response::<NewConversationResponse>(new_response)
|
||||||
|
.expect("deserialize newConversation response");
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
rollout_path.exists(),
|
||||||
|
"expected rollout path {} to exist",
|
||||||
|
rollout_path.display()
|
||||||
|
);
|
||||||
|
|
||||||
|
let archive_request_id = mcp
|
||||||
|
.send_archive_conversation_request(ArchiveConversationParams {
|
||||||
|
conversation_id,
|
||||||
|
rollout_path: rollout_path.clone(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("send archiveConversation");
|
||||||
|
let archive_response: JSONRPCResponse = timeout(
|
||||||
|
DEFAULT_READ_TIMEOUT,
|
||||||
|
mcp.read_stream_until_response_message(RequestId::Integer(archive_request_id)),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("archiveConversation timeout")
|
||||||
|
.expect("archiveConversation response");
|
||||||
|
|
||||||
|
let _: ArchiveConversationResponse =
|
||||||
|
to_response::<ArchiveConversationResponse>(archive_response)
|
||||||
|
.expect("deserialize archiveConversation response");
|
||||||
|
|
||||||
|
let archived_directory = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR);
|
||||||
|
let archived_rollout_path =
|
||||||
|
archived_directory.join(rollout_path.file_name().unwrap_or_else(|| {
|
||||||
|
panic!("rollout path {} missing file name", rollout_path.display())
|
||||||
|
}));
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!rollout_path.exists(),
|
||||||
|
"expected rollout path {} to be moved",
|
||||||
|
rollout_path.display()
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
archived_rollout_path.exists(),
|
||||||
|
"expected archived rollout path {} to exist",
|
||||||
|
archived_rollout_path.display()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
|
||||||
|
let config_toml = codex_home.join("config.toml");
|
||||||
|
std::fs::write(config_toml, config_contents())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn config_contents() -> &'static str {
|
||||||
|
r#"model = "mock-model"
|
||||||
|
approval_policy = "never"
|
||||||
|
sandbox_mode = "read-only"
|
||||||
|
"#
|
||||||
|
}
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
// Aggregates all former standalone integration tests as modules.
|
// Aggregates all former standalone integration tests as modules.
|
||||||
|
mod archive_conversation;
|
||||||
mod auth;
|
mod auth;
|
||||||
mod codex_message_processor_flow;
|
mod codex_message_processor_flow;
|
||||||
mod codex_tool;
|
mod codex_tool;
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ pub fn generate_ts(out_dir: &Path, prettier: Option<&Path>) -> Result<()> {
|
|||||||
codex_protocol::mcp_protocol::ServerNotification::export_all_to(out_dir)?;
|
codex_protocol::mcp_protocol::ServerNotification::export_all_to(out_dir)?;
|
||||||
codex_protocol::mcp_protocol::ListConversationsResponse::export_all_to(out_dir)?;
|
codex_protocol::mcp_protocol::ListConversationsResponse::export_all_to(out_dir)?;
|
||||||
codex_protocol::mcp_protocol::ResumeConversationResponse::export_all_to(out_dir)?;
|
codex_protocol::mcp_protocol::ResumeConversationResponse::export_all_to(out_dir)?;
|
||||||
|
codex_protocol::mcp_protocol::ArchiveConversationResponse::export_all_to(out_dir)?;
|
||||||
|
|
||||||
generate_index_ts(out_dir)?;
|
generate_index_ts(out_dir)?;
|
||||||
|
|
||||||
|
|||||||
@@ -91,6 +91,11 @@ pub enum ClientRequest {
|
|||||||
request_id: RequestId,
|
request_id: RequestId,
|
||||||
params: ResumeConversationParams,
|
params: ResumeConversationParams,
|
||||||
},
|
},
|
||||||
|
ArchiveConversation {
|
||||||
|
#[serde(rename = "id")]
|
||||||
|
request_id: RequestId,
|
||||||
|
params: ArchiveConversationParams,
|
||||||
|
},
|
||||||
SendUserMessage {
|
SendUserMessage {
|
||||||
#[serde(rename = "id")]
|
#[serde(rename = "id")]
|
||||||
request_id: RequestId,
|
request_id: RequestId,
|
||||||
@@ -263,6 +268,18 @@ pub struct AddConversationSubscriptionResponse {
|
|||||||
pub subscription_id: Uuid,
|
pub subscription_id: Uuid,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The [`ConversationId`] must match the `rollout_path`.
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ArchiveConversationParams {
|
||||||
|
pub conversation_id: ConversationId,
|
||||||
|
pub rollout_path: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ArchiveConversationResponse {}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct RemoveConversationSubscriptionResponse {}
|
pub struct RemoveConversationSubscriptionResponse {}
|
||||||
|
|||||||
Reference in New Issue
Block a user