tui: switch to using tokio + EventStream for processing crossterm events (#2489)

bringing the tui more into tokio-land to make it easier to factorize.

fyi @bolinfest
This commit is contained in:
Jeremy Rose
2025-08-20 10:11:09 -07:00
committed by GitHub
parent 8481eb4c6e
commit 61bbabe7d9
13 changed files with 396 additions and 390 deletions

View File

@@ -30,7 +30,6 @@ use std::io::BufRead;
use std::io::BufReader;
use std::io::Read;
use std::path::PathBuf;
use std::sync::mpsc::channel;
use tokio::sync::mpsc::unbounded_channel;
fn test_config() -> Config {
@@ -45,7 +44,7 @@ fn test_config() -> Config {
#[test]
fn final_answer_without_newline_is_flushed_immediately() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// Set up a VT100 test terminal to capture ANSI visual output
let width: u16 = 80;
@@ -73,7 +72,7 @@ fn final_answer_without_newline_is_flushed_immediately() {
});
// Drain history insertions and verify the final line is present.
let cells = drain_insert_history(&rx);
let cells = drain_insert_history(&mut rx);
assert!(
cells.iter().any(|lines| {
let s = lines
@@ -101,7 +100,7 @@ fn final_answer_without_newline_is_flushed_immediately() {
#[tokio::test(flavor = "current_thread")]
async fn helpers_are_available_and_do_not_panic() {
let (tx_raw, _rx) = channel::<AppEvent>();
let (tx_raw, _rx) = unbounded_channel::<AppEvent>();
let tx = AppEventSender::new(tx_raw);
let cfg = test_config();
let conversation_manager = Arc::new(ConversationManager::default());
@@ -113,10 +112,10 @@ async fn helpers_are_available_and_do_not_panic() {
// --- Helpers for tests that need direct construction and event draining ---
fn make_chatwidget_manual() -> (
ChatWidget<'static>,
std::sync::mpsc::Receiver<AppEvent>,
tokio::sync::mpsc::UnboundedReceiver<AppEvent>,
tokio::sync::mpsc::UnboundedReceiver<Op>,
) {
let (tx_raw, rx) = channel::<AppEvent>();
let (tx_raw, rx) = unbounded_channel::<AppEvent>();
let app_event_tx = AppEventSender::new(tx_raw);
let (op_tx, op_rx) = unbounded_channel::<Op>();
let cfg = test_config();
@@ -148,7 +147,7 @@ fn make_chatwidget_manual() -> (
}
fn drain_insert_history(
rx: &std::sync::mpsc::Receiver<AppEvent>,
rx: &mut tokio::sync::mpsc::UnboundedReceiver<AppEvent>,
) -> Vec<Vec<ratatui::text::Line<'static>>> {
let mut out = Vec::new();
while let Ok(ev) = rx.try_recv() {
@@ -196,7 +195,7 @@ fn open_fixture(name: &str) -> std::fs::File {
#[test]
fn exec_history_cell_shows_working_then_completed() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// Begin command
chat.handle_codex_event(Event {
@@ -226,7 +225,7 @@ fn exec_history_cell_shows_working_then_completed() {
}),
});
let cells = drain_insert_history(&rx);
let cells = drain_insert_history(&mut rx);
assert_eq!(
cells.len(),
1,
@@ -241,7 +240,7 @@ fn exec_history_cell_shows_working_then_completed() {
#[test]
fn exec_history_cell_shows_working_then_failed() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// Begin command
chat.handle_codex_event(Event {
@@ -271,7 +270,7 @@ fn exec_history_cell_shows_working_then_failed() {
}),
});
let cells = drain_insert_history(&rx);
let cells = drain_insert_history(&mut rx);
assert_eq!(
cells.len(),
1,
@@ -286,7 +285,7 @@ fn exec_history_cell_shows_working_then_failed() {
#[tokio::test(flavor = "current_thread")]
async fn binary_size_transcript_matches_ideal_fixture() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// Set up a VT100 test terminal to capture ANSI visual output
let width: u16 = 80;
@@ -423,7 +422,7 @@ async fn binary_size_transcript_matches_ideal_fixture() {
#[test]
fn apply_patch_events_emit_history_cells() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// 1) Approval request -> proposed patch summary cell
let mut changes = HashMap::new();
@@ -443,7 +442,7 @@ fn apply_patch_events_emit_history_cells() {
id: "s1".into(),
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
let cells = drain_insert_history(&rx);
let cells = drain_insert_history(&mut rx);
assert!(!cells.is_empty(), "expected pending patch cell to be sent");
let blob = lines_to_single_string(cells.last().unwrap());
assert!(
@@ -468,7 +467,7 @@ fn apply_patch_events_emit_history_cells() {
id: "s1".into(),
msg: EventMsg::PatchApplyBegin(begin),
});
let cells = drain_insert_history(&rx);
let cells = drain_insert_history(&mut rx);
assert!(!cells.is_empty(), "expected applying patch cell to be sent");
let blob = lines_to_single_string(cells.last().unwrap());
assert!(
@@ -487,7 +486,7 @@ fn apply_patch_events_emit_history_cells() {
id: "s1".into(),
msg: EventMsg::PatchApplyEnd(end),
});
let cells = drain_insert_history(&rx);
let cells = drain_insert_history(&mut rx);
assert!(!cells.is_empty(), "expected applied patch cell to be sent");
let blob = lines_to_single_string(cells.last().unwrap());
assert!(
@@ -498,7 +497,7 @@ fn apply_patch_events_emit_history_cells() {
#[test]
fn apply_patch_approval_sends_op_with_submission_id() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// Simulate receiving an approval request with a distinct submission id and call id
let mut changes = HashMap::new();
changes.insert(
@@ -539,7 +538,7 @@ fn apply_patch_approval_sends_op_with_submission_id() {
#[test]
fn apply_patch_full_flow_integration_like() {
let (mut chat, rx, mut op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual();
// 1) Backend requests approval
let mut changes = HashMap::new();
@@ -655,7 +654,7 @@ fn apply_patch_untrusted_shows_approval_modal() {
#[test]
fn apply_patch_request_shows_diff_summary() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// Ensure we are in OnRequest so an approval is surfaced
chat.config.approval_policy = codex_core::protocol::AskForApproval::OnRequest;
@@ -680,7 +679,7 @@ fn apply_patch_request_shows_diff_summary() {
});
// Drain history insertions and verify the diff summary is present
let cells = drain_insert_history(&rx);
let cells = drain_insert_history(&mut rx);
assert!(
!cells.is_empty(),
"expected a history cell with the proposed patch summary"
@@ -702,7 +701,7 @@ fn apply_patch_request_shows_diff_summary() {
#[test]
fn plan_update_renders_history_cell() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
let update = UpdatePlanArgs {
explanation: Some("Adapting plan".to_string()),
plan: vec![
@@ -724,7 +723,7 @@ fn plan_update_renders_history_cell() {
id: "sub-1".into(),
msg: EventMsg::PlanUpdate(update),
});
let cells = drain_insert_history(&rx);
let cells = drain_insert_history(&mut rx);
assert!(!cells.is_empty(), "expected plan update cell to be sent");
let blob = lines_to_single_string(cells.last().unwrap());
assert!(
@@ -738,7 +737,7 @@ fn plan_update_renders_history_cell() {
#[test]
fn headers_emitted_on_stream_begin_for_answer_and_reasoning() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// Answer: no header until a newline commit
chat.handle_codex_event(Event {
@@ -796,7 +795,7 @@ fn headers_emitted_on_stream_begin_for_answer_and_reasoning() {
);
// Reasoning: header immediately
let (mut chat2, rx2, _op_rx2) = make_chatwidget_manual();
let (mut chat2, mut rx2, _op_rx2) = make_chatwidget_manual();
chat2.handle_codex_event(Event {
id: "sub-b".into(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
@@ -826,7 +825,7 @@ fn headers_emitted_on_stream_begin_for_answer_and_reasoning() {
#[test]
fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// Begin turn
chat.handle_codex_event(Event {
@@ -858,7 +857,7 @@ fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
}),
});
let cells = drain_insert_history(&rx);
let cells = drain_insert_history(&mut rx);
let mut header_count = 0usize;
let mut combined = String::new();
for lines in &cells {
@@ -894,7 +893,7 @@ fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
#[test]
fn final_reasoning_then_message_without_deltas_are_rendered() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// No deltas; only final reasoning followed by final message.
chat.handle_codex_event(Event {
@@ -911,7 +910,7 @@ fn final_reasoning_then_message_without_deltas_are_rendered() {
});
// Drain history and snapshot the combined visible content.
let cells = drain_insert_history(&rx);
let cells = drain_insert_history(&mut rx);
let combined = cells
.iter()
.map(|lines| lines_to_single_string(lines))
@@ -921,7 +920,7 @@ fn final_reasoning_then_message_without_deltas_are_rendered() {
#[test]
fn deltas_then_same_final_message_are_rendered_snapshot() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
// Stream some reasoning deltas first.
chat.handle_codex_event(Event {
@@ -972,7 +971,7 @@ fn deltas_then_same_final_message_are_rendered_snapshot() {
// Snapshot the combined visible content to ensure we render as expected
// when deltas are followed by the identical final message.
let cells = drain_insert_history(&rx);
let cells = drain_insert_history(&mut rx);
let combined = cells
.iter()
.map(|lines| lines_to_single_string(lines))