From bd4fa8550781537b40bf5efd92f6d12140ad49b9 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Thu, 4 Sep 2025 07:38:28 -0700 Subject: [PATCH] fix: add callback to map before sending request to fix race condition (#3146) Last week, I thought I found the smoking gun in our flaky integration tests where holding these locks could have led to potential deadlock: - https://github.com/openai/codex/pull/2876 - https://github.com/openai/codex/pull/2878 Yet even after those PRs went in, we continued to see flakinees in our integration tests! Though with the additional logging added as part of debugging those tests, I now saw things like: ``` read message from stdout: Notification(JSONRPCNotification { jsonrpc: "2.0", method: "codex/event/exec_approval_request", params: Some(Object {"id": String("0"), "msg": Object {"type": String("exec_approval_request"), "call_id": String("call1"), "command": Array [String("python3"), String("-c"), String("print(42)")], "cwd": String("/tmp/.tmpFj2zwi/workdir")}, "conversationId": String("c67b32c5-9475-41bf-8680-f4b4834ebcc6")}) }) notification: Notification(JSONRPCNotification { jsonrpc: "2.0", method: "codex/event/exec_approval_request", params: Some(Object {"id": String("0"), "msg": Object {"type": String("exec_approval_request"), "call_id": String("call1"), "command": Array [String("python3"), String("-c"), String("print(42)")], "cwd": String("/tmp/.tmpFj2zwi/workdir")}, "conversationId": String("c67b32c5-9475-41bf-8680-f4b4834ebcc6")}) }) read message from stdout: Request(JSONRPCRequest { id: Integer(0), jsonrpc: "2.0", method: "execCommandApproval", params: Some(Object {"conversation_id": String("c67b32c5-9475-41bf-8680-f4b4834ebcc6"), "call_id": String("call1"), "command": Array [String("python3"), String("-c"), String("print(42)")], "cwd": String("/tmp/.tmpFj2zwi/workdir")}) }) writing message to stdin: Response(JSONRPCResponse { id: Integer(0), jsonrpc: "2.0", result: Object {"decision": String("approved")} }) in read_stream_until_notification_message(codex/event/task_complete) [mcp stderr] 2025-09-04T00:00:59.738585Z INFO codex_mcp_server::message_processor: <- response: JSONRPCResponse { id: Integer(0), jsonrpc: "2.0", result: Object {"decision": String("approved")} } [mcp stderr] 2025-09-04T00:00:59.738740Z DEBUG codex_core::codex: Submission sub=Submission { id: "1", op: ExecApproval { id: "0", decision: Approved } } [mcp stderr] 2025-09-04T00:00:59.738832Z WARN codex_core::codex: No pending approval found for sub_id: 0 ``` That is, a response was sent for a request, but no callback was in place to handle the response! This time, I think I may have found the underlying issue (though the fixes for holding locks for too long may have also been part of it), which is I found cases where we were sending the request: https://github.com/openai/codex/blob/234c0a0469db222f05df08d00ae5032312f77427/codex-rs/core/src/codex.rs#L597 before inserting the `Sender` into the `pending_approvals` map (which has to wait on acquiring a mutex): https://github.com/openai/codex/blob/234c0a0469db222f05df08d00ae5032312f77427/codex-rs/core/src/codex.rs#L598-L601 so it is possible the request could go out and the client could respond before `pending_approvals` was updated! Note this was happening in both `request_command_approval()` and `request_patch_approval()`, which maps to the sorts of errors we have been seeing when these integration tests have been flaking on us. While here, I am also adding some extra logging that prints if inserting into `pending_approvals` overwrites an entry as opposed to purely inserting one. Today, a conversation can have only one pending request at a time, but as we are planning to support parallel tool calls, this invariant may not continue to hold, in which case we need to revisit this abstraction. --- codex-rs/core/src/codex.rs | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 287bfe8f..2028c424 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -584,9 +584,19 @@ impl Session { cwd: PathBuf, reason: Option, ) -> oneshot::Receiver { + // Add the tx_approve callback to the map before sending the request. let (tx_approve, rx_approve) = oneshot::channel(); + let event_id = sub_id.clone(); + let prev_entry = { + let mut state = self.state.lock_unchecked(); + state.pending_approvals.insert(sub_id, tx_approve) + }; + if prev_entry.is_some() { + warn!("Overwriting existing pending approval for sub_id: {event_id}"); + } + let event = Event { - id: sub_id.clone(), + id: event_id, msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { call_id, command, @@ -595,10 +605,6 @@ impl Session { }), }; let _ = self.tx_event.send(event).await; - { - let mut state = self.state.lock_unchecked(); - state.pending_approvals.insert(sub_id, tx_approve); - } rx_approve } @@ -610,9 +616,19 @@ impl Session { reason: Option, grant_root: Option, ) -> oneshot::Receiver { + // Add the tx_approve callback to the map before sending the request. let (tx_approve, rx_approve) = oneshot::channel(); + let event_id = sub_id.clone(); + let prev_entry = { + let mut state = self.state.lock_unchecked(); + state.pending_approvals.insert(sub_id, tx_approve) + }; + if prev_entry.is_some() { + warn!("Overwriting existing pending approval for sub_id: {event_id}"); + } + let event = Event { - id: sub_id.clone(), + id: event_id, msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id, changes: convert_apply_patch_to_protocol(action), @@ -621,10 +637,6 @@ impl Session { }), }; let _ = self.tx_event.send(event).await; - { - let mut state = self.state.lock_unchecked(); - state.pending_approvals.insert(sub_id, tx_approve); - } rx_approve }