agent-loop: minimal mid-stream #429 retry loop using existing back-off (#506)

As requested by @tibo-openai at
https://github.com/openai/codex/pull/357#issuecomment-2816554203, this
attempts a more minimal implementation of #357 that preserves as much as
possible of the existing code's exponential backoff logic.

Adds a small retry wrapper around the streaming for‑await loop so that
HTTP 429s which occur *after* the stream has started no longer crash the
CLI.

Highlights
• Re‑uses existing RATE_LIMIT_RETRY_WAIT_MS constant and 5‑attempt
limit.
• Exponential back‑off identical to initial request handling. 

This comment is probably more useful here in the PR:
// The OpenAI SDK may raise a 429 (rate‑limit) *after* the stream has
// started. Prior logic already retries the initial `responses.create`
        // call, but we need to add equivalent resilience for mid‑stream
        // failures.  We keep the implementation minimal by wrapping the
// existing `for‑await` loop in a small retry‑for‑loop that re‑creates
        // the stream with exponential back‑off.
This commit is contained in:
Scott Leibrand
2025-04-22 08:02:10 -07:00
committed by GitHub
parent 98a22273d9
commit ee6e1765fa

View File

@@ -842,104 +842,211 @@ export class AgentLoop {
return;
}
try {
// eslint-disable-next-line no-await-in-loop
for await (const event of stream as AsyncIterable<ResponseEvent>) {
log(`AgentLoop.run(): response event ${event.type}`);
const MAX_STREAM_RETRIES = 5;
let streamRetryAttempt = 0;
// process and surface each item (no-op until we can depend on streaming events)
if (event.type === "response.output_item.done") {
const item = event.item;
// 1) if it's a reasoning item, annotate it
type ReasoningItem = { type?: string; duration_ms?: number };
const maybeReasoning = item as ReasoningItem;
if (maybeReasoning.type === "reasoning") {
maybeReasoning.duration_ms = Date.now() - thinkingStart;
}
if (item.type === "function_call") {
// Track outstanding tool call so we can abort later if needed.
// The item comes from the streaming response, therefore it has
// either `id` (chat) or `call_id` (responses) we normalise
// by reading both.
const callId =
(item as { call_id?: string; id?: string }).call_id ??
(item as { id?: string }).id;
if (callId) {
this.pendingAborts.add(callId);
// eslint-disable-next-line no-constant-condition
while (true) {
try {
// eslint-disable-next-line no-await-in-loop
for await (const event of stream as AsyncIterable<ResponseEvent>) {
log(`AgentLoop.run(): response event ${event.type}`);
// process and surface each item (no-op until we can depend on streaming events)
if (event.type === "response.output_item.done") {
const item = event.item;
// 1) if it's a reasoning item, annotate it
type ReasoningItem = { type?: string; duration_ms?: number };
const maybeReasoning = item as ReasoningItem;
if (maybeReasoning.type === "reasoning") {
maybeReasoning.duration_ms = Date.now() - thinkingStart;
}
} else {
stageItem(item as ResponseItem);
}
}
if (event.type === "response.completed") {
if (thisGeneration === this.generation && !this.canceled) {
for (const item of event.response.output) {
if (item.type === "function_call") {
// Track outstanding tool call so we can abort later if needed.
// The item comes from the streaming response, therefore it has
// either `id` (chat) or `call_id` (responses) we normalise
// by reading both.
const callId =
(item as { call_id?: string; id?: string }).call_id ??
(item as { id?: string }).id;
if (callId) {
this.pendingAborts.add(callId);
}
} else {
stageItem(item as ResponseItem);
}
}
if (event.response.status === "completed") {
// TODO: remove this once we can depend on streaming events
const newTurnInput = await this.processEventsWithoutStreaming(
event.response.output,
stageItem,
);
turnInput = newTurnInput;
if (event.type === "response.completed") {
if (thisGeneration === this.generation && !this.canceled) {
for (const item of event.response.output) {
stageItem(item as ResponseItem);
}
}
if (event.response.status === "completed") {
// TODO: remove this once we can depend on streaming events
const newTurnInput = await this.processEventsWithoutStreaming(
event.response.output,
stageItem,
);
turnInput = newTurnInput;
}
lastResponseId = event.response.id;
this.onLastResponseId(event.response.id);
}
lastResponseId = event.response.id;
this.onLastResponseId(event.response.id);
}
}
} catch (err: unknown) {
// Gracefully handle an abort triggered via `cancel()` so that the
// consumer does not see an unhandled exception.
if (err instanceof Error && err.name === "AbortError") {
if (!this.canceled) {
// It was aborted for some other reason; surface the error.
throw err;
// Stream finished successfully leave the retry loop.
break;
} catch (err: unknown) {
const isRateLimitError = (e: unknown): boolean => {
if (!e || typeof e !== "object") {
return false;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const ex: any = e;
return (
ex.status === 429 ||
ex.code === "rate_limit_exceeded" ||
ex.type === "rate_limit_exceeded"
);
};
if (
isRateLimitError(err) &&
streamRetryAttempt < MAX_STREAM_RETRIES
) {
streamRetryAttempt += 1;
const waitMs =
RATE_LIMIT_RETRY_WAIT_MS * 2 ** (streamRetryAttempt - 1);
log(
`OpenAI stream ratelimited retry ${streamRetryAttempt}/${MAX_STREAM_RETRIES} in ${waitMs} ms`,
);
// Give the server a breather before retrying.
// eslint-disable-next-line no-await-in-loop
await new Promise((res) => setTimeout(res, waitMs));
// Recreate the stream with the *same* parameters.
let reasoning: Reasoning | undefined;
if (this.model.startsWith("o")) {
reasoning = { effort: "high" };
if (this.model === "o3" || this.model === "o4-mini") {
reasoning.summary = "auto";
}
}
const mergedInstructions = [prefix, this.instructions]
.filter(Boolean)
.join("\n");
const responseCall =
!this.config.provider ||
this.config.provider?.toLowerCase() === "openai"
? (params: ResponseCreateParams) =>
this.oai.responses.create(params)
: (params: ResponseCreateParams) =>
responsesCreateViaChatCompletions(
this.oai,
params as ResponseCreateParams & { stream: true },
);
// eslint-disable-next-line no-await-in-loop
stream = await responseCall({
model: this.model,
instructions: mergedInstructions,
previous_response_id: lastResponseId || undefined,
input: turnInput,
stream: true,
parallel_tool_calls: false,
reasoning,
...(this.config.flexMode ? { service_tier: "flex" } : {}),
tools: [
{
type: "function",
name: "shell",
description:
"Runs a shell command, and returns its output.",
strict: false,
parameters: {
type: "object",
properties: {
command: {
type: "array",
items: { type: "string" },
},
workdir: {
type: "string",
description: "The working directory for the command.",
},
timeout: {
type: "number",
description:
"The maximum time to wait for the command to complete in milliseconds.",
},
},
required: ["command"],
additionalProperties: false,
},
},
],
});
this.currentStream = stream;
// Continue to outer while to consume new stream.
continue;
}
this.onLoading(false);
return;
// Gracefully handle an abort triggered via `cancel()` so that the
// consumer does not see an unhandled exception.
if (err instanceof Error && err.name === "AbortError") {
if (!this.canceled) {
// It was aborted for some other reason; surface the error.
throw err;
}
this.onLoading(false);
return;
}
// Suppress internal stack on JSON parse failures
if (err instanceof SyntaxError) {
this.onItem({
id: `error-${Date.now()}`,
type: "message",
role: "system",
content: [
{
type: "input_text",
text: "⚠️ Failed to parse streaming response (invalid JSON). Please `/clear` to reset.",
},
],
});
this.onLoading(false);
return;
}
// Handle OpenAI API quota errors
if (
err instanceof Error &&
(err as { code?: string }).code === "insufficient_quota"
) {
this.onItem({
id: `error-${Date.now()}`,
type: "message",
role: "system",
content: [
{
type: "input_text",
text: "⚠️ Insufficient quota. Please check your billing details and retry.",
},
],
});
this.onLoading(false);
return;
}
throw err;
} finally {
this.currentStream = null;
}
// Suppress internal stack on JSON parse failures
if (err instanceof SyntaxError) {
this.onItem({
id: `error-${Date.now()}`,
type: "message",
role: "system",
content: [
{
type: "input_text",
text: "⚠️ Failed to parse streaming response (invalid JSON). Please `/clear` to reset.",
},
],
});
this.onLoading(false);
return;
}
// Handle OpenAI API quota errors
if (
err instanceof Error &&
(err as { code?: string }).code === "insufficient_quota"
) {
this.onItem({
id: `error-${Date.now()}`,
type: "message",
role: "system",
content: [
{
type: "input_text",
text: "⚠️ Insufficient quota. Please check your billing details and retry.",
},
],
});
this.onLoading(false);
return;
}
throw err;
} finally {
this.currentStream = null;
}
} // end while retry loop
log(
`Turn inputs (${turnInput.length}) - ${turnInput