This PR adds a central `AuthManager` struct that manages the auth information used across conversations and the MCP server. Prior to this, each conversation and the MCP server got their own private snapshots of the auth information, and changes to one (such as a logout or token refresh) were not seen by others. This is especially problematic when multiple instances of the CLI are run. For example, consider the case where you start CLI 1 and log in to ChatGPT account X and then start CLI 2 and log out and then log in to ChatGPT account Y. The conversation in CLI 1 is still using account X, but if you create a new conversation, it will suddenly (and unexpectedly) switch to account Y. With the `AuthManager`, auth information is read from disk at the time the `ConversationManager` is constructed, and it is cached in memory. All new conversations use this same auth information, as do any token refreshes. The `AuthManager` is also used by the MCP server's GetAuthStatus command, which now returns the auth method currently used by the MCP server. This PR also includes an enhancement to the GetAuthStatus command. It now accepts two new (optional) input parameters: `include_token` and `refresh_token`. Callers can use this to request the in-use auth token and can optionally request to refresh the token. The PR also adds tests for the login and auth APIs that I recently added to the MCP server.
136 lines
4.3 KiB
Rust
136 lines
4.3 KiB
Rust
use std::io::IsTerminal;
|
|
|
|
use clap::Parser;
|
|
use codex_common::CliConfigOverrides;
|
|
use codex_core::ConversationManager;
|
|
use codex_core::NewConversation;
|
|
use codex_core::config::Config;
|
|
use codex_core::config::ConfigOverrides;
|
|
use codex_core::protocol::Event;
|
|
use codex_core::protocol::EventMsg;
|
|
use codex_core::protocol::Submission;
|
|
use codex_login::AuthManager;
|
|
use tokio::io::AsyncBufReadExt;
|
|
use tokio::io::BufReader;
|
|
use tracing::error;
|
|
use tracing::info;
|
|
|
|
#[derive(Debug, Parser)]
|
|
pub struct ProtoCli {
|
|
#[clap(skip)]
|
|
pub config_overrides: CliConfigOverrides,
|
|
}
|
|
|
|
pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
|
|
if std::io::stdin().is_terminal() {
|
|
anyhow::bail!("Protocol mode expects stdin to be a pipe, not a terminal");
|
|
}
|
|
|
|
tracing_subscriber::fmt()
|
|
.with_writer(std::io::stderr)
|
|
.init();
|
|
|
|
let ProtoCli { config_overrides } = opts;
|
|
let overrides_vec = config_overrides
|
|
.parse_overrides()
|
|
.map_err(anyhow::Error::msg)?;
|
|
|
|
let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
|
|
// Use conversation_manager API to start a conversation
|
|
let conversation_manager = ConversationManager::new(AuthManager::shared(
|
|
config.codex_home.clone(),
|
|
config.preferred_auth_method,
|
|
));
|
|
let NewConversation {
|
|
conversation_id: _,
|
|
conversation,
|
|
session_configured,
|
|
} = conversation_manager.new_conversation(config).await?;
|
|
|
|
// Simulate streaming the session_configured event.
|
|
let synthetic_event = Event {
|
|
// Fake id value.
|
|
id: "".to_string(),
|
|
msg: EventMsg::SessionConfigured(session_configured),
|
|
};
|
|
let session_configured_event = match serde_json::to_string(&synthetic_event) {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
error!("Failed to serialize session_configured: {e}");
|
|
return Err(anyhow::Error::from(e));
|
|
}
|
|
};
|
|
println!("{session_configured_event}");
|
|
|
|
// Task that reads JSON lines from stdin and forwards to Submission Queue
|
|
let sq_fut = {
|
|
let conversation = conversation.clone();
|
|
async move {
|
|
let stdin = BufReader::new(tokio::io::stdin());
|
|
let mut lines = stdin.lines();
|
|
loop {
|
|
let result = tokio::select! {
|
|
_ = tokio::signal::ctrl_c() => {
|
|
break
|
|
},
|
|
res = lines.next_line() => res,
|
|
};
|
|
|
|
match result {
|
|
Ok(Some(line)) => {
|
|
let line = line.trim();
|
|
if line.is_empty() {
|
|
continue;
|
|
}
|
|
match serde_json::from_str::<Submission>(line) {
|
|
Ok(sub) => {
|
|
if let Err(e) = conversation.submit_with_id(sub).await {
|
|
error!("{e:#}");
|
|
break;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
error!("invalid submission: {e}");
|
|
}
|
|
}
|
|
}
|
|
_ => {
|
|
info!("Submission queue closed");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
// Task that reads events from the agent and prints them as JSON lines to stdout
|
|
let eq_fut = async move {
|
|
loop {
|
|
let event = tokio::select! {
|
|
_ = tokio::signal::ctrl_c() => break,
|
|
event = conversation.next_event() => event,
|
|
};
|
|
match event {
|
|
Ok(event) => {
|
|
let event_str = match serde_json::to_string(&event) {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
error!("Failed to serialize event: {e}");
|
|
continue;
|
|
}
|
|
};
|
|
println!("{event_str}");
|
|
}
|
|
Err(e) => {
|
|
error!("{e:#}");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
info!("Event queue closed");
|
|
};
|
|
|
|
tokio::join!(sq_fut, eq_fut);
|
|
Ok(())
|
|
}
|