fix: get responses API working again in Rust (#872)

I inadvertently regressed support for the Responses API when adding
support for the chat completions API in
https://github.com/openai/codex/pull/862. This should get both APIs
working again, but the chat completions codepath seems more complex than
necessary. I'll try to clean that up shortly, but I want to get things
working again ASAP.
This commit is contained in:
Michael Bolin
2025-05-08 22:49:15 -07:00
committed by GitHub
parent e924070cee
commit b940adae8e
2 changed files with 27 additions and 3 deletions

View File

@@ -19,6 +19,7 @@ use tracing::debug;
use tracing::trace;
use tracing::warn;
use crate::chat_completions::AggregateStreamExt;
use crate::chat_completions::stream_chat_completions;
use crate::client_common::Payload;
use crate::client_common::Prompt;
@@ -111,7 +112,31 @@ impl ModelClient {
match self.provider.wire_api {
WireApi::Responses => self.stream_responses(prompt).await,
WireApi::Chat => {
stream_chat_completions(prompt, &self.model, &self.client, &self.provider).await
// Create the raw streaming connection first.
let response_stream =
stream_chat_completions(prompt, &self.model, &self.client, &self.provider)
.await?;
// Wrap it with the aggregation adapter so callers see *only*
// the final assistant message per turn (matching the
// behaviour of the Responses API).
let mut aggregated = response_stream.aggregate();
// Bridge the aggregated stream back into a standard
// `ResponseStream` by forwarding events through a channel.
let (tx, rx) = mpsc::channel::<Result<ResponseEvent>>(16);
tokio::spawn(async move {
use futures::StreamExt;
while let Some(ev) = aggregated.next().await {
// Exit early if receiver hung up.
if tx.send(ev).await.is_err() {
break;
}
}
});
Ok(ResponseStream { rx_event: rx })
}
}
}

View File

@@ -32,7 +32,6 @@ use tracing::trace;
use tracing::warn;
use crate::WireApi;
use crate::chat_completions::AggregateStreamExt;
use crate::client::ModelClient;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
@@ -864,7 +863,7 @@ async fn try_run_turn(
sub_id: &str,
prompt: &Prompt,
) -> CodexResult<Vec<ProcessedResponseItem>> {
let mut stream = sess.client.clone().stream(prompt).await?.aggregate();
let mut stream = sess.client.clone().stream(prompt).await?;
// Buffer all the incoming messages from the stream first, then execute them.
// If we execute a function call in the middle of handling the stream, it can time out.