[app-server] feat: v2 Turn APIs (#6216)

Implements:
```
turn/start
turn/interrupt
```

along with their integration tests. These are relatively light wrappers
around the existing core logic, and changes to core logic are minimal.

However, an improvement made for developer ergonomics:
- `turn/start` replaces both `SendUserMessage` (no turn overrides) and
`SendUserTurn` (can override model, approval policy, etc.)
This commit is contained in:
Owen Lin
2025-11-06 08:36:36 -08:00
committed by GitHub
parent 649ce520c4
commit 6582554926
8 changed files with 935 additions and 101 deletions

View File

@@ -13,6 +13,7 @@ use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::ApplyPatchApprovalResponse;
use codex_app_server_protocol::ArchiveConversationParams;
use codex_app_server_protocol::ArchiveConversationResponse;
use codex_app_server_protocol::AskForApproval;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::AuthStatusChangeNotification;
use codex_app_server_protocol::CancelLoginAccountParams;
@@ -29,6 +30,8 @@ use codex_app_server_protocol::FeedbackUploadResponse;
use codex_app_server_protocol::FuzzyFileSearchParams;
use codex_app_server_protocol::FuzzyFileSearchResponse;
use codex_app_server_protocol::GetAccountRateLimitsResponse;
use codex_app_server_protocol::GetAuthStatusParams;
use codex_app_server_protocol::GetAuthStatusResponse;
use codex_app_server_protocol::GetConversationSummaryParams;
use codex_app_server_protocol::GetConversationSummaryResponse;
use codex_app_server_protocol::GetUserAgentResponse;
@@ -45,6 +48,8 @@ use codex_app_server_protocol::LoginApiKeyParams;
use codex_app_server_protocol::LoginApiKeyResponse;
use codex_app_server_protocol::LoginChatGptCompleteNotification;
use codex_app_server_protocol::LoginChatGptResponse;
use codex_app_server_protocol::LogoutAccountResponse;
use codex_app_server_protocol::LogoutChatGptResponse;
use codex_app_server_protocol::ModelListParams;
use codex_app_server_protocol::ModelListResponse;
use codex_app_server_protocol::NewConversationParams;
@@ -54,6 +59,8 @@ use codex_app_server_protocol::RemoveConversationSubscriptionResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonRpcResult;
use codex_app_server_protocol::ResumeConversationParams;
use codex_app_server_protocol::ResumeConversationResponse;
use codex_app_server_protocol::SandboxMode;
use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse;
use codex_app_server_protocol::SendUserTurnParams;
@@ -66,6 +73,7 @@ use codex_app_server_protocol::SetDefaultModelResponse;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadResumeParams;
@@ -73,7 +81,15 @@ use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_app_server_protocol::UserSavedConfig;
use codex_backend_client::Client as BackendClient;
use codex_core::AuthManager;
@@ -136,6 +152,9 @@ use tracing::info;
use tracing::warn;
use uuid::Uuid;
type PendingInterruptQueue = Vec<(RequestId, ApiVersion)>;
type PendingInterrupts = Arc<Mutex<HashMap<ConversationId, PendingInterruptQueue>>>;
// Duration before a ChatGPT login attempt is abandoned.
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
struct ActiveLogin {
@@ -159,11 +178,17 @@ pub(crate) struct CodexMessageProcessor {
conversation_listeners: HashMap<Uuid, oneshot::Sender<()>>,
active_login: Arc<Mutex<Option<ActiveLogin>>>,
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
pending_interrupts: Arc<Mutex<HashMap<ConversationId, Vec<RequestId>>>>,
pending_interrupts: PendingInterrupts,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
feedback: CodexFeedback,
}
#[derive(Clone, Copy, Debug)]
enum ApiVersion {
V1,
V2,
}
impl CodexMessageProcessor {
pub fn new(
auth_manager: Arc<AuthManager>,
@@ -192,7 +217,7 @@ impl CodexMessageProcessor {
ClientRequest::Initialize { .. } => {
panic!("Initialize should be handled in MessageProcessor");
}
// === v2 Thread APIs ===
// === v2 Thread/Turn APIs ===
ClientRequest::ThreadStart { request_id, params } => {
self.thread_start(request_id, params).await;
}
@@ -212,6 +237,12 @@ impl CodexMessageProcessor {
self.send_unimplemented_error(request_id, "thread/compact")
.await;
}
ClientRequest::TurnStart { request_id, params } => {
self.turn_start(request_id, params).await;
}
ClientRequest::TurnInterrupt { request_id, params } => {
self.turn_interrupt(request_id, params).await;
}
ClientRequest::NewConversation { request_id, params } => {
// Do not tokio::spawn() to process new_conversation()
// asynchronously because we need to ensure the conversation is
@@ -731,10 +762,7 @@ impl CodexMessageProcessor {
match self.logout_common().await {
Ok(current_auth_method) => {
self.outgoing
.send_response(
request_id,
codex_app_server_protocol::LogoutChatGptResponse {},
)
.send_response(request_id, LogoutChatGptResponse {})
.await;
let payload = AuthStatusChangeNotification {
@@ -754,10 +782,7 @@ impl CodexMessageProcessor {
match self.logout_common().await {
Ok(current_auth_method) => {
self.outgoing
.send_response(
request_id,
codex_app_server_protocol::LogoutAccountResponse {},
)
.send_response(request_id, LogoutAccountResponse {})
.await;
let payload_v2 = AccountUpdatedNotification {
@@ -773,11 +798,7 @@ impl CodexMessageProcessor {
}
}
async fn get_auth_status(
&self,
request_id: RequestId,
params: codex_app_server_protocol::GetAuthStatusParams,
) {
async fn get_auth_status(&self, request_id: RequestId, params: GetAuthStatusParams) {
let include_token = params.include_token.unwrap_or(false);
let do_refresh = params.refresh_token.unwrap_or(false);
@@ -791,7 +812,7 @@ impl CodexMessageProcessor {
let requires_openai_auth = self.config.model_provider.requires_openai_auth;
let response = if !requires_openai_auth {
codex_app_server_protocol::GetAuthStatusResponse {
GetAuthStatusResponse {
auth_method: None,
auth_token: None,
requires_openai_auth: Some(false),
@@ -811,13 +832,13 @@ impl CodexMessageProcessor {
(None, None)
}
};
codex_app_server_protocol::GetAuthStatusResponse {
GetAuthStatusResponse {
auth_method: reported_auth_method,
auth_token: token_opt,
requires_openai_auth: Some(true),
}
}
None => codex_app_server_protocol::GetAuthStatusResponse {
None => GetAuthStatusResponse {
auth_method: None,
auth_token: None,
requires_openai_auth: Some(true),
@@ -1101,12 +1122,8 @@ impl CodexMessageProcessor {
let overrides = ConfigOverrides {
model: params.model,
cwd: params.cwd.map(PathBuf::from),
approval_policy: params
.approval_policy
.map(codex_app_server_protocol::AskForApproval::to_core),
sandbox_mode: params
.sandbox
.map(codex_app_server_protocol::SandboxMode::to_core),
approval_policy: params.approval_policy.map(AskForApproval::to_core),
sandbox_mode: params.sandbox.map(SandboxMode::to_core),
model_provider: params.model_provider,
codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(),
base_instructions: params.base_instructions,
@@ -1431,60 +1448,22 @@ impl CodexMessageProcessor {
let ListConversationsParams {
page_size,
cursor,
model_providers: model_provider,
model_providers,
} = params;
let page_size = page_size.unwrap_or(25);
let cursor_obj: Option<RolloutCursor> = cursor.as_ref().and_then(|s| parse_cursor(s));
let cursor_ref = cursor_obj.as_ref();
let model_provider_filter = match model_provider {
Some(providers) => {
if providers.is_empty() {
None
} else {
Some(providers)
}
}
None => Some(vec![self.config.model_provider_id.clone()]),
};
let model_provider_slice = model_provider_filter.as_deref();
let fallback_provider = self.config.model_provider_id.clone();
let page_size = page_size.unwrap_or(25).max(1);
let page = match RolloutRecorder::list_conversations(
&self.config.codex_home,
page_size,
cursor_ref,
INTERACTIVE_SESSION_SOURCES,
model_provider_slice,
fallback_provider.as_str(),
)
.await
match self
.list_conversations_common(page_size, cursor, model_providers)
.await
{
Ok(p) => p,
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list conversations: {err}"),
data: None,
};
Ok((items, next_cursor)) => {
let response = ListConversationsResponse { items, next_cursor };
self.outgoing.send_response(request_id, response).await;
}
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let items = page
.items
.into_iter()
.filter_map(|it| extract_conversation_summary(it.path, &it.head, &fallback_provider))
.collect();
// Encode next_cursor as a plain string
let next_cursor = page
.next_cursor
.and_then(|cursor| serde_json::to_value(&cursor).ok())
.and_then(|value| value.as_str().map(str::to_owned));
let response = ListConversationsResponse { items, next_cursor };
self.outgoing.send_response(request_id, response).await;
}
async fn list_conversations_common(
@@ -1534,6 +1513,7 @@ impl CodexMessageProcessor {
.filter_map(|it| extract_conversation_summary(it.path, &it.head, &fallback_provider))
.collect::<Vec<_>>();
// Encode next_cursor as a plain string
let next_cursor = page
.next_cursor
.and_then(|cursor| serde_json::to_value(&cursor).ok())
@@ -1557,8 +1537,8 @@ impl CodexMessageProcessor {
return;
}
let effective_page_size = limit.unwrap_or(total as u32).max(1) as usize;
let effective_page_size = effective_page_size.min(total);
let effective_limit = limit.unwrap_or(total as u32).max(1) as usize;
let effective_limit = effective_limit.min(total);
let start = match cursor {
Some(cursor) => match cursor.parse::<usize>() {
Ok(idx) => idx,
@@ -1585,7 +1565,7 @@ impl CodexMessageProcessor {
return;
}
let end = start.saturating_add(effective_page_size).min(total);
let end = start.saturating_add(effective_limit).min(total);
let items = models[start..end].to_vec();
let next_cursor = if end < total {
Some(end.to_string())
@@ -1620,7 +1600,7 @@ impl CodexMessageProcessor {
profile,
cwd,
approval_policy,
sandbox,
sandbox: sandbox_mode,
config: cli_overrides,
base_instructions,
developer_instructions,
@@ -1633,7 +1613,7 @@ impl CodexMessageProcessor {
config_profile: profile,
cwd: cwd.map(PathBuf::from),
approval_policy,
sandbox_mode: sandbox,
sandbox_mode,
model_provider,
codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(),
base_instructions,
@@ -1759,7 +1739,7 @@ impl CodexMessageProcessor {
.map(|msgs| msgs.into_iter().collect());
// Reply with conversation id + model and initial messages (when present)
let response = codex_app_server_protocol::ResumeConversationResponse {
let response = ResumeConversationResponse {
conversation_id,
model: session_configured.model.clone(),
initial_messages,
@@ -2070,7 +2050,151 @@ impl CodexMessageProcessor {
// Record the pending interrupt so we can reply when TurnAborted arrives.
{
let mut map = self.pending_interrupts.lock().await;
map.entry(conversation_id).or_default().push(request_id);
map.entry(conversation_id)
.or_default()
.push((request_id, ApiVersion::V1));
}
// Submit the interrupt; we'll respond upon TurnAborted.
let _ = conversation.submit(Op::Interrupt).await;
}
async fn turn_start(&self, request_id: RequestId, params: TurnStartParams) {
// Resolve conversation id from v2 thread id string.
let conversation_id = match ConversationId::from_string(&params.thread_id) {
Ok(id) => id,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid thread id: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let Ok(conversation) = self
.conversation_manager
.get_conversation(conversation_id)
.await
else {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("conversation not found: {conversation_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
};
// Keep a copy of v2 inputs for the notification payload.
let v2_inputs_for_notif = params.input.clone();
// Map v2 input items to core input items.
let mapped_items: Vec<CoreInputItem> = params
.input
.into_iter()
.map(V2UserInput::into_core)
.collect();
let has_any_overrides = params.cwd.is_some()
|| params.approval_policy.is_some()
|| params.sandbox_policy.is_some()
|| params.model.is_some()
|| params.effort.is_some()
|| params.summary.is_some();
// If any overrides are provided, update the session turn context first.
if has_any_overrides {
let _ = conversation
.submit(Op::OverrideTurnContext {
cwd: params.cwd,
approval_policy: params.approval_policy.map(AskForApproval::to_core),
sandbox_policy: params.sandbox_policy.map(|p| p.to_core()),
model: params.model,
effort: params.effort.map(Some),
summary: params.summary,
})
.await;
}
// Start the turn by submitting the user input. Return its submission id as turn_id.
let turn_id = conversation
.submit(Op::UserInput {
items: mapped_items,
})
.await;
match turn_id {
Ok(turn_id) => {
let turn = Turn {
id: turn_id.clone(),
items: vec![ThreadItem::UserMessage {
id: turn_id,
content: v2_inputs_for_notif,
}],
status: TurnStatus::InProgress,
error: None,
};
let response = TurnStartResponse { turn: turn.clone() };
self.outgoing.send_response(request_id, response).await;
// Emit v2 turn/started notification.
let notif = TurnStartedNotification { turn };
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start turn: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
async fn turn_interrupt(&mut self, request_id: RequestId, params: TurnInterruptParams) {
let TurnInterruptParams { thread_id, .. } = params;
// Resolve conversation id from v2 thread id string.
let conversation_id = match ConversationId::from_string(&thread_id) {
Ok(id) => id,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid thread id: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let Ok(conversation) = self
.conversation_manager
.get_conversation(conversation_id)
.await
else {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("conversation not found: {conversation_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
};
// Record the pending interrupt so we can reply when TurnAborted arrives.
{
let mut map = self.pending_interrupts.lock().await;
map.entry(conversation_id)
.or_default()
.push((request_id, ApiVersion::V2));
}
// Submit the interrupt; we'll respond upon TurnAborted.
@@ -2086,7 +2210,6 @@ impl CodexMessageProcessor {
conversation_id,
experimental_raw_events,
} = params;
match self
.attach_conversation_listener(conversation_id, experimental_raw_events)
.await
@@ -2213,7 +2336,6 @@ impl CodexMessageProcessor {
}
}
});
Ok(subscription_id)
}
@@ -2355,7 +2477,7 @@ async fn apply_bespoke_event_handling(
conversation_id: ConversationId,
conversation: Arc<CodexConversation>,
outgoing: Arc<OutgoingMessageSender>,
pending_interrupts: Arc<Mutex<HashMap<ConversationId, Vec<RequestId>>>>,
pending_interrupts: PendingInterrupts,
) {
let Event { id: event_id, msg } = event;
match msg {
@@ -2424,11 +2546,19 @@ async fn apply_bespoke_event_handling(
map.remove(&conversation_id).unwrap_or_default()
};
if !pending.is_empty() {
let response = InterruptConversationResponse {
abort_reason: turn_aborted_event.reason,
};
for rid in pending {
outgoing.send_response(rid, response.clone()).await;
for (rid, ver) in pending {
match ver {
ApiVersion::V1 => {
let response = InterruptConversationResponse {
abort_reason: turn_aborted_event.reason.clone(),
};
outgoing.send_response(rid, response).await;
}
ApiVersion::V2 => {
let response = TurnInterruptResponse {};
outgoing.send_response(rid, response).await;
}
}
}
}
}