Generate more typescript types and return conversation id with ConversationSummary (#3219)
This PR does multiple things that are necessary for conversation resume to work from the extension. I wanted to make sure everything worked so these changes wound up in one PR: 1. Generate more ts types 2. Resume rollout history files rather than create a new one every time it is resumed so you don't see a duplicate conversation in history for every resume. Chatted with @aibrahim-oai to verify this 3. Return conversation_id in conversation summaries 4. [Cleanup] Use serde and strong types for a lot of the rollout file parsing
This commit is contained in:
@@ -31,7 +31,6 @@ use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::trace;
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::ModelProviderInfo;
|
||||
use crate::apply_patch;
|
||||
@@ -104,6 +103,7 @@ use crate::protocol::TokenUsageInfo;
|
||||
use crate::protocol::TurnDiffEvent;
|
||||
use crate::protocol::WebSearchBeginEvent;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::RolloutRecorderParams;
|
||||
use crate::safety::SafetyCheck;
|
||||
use crate::safety::assess_command_safety;
|
||||
use crate::safety::assess_safety_for_untrusted_command;
|
||||
@@ -362,7 +362,6 @@ impl Session {
|
||||
tx_event: Sender<Event>,
|
||||
initial_history: InitialHistory,
|
||||
) -> anyhow::Result<(Arc<Self>, TurnContext)> {
|
||||
let conversation_id = ConversationId::from(Uuid::new_v4());
|
||||
let ConfigureSession {
|
||||
provider,
|
||||
model,
|
||||
@@ -380,6 +379,20 @@ impl Session {
|
||||
return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}"));
|
||||
}
|
||||
|
||||
let (conversation_id, rollout_params) = match &initial_history {
|
||||
InitialHistory::New | InitialHistory::Forked(_) => {
|
||||
let conversation_id = ConversationId::default();
|
||||
(
|
||||
conversation_id,
|
||||
RolloutRecorderParams::new(conversation_id, user_instructions.clone()),
|
||||
)
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => (
|
||||
resumed_history.conversation_id,
|
||||
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
|
||||
),
|
||||
};
|
||||
|
||||
// Error messages to dispatch after SessionConfigured is sent.
|
||||
let mut post_session_configured_error_events = Vec::<Event>::new();
|
||||
|
||||
@@ -389,7 +402,7 @@ impl Session {
|
||||
// - spin up MCP connection manager
|
||||
// - perform default shell discovery
|
||||
// - load history metadata
|
||||
let rollout_fut = RolloutRecorder::new(&config, conversation_id, user_instructions.clone());
|
||||
let rollout_fut = RolloutRecorder::new(&config, rollout_params);
|
||||
|
||||
let mcp_fut = McpConnectionManager::new(config.mcp_servers.clone());
|
||||
let default_shell_fut = shell::default_user_shell();
|
||||
@@ -481,7 +494,10 @@ impl Session {
|
||||
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
|
||||
let initial_messages = match &initial_history {
|
||||
InitialHistory::New => None,
|
||||
InitialHistory::Resumed(items) => Some(sess.build_initial_messages(items)),
|
||||
InitialHistory::Forked(items) => Some(sess.build_initial_messages(items)),
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
Some(sess.build_initial_messages(&resumed_history.history))
|
||||
}
|
||||
};
|
||||
|
||||
let events = std::iter::once(Event {
|
||||
@@ -530,8 +546,12 @@ impl Session {
|
||||
InitialHistory::New => {
|
||||
self.record_initial_history_new(turn_context).await;
|
||||
}
|
||||
InitialHistory::Resumed(items) => {
|
||||
self.record_initial_history_resumed(items).await;
|
||||
InitialHistory::Forked(items) => {
|
||||
self.record_initial_history_from_items(items).await;
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
self.record_initial_history_from_items(resumed_history.history)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -553,8 +573,8 @@ impl Session {
|
||||
self.record_conversation_items(&conversation_items).await;
|
||||
}
|
||||
|
||||
async fn record_initial_history_resumed(&self, items: Vec<ResponseItem>) {
|
||||
self.record_conversation_items(&items).await;
|
||||
async fn record_initial_history_from_items(&self, items: Vec<ResponseItem>) {
|
||||
self.record_conversation_items_internal(&items, false).await;
|
||||
}
|
||||
|
||||
/// build the initial messages vector for SessionConfigured by converting
|
||||
@@ -663,8 +683,14 @@ impl Session {
|
||||
/// Records items to both the rollout and the chat completions/ZDR
|
||||
/// transcript, if enabled.
|
||||
async fn record_conversation_items(&self, items: &[ResponseItem]) {
|
||||
self.record_conversation_items_internal(items, true).await;
|
||||
}
|
||||
|
||||
async fn record_conversation_items_internal(&self, items: &[ResponseItem], persist: bool) {
|
||||
debug!("Recording items for conversation: {items:?}");
|
||||
self.record_state_snapshot(items).await;
|
||||
if persist {
|
||||
self.record_state_snapshot(items).await;
|
||||
}
|
||||
|
||||
self.state.lock_unchecked().history.record_items(items);
|
||||
}
|
||||
|
||||
@@ -1,12 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::CodexAuth;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::codex::Codex;
|
||||
use crate::codex::CodexSpawnOk;
|
||||
use crate::codex::INITIAL_SUBMIT_ID;
|
||||
@@ -18,12 +11,25 @@ use crate::protocol::Event;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct ResumedHistory {
|
||||
pub conversation_id: ConversationId,
|
||||
pub history: Vec<ResponseItem>,
|
||||
pub rollout_path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum InitialHistory {
|
||||
New,
|
||||
Resumed(Vec<ResponseItem>),
|
||||
Resumed(ResumedHistory),
|
||||
Forked(Vec<ResponseItem>),
|
||||
}
|
||||
|
||||
/// Represents a newly created Codex conversation, including the first event
|
||||
@@ -77,7 +83,7 @@ impl ConversationManager {
|
||||
let CodexSpawnOk {
|
||||
codex,
|
||||
conversation_id,
|
||||
} = { Codex::spawn(config, auth_manager, InitialHistory::New).await? };
|
||||
} = Codex::spawn(config, auth_manager, InitialHistory::New).await?;
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
}
|
||||
}
|
||||
@@ -172,7 +178,7 @@ impl ConversationManager {
|
||||
/// and all items that follow them.
|
||||
fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) -> InitialHistory {
|
||||
if n == 0 {
|
||||
return InitialHistory::Resumed(items);
|
||||
return InitialHistory::Forked(items);
|
||||
}
|
||||
|
||||
// Walk backwards counting only `user` Message items, find cut index.
|
||||
@@ -194,7 +200,7 @@ fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) ->
|
||||
// No prefix remains after dropping; start a new conversation.
|
||||
InitialHistory::New
|
||||
} else {
|
||||
InitialHistory::Resumed(items.into_iter().take(cut_index).collect())
|
||||
InitialHistory::Forked(items.into_iter().take(cut_index).collect())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -252,7 +258,7 @@ mod tests {
|
||||
let truncated = truncate_after_dropping_last_messages(items.clone(), 1);
|
||||
assert_eq!(
|
||||
truncated,
|
||||
InitialHistory::Resumed(vec![items[0].clone(), items[1].clone(), items[2].clone(),])
|
||||
InitialHistory::Forked(vec![items[0].clone(), items[1].clone(), items[2].clone(),])
|
||||
);
|
||||
|
||||
let truncated2 = truncate_after_dropping_last_messages(items, 2);
|
||||
|
||||
@@ -62,6 +62,7 @@ pub mod terminal;
|
||||
mod tool_apply_patch;
|
||||
pub mod turn_diff_tracker;
|
||||
pub use rollout::RolloutRecorder;
|
||||
pub use rollout::SessionMeta;
|
||||
pub use rollout::list::ConversationItem;
|
||||
pub use rollout::list::ConversationsPage;
|
||||
pub use rollout::list::Cursor;
|
||||
|
||||
@@ -23,7 +23,6 @@ use std::path::PathBuf;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use std::time::Duration;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncReadExt;
|
||||
@@ -31,6 +30,7 @@ use tokio::io::AsyncReadExt;
|
||||
use crate::config::Config;
|
||||
use crate::config_types::HistoryPersistence;
|
||||
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
#[cfg(unix)]
|
||||
|
||||
@@ -7,6 +7,8 @@ pub(crate) mod policy;
|
||||
pub mod recorder;
|
||||
|
||||
pub use recorder::RolloutRecorder;
|
||||
pub use recorder::RolloutRecorderParams;
|
||||
pub use recorder::SessionMeta;
|
||||
pub use recorder::SessionStateSnapshot;
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::fs::File;
|
||||
use std::fs::{self};
|
||||
use std::io::Error as IoError;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use serde::Deserialize;
|
||||
@@ -26,6 +27,7 @@ use super::list::get_conversations;
|
||||
use super::policy::is_persisted_response_item;
|
||||
use crate::config::Config;
|
||||
use crate::conversation_manager::InitialHistory;
|
||||
use crate::conversation_manager::ResumedHistory;
|
||||
use crate::git_info::GitInfo;
|
||||
use crate::git_info::collect_git_info;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -72,12 +74,36 @@ pub struct RolloutRecorder {
|
||||
tx: Sender<RolloutCmd>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum RolloutRecorderParams {
|
||||
Create {
|
||||
conversation_id: ConversationId,
|
||||
instructions: Option<String>,
|
||||
},
|
||||
Resume {
|
||||
path: PathBuf,
|
||||
},
|
||||
}
|
||||
|
||||
enum RolloutCmd {
|
||||
AddItems(Vec<ResponseItem>),
|
||||
UpdateState(SessionStateSnapshot),
|
||||
Shutdown { ack: oneshot::Sender<()> },
|
||||
}
|
||||
|
||||
impl RolloutRecorderParams {
|
||||
pub fn new(conversation_id: ConversationId, instructions: Option<String>) -> Self {
|
||||
Self::Create {
|
||||
conversation_id,
|
||||
instructions,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn resume(path: PathBuf) -> Self {
|
||||
Self::Resume { path }
|
||||
}
|
||||
}
|
||||
|
||||
impl RolloutRecorder {
|
||||
#[allow(dead_code)]
|
||||
/// List conversations (rollout files) under the provided Codex home directory.
|
||||
@@ -92,24 +118,43 @@ impl RolloutRecorder {
|
||||
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
|
||||
/// cannot be created or the rollout file cannot be opened we return the
|
||||
/// error so the caller can decide whether to disable persistence.
|
||||
pub async fn new(
|
||||
config: &Config,
|
||||
conversation_id: ConversationId,
|
||||
instructions: Option<String>,
|
||||
) -> std::io::Result<Self> {
|
||||
let LogFileInfo {
|
||||
file,
|
||||
conversation_id: session_id,
|
||||
timestamp,
|
||||
} = create_log_file(config, conversation_id)?;
|
||||
pub async fn new(config: &Config, params: RolloutRecorderParams) -> std::io::Result<Self> {
|
||||
let (file, meta) = match params {
|
||||
RolloutRecorderParams::Create {
|
||||
conversation_id,
|
||||
instructions,
|
||||
} => {
|
||||
let LogFileInfo {
|
||||
file,
|
||||
conversation_id: session_id,
|
||||
timestamp,
|
||||
} = create_log_file(config, conversation_id)?;
|
||||
|
||||
let timestamp_format: &[FormatItem] = format_description!(
|
||||
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
|
||||
);
|
||||
let timestamp = timestamp
|
||||
.to_offset(time::UtcOffset::UTC)
|
||||
.format(timestamp_format)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
let timestamp_format: &[FormatItem] = format_description!(
|
||||
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
|
||||
);
|
||||
let timestamp = timestamp
|
||||
.to_offset(time::UtcOffset::UTC)
|
||||
.format(timestamp_format)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
|
||||
(
|
||||
tokio::fs::File::from_std(file),
|
||||
Some(SessionMeta {
|
||||
timestamp,
|
||||
id: session_id,
|
||||
instructions,
|
||||
}),
|
||||
)
|
||||
}
|
||||
RolloutRecorderParams::Resume { path } => (
|
||||
tokio::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(path)
|
||||
.await?,
|
||||
None,
|
||||
),
|
||||
};
|
||||
|
||||
// Clone the cwd for the spawned task to collect git info asynchronously
|
||||
let cwd = config.cwd.clone();
|
||||
@@ -122,16 +167,7 @@ impl RolloutRecorder {
|
||||
// Spawn a Tokio task that owns the file handle and performs async
|
||||
// writes. Using `tokio::fs::File` keeps everything on the async I/O
|
||||
// driver instead of blocking the runtime.
|
||||
tokio::task::spawn(rollout_writer(
|
||||
tokio::fs::File::from_std(file),
|
||||
rx,
|
||||
Some(SessionMeta {
|
||||
timestamp,
|
||||
id: session_id,
|
||||
instructions,
|
||||
}),
|
||||
cwd,
|
||||
));
|
||||
tokio::task::spawn(rollout_writer(file, rx, meta, cwd));
|
||||
|
||||
Ok(Self { tx })
|
||||
}
|
||||
@@ -164,13 +200,28 @@ impl RolloutRecorder {
|
||||
|
||||
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
|
||||
info!("Resuming rollout from {path:?}");
|
||||
tracing::error!("Resuming rollout from {path:?}");
|
||||
let text = tokio::fs::read_to_string(path).await?;
|
||||
let mut lines = text.lines();
|
||||
let _ = lines
|
||||
let first_line = lines
|
||||
.next()
|
||||
.ok_or_else(|| IoError::other("empty session file"))?;
|
||||
let mut items = Vec::new();
|
||||
let conversation_id = match serde_json::from_str::<SessionMeta>(first_line) {
|
||||
Ok(rollout_session_meta) => {
|
||||
tracing::error!(
|
||||
"Parsed conversation ID from rollout file: {:?}",
|
||||
rollout_session_meta.id
|
||||
);
|
||||
Some(rollout_session_meta.id)
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(IoError::other(format!(
|
||||
"failed to parse first line of rollout file as SessionMeta: {e}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let mut items = Vec::new();
|
||||
for line in lines {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
@@ -198,12 +249,24 @@ impl RolloutRecorder {
|
||||
}
|
||||
}
|
||||
|
||||
info!("Resumed rollout successfully from {path:?}");
|
||||
tracing::error!(
|
||||
"Resumed rollout with {} items, conversation ID: {:?}",
|
||||
items.len(),
|
||||
conversation_id
|
||||
);
|
||||
let conversation_id = conversation_id
|
||||
.ok_or_else(|| IoError::other("failed to parse conversation ID from rollout file"))?;
|
||||
|
||||
if items.is_empty() {
|
||||
Ok(InitialHistory::New)
|
||||
} else {
|
||||
Ok(InitialHistory::Resumed(items))
|
||||
return Ok(InitialHistory::New);
|
||||
}
|
||||
|
||||
info!("Resumed rollout successfully from {path:?}");
|
||||
Ok(InitialHistory::Resumed(ResumedHistory {
|
||||
conversation_id,
|
||||
history: items,
|
||||
rollout_path: path.to_path_buf(),
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) -> std::io::Result<()> {
|
||||
@@ -331,7 +394,7 @@ impl JsonlWriter {
|
||||
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
|
||||
let mut json = serde_json::to_string(item)?;
|
||||
json.push('\n');
|
||||
let _ = self.file.write_all(json.as_bytes()).await;
|
||||
self.file.write_all(json.as_bytes()).await?;
|
||||
self.file.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user