chore: use one write call per item in rollout_writer() (#1679)
Most of the time, we expect the `String` returned by
`serde_json::to_string()` to have extra capacity, so `push('\n')` is
unlikely to allocate, which seems cheaper than an extra `write(2)` call,
on average?
This commit is contained in:
@@ -288,11 +288,13 @@ fn create_log_file(config: &Config, session_id: Uuid) -> std::io::Result<LogFile
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn rollout_writer(
|
async fn rollout_writer(
|
||||||
mut file: tokio::fs::File,
|
file: tokio::fs::File,
|
||||||
mut rx: mpsc::Receiver<RolloutCmd>,
|
mut rx: mpsc::Receiver<RolloutCmd>,
|
||||||
mut meta: Option<SessionMeta>,
|
mut meta: Option<SessionMeta>,
|
||||||
cwd: std::path::PathBuf,
|
cwd: std::path::PathBuf,
|
||||||
) {
|
) -> std::io::Result<()> {
|
||||||
|
let mut writer = JsonlWriter { file };
|
||||||
|
|
||||||
// If we have a meta, collect git info asynchronously and write meta first
|
// If we have a meta, collect git info asynchronously and write meta first
|
||||||
if let Some(session_meta) = meta.take() {
|
if let Some(session_meta) = meta.take() {
|
||||||
let git_info = collect_git_info(&cwd).await;
|
let git_info = collect_git_info(&cwd).await;
|
||||||
@@ -302,11 +304,7 @@ async fn rollout_writer(
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Write the SessionMeta as the first item in the file
|
// Write the SessionMeta as the first item in the file
|
||||||
if let Ok(json) = serde_json::to_string(&session_meta_with_git) {
|
writer.write_line(&session_meta_with_git).await?;
|
||||||
let _ = file.write_all(json.as_bytes()).await;
|
|
||||||
let _ = file.write_all(b"\n").await;
|
|
||||||
let _ = file.flush().await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process rollout commands
|
// Process rollout commands
|
||||||
@@ -320,15 +318,11 @@ async fn rollout_writer(
|
|||||||
| ResponseItem::FunctionCall { .. }
|
| ResponseItem::FunctionCall { .. }
|
||||||
| ResponseItem::FunctionCallOutput { .. }
|
| ResponseItem::FunctionCallOutput { .. }
|
||||||
| ResponseItem::Reasoning { .. } => {
|
| ResponseItem::Reasoning { .. } => {
|
||||||
if let Ok(json) = serde_json::to_string(&item) {
|
writer.write_line(&item).await?;
|
||||||
let _ = file.write_all(json.as_bytes()).await;
|
|
||||||
let _ = file.write_all(b"\n").await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
ResponseItem::Other => {}
|
ResponseItem::Other => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let _ = file.flush().await;
|
|
||||||
}
|
}
|
||||||
RolloutCmd::UpdateState(state) => {
|
RolloutCmd::UpdateState(state) => {
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
@@ -337,18 +331,32 @@ async fn rollout_writer(
|
|||||||
#[serde(flatten)]
|
#[serde(flatten)]
|
||||||
state: &'a SessionStateSnapshot,
|
state: &'a SessionStateSnapshot,
|
||||||
}
|
}
|
||||||
if let Ok(json) = serde_json::to_string(&StateLine {
|
writer
|
||||||
record_type: "state",
|
.write_line(&StateLine {
|
||||||
state: &state,
|
record_type: "state",
|
||||||
}) {
|
state: &state,
|
||||||
let _ = file.write_all(json.as_bytes()).await;
|
})
|
||||||
let _ = file.write_all(b"\n").await;
|
.await?;
|
||||||
let _ = file.flush().await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
RolloutCmd::Shutdown { ack } => {
|
RolloutCmd::Shutdown { ack } => {
|
||||||
let _ = ack.send(());
|
let _ = ack.send(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
struct JsonlWriter {
|
||||||
|
file: tokio::fs::File,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JsonlWriter {
|
||||||
|
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
|
||||||
|
let mut json = serde_json::to_string(item)?;
|
||||||
|
json.push('\n');
|
||||||
|
let _ = self.file.write_all(json.as_bytes()).await;
|
||||||
|
self.file.flush().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user