This PR does two things because after I got deep into the first one I started pulling on the thread to the second: - Makes `ConversationManager` the place where all in-memory conversations are created and stored. Previously, `MessageProcessor` in the `codex-mcp-server` crate was doing this via its `session_map`, but this is something that should be done in `codex-core`. - It unwinds the `ctrl_c: tokio::sync::Notify` that was threaded throughout our code. I think this made sense at one time, but now that we handle Ctrl-C within the TUI and have a proper `Op::Interrupt` event, I don't think this was quite right, so I removed it. For `codex exec` and `codex proto`, we now use `tokio::signal::ctrl_c()` directly, but we no longer make `Notify` a field of `Codex` or `CodexConversation`. Changes of note: - Adds the files `conversation_manager.rs` and `codex_conversation.rs` to `codex-core`. - `Codex` and `CodexSpawnOk` are no longer exported from `codex-core`: other crates must use `CodexConversation` instead (which is created via `ConversationManager`). - `core/src/codex_wrapper.rs` has been deleted in favor of `ConversationManager`. - `ConversationManager::new_conversation()` returns `NewConversation`, which is in line with the `new_conversation` tool we want to add to the MCP server. Note `NewConversation` includes `SessionConfiguredEvent`, so we eliminate checks in cases like `codex-rs/core/tests/client.rs` to verify `SessionConfiguredEvent` is the first event because that is now internal to `ConversationManager`. - Quite a bit of code was deleted from `codex-rs/mcp-server/src/message_processor.rs` since it no longer has to manage multiple conversations itself: it goes through `ConversationManager` instead. - `core/tests/live_agent.rs` has been deleted because I had to update a bunch of tests and all the tests in here were ignored, and I don't think anyone ever ran them, so this was just technical debt, at this point. - Removed `notify_on_sigint()` from `util.rs` (and in a follow-up, I hope to refactor the blandly-named `util.rs` into more descriptive files). - In general, I started replacing local variables named `codex` as `conversation`, where appropriate, though admittedly I didn't do it through all the integration tests because that would have added a lot of noise to this PR. --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/2240). * #2264 * #2263 * __->__ #2240
132 lines
4.2 KiB
Rust
132 lines
4.2 KiB
Rust
use std::io::IsTerminal;
|
|
|
|
use clap::Parser;
|
|
use codex_common::CliConfigOverrides;
|
|
use codex_core::ConversationManager;
|
|
use codex_core::NewConversation;
|
|
use codex_core::config::Config;
|
|
use codex_core::config::ConfigOverrides;
|
|
use codex_core::protocol::Event;
|
|
use codex_core::protocol::EventMsg;
|
|
use codex_core::protocol::Submission;
|
|
use tokio::io::AsyncBufReadExt;
|
|
use tokio::io::BufReader;
|
|
use tracing::error;
|
|
use tracing::info;
|
|
|
|
#[derive(Debug, Parser)]
|
|
pub struct ProtoCli {
|
|
#[clap(skip)]
|
|
pub config_overrides: CliConfigOverrides,
|
|
}
|
|
|
|
pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
|
|
if std::io::stdin().is_terminal() {
|
|
anyhow::bail!("Protocol mode expects stdin to be a pipe, not a terminal");
|
|
}
|
|
|
|
tracing_subscriber::fmt()
|
|
.with_writer(std::io::stderr)
|
|
.init();
|
|
|
|
let ProtoCli { config_overrides } = opts;
|
|
let overrides_vec = config_overrides
|
|
.parse_overrides()
|
|
.map_err(anyhow::Error::msg)?;
|
|
|
|
let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
|
|
// Use conversation_manager API to start a conversation
|
|
let conversation_manager = ConversationManager::default();
|
|
let NewConversation {
|
|
conversation_id: _,
|
|
conversation,
|
|
session_configured,
|
|
} = conversation_manager.new_conversation(config).await?;
|
|
|
|
// Simulate streaming the session_configured event.
|
|
let synthetic_event = Event {
|
|
// Fake id value.
|
|
id: "".to_string(),
|
|
msg: EventMsg::SessionConfigured(session_configured),
|
|
};
|
|
let session_configured_event = match serde_json::to_string(&synthetic_event) {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
error!("Failed to serialize session_configured: {e}");
|
|
return Err(anyhow::Error::from(e));
|
|
}
|
|
};
|
|
println!("{session_configured_event}");
|
|
|
|
// Task that reads JSON lines from stdin and forwards to Submission Queue
|
|
let sq_fut = {
|
|
let conversation = conversation.clone();
|
|
async move {
|
|
let stdin = BufReader::new(tokio::io::stdin());
|
|
let mut lines = stdin.lines();
|
|
loop {
|
|
let result = tokio::select! {
|
|
_ = tokio::signal::ctrl_c() => {
|
|
break
|
|
},
|
|
res = lines.next_line() => res,
|
|
};
|
|
|
|
match result {
|
|
Ok(Some(line)) => {
|
|
let line = line.trim();
|
|
if line.is_empty() {
|
|
continue;
|
|
}
|
|
match serde_json::from_str::<Submission>(line) {
|
|
Ok(sub) => {
|
|
if let Err(e) = conversation.submit_with_id(sub).await {
|
|
error!("{e:#}");
|
|
break;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
error!("invalid submission: {e}");
|
|
}
|
|
}
|
|
}
|
|
_ => {
|
|
info!("Submission queue closed");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
// Task that reads events from the agent and prints them as JSON lines to stdout
|
|
let eq_fut = async move {
|
|
loop {
|
|
let event = tokio::select! {
|
|
_ = tokio::signal::ctrl_c() => break,
|
|
event = conversation.next_event() => event,
|
|
};
|
|
match event {
|
|
Ok(event) => {
|
|
let event_str = match serde_json::to_string(&event) {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
error!("Failed to serialize event: {e}");
|
|
continue;
|
|
}
|
|
};
|
|
println!("{event_str}");
|
|
}
|
|
Err(e) => {
|
|
error!("{e:#}");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
info!("Event queue closed");
|
|
};
|
|
|
|
tokio::join!(sq_fut, eq_fut);
|
|
Ok(())
|
|
}
|