fix: agent loop for disable response storage (#543)

- Fixes post-merge of #506

---------

Co-authored-by: Ilan Bigio <ilan@openai.com>
This commit is contained in:
Fouad Matin
2025-04-22 13:49:10 -07:00
committed by GitHub
parent f99c9080fd
commit a30e79b768
2 changed files with 184 additions and 73 deletions

View File

@@ -7,6 +7,7 @@ import type {
ResponseInputItem,
ResponseItem,
ResponseCreateParams,
FunctionTool,
} from "openai/resources/responses/responses.mjs";
import type { Reasoning } from "openai/resources.mjs";
@@ -68,6 +69,30 @@ type AgentLoopParams = {
onLastResponseId: (lastResponseId: string) => void;
};
const shellTool: FunctionTool = {
type: "function",
name: "shell",
description: "Runs a shell command, and returns its output.",
strict: false,
parameters: {
type: "object",
properties: {
command: { type: "array", items: { type: "string" } },
workdir: {
type: "string",
description: "The working directory for the command.",
},
timeout: {
type: "number",
description:
"The maximum time to wait for the command to complete in milliseconds.",
},
},
required: ["command"],
additionalProperties: false,
},
};
export class AgentLoop {
private model: string;
private provider: string;
@@ -109,7 +134,7 @@ export class AgentLoop {
private canceled = false;
/**
* Local conversation transcript used when `disableResponseStorage === false`. Holds
* Local conversation transcript used when `disableResponseStorage === true`. Holds
* all nonsystem items exchanged so far so we can provide full context on
* every request.
*/
@@ -442,9 +467,13 @@ export class AgentLoop {
// `previous_response_id` when `disableResponseStorage` is enabled. When storage
// is disabled we deliberately ignore the callersupplied value because
// the backend will not retain any state that could be referenced.
// If the backend stores conversation state (`disableResponseStorage === false`) we
// forward the callersupplied `previousResponseId` so that the model sees the
// full context. When storage is disabled we *must not* send any ID because the
// server no longer retains the referenced response.
let lastResponseId: string = this.disableResponseStorage
? previousResponseId
: "";
? ""
: previousResponseId;
// If there are unresolved function calls from a previously cancelled run
// we have to emit dummy tool outputs so that the API no longer expects
@@ -473,7 +502,11 @@ export class AgentLoop {
// conversation, so we must include the *entire* transcript (minus system
// messages) on every call.
let turnInput: Array<ResponseInputItem>;
let turnInput: Array<ResponseInputItem> = [];
// Keeps track of how many items in `turnInput` stem from the existing
// transcript so we can avoid reemitting them to the UI. Only used when
// `disableResponseStorage === true`.
let transcriptPrefixLen = 0;
const stripInternalFields = (
item: ResponseInputItem,
@@ -485,28 +518,47 @@ export class AgentLoop {
// be referenced elsewhere (e.g. UI components).
const clean = { ...item } as Record<string, unknown>;
delete clean["duration_ms"];
// Remove OpenAI-assigned identifiers and transient status so the
// backend does not reject items that were never persisted because we
// use `store: false`.
delete clean["id"];
delete clean["status"];
return clean as unknown as ResponseInputItem;
};
if (this.disableResponseStorage) {
// Remember where the existing transcript ends everything after this
// index in the upcoming `turnInput` list will be *new* for this turn
// and therefore needs to be surfaced to the UI.
transcriptPrefixLen = this.transcript.length;
// Ensure the transcript is uptodate with the latest user input so
// that subsequent iterations see a complete history.
const newUserItems: Array<ResponseInputItem> = input.filter((it) => {
if (
(it.type === "message" && it.role !== "system") ||
it.type === "reasoning"
) {
return false;
}
return true;
});
this.transcript.push(...newUserItems);
// `turnInput` is still empty at this point (it will be filled later).
// We need to look at the *input* items the user just supplied.
this.transcript.push(...filterToApiMessages(input));
turnInput = [...this.transcript, ...abortOutputs].map(
stripInternalFields,
);
} else {
turnInput = [...abortOutputs, ...input].map(stripInternalFields);
// When response storage is disabled we have to maintain our own
// running transcript so that the next request still contains the
// full conversational history. We skipped the transcript update in
// the branch above ensure we do it here as well.
if (this.disableResponseStorage) {
const newUserItems: Array<ResponseInputItem> = input.filter((it) => {
if (it.type === "message" && it.role === "system") {
return false;
} else if (it.type === "reasoning") {
return false;
}
return true;
});
this.transcript.push(...newUserItems.map(stripInternalFields));
}
}
this.onLoading(true);
@@ -553,6 +605,24 @@ export class AgentLoop {
// API schema and therefore causes a 400 error when included
// in subsequent requests whose context is sent verbatim.
// Skip items that we have already inserted earlier or that the
// model does not need to see again in the next turn.
// • function_call superseded by the forthcoming
// function_call_output.
// • reasoning internal only, never sent back.
// • user messages we added these to the transcript when
// building the first turnInput; stageItem would add a
// duplicate.
if (
(item as ResponseInputItem).type === "function_call" ||
(item as ResponseInputItem).type === "reasoning" ||
((item as ResponseInputItem).type === "message" &&
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(item as any).role === "user")
) {
return;
}
const clone: ResponseInputItem = {
...(item as unknown as ResponseInputItem),
} as ResponseInputItem;
@@ -578,7 +648,18 @@ export class AgentLoop {
// Only surface the *new* input items to the UI replaying the entire
// transcript would duplicate messages that have already been shown in
// earlier turns.
const deltaInput = [...abortOutputs, ...input];
// `turnInput` holds the *new* items that will be sent to the API in
// this iteration. Surface exactly these to the UI so that we do not
// reemit messages from previous turns (which would duplicate user
// prompts) and so that freshly generated `function_call_output`s are
// shown immediately.
// Figure out what subset of `turnInput` constitutes *new* information
// for the UI so that we dont spam the interface with repeats of the
// entire transcript on every iteration when response storage is
// disabled.
const deltaInput = this.disableResponseStorage
? turnInput.slice(transcriptPrefixLen)
: [...turnInput];
for (const item of deltaInput) {
stageItem(item as ResponseItem);
}
@@ -629,31 +710,12 @@ export class AgentLoop {
store: true,
previous_response_id: lastResponseId || undefined,
}),
tools: [
{
type: "function",
name: "shell",
description: "Runs a shell command, and returns its output.",
strict: true,
parameters: {
type: "object",
properties: {
command: { type: "array", items: { type: "string" } },
workdir: {
type: "string",
description: "The working directory for the command.",
},
timeout: {
type: "number",
description:
"The maximum time to wait for the command to complete in milliseconds.",
},
},
required: ["command", "workdir", "timeout"],
additionalProperties: false,
},
},
],
tools: [shellTool],
// Explicitly tell the model it is allowed to pick whatever
// tool it deems appropriate. Omitting this sometimes leads to
// the model ignoring the available tools and responding with
// plain text instead (resulting in a missing toolcall).
tool_choice: "auto",
});
break;
} catch (error) {
@@ -883,13 +945,60 @@ export class AgentLoop {
stageItem(item as ResponseItem);
}
}
if (event.response.status === "completed") {
if (
event.response.status === "completed" ||
(event.response.status as unknown as string) ===
"requires_action"
) {
// TODO: remove this once we can depend on streaming events
const newTurnInput = await this.processEventsWithoutStreaming(
event.response.output,
stageItem,
);
turnInput = newTurnInput;
// When we do not use serverside storage we maintain our
// own transcript so that *future* turns still contain full
// conversational context. However, whether we advance to
// another loop iteration should depend solely on the
// presence of *new* input items (i.e. items that were not
// part of the previous request). Resending the transcript
// by itself would create an infinite request loop because
// `turnInput.length` would never reach zero.
if (this.disableResponseStorage) {
// 1) Append the freshly emitted output to our local
// transcript (minus nonmessage items the model does
// not need to see again).
const cleaned = filterToApiMessages(
event.response.output.map(stripInternalFields),
);
this.transcript.push(...cleaned);
// 2) Determine the *delta* (newTurnInput) that must be
// sent in the next iteration. If there is none we can
// safely terminate the loop the transcript alone
// does not constitute new information for the
// assistant to act upon.
const delta = filterToApiMessages(
newTurnInput.map(stripInternalFields),
);
if (delta.length === 0) {
// No new input => end conversation.
turnInput = [];
} else {
// Resend full transcript *plus* the new delta so the
// stateless backend receives complete context.
turnInput = [...this.transcript, ...delta];
// The prefix ends at the current transcript length
// everything after this index is new for the next
// iteration.
transcriptPrefixLen = this.transcript.length;
}
} else {
turnInput = newTurnInput;
}
}
lastResponseId = event.response.id;
this.onLastResponseId(event.response.id);
@@ -951,45 +1060,27 @@ export class AgentLoop {
params as ResponseCreateParams & { stream: true },
);
log(
"agentLoop.run(): responseCall(1): turnInput: " +
JSON.stringify(turnInput),
);
// eslint-disable-next-line no-await-in-loop
stream = await responseCall({
model: this.model,
instructions: mergedInstructions,
previous_response_id: lastResponseId || undefined,
input: turnInput,
stream: true,
parallel_tool_calls: false,
reasoning,
...(this.config.flexMode ? { service_tier: "flex" } : {}),
tools: [
{
type: "function",
name: "shell",
description:
"Runs a shell command, and returns its output.",
strict: false,
parameters: {
type: "object",
properties: {
command: {
type: "array",
items: { type: "string" },
},
workdir: {
type: "string",
description: "The working directory for the command.",
},
timeout: {
type: "number",
description:
"The maximum time to wait for the command to complete in milliseconds.",
},
},
required: ["command"],
additionalProperties: false,
},
},
],
...(this.disableResponseStorage
? { store: false }
: {
store: true,
previous_response_id: lastResponseId || undefined,
}),
tools: [shellTool],
tool_choice: "auto",
});
this.currentStream = stream;
@@ -1393,3 +1484,17 @@ You MUST adhere to the following criteria when executing the task:
- When your task involves writing or modifying files:
- Do NOT tell the user to "save the file" or "copy the code into a file" if you already created or modified the file using \`apply_patch\`. Instead, reference the file as already saved.
- Do NOT show the full contents of large files you have already written, unless the user explicitly asks for them.`;
function filterToApiMessages(
items: Array<ResponseInputItem>,
): Array<ResponseInputItem> {
return items.filter((it) => {
if (it.type === "message" && it.role === "system") {
return false;
}
if (it.type === "reasoning") {
return false;
}
return true;
});
}