bug: non-openai mode - fix for gemini content: null, fix 429 to throw before stream (#563)

Gemini's API is finicky, it 400's without an error when you pass
content: null
Also fixed the rate limiting issues by throwing outside of the iterator.
I think there's a separate issue with the second isRateLimit check in
agent-loop - turnInput is cleared by that time, so it retries without
the last message.
This commit is contained in:
Daniel Nakov
2025-04-22 20:37:48 -04:00
committed by GitHub
parent 20b6ef0de8
commit 23f0887df3
2 changed files with 273 additions and 319 deletions

View File

@@ -3,6 +3,7 @@ import type {
ResponseCreateParams,
Response,
} from "openai/resources/responses/responses";
// Define interfaces based on OpenAI API documentation
type ResponseCreateInput = ResponseCreateParams;
type ResponseOutput = Response;
@@ -260,31 +261,7 @@ function convertTools(
}));
}
// Main function with overloading
async function responsesCreateViaChatCompletions(
openai: OpenAI,
input: ResponseCreateInput & { stream: true },
): Promise<AsyncGenerator<ResponseEvent>>;
async function responsesCreateViaChatCompletions(
openai: OpenAI,
input: ResponseCreateInput & { stream?: false },
): Promise<ResponseOutput>;
async function responsesCreateViaChatCompletions(
openai: OpenAI,
input: ResponseCreateInput,
): Promise<ResponseOutput | AsyncGenerator<ResponseEvent>> {
if (input.stream) {
return streamResponses(openai, input);
} else {
return nonStreamResponses(openai, input);
}
}
// Non-streaming implementation
async function nonStreamResponses(
openai: OpenAI,
input: ResponseCreateInput,
): Promise<ResponseOutput> {
const createCompletion = (openai: OpenAI, input: ResponseCreateInput) => {
const fullMessages = getFullMessages(input);
const chatTools = convertTools(input.tools);
const webSearchOptions = input.tools?.some(
@@ -298,17 +275,55 @@ async function nonStreamResponses(
messages: fullMessages,
tools: chatTools,
web_search_options: webSearchOptions,
temperature: input.temperature,
top_p: input.top_p,
temperature: input.temperature ?? 1.0,
top_p: input.top_p ?? 1.0,
tool_choice: (input.tool_choice === "auto"
? "auto"
: input.tool_choice) as OpenAI.Chat.Completions.ChatCompletionCreateParams["tool_choice"],
stream: input.stream || false,
user: input.user,
metadata: input.metadata,
};
return openai.chat.completions.create(chatInput);
};
// Main function with overloading
async function responsesCreateViaChatCompletions(
openai: OpenAI,
input: ResponseCreateInput & { stream: true },
): Promise<AsyncGenerator<ResponseEvent>>;
async function responsesCreateViaChatCompletions(
openai: OpenAI,
input: ResponseCreateInput & { stream?: false },
): Promise<ResponseOutput>;
async function responsesCreateViaChatCompletions(
openai: OpenAI,
input: ResponseCreateInput,
): Promise<ResponseOutput | AsyncGenerator<ResponseEvent>> {
const completion = await createCompletion(openai, input);
if (input.stream) {
return streamResponses(
input,
completion as AsyncIterable<OpenAI.ChatCompletionChunk>,
);
} else {
return nonStreamResponses(
input,
completion as unknown as OpenAI.Chat.Completions.ChatCompletion,
);
}
}
// Non-streaming implementation
async function nonStreamResponses(
input: ResponseCreateInput,
completion: OpenAI.Chat.Completions.ChatCompletion,
): Promise<ResponseOutput> {
const fullMessages = getFullMessages(input);
try {
const chatResponse = await openai.chat.completions.create(chatInput);
const chatResponse = completion;
if (!("choices" in chatResponse) || chatResponse.choices.length === 0) {
throw new Error("No choices in chat completion response");
}
@@ -429,56 +444,211 @@ async function nonStreamResponses(
// Streaming implementation
async function* streamResponses(
openai: OpenAI,
input: ResponseCreateInput,
completion: AsyncIterable<OpenAI.ChatCompletionChunk>,
): AsyncGenerator<ResponseEvent> {
const fullMessages = getFullMessages(input);
const chatTools = convertTools(input.tools);
const webSearchOptions = input.tools?.some(
(tool) => tool.type === "function" && tool.name === "web_search",
)
? {}
: undefined;
const chatInput: OpenAI.Chat.Completions.ChatCompletionCreateParams = {
const responseId = generateId("resp");
const outputItemId = generateId("msg");
let textContentAdded = false;
let textContent = "";
const toolCalls = new Map<number, ToolCallData>();
let usage: UsageData | null = null;
const finalOutputItem: Array<ResponseContentOutput> = [];
// Initial response
const initialResponse: Partial<ResponseOutput> = {
id: responseId,
object: "response" as const,
created_at: Math.floor(Date.now() / 1000),
status: "in_progress" as const,
model: input.model,
messages: fullMessages,
tools: chatTools,
web_search_options: webSearchOptions,
output: [],
error: null,
incomplete_details: null,
instructions: null,
max_output_tokens: null,
parallel_tool_calls: true,
previous_response_id: input.previous_response_id ?? null,
reasoning: null,
temperature: input.temperature ?? 1.0,
text: { format: { type: "text" } },
tool_choice: input.tool_choice ?? "auto",
tools: input.tools ?? [],
top_p: input.top_p ?? 1.0,
tool_choice: (input.tool_choice === "auto"
? "auto"
: input.tool_choice) as OpenAI.Chat.Completions.ChatCompletionCreateParams["tool_choice"],
stream: true,
user: input.user,
metadata: input.metadata,
truncation: input.truncation ?? "disabled",
usage: undefined,
user: input.user ?? undefined,
metadata: input.metadata ?? {},
output_text: "",
};
yield { type: "response.created", response: initialResponse };
yield { type: "response.in_progress", response: initialResponse };
let isToolCall = false;
for await (const chunk of completion as AsyncIterable<OpenAI.ChatCompletionChunk>) {
// console.error('\nCHUNK: ', JSON.stringify(chunk));
const choice = chunk.choices[0];
if (!choice) {
continue;
}
if (
!isToolCall &&
(("tool_calls" in choice.delta && choice.delta.tool_calls) ||
choice.finish_reason === "tool_calls")
) {
isToolCall = true;
}
try {
// console.error("chatInput", JSON.stringify(chatInput));
const stream = await openai.chat.completions.create(chatInput);
if (chunk.usage) {
usage = {
prompt_tokens: chunk.usage.prompt_tokens,
completion_tokens: chunk.usage.completion_tokens,
total_tokens: chunk.usage.total_tokens,
input_tokens: chunk.usage.prompt_tokens,
input_tokens_details: { cached_tokens: 0 },
output_tokens: chunk.usage.completion_tokens,
output_tokens_details: { reasoning_tokens: 0 },
};
}
if (isToolCall) {
for (const tcDelta of choice.delta.tool_calls || []) {
const tcIndex = tcDelta.index;
const content_index = textContentAdded ? tcIndex + 1 : tcIndex;
// Initialize state
const responseId = generateId("resp");
const outputItemId = generateId("msg");
let textContentAdded = false;
let textContent = "";
const toolCalls = new Map<number, ToolCallData>();
let usage: UsageData | null = null;
const finalOutputItem: Array<ResponseContentOutput> = [];
// Initial response
const initialResponse: Partial<ResponseOutput> = {
if (!toolCalls.has(tcIndex)) {
// New tool call
const toolCallId = tcDelta.id || generateId("call");
const functionName = tcDelta.function?.name || "";
yield {
type: "response.output_item.added",
item: {
type: "function_call",
id: outputItemId,
status: "in_progress",
call_id: toolCallId,
name: functionName,
arguments: "",
},
output_index: 0,
};
toolCalls.set(tcIndex, {
id: toolCallId,
name: functionName,
arguments: "",
});
}
if (tcDelta.function?.arguments) {
const current = toolCalls.get(tcIndex);
if (current) {
current.arguments += tcDelta.function.arguments;
yield {
type: "response.function_call_arguments.delta",
item_id: outputItemId,
output_index: 0,
content_index,
delta: tcDelta.function.arguments,
};
}
}
}
if (choice.finish_reason === "tool_calls") {
for (const [tcIndex, tc] of toolCalls) {
const item = {
type: "function_call",
id: outputItemId,
status: "completed",
call_id: tc.id,
name: tc.name,
arguments: tc.arguments,
};
yield {
type: "response.function_call_arguments.done",
item_id: outputItemId,
output_index: tcIndex,
content_index: textContentAdded ? tcIndex + 1 : tcIndex,
arguments: tc.arguments,
};
yield {
type: "response.output_item.done",
output_index: tcIndex,
item,
};
finalOutputItem.push(item as unknown as ResponseContentOutput);
}
} else {
continue;
}
} else {
if (!textContentAdded) {
yield {
type: "response.content_part.added",
item_id: outputItemId,
output_index: 0,
content_index: 0,
part: { type: "output_text", text: "", annotations: [] },
};
textContentAdded = true;
}
if (choice.delta.content?.length) {
yield {
type: "response.output_text.delta",
item_id: outputItemId,
output_index: 0,
content_index: 0,
delta: choice.delta.content,
};
textContent += choice.delta.content;
}
if (choice.finish_reason) {
yield {
type: "response.output_text.done",
item_id: outputItemId,
output_index: 0,
content_index: 0,
text: textContent,
};
yield {
type: "response.content_part.done",
item_id: outputItemId,
output_index: 0,
content_index: 0,
part: { type: "output_text", text: textContent, annotations: [] },
};
const item = {
type: "message",
id: outputItemId,
status: "completed",
role: "assistant",
content: [
{ type: "output_text", text: textContent, annotations: [] },
],
};
yield {
type: "response.output_item.done",
output_index: 0,
item,
};
finalOutputItem.push(item as unknown as ResponseContentOutput);
} else {
continue;
}
}
// Construct final response
const finalResponse: ResponseOutput = {
id: responseId,
object: "response" as const,
created_at: Math.floor(Date.now() / 1000),
status: "in_progress" as const,
model: input.model,
output: [],
created_at: initialResponse.created_at || Math.floor(Date.now() / 1000),
status: "completed" as const,
error: null,
incomplete_details: null,
instructions: null,
max_output_tokens: null,
model: chunk.model || input.model,
output: finalOutputItem as unknown as ResponseOutput["output"],
parallel_tool_calls: true,
previous_response_id: input.previous_response_id ?? null,
reasoning: null,
@@ -488,243 +658,54 @@ async function* streamResponses(
tools: input.tools ?? [],
top_p: input.top_p ?? 1.0,
truncation: input.truncation ?? "disabled",
usage: undefined,
usage: usage as ResponseOutput["usage"],
user: input.user ?? undefined,
metadata: input.metadata ?? {},
output_text: "",
};
yield { type: "response.created", response: initialResponse };
yield { type: "response.in_progress", response: initialResponse };
let isToolCall = false;
for await (const chunk of stream as AsyncIterable<OpenAI.ChatCompletionChunk>) {
// console.error('\nCHUNK: ', JSON.stringify(chunk));
const choice = chunk.choices[0];
if (!choice) {
continue;
}
if (
!isToolCall &&
(("tool_calls" in choice.delta && choice.delta.tool_calls) ||
choice.finish_reason === "tool_calls")
) {
isToolCall = true;
}
} as ResponseOutput;
if (chunk.usage) {
usage = {
prompt_tokens: chunk.usage.prompt_tokens,
completion_tokens: chunk.usage.completion_tokens,
total_tokens: chunk.usage.total_tokens,
input_tokens: chunk.usage.prompt_tokens,
input_tokens_details: { cached_tokens: 0 },
output_tokens: chunk.usage.completion_tokens,
output_tokens_details: { reasoning_tokens: 0 },
};
}
if (isToolCall) {
for (const tcDelta of choice.delta.tool_calls || []) {
const tcIndex = tcDelta.index;
const content_index = textContentAdded ? tcIndex + 1 : tcIndex;
if (!toolCalls.has(tcIndex)) {
// New tool call
const toolCallId = tcDelta.id || generateId("call");
const functionName = tcDelta.function?.name || "";
yield {
type: "response.output_item.added",
item: {
type: "function_call",
id: outputItemId,
status: "in_progress",
call_id: toolCallId,
name: functionName,
arguments: "",
},
output_index: 0,
};
toolCalls.set(tcIndex, {
id: toolCallId,
name: functionName,
arguments: "",
});
}
if (tcDelta.function?.arguments) {
const current = toolCalls.get(tcIndex);
if (current) {
current.arguments += tcDelta.function.arguments;
yield {
type: "response.function_call_arguments.delta",
item_id: outputItemId,
output_index: 0,
content_index,
delta: tcDelta.function.arguments,
};
}
}
}
if (choice.finish_reason === "tool_calls") {
for (const [tcIndex, tc] of toolCalls) {
const item = {
type: "function_call",
id: outputItemId,
status: "completed",
call_id: tc.id,
name: tc.name,
arguments: tc.arguments,
};
yield {
type: "response.function_call_arguments.done",
item_id: outputItemId,
output_index: tcIndex,
content_index: textContentAdded ? tcIndex + 1 : tcIndex,
arguments: tc.arguments,
};
yield {
type: "response.output_item.done",
output_index: tcIndex,
item,
};
finalOutputItem.push(item as unknown as ResponseContentOutput);
}
} else {
continue;
}
} else {
if (!textContentAdded) {
yield {
type: "response.content_part.added",
item_id: outputItemId,
output_index: 0,
content_index: 0,
part: { type: "output_text", text: "", annotations: [] },
};
textContentAdded = true;
}
if (choice.delta.content?.length) {
yield {
type: "response.output_text.delta",
item_id: outputItemId,
output_index: 0,
content_index: 0,
delta: choice.delta.content,
};
textContent += choice.delta.content;
}
if (choice.finish_reason) {
yield {
type: "response.output_text.done",
item_id: outputItemId,
output_index: 0,
content_index: 0,
text: textContent,
};
yield {
type: "response.content_part.done",
item_id: outputItemId,
output_index: 0,
content_index: 0,
part: { type: "output_text", text: textContent, annotations: [] },
};
const item = {
type: "message",
id: outputItemId,
status: "completed",
role: "assistant",
content: [
{ type: "output_text", text: textContent, annotations: [] },
],
};
yield {
type: "response.output_item.done",
output_index: 0,
item,
};
finalOutputItem.push(item as unknown as ResponseContentOutput);
} else {
continue;
}
}
// Construct final response
const finalResponse: ResponseOutput = {
id: responseId,
object: "response" as const,
created_at: initialResponse.created_at || Math.floor(Date.now() / 1000),
status: "completed" as const,
error: null,
incomplete_details: null,
instructions: null,
max_output_tokens: null,
model: chunk.model || input.model,
output: finalOutputItem as unknown as ResponseOutput["output"],
parallel_tool_calls: true,
previous_response_id: input.previous_response_id ?? null,
reasoning: null,
temperature: input.temperature ?? 1.0,
text: { format: { type: "text" } },
tool_choice: input.tool_choice ?? "auto",
tools: input.tools ?? [],
top_p: input.top_p ?? 1.0,
truncation: input.truncation ?? "disabled",
usage: usage as ResponseOutput["usage"],
user: input.user ?? undefined,
metadata: input.metadata ?? {},
output_text: "",
} as ResponseOutput;
// Store history
const assistantMessage = {
// Store history
const assistantMessage: OpenAI.Chat.Completions.ChatCompletionMessageParam =
{
role: "assistant" as const,
content: textContent || null,
};
// Add tool_calls property if needed
if (toolCalls.size > 0) {
const toolCallsArray = Array.from(toolCalls.values()).map((tc) => ({
id: tc.id,
type: "function" as const,
function: { name: tc.name, arguments: tc.arguments },
}));
// Define a more specific type for the assistant message with tool calls
type AssistantMessageWithToolCalls =
OpenAI.Chat.Completions.ChatCompletionMessageParam & {
tool_calls: Array<{
id: string;
type: "function";
function: {
name: string;
arguments: string;
};
}>;
};
// Use type assertion with the defined type
(assistantMessage as AssistantMessageWithToolCalls).tool_calls =
toolCallsArray;
}
const newHistory = [...fullMessages, assistantMessage];
conversationHistories.set(responseId, {
previous_response_id: input.previous_response_id ?? null,
messages: newHistory,
});
yield { type: "response.completed", response: finalResponse };
if (textContent) {
assistantMessage.content = textContent;
}
} catch (error) {
// console.error('\nERROR: ', JSON.stringify(error));
yield {
type: "error",
code:
error instanceof Error && "code" in error
? (error as { code: string }).code
: "unknown",
message: error instanceof Error ? error.message : String(error),
param: null,
};
// Add tool_calls property if needed
if (toolCalls.size > 0) {
const toolCallsArray = Array.from(toolCalls.values()).map((tc) => ({
id: tc.id,
type: "function" as const,
function: { name: tc.name, arguments: tc.arguments },
}));
// Define a more specific type for the assistant message with tool calls
type AssistantMessageWithToolCalls =
OpenAI.Chat.Completions.ChatCompletionMessageParam & {
tool_calls: Array<{
id: string;
type: "function";
function: {
name: string;
arguments: string;
};
}>;
};
// Use type assertion with the defined type
(assistantMessage as AssistantMessageWithToolCalls).tool_calls =
toolCallsArray;
}
const newHistory = [...fullMessages, assistantMessage];
conversationHistories.set(responseId, {
previous_response_id: input.previous_response_id ?? null,
messages: newHistory,
});
yield { type: "response.completed", response: finalResponse };
}
}

View File

@@ -294,7 +294,7 @@ describe("responsesCreateViaChatCompletions", () => {
expect(callArgs.messages).toEqual([
{ role: "user", content: "Hello world" },
]);
expect(callArgs.stream).toBeUndefined();
expect(callArgs.stream).toBe(false);
}
// Verify result format
@@ -736,33 +736,6 @@ describe("responsesCreateViaChatCompletions", () => {
}
});
it("should handle errors gracefully", async () => {
// Setup mock to throw an error
openAiState.createSpy = vi
.fn()
.mockRejectedValue(new Error("API connection error"));
const openaiClient = new (await import("openai")).default({
apiKey: "test-key",
}) as unknown as OpenAI;
const inputMessage = createTestInput({
model: "gpt-4o",
userMessage: "Test message",
stream: false,
});
// Expect the function to throw an error
await expect(
responsesModule.responsesCreateViaChatCompletions(
openaiClient,
inputMessage as unknown as ResponseCreateParamsNonStreaming & {
stream?: false | undefined;
},
),
).rejects.toThrow("Failed to process chat completion");
});
it("handles streaming with tool calls", async () => {
// Mock a streaming response with tool calls
const mockStream = createToolCallsStream();