chore: update Codex::spawn() to return a struct instead of a tuple (#1677)
Also update `init_codex()` to return a `struct` instead of a tuple, as well.
This commit is contained in:
@@ -4,6 +4,7 @@ use std::sync::Arc;
|
|||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use codex_common::CliConfigOverrides;
|
use codex_common::CliConfigOverrides;
|
||||||
use codex_core::Codex;
|
use codex_core::Codex;
|
||||||
|
use codex_core::CodexSpawnOk;
|
||||||
use codex_core::config::Config;
|
use codex_core::config::Config;
|
||||||
use codex_core::config::ConfigOverrides;
|
use codex_core::config::ConfigOverrides;
|
||||||
use codex_core::protocol::Submission;
|
use codex_core::protocol::Submission;
|
||||||
@@ -35,7 +36,7 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
|
|||||||
|
|
||||||
let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
|
let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
|
||||||
let ctrl_c = notify_on_sigint();
|
let ctrl_c = notify_on_sigint();
|
||||||
let (codex, _init_id, _session_id) = Codex::spawn(config, ctrl_c.clone()).await?;
|
let CodexSpawnOk { codex, .. } = Codex::spawn(config, ctrl_c.clone()).await?;
|
||||||
let codex = Arc::new(codex);
|
let codex = Arc::new(codex);
|
||||||
|
|
||||||
// Task that reads JSON lines from stdin and forwards to Submission Queue
|
// Task that reads JSON lines from stdin and forwards to Submission Queue
|
||||||
|
|||||||
@@ -97,11 +97,18 @@ pub struct Codex {
|
|||||||
rx_event: Receiver<Event>,
|
rx_event: Receiver<Event>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`],
|
||||||
|
/// the submission id for the initial `ConfigureSession` request and the
|
||||||
|
/// unique session id.
|
||||||
|
pub struct CodexSpawnOk {
|
||||||
|
pub codex: Codex,
|
||||||
|
pub init_id: String,
|
||||||
|
pub session_id: Uuid,
|
||||||
|
}
|
||||||
|
|
||||||
impl Codex {
|
impl Codex {
|
||||||
/// Spawn a new [`Codex`] and initialize the session. Returns the instance
|
/// Spawn a new [`Codex`] and initialize the session.
|
||||||
/// of `Codex` and the ID of the `SessionInitialized` event that was
|
pub async fn spawn(config: Config, ctrl_c: Arc<Notify>) -> CodexResult<CodexSpawnOk> {
|
||||||
/// submitted to start the session.
|
|
||||||
pub async fn spawn(config: Config, ctrl_c: Arc<Notify>) -> CodexResult<(Codex, String, Uuid)> {
|
|
||||||
// experimental resume path (undocumented)
|
// experimental resume path (undocumented)
|
||||||
let resume_path = config.experimental_resume.clone();
|
let resume_path = config.experimental_resume.clone();
|
||||||
info!("resume_path: {resume_path:?}");
|
info!("resume_path: {resume_path:?}");
|
||||||
@@ -139,7 +146,11 @@ impl Codex {
|
|||||||
};
|
};
|
||||||
let init_id = codex.submit(configure_session).await?;
|
let init_id = codex.submit(configure_session).await?;
|
||||||
|
|
||||||
Ok((codex, init_id, session_id))
|
Ok(CodexSpawnOk {
|
||||||
|
codex,
|
||||||
|
init_id,
|
||||||
|
session_id,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Submit the `op` wrapped in a `Submission` with a unique ID.
|
/// Submit the `op` wrapped in a `Submission` with a unique ID.
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::Codex;
|
use crate::Codex;
|
||||||
|
use crate::CodexSpawnOk;
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::protocol::Event;
|
use crate::protocol::Event;
|
||||||
use crate::protocol::EventMsg;
|
use crate::protocol::EventMsg;
|
||||||
@@ -8,14 +9,27 @@ use crate::util::notify_on_sigint;
|
|||||||
use tokio::sync::Notify;
|
use tokio::sync::Notify;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
/// Represents an active Codex conversation, including the first event
|
||||||
|
/// (which is [`EventMsg::SessionConfigured`]).
|
||||||
|
pub struct CodexConversation {
|
||||||
|
pub codex: Codex,
|
||||||
|
pub session_id: Uuid,
|
||||||
|
pub session_configured: Event,
|
||||||
|
pub ctrl_c: Arc<Notify>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Spawn a new [`Codex`] and initialize the session.
|
/// Spawn a new [`Codex`] and initialize the session.
|
||||||
///
|
///
|
||||||
/// Returns the wrapped [`Codex`] **and** the `SessionInitialized` event that
|
/// Returns the wrapped [`Codex`] **and** the `SessionInitialized` event that
|
||||||
/// is received as a response to the initial `ConfigureSession` submission so
|
/// is received as a response to the initial `ConfigureSession` submission so
|
||||||
/// that callers can surface the information to the UI.
|
/// that callers can surface the information to the UI.
|
||||||
pub async fn init_codex(config: Config) -> anyhow::Result<(Codex, Event, Arc<Notify>, Uuid)> {
|
pub async fn init_codex(config: Config) -> anyhow::Result<CodexConversation> {
|
||||||
let ctrl_c = notify_on_sigint();
|
let ctrl_c = notify_on_sigint();
|
||||||
let (codex, init_id, session_id) = Codex::spawn(config, ctrl_c.clone()).await?;
|
let CodexSpawnOk {
|
||||||
|
codex,
|
||||||
|
init_id,
|
||||||
|
session_id,
|
||||||
|
} = Codex::spawn(config, ctrl_c.clone()).await?;
|
||||||
|
|
||||||
// The first event must be `SessionInitialized`. Validate and forward it to
|
// The first event must be `SessionInitialized`. Validate and forward it to
|
||||||
// the caller so that they can display it in the conversation history.
|
// the caller so that they can display it in the conversation history.
|
||||||
@@ -34,5 +48,10 @@ pub async fn init_codex(config: Config) -> anyhow::Result<(Codex, Event, Arc<Not
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((codex, event, ctrl_c, session_id))
|
Ok(CodexConversation {
|
||||||
|
codex,
|
||||||
|
session_id,
|
||||||
|
session_configured: event,
|
||||||
|
ctrl_c,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ mod client;
|
|||||||
mod client_common;
|
mod client_common;
|
||||||
pub mod codex;
|
pub mod codex;
|
||||||
pub use codex::Codex;
|
pub use codex::Codex;
|
||||||
|
pub use codex::CodexSpawnOk;
|
||||||
pub mod codex_wrapper;
|
pub mod codex_wrapper;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod config_profile;
|
pub mod config_profile;
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use codex_core::Codex;
|
use codex_core::Codex;
|
||||||
|
use codex_core::CodexSpawnOk;
|
||||||
use codex_core::ModelProviderInfo;
|
use codex_core::ModelProviderInfo;
|
||||||
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||||
use codex_core::protocol::EventMsg;
|
use codex_core::protocol::EventMsg;
|
||||||
@@ -72,7 +73,7 @@ async fn includes_session_id_and_model_headers_in_request() {
|
|||||||
let mut config = load_default_config_for_test(&codex_home);
|
let mut config = load_default_config_for_test(&codex_home);
|
||||||
config.model_provider = model_provider;
|
config.model_provider = model_provider;
|
||||||
let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());
|
let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());
|
||||||
let (codex, _init_id, _session_id) = Codex::spawn(config, ctrl_c.clone()).await.unwrap();
|
let CodexSpawnOk { codex, .. } = Codex::spawn(config, ctrl_c.clone()).await.unwrap();
|
||||||
|
|
||||||
codex
|
codex
|
||||||
.submit(Op::UserInput {
|
.submit(Op::UserInput {
|
||||||
@@ -148,7 +149,7 @@ async fn includes_base_instructions_override_in_request() {
|
|||||||
config.model_provider = model_provider;
|
config.model_provider = model_provider;
|
||||||
|
|
||||||
let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());
|
let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());
|
||||||
let (codex, ..) = Codex::spawn(config, ctrl_c.clone()).await.unwrap();
|
let CodexSpawnOk { codex, .. } = Codex::spawn(config, ctrl_c.clone()).await.unwrap();
|
||||||
|
|
||||||
codex
|
codex
|
||||||
.submit(Op::UserInput {
|
.submit(Op::UserInput {
|
||||||
|
|||||||
@@ -20,6 +20,7 @@
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use codex_core::Codex;
|
use codex_core::Codex;
|
||||||
|
use codex_core::CodexSpawnOk;
|
||||||
use codex_core::error::CodexErr;
|
use codex_core::error::CodexErr;
|
||||||
use codex_core::protocol::AgentMessageEvent;
|
use codex_core::protocol::AgentMessageEvent;
|
||||||
use codex_core::protocol::ErrorEvent;
|
use codex_core::protocol::ErrorEvent;
|
||||||
@@ -48,7 +49,7 @@ async fn spawn_codex() -> Result<Codex, CodexErr> {
|
|||||||
let mut config = load_default_config_for_test(&codex_home);
|
let mut config = load_default_config_for_test(&codex_home);
|
||||||
config.model_provider.request_max_retries = Some(2);
|
config.model_provider.request_max_retries = Some(2);
|
||||||
config.model_provider.stream_max_retries = Some(2);
|
config.model_provider.stream_max_retries = Some(2);
|
||||||
let (agent, _init_id, _session_id) =
|
let CodexSpawnOk { codex: agent, .. } =
|
||||||
Codex::spawn(config, std::sync::Arc::new(Notify::new())).await?;
|
Codex::spawn(config, std::sync::Arc::new(Notify::new())).await?;
|
||||||
|
|
||||||
Ok(agent)
|
Ok(agent)
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use codex_core::Codex;
|
use codex_core::Codex;
|
||||||
|
use codex_core::CodexSpawnOk;
|
||||||
use codex_core::ModelProviderInfo;
|
use codex_core::ModelProviderInfo;
|
||||||
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||||
use codex_core::protocol::EventMsg;
|
use codex_core::protocol::EventMsg;
|
||||||
@@ -94,7 +95,7 @@ async fn retries_on_early_close() {
|
|||||||
let codex_home = TempDir::new().unwrap();
|
let codex_home = TempDir::new().unwrap();
|
||||||
let mut config = load_default_config_for_test(&codex_home);
|
let mut config = load_default_config_for_test(&codex_home);
|
||||||
config.model_provider = model_provider;
|
config.model_provider = model_provider;
|
||||||
let (codex, _init_id, _session_id) = Codex::spawn(config, ctrl_c).await.unwrap();
|
let CodexSpawnOk { codex, .. } = Codex::spawn(config, ctrl_c).await.unwrap();
|
||||||
|
|
||||||
codex
|
codex
|
||||||
.submit(Op::UserInput {
|
.submit(Op::UserInput {
|
||||||
|
|||||||
@@ -9,7 +9,8 @@ use std::path::PathBuf;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
pub use cli::Cli;
|
pub use cli::Cli;
|
||||||
use codex_core::codex_wrapper;
|
use codex_core::codex_wrapper::CodexConversation;
|
||||||
|
use codex_core::codex_wrapper::{self};
|
||||||
use codex_core::config::Config;
|
use codex_core::config::Config;
|
||||||
use codex_core::config::ConfigOverrides;
|
use codex_core::config::ConfigOverrides;
|
||||||
use codex_core::config_types::SandboxMode;
|
use codex_core::config_types::SandboxMode;
|
||||||
@@ -155,9 +156,14 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
|||||||
.with_writer(std::io::stderr)
|
.with_writer(std::io::stderr)
|
||||||
.try_init();
|
.try_init();
|
||||||
|
|
||||||
let (codex_wrapper, event, ctrl_c, _session_id) = codex_wrapper::init_codex(config).await?;
|
let CodexConversation {
|
||||||
|
codex: codex_wrapper,
|
||||||
|
session_configured,
|
||||||
|
ctrl_c,
|
||||||
|
..
|
||||||
|
} = codex_wrapper::init_codex(config).await?;
|
||||||
let codex = Arc::new(codex_wrapper);
|
let codex = Arc::new(codex_wrapper);
|
||||||
info!("Codex initialized with event: {event:?}");
|
info!("Codex initialized with event: {session_configured:?}");
|
||||||
|
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
|
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use std::collections::HashMap;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use codex_core::Codex;
|
use codex_core::Codex;
|
||||||
|
use codex_core::codex_wrapper::CodexConversation;
|
||||||
use codex_core::codex_wrapper::init_codex;
|
use codex_core::codex_wrapper::init_codex;
|
||||||
use codex_core::config::Config as CodexConfig;
|
use codex_core::config::Config as CodexConfig;
|
||||||
use codex_core::protocol::AgentMessageEvent;
|
use codex_core::protocol::AgentMessageEvent;
|
||||||
@@ -42,7 +43,12 @@ pub async fn run_codex_tool_session(
|
|||||||
session_map: Arc<Mutex<HashMap<Uuid, Arc<Codex>>>>,
|
session_map: Arc<Mutex<HashMap<Uuid, Arc<Codex>>>>,
|
||||||
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
|
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
|
||||||
) {
|
) {
|
||||||
let (codex, first_event, _ctrl_c, session_id) = match init_codex(config).await {
|
let CodexConversation {
|
||||||
|
codex,
|
||||||
|
session_configured,
|
||||||
|
session_id,
|
||||||
|
..
|
||||||
|
} = match init_codex(config).await {
|
||||||
Ok(res) => res,
|
Ok(res) => res,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let result = CallToolResult {
|
let result = CallToolResult {
|
||||||
@@ -66,7 +72,9 @@ pub async fn run_codex_tool_session(
|
|||||||
drop(session_map);
|
drop(session_map);
|
||||||
|
|
||||||
// Send initial SessionConfigured event.
|
// Send initial SessionConfigured event.
|
||||||
outgoing.send_event_as_notification(&first_event).await;
|
outgoing
|
||||||
|
.send_event_as_notification(&session_configured)
|
||||||
|
.await;
|
||||||
|
|
||||||
// Use the original MCP request ID as the `sub_id` for the Codex submission so that
|
// Use the original MCP request ID as the `sub_id` for the Codex submission so that
|
||||||
// any events emitted for this tool-call can be correlated with the
|
// any events emitted for this tool-call can be correlated with the
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use codex_core::codex_wrapper::CodexConversation;
|
||||||
use codex_core::codex_wrapper::init_codex;
|
use codex_core::codex_wrapper::init_codex;
|
||||||
use codex_core::config::Config;
|
use codex_core::config::Config;
|
||||||
use codex_core::protocol::AgentMessageDeltaEvent;
|
use codex_core::protocol::AgentMessageDeltaEvent;
|
||||||
@@ -89,19 +90,22 @@ impl ChatWidget<'_> {
|
|||||||
// Create the Codex asynchronously so the UI loads as quickly as possible.
|
// Create the Codex asynchronously so the UI loads as quickly as possible.
|
||||||
let config_for_agent_loop = config.clone();
|
let config_for_agent_loop = config.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let (codex, session_event, _ctrl_c, _session_id) =
|
let CodexConversation {
|
||||||
match init_codex(config_for_agent_loop).await {
|
codex,
|
||||||
Ok(vals) => vals,
|
session_configured,
|
||||||
Err(e) => {
|
..
|
||||||
// TODO: surface this error to the user.
|
} = match init_codex(config_for_agent_loop).await {
|
||||||
tracing::error!("failed to initialize codex: {e}");
|
Ok(vals) => vals,
|
||||||
return;
|
Err(e) => {
|
||||||
}
|
// TODO: surface this error to the user.
|
||||||
};
|
tracing::error!("failed to initialize codex: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Forward the captured `SessionInitialized` event that was consumed
|
// Forward the captured `SessionInitialized` event that was consumed
|
||||||
// inside `init_codex()` so it can be rendered in the UI.
|
// inside `init_codex()` so it can be rendered in the UI.
|
||||||
app_event_tx_clone.send(AppEvent::CodexEvent(session_event.clone()));
|
app_event_tx_clone.send(AppEvent::CodexEvent(session_configured.clone()));
|
||||||
let codex = Arc::new(codex);
|
let codex = Arc::new(codex);
|
||||||
let codex_clone = codex.clone();
|
let codex_clone = codex.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|||||||
Reference in New Issue
Block a user