Add feedback upload request handling (#5682)

This commit is contained in:
Ahmed Ibrahim
2025-10-26 22:53:39 -07:00
committed by GitHub
parent a55b0c4bcc
commit f178805252
22 changed files with 194 additions and 117 deletions

View File

@@ -20,7 +20,6 @@ use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
@@ -598,6 +597,19 @@ impl Session {
self.tx_event.clone()
}
/// Ensure all rollout writes are durably flushed.
pub(crate) async fn flush_rollout(&self) {
let recorder = {
let guard = self.services.rollout.lock().await;
guard.clone()
};
if let Some(rec) = recorder
&& let Err(e) = rec.flush().await
{
warn!("failed to flush rollout recorder: {e}");
}
}
fn next_internal_sub_id(&self) -> String {
let id = self
.next_internal_sub_id
@@ -612,6 +624,8 @@ impl Session {
// Build and record initial items (user instructions + environment context)
let items = self.build_initial_context(&turn_context);
self.record_conversation_items(&turn_context, &items).await;
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
self.flush_rollout().await;
}
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
let rollout_items = conversation_history.get_rollout_items();
@@ -628,6 +642,8 @@ impl Session {
if persist && !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
}
// Flush after seeding history and any persisted rollout copy.
self.flush_rollout().await;
}
}
}
@@ -1401,33 +1417,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
sess.send_event_raw(event).await;
break;
}
Op::GetPath => {
let sub_id = sub.id.clone();
// Flush rollout writes before returning the path so readers observe a consistent file.
let (path, rec_opt) = {
let guard = sess.services.rollout.lock().await;
match guard.as_ref() {
Some(rec) => (rec.get_rollout_path(), Some(rec.clone())),
None => {
error!("rollout recorder not found");
continue;
}
}
};
if let Some(rec) = rec_opt
&& let Err(e) = rec.flush().await
{
warn!("failed to flush rollout recorder before GetHistory: {e}");
}
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ConversationPath(ConversationPathResponseEvent {
conversation_id: sess.conversation_id,
path,
}),
};
sess.send_event_raw(event).await;
}
Op::Review { review_request } => {
let turn_context = sess
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
@@ -2231,6 +2221,8 @@ pub(crate) async fn exit_review_mode(
}],
)
.await;
// Make the recorded review note visible immediately for readers.
session.flush_rollout().await;
}
fn mcp_init_error_display(

View File

@@ -3,16 +3,21 @@ use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::Op;
use crate::protocol::Submission;
use std::path::PathBuf;
pub struct CodexConversation {
codex: Codex,
rollout_path: PathBuf,
}
/// Conduit for the bidirectional stream of messages that compose a conversation
/// in Codex.
impl CodexConversation {
pub(crate) fn new(codex: Codex) -> Self {
Self { codex }
pub(crate) fn new(codex: Codex, rollout_path: PathBuf) -> Self {
Self {
codex,
rollout_path,
}
}
pub async fn submit(&self, op: Op) -> CodexResult<String> {
@@ -27,4 +32,8 @@ impl CodexConversation {
pub async fn next_event(&self) -> CodexResult<Event> {
self.codex.next_event().await
}
pub fn rollout_path(&self) -> PathBuf {
self.rollout_path.clone()
}
}

View File

@@ -98,7 +98,10 @@ impl ConversationManager {
}
};
let conversation = Arc::new(CodexConversation::new(codex));
let conversation = Arc::new(CodexConversation::new(
codex,
session_configured.rollout_path.clone(),
));
self.conversations
.write()
.await

View File

@@ -72,7 +72,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::PlanUpdate(_)
| EventMsg::ShutdownComplete
| EventMsg::ViewImageToolCall(_)
| EventMsg::ConversationPath(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_) => false,
}

View File

@@ -267,10 +267,6 @@ impl RolloutRecorder {
}))
}
pub(crate) fn get_rollout_path(&self) -> PathBuf {
self.rollout_path.clone()
}
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {

View File

@@ -18,7 +18,6 @@ use codex_core::built_in_model_providers;
use codex_core::codex::compact::SUMMARIZATION_PROMPT;
use codex_core::config::Config;
use codex_core::config::OPENAI_DEFAULT_MODEL;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
@@ -61,7 +60,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
user_turn(&base, "hello world").await;
compact_conversation(&base).await;
user_turn(&base, "AFTER_COMPACT").await;
let base_path = fetch_conversation_path(&base, "base conversation").await;
let base_path = fetch_conversation_path(&base).await;
assert!(
base_path.exists(),
"compact+resume test expects base path {base_path:?} to exist",
@@ -69,7 +68,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
let resumed = resume_conversation(&manager, &config, base_path).await;
user_turn(&resumed, "AFTER_RESUME").await;
let resumed_path = fetch_conversation_path(&resumed, "resumed conversation").await;
let resumed_path = fetch_conversation_path(&resumed).await;
assert!(
resumed_path.exists(),
"compact+resume test expects resumed path {resumed_path:?} to exist",
@@ -518,7 +517,7 @@ async fn compact_resume_after_second_compaction_preserves_history() {
user_turn(&base, "hello world").await;
compact_conversation(&base).await;
user_turn(&base, "AFTER_COMPACT").await;
let base_path = fetch_conversation_path(&base, "base conversation").await;
let base_path = fetch_conversation_path(&base).await;
assert!(
base_path.exists(),
"second compact test expects base path {base_path:?} to exist",
@@ -526,7 +525,7 @@ async fn compact_resume_after_second_compaction_preserves_history() {
let resumed = resume_conversation(&manager, &config, base_path).await;
user_turn(&resumed, "AFTER_RESUME").await;
let resumed_path = fetch_conversation_path(&resumed, "resumed conversation").await;
let resumed_path = fetch_conversation_path(&resumed).await;
assert!(
resumed_path.exists(),
"second compact test expects resumed path {resumed_path:?} to exist",
@@ -537,7 +536,7 @@ async fn compact_resume_after_second_compaction_preserves_history() {
compact_conversation(&forked).await;
user_turn(&forked, "AFTER_COMPACT_2").await;
let forked_path = fetch_conversation_path(&forked, "forked conversation").await;
let forked_path = fetch_conversation_path(&forked).await;
assert!(
forked_path.exists(),
"second compact test expects forked path {forked_path:?} to exist",
@@ -792,22 +791,8 @@ async fn compact_conversation(conversation: &Arc<CodexConversation>) {
wait_for_event(conversation, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
}
async fn fetch_conversation_path(
conversation: &Arc<CodexConversation>,
context: &str,
) -> std::path::PathBuf {
conversation
.submit(Op::GetPath)
.await
.expect("request conversation path");
match wait_for_event(conversation, |ev| {
matches!(ev, EventMsg::ConversationPath(_))
})
.await
{
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path,
_ => panic!("expected ConversationPath event for {context}"),
}
async fn fetch_conversation_path(conversation: &Arc<CodexConversation>) -> std::path::PathBuf {
conversation.rollout_path()
}
async fn resume_conversation(

View File

@@ -4,7 +4,6 @@ use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::built_in_model_providers;
use codex_core::parse_turn_item;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::RolloutItem;
@@ -79,13 +78,7 @@ async fn fork_conversation_twice_drops_to_first_message() {
}
// Request history from the base conversation to obtain rollout path.
codex.submit(Op::GetPath).await.unwrap();
let base_history =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await;
let base_path = match &base_history {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
_ => panic!("expected ConversationHistory event"),
};
let base_path = codex.rollout_path();
// GetHistory flushes before returning the path; no wait needed.
@@ -140,15 +133,7 @@ async fn fork_conversation_twice_drops_to_first_message() {
.await
.expect("fork 1");
codex_fork1.submit(Op::GetPath).await.unwrap();
let fork1_history = wait_for_event(&codex_fork1, |ev| {
matches!(ev, EventMsg::ConversationPath(_))
})
.await;
let fork1_path = match &fork1_history {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
_ => panic!("expected ConversationHistory event after first fork"),
};
let fork1_path = codex_fork1.rollout_path();
// GetHistory on fork1 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
@@ -166,15 +151,7 @@ async fn fork_conversation_twice_drops_to_first_message() {
.await
.expect("fork 2");
codex_fork2.submit(Op::GetPath).await.unwrap();
let fork2_history = wait_for_event(&codex_fork2, |ev| {
matches!(ev, EventMsg::ConversationPath(_))
})
.await;
let fork2_path = match &fork2_history {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
_ => panic!("expected ConversationHistory event after second fork"),
};
let fork2_path = codex_fork2.rollout_path();
// GetHistory on fork2 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
let fork1_user_inputs = find_user_input_positions(&fork1_items);

View File

@@ -7,7 +7,6 @@ use codex_core::REVIEW_PROMPT;
use codex_core::ResponseItem;
use codex_core::built_in_model_providers;
use codex_core::config::Config;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::ENVIRONMENT_CONTEXT_OPEN_TAG;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExitedReviewModeEvent;
@@ -120,13 +119,7 @@ async fn review_op_emits_lifecycle_and_review_output() {
// Also verify that a user message with the header and a formatted finding
// was recorded back in the parent session's rollout.
codex.submit(Op::GetPath).await.unwrap();
let history_event =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await;
let path = match history_event {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path,
other => panic!("expected ConversationPath event, got {other:?}"),
};
let path = codex.rollout_path();
let text = std::fs::read_to_string(&path).expect("read rollout file");
let mut saw_header = false;
@@ -482,13 +475,7 @@ async fn review_input_isolated_from_parent_history() {
assert_eq!(instructions, REVIEW_PROMPT);
// Also verify that a user interruption note was recorded in the rollout.
codex.submit(Op::GetPath).await.unwrap();
let history_event =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await;
let path = match history_event {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path,
other => panic!("expected ConversationPath event, got {other:?}"),
};
let path = codex.rollout_path();
let text = std::fs::read_to_string(&path).expect("read rollout file");
let mut saw_interruption_message = false;
for line in text.lines() {