MCP: add session resume + history listing; (#3185)

# External (non-OpenAI) Pull Request Requirements

Before opening this Pull Request, please read the dedicated
"Contributing" markdown file or your PR may be closed:
https://github.com/openai/codex/blob/main/docs/contributing.md

If your PR conforms to our contribution guidelines, replace this text
with a detailed and high quality description of your changes.
This commit is contained in:
Ahmed Ibrahim
2025-09-04 16:44:18 -07:00
committed by GitHub
parent 7df9e9c664
commit 907d3dd348
5 changed files with 425 additions and 0 deletions

View File

@@ -11,7 +11,9 @@ use crate::outgoing_message::OutgoingNotification;
use codex_core::AuthManager; use codex_core::AuthManager;
use codex_core::CodexConversation; use codex_core::CodexConversation;
use codex_core::ConversationManager; use codex_core::ConversationManager;
use codex_core::Cursor as RolloutCursor;
use codex_core::NewConversation; use codex_core::NewConversation;
use codex_core::RolloutRecorder;
use codex_core::auth::CLIENT_ID; use codex_core::auth::CLIENT_ID;
use codex_core::config::Config; use codex_core::config::Config;
use codex_core::config::ConfigOverrides; use codex_core::config::ConfigOverrides;
@@ -40,6 +42,7 @@ use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::AuthStatusChangeNotification; use codex_protocol::mcp_protocol::AuthStatusChangeNotification;
use codex_protocol::mcp_protocol::ClientRequest; use codex_protocol::mcp_protocol::ClientRequest;
use codex_protocol::mcp_protocol::ConversationId; use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationSummary;
use codex_protocol::mcp_protocol::EXEC_COMMAND_APPROVAL_METHOD; use codex_protocol::mcp_protocol::EXEC_COMMAND_APPROVAL_METHOD;
use codex_protocol::mcp_protocol::ExecArbitraryCommandResponse; use codex_protocol::mcp_protocol::ExecArbitraryCommandResponse;
use codex_protocol::mcp_protocol::ExecCommandApprovalParams; use codex_protocol::mcp_protocol::ExecCommandApprovalParams;
@@ -50,12 +53,15 @@ use codex_protocol::mcp_protocol::GitDiffToRemoteResponse;
use codex_protocol::mcp_protocol::InputItem as WireInputItem; use codex_protocol::mcp_protocol::InputItem as WireInputItem;
use codex_protocol::mcp_protocol::InterruptConversationParams; use codex_protocol::mcp_protocol::InterruptConversationParams;
use codex_protocol::mcp_protocol::InterruptConversationResponse; use codex_protocol::mcp_protocol::InterruptConversationResponse;
use codex_protocol::mcp_protocol::ListConversationsParams;
use codex_protocol::mcp_protocol::ListConversationsResponse;
use codex_protocol::mcp_protocol::LoginChatGptCompleteNotification; use codex_protocol::mcp_protocol::LoginChatGptCompleteNotification;
use codex_protocol::mcp_protocol::LoginChatGptResponse; use codex_protocol::mcp_protocol::LoginChatGptResponse;
use codex_protocol::mcp_protocol::NewConversationParams; use codex_protocol::mcp_protocol::NewConversationParams;
use codex_protocol::mcp_protocol::NewConversationResponse; use codex_protocol::mcp_protocol::NewConversationResponse;
use codex_protocol::mcp_protocol::RemoveConversationListenerParams; use codex_protocol::mcp_protocol::RemoveConversationListenerParams;
use codex_protocol::mcp_protocol::RemoveConversationSubscriptionResponse; use codex_protocol::mcp_protocol::RemoveConversationSubscriptionResponse;
use codex_protocol::mcp_protocol::ResumeConversationParams;
use codex_protocol::mcp_protocol::SendUserMessageParams; use codex_protocol::mcp_protocol::SendUserMessageParams;
use codex_protocol::mcp_protocol::SendUserMessageResponse; use codex_protocol::mcp_protocol::SendUserMessageResponse;
use codex_protocol::mcp_protocol::SendUserTurnParams; use codex_protocol::mcp_protocol::SendUserTurnParams;
@@ -124,6 +130,12 @@ impl CodexMessageProcessor {
// created before processing any subsequent messages. // created before processing any subsequent messages.
self.process_new_conversation(request_id, params).await; self.process_new_conversation(request_id, params).await;
} }
ClientRequest::ListConversations { request_id, params } => {
self.handle_list_conversations(request_id, params).await;
}
ClientRequest::ResumeConversation { request_id, params } => {
self.handle_resume_conversation(request_id, params).await;
}
ClientRequest::SendUserMessage { request_id, params } => { ClientRequest::SendUserMessage { request_id, params } => {
self.send_user_message(request_id, params).await; self.send_user_message(request_id, params).await;
} }
@@ -515,6 +527,128 @@ impl CodexMessageProcessor {
} }
} }
async fn handle_list_conversations(
&self,
request_id: RequestId,
params: ListConversationsParams,
) {
let page_size = params.page_size.unwrap_or(25);
// Decode the optional cursor string to a Cursor via serde (Cursor implements Deserialize from string)
let cursor_obj: Option<RolloutCursor> = match params.cursor {
Some(s) => serde_json::from_str::<RolloutCursor>(&format!("\"{s}\"")).ok(),
None => None,
};
let cursor_ref = cursor_obj.as_ref();
let page = match RolloutRecorder::list_conversations(
&self.config.codex_home,
page_size,
cursor_ref,
)
.await
{
Ok(p) => p,
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list conversations: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
// Build summaries
let mut items: Vec<ConversationSummary> = Vec::new();
for it in page.items.into_iter() {
let (timestamp, preview) = extract_ts_and_preview(&it.head);
items.push(ConversationSummary {
path: it.path,
preview,
timestamp,
});
}
// Encode next_cursor as a plain string
let next_cursor = match page.next_cursor {
Some(c) => match serde_json::to_value(&c) {
Ok(serde_json::Value::String(s)) => Some(s),
_ => None,
},
None => None,
};
let response = ListConversationsResponse { items, next_cursor };
self.outgoing.send_response(request_id, response).await;
}
async fn handle_resume_conversation(
&self,
request_id: RequestId,
params: ResumeConversationParams,
) {
// Derive a Config using the same logic as new conversation, honoring overrides if provided.
let config = match params.overrides {
Some(overrides) => {
derive_config_from_params(overrides, self.codex_linux_sandbox_exe.clone())
}
None => Ok(self.config.as_ref().clone()),
};
let config = match config {
Ok(cfg) => cfg,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("error deriving config: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
match self
.conversation_manager
.resume_conversation_from_rollout(
config,
params.path.clone(),
self.auth_manager.clone(),
)
.await
{
Ok(NewConversation {
conversation_id,
session_configured,
..
}) => {
let event = codex_core::protocol::Event {
id: "".to_string(),
msg: codex_core::protocol::EventMsg::SessionConfigured(
session_configured.clone(),
),
};
self.outgoing.send_event_as_notification(&event, None).await;
// Reply with conversation id + model and initial messages (when present)
let response = codex_protocol::mcp_protocol::ResumeConversationResponse {
conversation_id: ConversationId(conversation_id),
model: session_configured.model.clone(),
initial_messages: session_configured.initial_messages.clone(),
};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error resuming conversation: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) { async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) {
let SendUserMessageParams { let SendUserMessageParams {
conversation_id, conversation_id,
@@ -951,3 +1085,38 @@ async fn on_exec_approval_response(
error!("failed to submit ExecApproval: {err}"); error!("failed to submit ExecApproval: {err}");
} }
} }
fn extract_ts_and_preview(head: &[serde_json::Value]) -> (Option<String>, String) {
let ts = head
.first()
.and_then(|v| v.get("timestamp"))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let preview = find_first_user_text(head).unwrap_or_default();
(ts, preview)
}
fn find_first_user_text(head: &[serde_json::Value]) -> Option<String> {
use codex_core::protocol::InputMessageKind;
for v in head.iter() {
let t = v.get("type").and_then(|x| x.as_str()).unwrap_or("");
if t != "message" {
continue;
}
if v.get("role").and_then(|x| x.as_str()) != Some("user") {
continue;
}
if let Some(arr) = v.get("content").and_then(|c| c.as_array()) {
for c in arr.iter() {
if let (Some("input_text"), Some(txt)) =
(c.get("type").and_then(|t| t.as_str()), c.get("text"))
&& let Some(s) = txt.as_str()
&& matches!(InputMessageKind::from(("user", s)), InputMessageKind::Plain)
{
return Some(s.to_string());
}
}
}
}
None
}

View File

@@ -16,8 +16,10 @@ use codex_protocol::mcp_protocol::AddConversationListenerParams;
use codex_protocol::mcp_protocol::CancelLoginChatGptParams; use codex_protocol::mcp_protocol::CancelLoginChatGptParams;
use codex_protocol::mcp_protocol::GetAuthStatusParams; use codex_protocol::mcp_protocol::GetAuthStatusParams;
use codex_protocol::mcp_protocol::InterruptConversationParams; use codex_protocol::mcp_protocol::InterruptConversationParams;
use codex_protocol::mcp_protocol::ListConversationsParams;
use codex_protocol::mcp_protocol::NewConversationParams; use codex_protocol::mcp_protocol::NewConversationParams;
use codex_protocol::mcp_protocol::RemoveConversationListenerParams; use codex_protocol::mcp_protocol::RemoveConversationListenerParams;
use codex_protocol::mcp_protocol::ResumeConversationParams;
use codex_protocol::mcp_protocol::SendUserMessageParams; use codex_protocol::mcp_protocol::SendUserMessageParams;
use codex_protocol::mcp_protocol::SendUserTurnParams; use codex_protocol::mcp_protocol::SendUserTurnParams;
@@ -245,6 +247,24 @@ impl McpProcess {
self.send_request("getUserSavedConfig", None).await self.send_request("getUserSavedConfig", None).await
} }
/// Send a `listConversations` JSON-RPC request.
pub async fn send_list_conversations_request(
&mut self,
params: ListConversationsParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("listConversations", params).await
}
/// Send a `resumeConversation` JSON-RPC request.
pub async fn send_resume_conversation_request(
&mut self,
params: ResumeConversationParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("resumeConversation", params).await
}
/// Send a `loginChatGpt` JSON-RPC request. /// Send a `loginChatGpt` JSON-RPC request.
pub async fn send_login_chat_gpt_request(&mut self) -> anyhow::Result<i64> { pub async fn send_login_chat_gpt_request(&mut self) -> anyhow::Result<i64> {
self.send_request("loginChatGpt", None).await self.send_request("loginChatGpt", None).await

View File

@@ -0,0 +1,172 @@
use std::fs;
use std::path::Path;
use codex_protocol::mcp_protocol::ListConversationsParams;
use codex_protocol::mcp_protocol::ListConversationsResponse;
use codex_protocol::mcp_protocol::NewConversationParams; // reused for overrides shape
use codex_protocol::mcp_protocol::ResumeConversationParams;
use codex_protocol::mcp_protocol::ResumeConversationResponse;
use mcp_test_support::McpProcess;
use mcp_test_support::to_response;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_list_and_resume_conversations() {
// Prepare a temporary CODEX_HOME with a few fake rollout files.
let codex_home = TempDir::new().expect("create temp dir");
create_fake_rollout(
codex_home.path(),
"2025-01-02T12-00-00",
"2025-01-02T12:00:00Z",
"Hello A",
);
create_fake_rollout(
codex_home.path(),
"2025-01-01T13-00-00",
"2025-01-01T13:00:00Z",
"Hello B",
);
create_fake_rollout(
codex_home.path(),
"2025-01-01T12-00-00",
"2025-01-01T12:00:00Z",
"Hello C",
);
let mut mcp = McpProcess::new(codex_home.path())
.await
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
// Request first page with size 2
let req_id = mcp
.send_list_conversations_request(ListConversationsParams {
page_size: Some(2),
cursor: None,
})
.await
.expect("send listConversations");
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await
.expect("listConversations timeout")
.expect("listConversations resp");
let ListConversationsResponse { items, next_cursor } =
to_response::<ListConversationsResponse>(resp).expect("deserialize response");
assert_eq!(items.len(), 2);
// Newest first; preview text should match
assert_eq!(items[0].preview, "Hello A");
assert_eq!(items[1].preview, "Hello B");
assert!(items[0].path.is_absolute());
assert!(next_cursor.is_some());
// Request the next page using the cursor
let req_id2 = mcp
.send_list_conversations_request(ListConversationsParams {
page_size: Some(2),
cursor: next_cursor,
})
.await
.expect("send listConversations page 2");
let resp2: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id2)),
)
.await
.expect("listConversations page 2 timeout")
.expect("listConversations page 2 resp");
let ListConversationsResponse {
items: items2,
next_cursor: next2,
..
} = to_response::<ListConversationsResponse>(resp2).expect("deserialize response");
assert_eq!(items2.len(), 1);
assert_eq!(items2[0].preview, "Hello C");
assert!(next2.is_some());
// Now resume one of the sessions and expect a SessionConfigured notification and response.
let resume_req_id = mcp
.send_resume_conversation_request(ResumeConversationParams {
path: items[0].path.clone(),
overrides: Some(NewConversationParams {
model: Some("o3".to_string()),
..Default::default()
}),
})
.await
.expect("send resumeConversation");
// Expect a codex/event notification with msg.type == session_configured
let notification: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event"),
)
.await
.expect("session_configured notification timeout")
.expect("session_configured notification");
// Basic shape assertion: ensure event type is session_configured
let msg_type = notification
.params
.as_ref()
.and_then(|p| p.get("msg"))
.and_then(|m| m.get("type"))
.and_then(|t| t.as_str())
.unwrap_or("");
assert_eq!(msg_type, "session_configured");
// Then the response for resumeConversation
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_req_id)),
)
.await
.expect("resumeConversation timeout")
.expect("resumeConversation resp");
let ResumeConversationResponse {
conversation_id, ..
} = to_response::<ResumeConversationResponse>(resume_resp)
.expect("deserialize resumeConversation response");
// conversation id should be a valid UUID
let _ = uuid::Uuid::from_bytes(conversation_id.0.into_bytes());
}
fn create_fake_rollout(codex_home: &Path, filename_ts: &str, meta_rfc3339: &str, preview: &str) {
let uuid = Uuid::new_v4();
// sessions/YYYY/MM/DD/ derived from filename_ts (YYYY-MM-DDThh-mm-ss)
let year = &filename_ts[0..4];
let month = &filename_ts[5..7];
let day = &filename_ts[8..10];
let dir = codex_home.join("sessions").join(year).join(month).join(day);
fs::create_dir_all(&dir).unwrap_or_else(|e| panic!("create sessions dir: {e}"));
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
let mut lines = Vec::new();
// Meta line with timestamp
lines.push(json!({"timestamp": meta_rfc3339}).to_string());
// Minimal user message entry as a persisted response item
lines.push(
json!({
"type":"message",
"role":"user",
"content":[{"type":"input_text","text": preview}]
})
.to_string(),
);
fs::write(file_path, lines.join("\n") + "\n")
.unwrap_or_else(|e| panic!("write rollout file: {e}"));
}

View File

@@ -5,5 +5,6 @@ mod codex_tool;
mod config; mod config;
mod create_conversation; mod create_conversation;
mod interrupt; mod interrupt;
mod list_resume;
mod login; mod login;
mod send_message; mod send_message;

View File

@@ -7,6 +7,7 @@ use crate::config_types::ReasoningSummary;
use crate::config_types::SandboxMode; use crate::config_types::SandboxMode;
use crate::config_types::Verbosity; use crate::config_types::Verbosity;
use crate::protocol::AskForApproval; use crate::protocol::AskForApproval;
use crate::protocol::EventMsg;
use crate::protocol::FileChange; use crate::protocol::FileChange;
use crate::protocol::ReviewDecision; use crate::protocol::ReviewDecision;
use crate::protocol::SandboxPolicy; use crate::protocol::SandboxPolicy;
@@ -54,6 +55,18 @@ pub enum ClientRequest {
request_id: RequestId, request_id: RequestId,
params: NewConversationParams, params: NewConversationParams,
}, },
/// List recorded Codex conversations (rollouts) with optional pagination and search.
ListConversations {
#[serde(rename = "id")]
request_id: RequestId,
params: ListConversationsParams,
},
/// Resume a recorded Codex conversation from a rollout file.
ResumeConversation {
#[serde(rename = "id")]
request_id: RequestId,
params: ResumeConversationParams,
},
SendUserMessage { SendUserMessage {
#[serde(rename = "id")] #[serde(rename = "id")]
request_id: RequestId, request_id: RequestId,
@@ -164,6 +177,56 @@ pub struct NewConversationResponse {
pub model: String, pub model: String,
} }
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ResumeConversationResponse {
pub conversation_id: ConversationId,
pub model: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub initial_messages: Option<Vec<EventMsg>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, TS)]
#[serde(rename_all = "camelCase")]
pub struct ListConversationsParams {
/// Optional page size; defaults to a reasonable server-side value.
#[serde(skip_serializing_if = "Option::is_none")]
pub page_size: Option<usize>,
/// Opaque pagination cursor returned by a previous call.
#[serde(skip_serializing_if = "Option::is_none")]
pub cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
#[serde(rename_all = "camelCase")]
pub struct ConversationSummary {
pub path: PathBuf,
pub preview: String,
/// RFC3339 timestamp string for the session start, if available.
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
#[serde(rename_all = "camelCase")]
pub struct ListConversationsResponse {
pub items: Vec<ConversationSummary>,
/// Opaque cursor to pass to the next call to continue after the last item.
/// if None, there are no more items to return.
#[serde(skip_serializing_if = "Option::is_none")]
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
#[serde(rename_all = "camelCase")]
pub struct ResumeConversationParams {
/// Absolute path to the rollout JSONL file.
pub path: PathBuf,
/// Optional overrides to apply when spawning the resumed session.
#[serde(skip_serializing_if = "Option::is_none")]
pub overrides: Option<NewConversationParams>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct AddConversationSubscriptionResponse { pub struct AddConversationSubscriptionResponse {