From ee6e1765fa216df2f054906b68d542adef8ac25b Mon Sep 17 00:00:00 2001 From: Scott Leibrand Date: Tue, 22 Apr 2025 08:02:10 -0700 Subject: [PATCH] agent-loop: minimal mid-stream #429 retry loop using existing back-off (#506) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As requested by @tibo-openai at https://github.com/openai/codex/pull/357#issuecomment-2816554203, this attempts a more minimal implementation of #357 that preserves as much as possible of the existing code's exponential backoff logic. Adds a small retry wrapper around the streaming for‑await loop so that HTTP 429s which occur *after* the stream has started no longer crash the CLI. Highlights • Re‑uses existing RATE_LIMIT_RETRY_WAIT_MS constant and 5‑attempt limit. • Exponential back‑off identical to initial request handling. This comment is probably more useful here in the PR: // The OpenAI SDK may raise a 429 (rate‑limit) *after* the stream has // started. Prior logic already retries the initial `responses.create` // call, but we need to add equivalent resilience for mid‑stream // failures. We keep the implementation minimal by wrapping the // existing `for‑await` loop in a small retry‑for‑loop that re‑creates // the stream with exponential back‑off. --- codex-cli/src/utils/agent/agent-loop.ts | 285 ++++++++++++++++-------- 1 file changed, 196 insertions(+), 89 deletions(-) diff --git a/codex-cli/src/utils/agent/agent-loop.ts b/codex-cli/src/utils/agent/agent-loop.ts index 3d34dee3..bd6adcd0 100644 --- a/codex-cli/src/utils/agent/agent-loop.ts +++ b/codex-cli/src/utils/agent/agent-loop.ts @@ -842,104 +842,211 @@ export class AgentLoop { return; } - try { - // eslint-disable-next-line no-await-in-loop - for await (const event of stream as AsyncIterable) { - log(`AgentLoop.run(): response event ${event.type}`); + const MAX_STREAM_RETRIES = 5; + let streamRetryAttempt = 0; - // process and surface each item (no-op until we can depend on streaming events) - if (event.type === "response.output_item.done") { - const item = event.item; - // 1) if it's a reasoning item, annotate it - type ReasoningItem = { type?: string; duration_ms?: number }; - const maybeReasoning = item as ReasoningItem; - if (maybeReasoning.type === "reasoning") { - maybeReasoning.duration_ms = Date.now() - thinkingStart; - } - if (item.type === "function_call") { - // Track outstanding tool call so we can abort later if needed. - // The item comes from the streaming response, therefore it has - // either `id` (chat) or `call_id` (responses) – we normalise - // by reading both. - const callId = - (item as { call_id?: string; id?: string }).call_id ?? - (item as { id?: string }).id; - if (callId) { - this.pendingAborts.add(callId); + // eslint-disable-next-line no-constant-condition + while (true) { + try { + // eslint-disable-next-line no-await-in-loop + for await (const event of stream as AsyncIterable) { + log(`AgentLoop.run(): response event ${event.type}`); + + // process and surface each item (no-op until we can depend on streaming events) + if (event.type === "response.output_item.done") { + const item = event.item; + // 1) if it's a reasoning item, annotate it + type ReasoningItem = { type?: string; duration_ms?: number }; + const maybeReasoning = item as ReasoningItem; + if (maybeReasoning.type === "reasoning") { + maybeReasoning.duration_ms = Date.now() - thinkingStart; } - } else { - stageItem(item as ResponseItem); - } - } - - if (event.type === "response.completed") { - if (thisGeneration === this.generation && !this.canceled) { - for (const item of event.response.output) { + if (item.type === "function_call") { + // Track outstanding tool call so we can abort later if needed. + // The item comes from the streaming response, therefore it has + // either `id` (chat) or `call_id` (responses) – we normalise + // by reading both. + const callId = + (item as { call_id?: string; id?: string }).call_id ?? + (item as { id?: string }).id; + if (callId) { + this.pendingAborts.add(callId); + } + } else { stageItem(item as ResponseItem); } } - if (event.response.status === "completed") { - // TODO: remove this once we can depend on streaming events - const newTurnInput = await this.processEventsWithoutStreaming( - event.response.output, - stageItem, - ); - turnInput = newTurnInput; + + if (event.type === "response.completed") { + if (thisGeneration === this.generation && !this.canceled) { + for (const item of event.response.output) { + stageItem(item as ResponseItem); + } + } + if (event.response.status === "completed") { + // TODO: remove this once we can depend on streaming events + const newTurnInput = await this.processEventsWithoutStreaming( + event.response.output, + stageItem, + ); + turnInput = newTurnInput; + } + lastResponseId = event.response.id; + this.onLastResponseId(event.response.id); } - lastResponseId = event.response.id; - this.onLastResponseId(event.response.id); } - } - } catch (err: unknown) { - // Gracefully handle an abort triggered via `cancel()` so that the - // consumer does not see an unhandled exception. - if (err instanceof Error && err.name === "AbortError") { - if (!this.canceled) { - // It was aborted for some other reason; surface the error. - throw err; + // Stream finished successfully – leave the retry loop. + break; + } catch (err: unknown) { + const isRateLimitError = (e: unknown): boolean => { + if (!e || typeof e !== "object") { + return false; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const ex: any = e; + return ( + ex.status === 429 || + ex.code === "rate_limit_exceeded" || + ex.type === "rate_limit_exceeded" + ); + }; + + if ( + isRateLimitError(err) && + streamRetryAttempt < MAX_STREAM_RETRIES + ) { + streamRetryAttempt += 1; + + const waitMs = + RATE_LIMIT_RETRY_WAIT_MS * 2 ** (streamRetryAttempt - 1); + log( + `OpenAI stream rate‑limited – retry ${streamRetryAttempt}/${MAX_STREAM_RETRIES} in ${waitMs} ms`, + ); + + // Give the server a breather before retrying. + // eslint-disable-next-line no-await-in-loop + await new Promise((res) => setTimeout(res, waitMs)); + + // Re‑create the stream with the *same* parameters. + let reasoning: Reasoning | undefined; + if (this.model.startsWith("o")) { + reasoning = { effort: "high" }; + if (this.model === "o3" || this.model === "o4-mini") { + reasoning.summary = "auto"; + } + } + + const mergedInstructions = [prefix, this.instructions] + .filter(Boolean) + .join("\n"); + + const responseCall = + !this.config.provider || + this.config.provider?.toLowerCase() === "openai" + ? (params: ResponseCreateParams) => + this.oai.responses.create(params) + : (params: ResponseCreateParams) => + responsesCreateViaChatCompletions( + this.oai, + params as ResponseCreateParams & { stream: true }, + ); + + // eslint-disable-next-line no-await-in-loop + stream = await responseCall({ + model: this.model, + instructions: mergedInstructions, + previous_response_id: lastResponseId || undefined, + input: turnInput, + stream: true, + parallel_tool_calls: false, + reasoning, + ...(this.config.flexMode ? { service_tier: "flex" } : {}), + tools: [ + { + type: "function", + name: "shell", + description: + "Runs a shell command, and returns its output.", + strict: false, + parameters: { + type: "object", + properties: { + command: { + type: "array", + items: { type: "string" }, + }, + workdir: { + type: "string", + description: "The working directory for the command.", + }, + timeout: { + type: "number", + description: + "The maximum time to wait for the command to complete in milliseconds.", + }, + }, + required: ["command"], + additionalProperties: false, + }, + }, + ], + }); + + this.currentStream = stream; + // Continue to outer while to consume new stream. + continue; } - this.onLoading(false); - return; + + // Gracefully handle an abort triggered via `cancel()` so that the + // consumer does not see an unhandled exception. + if (err instanceof Error && err.name === "AbortError") { + if (!this.canceled) { + // It was aborted for some other reason; surface the error. + throw err; + } + this.onLoading(false); + return; + } + // Suppress internal stack on JSON parse failures + if (err instanceof SyntaxError) { + this.onItem({ + id: `error-${Date.now()}`, + type: "message", + role: "system", + content: [ + { + type: "input_text", + text: "⚠️ Failed to parse streaming response (invalid JSON). Please `/clear` to reset.", + }, + ], + }); + this.onLoading(false); + return; + } + // Handle OpenAI API quota errors + if ( + err instanceof Error && + (err as { code?: string }).code === "insufficient_quota" + ) { + this.onItem({ + id: `error-${Date.now()}`, + type: "message", + role: "system", + content: [ + { + type: "input_text", + text: "⚠️ Insufficient quota. Please check your billing details and retry.", + }, + ], + }); + this.onLoading(false); + return; + } + throw err; + } finally { + this.currentStream = null; } - // Suppress internal stack on JSON parse failures - if (err instanceof SyntaxError) { - this.onItem({ - id: `error-${Date.now()}`, - type: "message", - role: "system", - content: [ - { - type: "input_text", - text: "⚠️ Failed to parse streaming response (invalid JSON). Please `/clear` to reset.", - }, - ], - }); - this.onLoading(false); - return; - } - // Handle OpenAI API quota errors - if ( - err instanceof Error && - (err as { code?: string }).code === "insufficient_quota" - ) { - this.onItem({ - id: `error-${Date.now()}`, - type: "message", - role: "system", - content: [ - { - type: "input_text", - text: "⚠️ Insufficient quota. Please check your billing details and retry.", - }, - ], - }); - this.onLoading(false); - return; - } - throw err; - } finally { - this.currentStream = null; - } + } // end while retry loop log( `Turn inputs (${turnInput.length}) - ${turnInput