diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 12bbdf71..8d97f82c 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -198,6 +198,30 @@ enum ApiVersion { } impl CodexMessageProcessor { + async fn conversation_from_thread_id( + &self, + thread_id: &str, + ) -> Result<(ConversationId, Arc), JSONRPCErrorError> { + // Resolve conversation id from v2 thread id string. + let conversation_id = + ConversationId::from_string(thread_id).map_err(|err| JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("invalid thread id: {err}"), + data: None, + })?; + + let conversation = self + .conversation_manager + .get_conversation(conversation_id) + .await + .map_err(|_| JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("conversation not found: {conversation_id}"), + data: None, + })?; + + Ok((conversation_id, conversation)) + } pub fn new( auth_manager: Arc, conversation_manager: Arc, @@ -2145,34 +2169,14 @@ impl CodexMessageProcessor { } async fn turn_start(&self, request_id: RequestId, params: TurnStartParams) { - // Resolve conversation id from v2 thread id string. - let conversation_id = match ConversationId::from_string(¶ms.thread_id) { - Ok(id) => id, - Err(err) => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("invalid thread id: {err}"), - data: None, - }; + let (_, conversation) = match self.conversation_from_thread_id(¶ms.thread_id).await { + Ok(v) => v, + Err(error) => { self.outgoing.send_error(request_id, error).await; return; } }; - let Ok(conversation) = self - .conversation_manager - .get_conversation(conversation_id) - .await - else { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("conversation not found: {conversation_id}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; - }; - // Keep a copy of v2 inputs for the notification payload. let v2_inputs_for_notif = params.input.clone(); @@ -2246,33 +2250,14 @@ impl CodexMessageProcessor { async fn turn_interrupt(&mut self, request_id: RequestId, params: TurnInterruptParams) { let TurnInterruptParams { thread_id, .. } = params; - // Resolve conversation id from v2 thread id string. - let conversation_id = match ConversationId::from_string(&thread_id) { - Ok(id) => id, - Err(err) => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("invalid thread id: {err}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; - } - }; - - let Ok(conversation) = self - .conversation_manager - .get_conversation(conversation_id) - .await - else { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("conversation not found: {conversation_id}"), - data: None, + let (conversation_id, conversation) = + match self.conversation_from_thread_id(&thread_id).await { + Ok(v) => v, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } }; - self.outgoing.send_error(request_id, error).await; - return; - }; // Record the pending interrupt so we can reply when TurnAborted arrives. {