diff --git a/codex-rs/core/src/exec_command/exec_command_session.rs b/codex-rs/core/src/exec_command/exec_command_session.rs index 3bf647ef..31b3c929 100644 --- a/codex-rs/core/src/exec_command/exec_command_session.rs +++ b/codex-rs/core/src/exec_command/exec_command_session.rs @@ -11,9 +11,6 @@ pub(crate) struct ExecCommandSession { /// Broadcast stream of output chunks read from the PTY. New subscribers /// receive only chunks emitted after they subscribe. output_tx: broadcast::Sender>, - /// Receiver subscribed before the child process starts emitting output so - /// the first caller can consume any early data without races. - initial_output_rx: StdMutex>>>, /// Child killer handle for termination on drop (can signal independently /// of a thread blocked in `.wait()`). @@ -41,25 +38,20 @@ impl ExecCommandSession { writer_handle: JoinHandle<()>, wait_handle: JoinHandle<()>, exit_status: std::sync::Arc, - ) -> Self { - Self { - writer_tx, - output_tx, - initial_output_rx: StdMutex::new(None), - killer: StdMutex::new(Some(killer)), - reader_handle: StdMutex::new(Some(reader_handle)), - writer_handle: StdMutex::new(Some(writer_handle)), - wait_handle: StdMutex::new(Some(wait_handle)), - exit_status, - } - } - - pub(crate) fn set_initial_output_receiver(&self, receiver: broadcast::Receiver>) { - if let Ok(mut guard) = self.initial_output_rx.lock() - && guard.is_none() - { - *guard = Some(receiver); - } + ) -> (Self, broadcast::Receiver>) { + let initial_output_rx = output_tx.subscribe(); + ( + Self { + writer_tx, + output_tx, + killer: StdMutex::new(Some(killer)), + reader_handle: StdMutex::new(Some(reader_handle)), + writer_handle: StdMutex::new(Some(writer_handle)), + wait_handle: StdMutex::new(Some(wait_handle)), + exit_status, + }, + initial_output_rx, + ) } pub(crate) fn writer_sender(&self) -> mpsc::Sender> { @@ -67,13 +59,7 @@ impl ExecCommandSession { } pub(crate) fn output_receiver(&self) -> broadcast::Receiver> { - if let Ok(mut guard) = self.initial_output_rx.lock() - && let Some(receiver) = guard.take() - { - receiver - } else { - self.output_tx.subscribe() - } + self.output_tx.subscribe() } pub(crate) fn has_exited(&self) -> bool { diff --git a/codex-rs/core/src/exec_command/session_manager.rs b/codex-rs/core/src/exec_command/session_manager.rs index 26fb3f3e..61c70f0c 100644 --- a/codex-rs/core/src/exec_command/session_manager.rs +++ b/codex-rs/core/src/exec_command/session_manager.rs @@ -93,18 +93,16 @@ impl SessionManager { .fetch_add(1, std::sync::atomic::Ordering::SeqCst), ); - let (session, mut exit_rx) = - create_exec_command_session(params.clone()) - .await - .map_err(|err| { - format!( - "failed to create exec command session for session id {}: {err}", - session_id.0 - ) - })?; + let (session, mut output_rx, mut exit_rx) = create_exec_command_session(params.clone()) + .await + .map_err(|err| { + format!( + "failed to create exec command session for session id {}: {err}", + session_id.0 + ) + })?; // Insert into session map. - let mut output_rx = session.output_receiver(); self.sessions.lock().await.insert(session_id, session); // Collect output until either timeout expires or process exits. @@ -245,7 +243,11 @@ impl SessionManager { /// Spawn PTY and child process per spawn_exec_command_session logic. async fn create_exec_command_session( params: ExecCommandParams, -) -> anyhow::Result<(ExecCommandSession, oneshot::Receiver)> { +) -> anyhow::Result<( + ExecCommandSession, + tokio::sync::broadcast::Receiver>, + oneshot::Receiver, +)> { let ExecCommandParams { cmd, yield_time_ms: _, @@ -279,8 +281,6 @@ async fn create_exec_command_session( let (writer_tx, mut writer_rx) = mpsc::channel::>(128); // Broadcast for streaming PTY output to readers: subscribers receive from subscription time. let (output_tx, _) = tokio::sync::broadcast::channel::>(256); - let initial_output_rx = output_tx.subscribe(); - // Reader task: drain PTY and forward chunks to output channel. let mut reader = pair.master.try_clone_reader()?; let output_tx_clone = output_tx.clone(); @@ -342,7 +342,7 @@ async fn create_exec_command_session( }); // Create and store the session with channels. - let session = ExecCommandSession::new( + let (session, initial_output_rx) = ExecCommandSession::new( writer_tx, output_tx, killer, @@ -351,8 +351,7 @@ async fn create_exec_command_session( wait_handle, exit_status, ); - session.set_initial_output_receiver(initial_output_rx); - Ok((session, exit_rx)) + Ok((session, initial_output_rx, exit_rx)) } #[cfg(test)] diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index c6980c53..99b19796 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -100,10 +100,13 @@ type OutputBuffer = Arc>; type OutputHandles = (OutputBuffer, Arc); impl ManagedUnifiedExecSession { - fn new(session: ExecCommandSession) -> Self { + fn new( + session: ExecCommandSession, + initial_output_rx: tokio::sync::broadcast::Receiver>, + ) -> Self { let output_buffer = Arc::new(Mutex::new(OutputBufferState::default())); let output_notify = Arc::new(Notify::new()); - let mut receiver = session.output_receiver(); + let mut receiver = initial_output_rx; let buffer_clone = Arc::clone(&output_buffer); let notify_clone = Arc::clone(&output_notify); let output_task = tokio::spawn(async move { @@ -193,8 +196,8 @@ impl UnifiedExecSessionManager { } else { let command = request.input_chunks.to_vec(); let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst); - let session = create_unified_exec_session(&command).await?; - let managed_session = ManagedUnifiedExecSession::new(session); + let (session, initial_output_rx) = create_unified_exec_session(&command).await?; + let managed_session = ManagedUnifiedExecSession::new(session, initial_output_rx); let (buffer, notify) = managed_session.output_handles(); writer_tx = managed_session.writer_sender(); output_buffer = buffer; @@ -297,7 +300,13 @@ impl UnifiedExecSessionManager { async fn create_unified_exec_session( command: &[String], -) -> Result { +) -> Result< + ( + ExecCommandSession, + tokio::sync::broadcast::Receiver>, + ), + UnifiedExecError, +> { if command.is_empty() { return Err(UnifiedExecError::MissingCommandLine); } @@ -327,7 +336,6 @@ async fn create_unified_exec_session( let (writer_tx, mut writer_rx) = mpsc::channel::>(128); let (output_tx, _) = tokio::sync::broadcast::channel::>(256); - let initial_output_rx = output_tx.subscribe(); let mut reader = pair .master @@ -381,7 +389,7 @@ async fn create_unified_exec_session( wait_exit_status.store(true, Ordering::SeqCst); }); - let session = ExecCommandSession::new( + let (session, initial_output_rx) = ExecCommandSession::new( writer_tx, output_tx, killer, @@ -390,9 +398,7 @@ async fn create_unified_exec_session( wait_handle, exit_status, ); - session.set_initial_output_receiver(initial_output_rx); - - Ok(session) + Ok((session, initial_output_rx)) } #[cfg(test)]