Compare commits
4 Commits
rust-v0.1.
...
0841ba05a8
| Author | SHA1 | Date | |
|---|---|---|---|
| 0841ba05a8 | |||
| 44dc7a3bed | |||
| a3ced1f246 | |||
| 401b0b3935 |
@@ -645,7 +645,14 @@ async fn process_chat_sse<S>(
|
||||
return;
|
||||
}
|
||||
Ok(None) => {
|
||||
// Stream closed gracefully – emit Completed with dummy id.
|
||||
// Stream closed gracefully – emit any pending items first, then Completed
|
||||
debug!("Stream closed gracefully (Ok(None)), emitting pending items");
|
||||
if let Some(item) = assistant_item.take() {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
if let Some(item) = reasoning_item.take() {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::Completed {
|
||||
response_id: String::new(),
|
||||
@@ -841,6 +848,7 @@ async fn process_chat_sse<S>(
|
||||
|
||||
// Emit end-of-turn when finish_reason signals completion.
|
||||
if let Some(finish_reason) = choice.get("finish_reason").and_then(|v| v.as_str()) {
|
||||
debug!("Received finish_reason: {}", finish_reason);
|
||||
match finish_reason {
|
||||
"tool_calls" if fn_call_state.active => {
|
||||
// First, flush the terminal raw reasoning so UIs can finalize
|
||||
@@ -859,9 +867,9 @@ async fn process_chat_sse<S>(
|
||||
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
"stop" => {
|
||||
// Regular turn without tool-call. Emit the final assistant message
|
||||
// as a single OutputItemDone so non-delta consumers see the result.
|
||||
"stop" | "length" => {
|
||||
// Regular turn without tool-call, or hit max_tokens limit.
|
||||
// Emit the final assistant message as a single OutputItemDone so non-delta consumers see the result.
|
||||
if let Some(item) = assistant_item.take() {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
@@ -870,7 +878,16 @@ async fn process_chat_sse<S>(
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
_ => {
|
||||
// Unknown finish_reason - still emit pending items to avoid hanging
|
||||
debug!("Unknown finish_reason: {}, emitting pending items", finish_reason);
|
||||
if let Some(item) = assistant_item.take() {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
if let Some(item) = reasoning_item.take() {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Emit Completed regardless of reason so the agent can advance.
|
||||
@@ -888,6 +905,22 @@ async fn process_chat_sse<S>(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stream ended without finish_reason - this can happen when the stream closes abruptly
|
||||
debug!("Stream ended without finish_reason, emitting final items and Completed event");
|
||||
if let Some(item) = assistant_item.take() {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
if let Some(item) = reasoning_item.take() {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
// Send Completed event so llmx knows the turn is done
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::Completed {
|
||||
response_id: String::new(),
|
||||
token_usage: token_usage.clone(),
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Optional client-side aggregation helper
|
||||
|
||||
@@ -54,7 +54,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
|
||||
Some(UserMessageItem::new(&content))
|
||||
}
|
||||
|
||||
fn parse_agent_message(id: Option<&String>, message: &[ContentItem]) -> AgentMessageItem {
|
||||
fn parse_agent_message(id: Option<&String>, message: &[ContentItem]) -> Option<AgentMessageItem> {
|
||||
let mut content: Vec<AgentMessageContent> = Vec::new();
|
||||
for content_item in message.iter() {
|
||||
match content_item {
|
||||
@@ -69,18 +69,23 @@ fn parse_agent_message(id: Option<&String>, message: &[ContentItem]) -> AgentMes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If the message has no content, return None to signal turn completion
|
||||
// This happens when the API ends a turn with an empty assistant message (e.g., after tool calls)
|
||||
if content.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let id = id.cloned().unwrap_or_else(|| Uuid::new_v4().to_string());
|
||||
AgentMessageItem { id, content }
|
||||
Some(AgentMessageItem { id, content })
|
||||
}
|
||||
|
||||
pub fn parse_turn_item(item: &ResponseItem) -> Option<TurnItem> {
|
||||
match item {
|
||||
ResponseItem::Message { role, content, id } => match role.as_str() {
|
||||
"user" => parse_user_message(content).map(TurnItem::UserMessage),
|
||||
"assistant" => Some(TurnItem::AgentMessage(parse_agent_message(
|
||||
id.as_ref(),
|
||||
content,
|
||||
))),
|
||||
"assistant" => parse_agent_message(id.as_ref(), content)
|
||||
.map(TurnItem::AgentMessage),
|
||||
"system" => None,
|
||||
_ => None,
|
||||
},
|
||||
|
||||
@@ -294,7 +294,7 @@ pub fn built_in_model_providers() -> HashMap<String, ModelProviderInfo> {
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
max_tokens: None,
|
||||
max_tokens: None,
|
||||
requires_openai_auth: false,
|
||||
},
|
||||
),
|
||||
@@ -335,7 +335,7 @@ pub fn built_in_model_providers() -> HashMap<String, ModelProviderInfo> {
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
max_tokens: None,
|
||||
max_tokens: None,
|
||||
requires_openai_auth: true,
|
||||
},
|
||||
),
|
||||
@@ -381,7 +381,7 @@ pub fn create_oss_provider_with_base_url(base_url: &str) -> ModelProviderInfo {
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
max_tokens: None,
|
||||
max_tokens: None,
|
||||
requires_openai_auth: false,
|
||||
}
|
||||
}
|
||||
@@ -422,6 +422,7 @@ base_url = "http://localhost:11434/v1"
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
max_tokens: None,
|
||||
requires_openai_auth: false,
|
||||
};
|
||||
|
||||
@@ -452,6 +453,7 @@ query_params = { api-version = "2025-04-01-preview" }
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
max_tokens: None,
|
||||
requires_openai_auth: false,
|
||||
};
|
||||
|
||||
@@ -485,6 +487,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
max_tokens: None,
|
||||
requires_openai_auth: false,
|
||||
};
|
||||
|
||||
@@ -508,6 +511,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
max_tokens: None,
|
||||
requires_openai_auth: false,
|
||||
}
|
||||
}
|
||||
@@ -541,6 +545,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
max_tokens: None,
|
||||
requires_openai_auth: false,
|
||||
};
|
||||
assert!(named_provider.is_azure_responses_endpoint());
|
||||
|
||||
Reference in New Issue
Block a user