Add turn started/completed events and correct exit code on error (#4309)

Adds new event for session completed that includes usage. Also ensures
we return 1 on failures.
```
{
  "type": "session.created",
  "session_id": "019987a7-93e7-7b20-9e05-e90060e411ea"
}
{
  "type": "turn.started"
}
...
{
  "type": "turn.completed",
  "usage": {
    "input_tokens": 78913,
    "cached_input_tokens": 65280,
    "output_tokens": 1099
  }
}
```
This commit is contained in:
pakrym-oai
2025-09-26 16:21:50 -07:00
committed by GitHub
parent 55801700de
commit cc1b21e47f
7 changed files with 192 additions and 23 deletions

View File

@@ -1,4 +1,3 @@
<h1 align="center">OpenAI Codex CLI</h1>
<p align="center"><code>npm i -g @openai/codex</code><br />or <code>brew install codex</code></p>
@@ -102,4 +101,3 @@ Codex CLI supports a rich set of configuration options, with preferences stored
## License
This repository is licensed under the [Apache-2.0 License](LICENSE).

View File

@@ -8,6 +8,10 @@ use ts_rs::TS;
pub enum ConversationEvent {
#[serde(rename = "session.created")]
SessionCreated(SessionCreatedEvent),
#[serde(rename = "turn.started")]
TurnStarted(TurnStartedEvent),
#[serde(rename = "turn.completed")]
TurnCompleted(TurnCompletedEvent),
#[serde(rename = "item.started")]
ItemStarted(ItemStartedEvent),
#[serde(rename = "item.updated")]
@@ -23,6 +27,22 @@ pub struct SessionCreatedEvent {
pub session_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS, Default)]
pub struct TurnStartedEvent {}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct TurnCompletedEvent {
pub usage: Usage,
}
/// Minimal usage summary for a turn.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS, Default)]
pub struct Usage {
pub input_tokens: u64,
pub cached_input_tokens: u64,
pub output_tokens: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct ItemStartedEvent {
pub item: ConversationItem,

View File

@@ -23,6 +23,9 @@ use crate::exec_events::ReasoningItem;
use crate::exec_events::SessionCreatedEvent;
use crate::exec_events::TodoItem;
use crate::exec_events::TodoListItem;
use crate::exec_events::TurnCompletedEvent;
use crate::exec_events::TurnStartedEvent;
use crate::exec_events::Usage;
use codex_core::config::Config;
use codex_core::plan_tool::StepStatus;
use codex_core::plan_tool::UpdatePlanArgs;
@@ -37,6 +40,7 @@ use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TaskStartedEvent;
use tracing::error;
use tracing::warn;
@@ -48,6 +52,7 @@ pub struct ExperimentalEventProcessorWithJsonOutput {
running_patch_applies: HashMap<String, PatchApplyBeginEvent>,
// Tracks the todo list for the current turn (at most one per turn).
running_todo_list: Option<RunningTodoList>,
last_total_token_usage: Option<codex_core::protocol::TokenUsage>,
}
#[derive(Debug, Clone)]
@@ -70,6 +75,7 @@ impl ExperimentalEventProcessorWithJsonOutput {
running_commands: HashMap::new(),
running_patch_applies: HashMap::new(),
running_todo_list: None,
last_total_token_usage: None,
}
}
@@ -82,6 +88,14 @@ impl ExperimentalEventProcessorWithJsonOutput {
EventMsg::ExecCommandEnd(ev) => self.handle_exec_command_end(ev),
EventMsg::PatchApplyBegin(ev) => self.handle_patch_apply_begin(ev),
EventMsg::PatchApplyEnd(ev) => self.handle_patch_apply_end(ev),
EventMsg::TokenCount(ev) => {
if let Some(info) = &ev.info {
self.last_total_token_usage = Some(info.total_token_usage.clone());
}
Vec::new()
}
EventMsg::TaskStarted(ev) => self.handle_task_started(ev),
EventMsg::TaskComplete(_) => self.handle_task_complete(),
EventMsg::Error(ev) => vec![ConversationEvent::Error(ConversationErrorEvent {
message: ev.message.clone(),
})],
@@ -89,7 +103,6 @@ impl ExperimentalEventProcessorWithJsonOutput {
message: ev.message.clone(),
})],
EventMsg::PlanUpdate(ev) => self.handle_plan_update(ev),
EventMsg::TaskComplete(_) => self.handle_task_complete(),
_ => Vec::new(),
}
}
@@ -283,7 +296,23 @@ impl ExperimentalEventProcessorWithJsonOutput {
vec![ConversationEvent::ItemStarted(ItemStartedEvent { item })]
}
fn handle_task_started(&self, _: &TaskStartedEvent) -> Vec<ConversationEvent> {
vec![ConversationEvent::TurnStarted(TurnStartedEvent {})]
}
fn handle_task_complete(&mut self) -> Vec<ConversationEvent> {
let usage = if let Some(u) = &self.last_total_token_usage {
Usage {
input_tokens: u.input_tokens,
cached_input_tokens: u.cached_input_tokens,
output_tokens: u.output_tokens,
}
} else {
Usage::default()
};
let mut items = Vec::new();
if let Some(running) = self.running_todo_list.take() {
let item = ConversationItem {
id: running.item_id,
@@ -291,11 +320,16 @@ impl ExperimentalEventProcessorWithJsonOutput {
items: running.items,
}),
};
return vec![ConversationEvent::ItemCompleted(ItemCompletedEvent {
items.push(ConversationEvent::ItemCompleted(ItemCompletedEvent {
item,
})];
}));
}
Vec::new()
items.push(ConversationEvent::TurnCompleted(TurnCompletedEvent {
usage,
}));
items
}
}

View File

@@ -331,7 +331,13 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
info!("Sent prompt with event ID: {initial_prompt_task_id}");
// Run the loop until the task is complete.
// Track whether a fatal error was reported by the server so we can
// exit with a non-zero status for automation-friendly signaling.
let mut error_seen = false;
while let Some(event) = rx.recv().await {
if matches!(event.msg, EventMsg::Error(_)) {
error_seen = true;
}
let shutdown: CodexStatus = event_processor.process_event(event);
match shutdown {
CodexStatus::Running => continue,
@@ -343,6 +349,9 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
}
}
}
if error_seen {
std::process::exit(1);
}
Ok(())
}

View File

@@ -24,6 +24,9 @@ use codex_exec::exec_events::ReasoningItem;
use codex_exec::exec_events::SessionCreatedEvent;
use codex_exec::exec_events::TodoItem as ExecTodoItem;
use codex_exec::exec_events::TodoListItem as ExecTodoListItem;
use codex_exec::exec_events::TurnCompletedEvent;
use codex_exec::exec_events::TurnStartedEvent;
use codex_exec::exec_events::Usage;
use codex_exec::experimental_event_processor_with_json_output::ExperimentalEventProcessorWithJsonOutput;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
@@ -65,6 +68,22 @@ fn session_configured_produces_session_created_event() {
);
}
#[test]
fn task_started_produces_turn_started_event() {
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let out = ep.collect_conversation_events(&event(
"t1",
EventMsg::TaskStarted(codex_core::protocol::TaskStartedEvent {
model_context_window: Some(32_000),
}),
));
assert_eq!(
out,
vec![ConversationEvent::TurnStarted(TurnStartedEvent {})]
);
}
#[test]
fn plan_update_emits_todo_list_started_updated_and_completed() {
use codex_core::plan_tool::PlanItemArg;
@@ -161,7 +180,8 @@ fn plan_update_emits_todo_list_started_updated_and_completed() {
let out_complete = ep.collect_conversation_events(&complete);
assert_eq!(
out_complete,
vec![ConversationEvent::ItemCompleted(ItemCompletedEvent {
vec![
ConversationEvent::ItemCompleted(ItemCompletedEvent {
item: ConversationItem {
id: "item_0".to_string(),
details: ConversationItemDetails::TodoList(ExecTodoListItem {
@@ -177,7 +197,11 @@ fn plan_update_emits_todo_list_started_updated_and_completed() {
],
}),
},
})]
}),
ConversationEvent::TurnCompleted(TurnCompletedEvent {
usage: Usage::default(),
}),
]
);
}
@@ -585,3 +609,52 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() {
other => panic!("unexpected event: {other:?}"),
}
}
#[test]
fn task_complete_produces_turn_completed_with_usage() {
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
// First, feed a TokenCount event with known totals.
let usage = codex_core::protocol::TokenUsage {
input_tokens: 1200,
cached_input_tokens: 200,
output_tokens: 345,
reasoning_output_tokens: 0,
total_tokens: 0,
};
let info = codex_core::protocol::TokenUsageInfo {
total_token_usage: usage.clone(),
last_token_usage: usage,
model_context_window: None,
};
let token_count_event = event(
"e1",
EventMsg::TokenCount(codex_core::protocol::TokenCountEvent {
info: Some(info),
rate_limits: None,
}),
);
assert!(
ep.collect_conversation_events(&token_count_event)
.is_empty()
);
// Then TaskComplete should produce turn.completed with the captured usage.
let complete_event = event(
"e2",
EventMsg::TaskComplete(codex_core::protocol::TaskCompleteEvent {
last_agent_message: Some("done".to_string()),
}),
);
let out = ep.collect_conversation_events(&complete_event);
assert_eq!(
out,
vec![ConversationEvent::TurnCompleted(TurnCompletedEvent {
usage: Usage {
input_tokens: 1200,
cached_input_tokens: 200,
output_tokens: 345,
},
})]
);
}

View File

@@ -3,3 +3,4 @@ mod apply_patch;
mod output_schema;
mod resume;
mod sandbox;
mod server_error_exit;

View File

@@ -0,0 +1,34 @@
#![cfg(not(target_os = "windows"))]
#![allow(clippy::expect_used, clippy::unwrap_used)]
use core_test_support::responses;
use core_test_support::test_codex_exec::test_codex_exec;
use wiremock::matchers::any;
/// Verify that when the server reports an error, `codex-exec` exits with a
/// non-zero status code so automation can detect failures.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exits_non_zero_when_server_reports_error() -> anyhow::Result<()> {
let test = test_codex_exec();
// Mock a simple Responses API SSE stream that immediately reports a
// `response.failed` event with an error message.
let server = responses::start_mock_server().await;
let body = responses::sse(vec![serde_json::json!({
"type": "response.failed",
"response": {
"id": "resp_err_1",
"error": {"code": "rate_limit_exceeded", "message": "synthetic server error"}
}
})]);
responses::mount_sse_once(&server, any(), body).await;
test.cmd_with_server(&server)
.arg("--skip-git-repo-check")
.arg("tell me something")
.arg("--experimental-json")
.assert()
.code(1);
Ok(())
}