diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index bd2eeb94..1b8e4c95 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -391,3 +391,116 @@ async fn stream_from_fixture(path: impl AsRef) -> Result { tokio::spawn(process_sse(stream, tx_event)); Ok(ResponseStream { rx_event }) } + +#[cfg(test)] +mod tests { + #![allow(clippy::expect_used, clippy::unwrap_used)] + use super::*; + use serde_json::json; + + async fn run_sse(events: Vec) -> Vec { + let mut body = String::new(); + for e in events { + let kind = e + .get("type") + .and_then(|v| v.as_str()) + .expect("fixture event missing type"); + if e.as_object().map(|o| o.len() == 1).unwrap_or(false) { + body.push_str(&format!("event: {kind}\n\n")); + } else { + body.push_str(&format!("event: {kind}\ndata: {e}\n\n")); + } + } + let (tx, mut rx) = mpsc::channel::>(8); + let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(CodexErr::Io); + tokio::spawn(process_sse(stream, tx)); + let mut out = Vec::new(); + while let Some(ev) = rx.recv().await { + out.push(ev.expect("channel closed")); + } + out + } + + /// Verifies that the SSE adapter emits the expected [`ResponseEvent`] for + /// a variety of `type` values from the Responses API. The test is written + /// table-driven style to keep additions for new event kinds trivial. + /// + /// Each `Case` supplies an input event, a predicate that must match the + /// *first* `ResponseEvent` produced by the adapter, and the total number + /// of events expected after appending a synthetic `response.completed` + /// marker that terminates the stream. + #[tokio::test] + async fn table_driven_event_kinds() { + struct TestCase { + name: &'static str, + event: serde_json::Value, + expect_first: fn(&ResponseEvent) -> bool, + expected_len: usize, + } + + fn is_created(ev: &ResponseEvent) -> bool { + matches!(ev, ResponseEvent::Created) + } + + fn is_output(ev: &ResponseEvent) -> bool { + matches!(ev, ResponseEvent::OutputItemDone(_)) + } + + fn is_completed(ev: &ResponseEvent) -> bool { + matches!(ev, ResponseEvent::Completed { .. }) + } + + let completed = json!({ + "type": "response.completed", + "response": { + "id": "c", + "usage": { + "input_tokens": 0, + "input_tokens_details": null, + "output_tokens": 0, + "output_tokens_details": null, + "total_tokens": 0 + }, + "output": [] + } + }); + + let cases = vec![ + TestCase { + name: "created", + event: json!({"type": "response.created", "response": {}}), + expect_first: is_created, + expected_len: 2, + }, + TestCase { + name: "output_item.done", + event: json!({ + "type": "response.output_item.done", + "item": { + "type": "message", + "role": "assistant", + "content": [ + {"type": "output_text", "text": "hi"} + ] + } + }), + expect_first: is_output, + expected_len: 2, + }, + TestCase { + name: "unknown", + event: json!({"type": "response.new_tool_event"}), + expect_first: is_completed, + expected_len: 1, + }, + ]; + + for case in cases { + let mut evs = vec![case.event]; + evs.push(completed.clone()); + let out = run_sse(evs).await; + assert_eq!(out.len(), case.expected_len, "case {}", case.name); + assert!((case.expect_first)(&out[0]), "case {}", case.name); + } + } +} diff --git a/codex-rs/core/tests/fixtures/completed_template.json b/codex-rs/core/tests/fixtures/completed_template.json new file mode 100644 index 00000000..1774dc5e --- /dev/null +++ b/codex-rs/core/tests/fixtures/completed_template.json @@ -0,0 +1,16 @@ +[ + { + "type": "response.completed", + "response": { + "id": "__ID__", + "usage": { + "input_tokens": 0, + "input_tokens_details": null, + "output_tokens": 0, + "output_tokens_details": null, + "total_tokens": 0 + }, + "output": [] + } + } +] diff --git a/codex-rs/core/tests/fixtures/incomplete_sse.json b/codex-rs/core/tests/fixtures/incomplete_sse.json new file mode 100644 index 00000000..2876bbfd --- /dev/null +++ b/codex-rs/core/tests/fixtures/incomplete_sse.json @@ -0,0 +1,3 @@ +[ + {"type": "response.output_item.done"} +] diff --git a/codex-rs/core/tests/previous_response_id.rs b/codex-rs/core/tests/previous_response_id.rs index 10d6e8bf..e64271a0 100644 --- a/codex-rs/core/tests/previous_response_id.rs +++ b/codex-rs/core/tests/previous_response_id.rs @@ -11,6 +11,7 @@ mod test_support; use serde_json::Value; use tempfile::TempDir; use test_support::load_default_config_for_test; +use test_support::load_sse_fixture_with_id; use tokio::time::timeout; use wiremock::Match; use wiremock::Mock; @@ -42,12 +43,9 @@ impl Match for HasPrevId { } } -/// Build minimal SSE stream with completed marker. +/// Build minimal SSE stream with completed marker using the JSON fixture. fn sse_completed(id: &str) -> String { - format!( - "event: response.completed\n\ -data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"{id}\",\"output\":[]}}}}\n\n\n" - ) + load_sse_fixture_with_id("tests/fixtures/completed_template.json", id) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/codex-rs/core/tests/stream_no_completed.rs b/codex-rs/core/tests/stream_no_completed.rs index ece34ba2..da2736aa 100644 --- a/codex-rs/core/tests/stream_no_completed.rs +++ b/codex-rs/core/tests/stream_no_completed.rs @@ -12,6 +12,8 @@ use codex_core::protocol::Op; mod test_support; use tempfile::TempDir; use test_support::load_default_config_for_test; +use test_support::load_sse_fixture; +use test_support::load_sse_fixture_with_id; use tokio::time::timeout; use wiremock::Mock; use wiremock::MockServer; @@ -22,15 +24,11 @@ use wiremock::matchers::method; use wiremock::matchers::path; fn sse_incomplete() -> String { - // Only a single line; missing the completed event. - "event: response.output_item.done\n\n".to_string() + load_sse_fixture("tests/fixtures/incomplete_sse.json") } fn sse_completed(id: &str) -> String { - format!( - "event: response.completed\n\ -data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"{id}\",\"output\":[]}}}}\n\n\n" - ) + load_sse_fixture_with_id("tests/fixtures/completed_template.json", id) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/codex-rs/core/tests/test_support.rs b/codex-rs/core/tests/test_support.rs index 532e3986..5dbe6371 100644 --- a/codex-rs/core/tests/test_support.rs +++ b/codex-rs/core/tests/test_support.rs @@ -21,3 +21,56 @@ pub fn load_default_config_for_test(codex_home: &TempDir) -> Config { ) .expect("defaults for test should always succeed") } + +/// Builds an SSE stream body from a JSON fixture. +/// +/// The fixture must contain an array of objects where each object represents a +/// single SSE event with at least a `type` field matching the `event:` value. +/// Additional fields become the JSON payload for the `data:` line. An object +/// with only a `type` field results in an event with no `data:` section. This +/// makes it trivial to extend the fixtures as OpenAI adds new event kinds or +/// fields. +pub fn load_sse_fixture(path: impl AsRef) -> String { + let events: Vec = + serde_json::from_reader(std::fs::File::open(path).expect("read fixture")) + .expect("parse JSON fixture"); + events + .into_iter() + .map(|e| { + let kind = e + .get("type") + .and_then(|v| v.as_str()) + .expect("fixture event missing type"); + if e.as_object().map(|o| o.len() == 1).unwrap_or(false) { + format!("event: {kind}\n\n") + } else { + format!("event: {kind}\ndata: {e}\n\n") + } + }) + .collect() +} + +/// Same as [`load_sse_fixture`], but replaces the placeholder `__ID__` in the +/// fixture template with the supplied identifier before parsing. This lets a +/// single JSON template be reused by multiple tests that each need a unique +/// `response_id`. +pub fn load_sse_fixture_with_id(path: impl AsRef, id: &str) -> String { + let raw = std::fs::read_to_string(path).expect("read fixture template"); + let replaced = raw.replace("__ID__", id); + let events: Vec = + serde_json::from_str(&replaced).expect("parse JSON fixture"); + events + .into_iter() + .map(|e| { + let kind = e + .get("type") + .and_then(|v| v.as_str()) + .expect("fixture event missing type"); + if e.as_object().map(|o| o.len() == 1).unwrap_or(false) { + format!("event: {kind}\n\n") + } else { + format!("event: {kind}\ndata: {e}\n\n") + } + }) + .collect() +}