fix: don't clear turn input before retries (#611)

The current turn input in the agent loop is being discarded before
consuming the stream events which causes the stream reconnect (after
rate limit failure) to not include the inputs. Since the new stream
includes the previous response ID, it triggers a bad request exception
considering the input doesn't match what OpenAI has stored on the server
side and subsequently a very confusing error message of: `No tool output
found for function call call_xyz`.

This should fix https://github.com/openai/codex/issues/586.

## Testing

I have a personal project that I'm working on that runs multiple Codex
CLIs in parallel and often runs into rate limit errors (as seen in the
OpenAI logs). After making this change, I am no longer experiencing
Codex crashing and it was able to retry and handle everything gracefully
until completion (even though I still see rate limiting in the OpenAI
logs).
This commit is contained in:
Connor Christie
2025-04-24 04:29:36 -07:00
committed by GitHub
parent c75cb507f0
commit 622323a59b

View File

@@ -861,7 +861,6 @@ export class AgentLoop {
throw error;
}
}
turnInput = []; // clear turn input, prepare for function call results
// If the user requested cancellation while we were awaiting the network
// request, abort immediately before we start handling the stream.
@@ -894,6 +893,8 @@ export class AgentLoop {
// eslint-disable-next-line no-constant-condition
while (true) {
try {
let newTurnInput: Array<ResponseInputItem> = [];
// eslint-disable-next-line no-await-in-loop
for await (const event of stream as AsyncIterable<ResponseEvent>) {
log(`AgentLoop.run(): response event ${event.type}`);
@@ -935,7 +936,7 @@ export class AgentLoop {
"requires_action"
) {
// TODO: remove this once we can depend on streaming events
const newTurnInput = await this.processEventsWithoutStreaming(
newTurnInput = await this.processEventsWithoutStreaming(
event.response.output,
stageItem,
);
@@ -970,24 +971,30 @@ export class AgentLoop {
if (delta.length === 0) {
// No new input => end conversation.
turnInput = [];
newTurnInput = [];
} else {
// Resend full transcript *plus* the new delta so the
// stateless backend receives complete context.
turnInput = [...this.transcript, ...delta];
newTurnInput = [...this.transcript, ...delta];
// The prefix ends at the current transcript length
// everything after this index is new for the next
// iteration.
transcriptPrefixLen = this.transcript.length;
}
} else {
turnInput = newTurnInput;
}
}
lastResponseId = event.response.id;
this.onLastResponseId(event.response.id);
}
}
// Set after we have consumed all stream events in case the stream wasn't
// complete or we missed events for whatever reason. That way, we will set
// the next turn to an empty array to prevent an infinite loop.
// And don't update the turn input too early otherwise we won't have the
// current turn inputs available for retries.
turnInput = newTurnInput;
// Stream finished successfully leave the retry loop.
break;
} catch (err: unknown) {