Improve error decoding response body error (#5263)
Split Reqwest error into separate error: 1. One for streaming response 2. One for initial connection failing Include request_id where possible. <img width="1791" height="116" alt="image" src="https://github.com/user-attachments/assets/549aa330-acfa-496a-9898-77fa58436316" />
This commit is contained in:
@@ -5,6 +5,8 @@ use crate::client_common::Prompt;
|
|||||||
use crate::client_common::ResponseEvent;
|
use crate::client_common::ResponseEvent;
|
||||||
use crate::client_common::ResponseStream;
|
use crate::client_common::ResponseStream;
|
||||||
use crate::error::CodexErr;
|
use crate::error::CodexErr;
|
||||||
|
use crate::error::ConnectionFailedError;
|
||||||
|
use crate::error::ResponseStreamFailed;
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
use crate::error::RetryLimitReachedError;
|
use crate::error::RetryLimitReachedError;
|
||||||
use crate::error::UnexpectedResponseError;
|
use crate::error::UnexpectedResponseError;
|
||||||
@@ -309,7 +311,12 @@ pub(crate) async fn stream_chat_completions(
|
|||||||
match res {
|
match res {
|
||||||
Ok(resp) if resp.status().is_success() => {
|
Ok(resp) if resp.status().is_success() => {
|
||||||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
||||||
let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
|
let stream = resp.bytes_stream().map_err(|e| {
|
||||||
|
CodexErr::ResponseStreamFailed(ResponseStreamFailed {
|
||||||
|
source: e,
|
||||||
|
request_id: None,
|
||||||
|
})
|
||||||
|
});
|
||||||
tokio::spawn(process_chat_sse(
|
tokio::spawn(process_chat_sse(
|
||||||
stream,
|
stream,
|
||||||
tx_event,
|
tx_event,
|
||||||
@@ -349,7 +356,9 @@ pub(crate) async fn stream_chat_completions(
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
if attempt > max_retries {
|
if attempt > max_retries {
|
||||||
return Err(e.into());
|
return Err(CodexErr::ConnectionFailed(ConnectionFailedError {
|
||||||
|
source: e,
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
let delay = backoff(attempt);
|
let delay = backoff(attempt);
|
||||||
tokio::time::sleep(delay).await;
|
tokio::time::sleep(delay).await;
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use crate::AuthManager;
|
use crate::AuthManager;
|
||||||
use crate::auth::CodexAuth;
|
use crate::auth::CodexAuth;
|
||||||
|
use crate::error::ConnectionFailedError;
|
||||||
|
use crate::error::ResponseStreamFailed;
|
||||||
use crate::error::RetryLimitReachedError;
|
use crate::error::RetryLimitReachedError;
|
||||||
use crate::error::UnexpectedResponseError;
|
use crate::error::UnexpectedResponseError;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
@@ -351,7 +353,12 @@ impl ModelClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// spawn task to process SSE
|
// spawn task to process SSE
|
||||||
let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
|
let stream = resp.bytes_stream().map_err(move |e| {
|
||||||
|
CodexErr::ResponseStreamFailed(ResponseStreamFailed {
|
||||||
|
source: e,
|
||||||
|
request_id: request_id.clone(),
|
||||||
|
})
|
||||||
|
});
|
||||||
tokio::spawn(process_sse(
|
tokio::spawn(process_sse(
|
||||||
stream,
|
stream,
|
||||||
tx_event,
|
tx_event,
|
||||||
@@ -431,7 +438,9 @@ impl ModelClient {
|
|||||||
request_id,
|
request_id,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Err(e) => Err(StreamAttemptError::RetryableTransportError(e.into())),
|
Err(e) => Err(StreamAttemptError::RetryableTransportError(
|
||||||
|
CodexErr::ConnectionFailed(ConnectionFailedError { source: e }),
|
||||||
|
)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -91,6 +91,12 @@ pub enum CodexErr {
|
|||||||
#[error("{0}")]
|
#[error("{0}")]
|
||||||
UsageLimitReached(UsageLimitReachedError),
|
UsageLimitReached(UsageLimitReachedError),
|
||||||
|
|
||||||
|
#[error("{0}")]
|
||||||
|
ResponseStreamFailed(ResponseStreamFailed),
|
||||||
|
|
||||||
|
#[error("{0}")]
|
||||||
|
ConnectionFailed(ConnectionFailedError),
|
||||||
|
|
||||||
#[error(
|
#[error(
|
||||||
"To use Codex with your ChatGPT plan, upgrade to Plus: https://openai.com/chatgpt/pricing."
|
"To use Codex with your ChatGPT plan, upgrade to Plus: https://openai.com/chatgpt/pricing."
|
||||||
)]
|
)]
|
||||||
@@ -126,9 +132,6 @@ pub enum CodexErr {
|
|||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
Io(#[from] io::Error),
|
Io(#[from] io::Error),
|
||||||
|
|
||||||
#[error(transparent)]
|
|
||||||
Reqwest(#[from] reqwest::Error),
|
|
||||||
|
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
Json(#[from] serde_json::Error),
|
Json(#[from] serde_json::Error),
|
||||||
|
|
||||||
@@ -147,6 +150,37 @@ pub enum CodexErr {
|
|||||||
EnvVar(EnvVarError),
|
EnvVar(EnvVarError),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ConnectionFailedError {
|
||||||
|
pub source: reqwest::Error,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for ConnectionFailedError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "Connection failed: {}", self.source)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ResponseStreamFailed {
|
||||||
|
pub source: reqwest::Error,
|
||||||
|
pub request_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for ResponseStreamFailed {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"Error while reading the server response: {}{}",
|
||||||
|
self.source,
|
||||||
|
self.request_id
|
||||||
|
.as_ref()
|
||||||
|
.map(|id| format!(", request id: {id}"))
|
||||||
|
.unwrap_or_default()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct UnexpectedResponseError {
|
pub struct UnexpectedResponseError {
|
||||||
pub status: StatusCode,
|
pub status: StatusCode,
|
||||||
|
|||||||
Reference in New Issue
Block a user