[app-server] feat: expose additional fields on Thread (#6338)
Add the following fields to Thread:
```
pub preview: String,
pub model_provider: String,
pub created_at: i64,
```
Will prob need another PR once this lands:
https://github.com/openai/codex/pull/6337
This commit is contained in:
@@ -354,6 +354,11 @@ pub struct ThreadCompactResponse {}
|
|||||||
#[ts(export_to = "v2/")]
|
#[ts(export_to = "v2/")]
|
||||||
pub struct Thread {
|
pub struct Thread {
|
||||||
pub id: String,
|
pub id: String,
|
||||||
|
/// Usually the first user message in the thread, if available.
|
||||||
|
pub preview: String,
|
||||||
|
pub model_provider: String,
|
||||||
|
/// Unix timestamp (in seconds) when the thread was created.
|
||||||
|
pub created_at: i64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||||
|
|||||||
@@ -4,6 +4,8 @@ use crate::fuzzy_file_search::run_fuzzy_file_search;
|
|||||||
use crate::models::supported_models;
|
use crate::models::supported_models;
|
||||||
use crate::outgoing_message::OutgoingMessageSender;
|
use crate::outgoing_message::OutgoingMessageSender;
|
||||||
use crate::outgoing_message::OutgoingNotification;
|
use crate::outgoing_message::OutgoingNotification;
|
||||||
|
use chrono::DateTime;
|
||||||
|
use chrono::Utc;
|
||||||
use codex_app_server_protocol::Account;
|
use codex_app_server_protocol::Account;
|
||||||
use codex_app_server_protocol::AccountLoginCompletedNotification;
|
use codex_app_server_protocol::AccountLoginCompletedNotification;
|
||||||
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
|
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
|
||||||
@@ -1202,8 +1204,31 @@ impl CodexMessageProcessor {
|
|||||||
|
|
||||||
match self.conversation_manager.new_conversation(config).await {
|
match self.conversation_manager.new_conversation(config).await {
|
||||||
Ok(new_conv) => {
|
Ok(new_conv) => {
|
||||||
let thread = Thread {
|
let conversation_id = new_conv.conversation_id;
|
||||||
id: new_conv.conversation_id.to_string(),
|
let rollout_path = new_conv.session_configured.rollout_path.clone();
|
||||||
|
let fallback_provider = self.config.model_provider_id.as_str();
|
||||||
|
|
||||||
|
// A bit hacky, but the summary contains a lot of useful information for the thread
|
||||||
|
// that unfortunately does not get returned from conversation_manager.new_conversation().
|
||||||
|
let thread = match read_summary_from_rollout(
|
||||||
|
rollout_path.as_path(),
|
||||||
|
fallback_provider,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(summary) => summary_to_thread(summary),
|
||||||
|
Err(err) => {
|
||||||
|
warn!(
|
||||||
|
"failed to load summary for new thread {}: {}",
|
||||||
|
conversation_id, err
|
||||||
|
);
|
||||||
|
Thread {
|
||||||
|
id: conversation_id.to_string(),
|
||||||
|
preview: String::new(),
|
||||||
|
model_provider: self.config.model_provider_id.clone(),
|
||||||
|
created_at: chrono::Utc::now().timestamp(),
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let response = ThreadStartResponse {
|
let response = ThreadStartResponse {
|
||||||
@@ -1213,12 +1238,12 @@ impl CodexMessageProcessor {
|
|||||||
// Auto-attach a conversation listener when starting a thread.
|
// Auto-attach a conversation listener when starting a thread.
|
||||||
// Use the same behavior as the v1 API with experimental_raw_events=false.
|
// Use the same behavior as the v1 API with experimental_raw_events=false.
|
||||||
if let Err(err) = self
|
if let Err(err) = self
|
||||||
.attach_conversation_listener(new_conv.conversation_id, false)
|
.attach_conversation_listener(conversation_id, false)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
"failed to attach listener for conversation {}: {}",
|
"failed to attach listener for conversation {}: {}",
|
||||||
new_conv.conversation_id,
|
conversation_id,
|
||||||
err.message
|
err.message
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -1316,12 +1341,7 @@ impl CodexMessageProcessor {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let data = summaries
|
let data = summaries.into_iter().map(summary_to_thread).collect();
|
||||||
.into_iter()
|
|
||||||
.map(|s| Thread {
|
|
||||||
id: s.conversation_id.to_string(),
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let response = ThreadListResponse { data, next_cursor };
|
let response = ThreadListResponse { data, next_cursor };
|
||||||
self.outgoing.send_response(request_id, response).await;
|
self.outgoing.send_response(request_id, response).await;
|
||||||
@@ -1408,6 +1428,8 @@ impl CodexMessageProcessor {
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
|
let thread = summary_to_thread(summary);
|
||||||
|
|
||||||
// Auto-attach a conversation listener when resuming a thread.
|
// Auto-attach a conversation listener when resuming a thread.
|
||||||
if let Err(err) = self
|
if let Err(err) = self
|
||||||
.attach_conversation_listener(conversation_id, false)
|
.attach_conversation_listener(conversation_id, false)
|
||||||
@@ -1420,11 +1442,7 @@ impl CodexMessageProcessor {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let response = ThreadResumeResponse {
|
let response = ThreadResumeResponse { thread };
|
||||||
thread: Thread {
|
|
||||||
id: conversation_id.to_string(),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
self.outgoing.send_response(request_id, response).await;
|
self.outgoing.send_response(request_id, response).await;
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -2837,6 +2855,33 @@ fn map_git_info(git_info: &GitInfo) -> ConversationGitInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn parse_datetime(timestamp: Option<&str>) -> Option<DateTime<Utc>> {
|
||||||
|
timestamp.and_then(|ts| {
|
||||||
|
chrono::DateTime::parse_from_rfc3339(ts)
|
||||||
|
.ok()
|
||||||
|
.map(|dt| dt.with_timezone(&chrono::Utc))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn summary_to_thread(summary: ConversationSummary) -> Thread {
|
||||||
|
let ConversationSummary {
|
||||||
|
conversation_id,
|
||||||
|
preview,
|
||||||
|
timestamp,
|
||||||
|
model_provider,
|
||||||
|
..
|
||||||
|
} = summary;
|
||||||
|
|
||||||
|
let created_at = parse_datetime(timestamp.as_deref());
|
||||||
|
|
||||||
|
Thread {
|
||||||
|
id: conversation_id.to_string(),
|
||||||
|
preview,
|
||||||
|
model_provider,
|
||||||
|
created_at: created_at.map(|dt| dt.timestamp()).unwrap_or(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
@@ -102,6 +102,11 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
|
|||||||
next_cursor: cursor1,
|
next_cursor: cursor1,
|
||||||
} = to_response::<ThreadListResponse>(page1_resp)?;
|
} = to_response::<ThreadListResponse>(page1_resp)?;
|
||||||
assert_eq!(data1.len(), 2);
|
assert_eq!(data1.len(), 2);
|
||||||
|
for thread in &data1 {
|
||||||
|
assert_eq!(thread.preview, "Hello");
|
||||||
|
assert_eq!(thread.model_provider, "mock_provider");
|
||||||
|
assert!(thread.created_at > 0);
|
||||||
|
}
|
||||||
let cursor1 = cursor1.expect("expected nextCursor on first page");
|
let cursor1 = cursor1.expect("expected nextCursor on first page");
|
||||||
|
|
||||||
// Page 2: with cursor → expect next_cursor None when no more results.
|
// Page 2: with cursor → expect next_cursor None when no more results.
|
||||||
@@ -122,6 +127,11 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
|
|||||||
next_cursor: cursor2,
|
next_cursor: cursor2,
|
||||||
} = to_response::<ThreadListResponse>(page2_resp)?;
|
} = to_response::<ThreadListResponse>(page2_resp)?;
|
||||||
assert!(data2.len() <= 2);
|
assert!(data2.len() <= 2);
|
||||||
|
for thread in &data2 {
|
||||||
|
assert_eq!(thread.preview, "Hello");
|
||||||
|
assert_eq!(thread.model_provider, "mock_provider");
|
||||||
|
assert!(thread.created_at > 0);
|
||||||
|
}
|
||||||
assert_eq!(cursor2, None, "expected nextCursor to be null on last page");
|
assert_eq!(cursor2, None, "expected nextCursor to be null on last page");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -200,6 +210,11 @@ async fn thread_list_respects_provider_filter() -> Result<()> {
|
|||||||
let ThreadListResponse { data, next_cursor } = to_response::<ThreadListResponse>(resp)?;
|
let ThreadListResponse { data, next_cursor } = to_response::<ThreadListResponse>(resp)?;
|
||||||
assert_eq!(data.len(), 1);
|
assert_eq!(data.len(), 1);
|
||||||
assert_eq!(next_cursor, None);
|
assert_eq!(next_cursor, None);
|
||||||
|
let thread = &data[0];
|
||||||
|
assert_eq!(thread.preview, "X");
|
||||||
|
assert_eq!(thread.model_provider, "other_provider");
|
||||||
|
let expected_ts = chrono::DateTime::parse_from_rfc3339("2025-01-02T11:00:00Z")?.timestamp();
|
||||||
|
assert_eq!(thread.created_at, expected_ts);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ async fn thread_resume_returns_existing_thread() -> Result<()> {
|
|||||||
.await??;
|
.await??;
|
||||||
let ThreadResumeResponse { thread: resumed } =
|
let ThreadResumeResponse { thread: resumed } =
|
||||||
to_response::<ThreadResumeResponse>(resume_resp)?;
|
to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||||
assert_eq!(resumed.id, thread.id);
|
assert_eq!(resumed, thread);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,6 +42,15 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
|
|||||||
.await??;
|
.await??;
|
||||||
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(resp)?;
|
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(resp)?;
|
||||||
assert!(!thread.id.is_empty(), "thread id should not be empty");
|
assert!(!thread.id.is_empty(), "thread id should not be empty");
|
||||||
|
assert!(
|
||||||
|
thread.preview.is_empty(),
|
||||||
|
"new threads should start with an empty preview"
|
||||||
|
);
|
||||||
|
assert_eq!(thread.model_provider, "mock_provider");
|
||||||
|
assert!(
|
||||||
|
thread.created_at > 0,
|
||||||
|
"created_at should be a positive UNIX timestamp"
|
||||||
|
);
|
||||||
|
|
||||||
// A corresponding thread/started notification should arrive.
|
// A corresponding thread/started notification should arrive.
|
||||||
let notif: JSONRPCNotification = timeout(
|
let notif: JSONRPCNotification = timeout(
|
||||||
@@ -51,7 +60,7 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
|
|||||||
.await??;
|
.await??;
|
||||||
let started: ThreadStartedNotification =
|
let started: ThreadStartedNotification =
|
||||||
serde_json::from_value(notif.params.expect("params must be present"))?;
|
serde_json::from_value(notif.params.expect("params must be present"))?;
|
||||||
assert_eq!(started.thread.id, thread.id);
|
assert_eq!(started.thread, thread);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user