fix: add callback to map before sending request to fix race condition (#3146)
Last week, I thought I found the smoking gun in our flaky integration tests where holding these locks could have led to potential deadlock: - https://github.com/openai/codex/pull/2876 - https://github.com/openai/codex/pull/2878 Yet even after those PRs went in, we continued to see flakinees in our integration tests! Though with the additional logging added as part of debugging those tests, I now saw things like: ``` read message from stdout: Notification(JSONRPCNotification { jsonrpc: "2.0", method: "codex/event/exec_approval_request", params: Some(Object {"id": String("0"), "msg": Object {"type": String("exec_approval_request"), "call_id": String("call1"), "command": Array [String("python3"), String("-c"), String("print(42)")], "cwd": String("/tmp/.tmpFj2zwi/workdir")}, "conversationId": String("c67b32c5-9475-41bf-8680-f4b4834ebcc6")}) }) notification: Notification(JSONRPCNotification { jsonrpc: "2.0", method: "codex/event/exec_approval_request", params: Some(Object {"id": String("0"), "msg": Object {"type": String("exec_approval_request"), "call_id": String("call1"), "command": Array [String("python3"), String("-c"), String("print(42)")], "cwd": String("/tmp/.tmpFj2zwi/workdir")}, "conversationId": String("c67b32c5-9475-41bf-8680-f4b4834ebcc6")}) }) read message from stdout: Request(JSONRPCRequest { id: Integer(0), jsonrpc: "2.0", method: "execCommandApproval", params: Some(Object {"conversation_id": String("c67b32c5-9475-41bf-8680-f4b4834ebcc6"), "call_id": String("call1"), "command": Array [String("python3"), String("-c"), String("print(42)")], "cwd": String("/tmp/.tmpFj2zwi/workdir")}) }) writing message to stdin: Response(JSONRPCResponse { id: Integer(0), jsonrpc: "2.0", result: Object {"decision": String("approved")} }) in read_stream_until_notification_message(codex/event/task_complete) [mcp stderr] 2025-09-04T00:00:59.738585Z INFO codex_mcp_server::message_processor: <- response: JSONRPCResponse { id: Integer(0), jsonrpc: "2.0", result: Object {"decision": String("approved")} } [mcp stderr] 2025-09-04T00:00:59.738740Z DEBUG codex_core::codex: Submission sub=Submission { id: "1", op: ExecApproval { id: "0", decision: Approved } } [mcp stderr] 2025-09-04T00:00:59.738832Z WARN codex_core::codex: No pending approval found for sub_id: 0 ``` That is, a response was sent for a request, but no callback was in place to handle the response! This time, I think I may have found the underlying issue (though the fixes for holding locks for too long may have also been part of it), which is I found cases where we were sending the request:234c0a0469/codex-rs/core/src/codex.rs (L597)before inserting the `Sender` into the `pending_approvals` map (which has to wait on acquiring a mutex):234c0a0469/codex-rs/core/src/codex.rs (L598-L601)so it is possible the request could go out and the client could respond before `pending_approvals` was updated! Note this was happening in both `request_command_approval()` and `request_patch_approval()`, which maps to the sorts of errors we have been seeing when these integration tests have been flaking on us. While here, I am also adding some extra logging that prints if inserting into `pending_approvals` overwrites an entry as opposed to purely inserting one. Today, a conversation can have only one pending request at a time, but as we are planning to support parallel tool calls, this invariant may not continue to hold, in which case we need to revisit this abstraction.
This commit is contained in:
@@ -584,9 +584,19 @@ impl Session {
|
||||
cwd: PathBuf,
|
||||
reason: Option<String>,
|
||||
) -> oneshot::Receiver<ReviewDecision> {
|
||||
// Add the tx_approve callback to the map before sending the request.
|
||||
let (tx_approve, rx_approve) = oneshot::channel();
|
||||
let event_id = sub_id.clone();
|
||||
let prev_entry = {
|
||||
let mut state = self.state.lock_unchecked();
|
||||
state.pending_approvals.insert(sub_id, tx_approve)
|
||||
};
|
||||
if prev_entry.is_some() {
|
||||
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
||||
}
|
||||
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
id: event_id,
|
||||
msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
||||
call_id,
|
||||
command,
|
||||
@@ -595,10 +605,6 @@ impl Session {
|
||||
}),
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
{
|
||||
let mut state = self.state.lock_unchecked();
|
||||
state.pending_approvals.insert(sub_id, tx_approve);
|
||||
}
|
||||
rx_approve
|
||||
}
|
||||
|
||||
@@ -610,9 +616,19 @@ impl Session {
|
||||
reason: Option<String>,
|
||||
grant_root: Option<PathBuf>,
|
||||
) -> oneshot::Receiver<ReviewDecision> {
|
||||
// Add the tx_approve callback to the map before sending the request.
|
||||
let (tx_approve, rx_approve) = oneshot::channel();
|
||||
let event_id = sub_id.clone();
|
||||
let prev_entry = {
|
||||
let mut state = self.state.lock_unchecked();
|
||||
state.pending_approvals.insert(sub_id, tx_approve)
|
||||
};
|
||||
if prev_entry.is_some() {
|
||||
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
||||
}
|
||||
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
id: event_id,
|
||||
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
call_id,
|
||||
changes: convert_apply_patch_to_protocol(action),
|
||||
@@ -621,10 +637,6 @@ impl Session {
|
||||
}),
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
{
|
||||
let mut state = self.state.lock_unchecked();
|
||||
state.pending_approvals.insert(sub_id, tx_approve);
|
||||
}
|
||||
rx_approve
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user