diff --git a/cli/src/daemon/server/ctx.rs b/cli/src/daemon/server/ctx.rs index 61a3dbef02ce..859cfd0dd6c1 100644 --- a/cli/src/daemon/server/ctx.rs +++ b/cli/src/daemon/server/ctx.rs @@ -70,8 +70,8 @@ use crate::daemon::server::dice_tracker::BuckDiceTracker; use crate::daemon::server::file_watcher::FileWatcher; use crate::daemon::server::heartbeat_guard::HeartbeatGuard; use crate::daemon::server::host_info; -use crate::daemon::server::RawOuputGuard; -use crate::daemon::server::RawOutputWriter; +use crate::daemon::server::raw_output::RawOuputGuard; +use crate::daemon::server::raw_output::RawOutputWriter; #[derive(Debug, thiserror::Error)] enum DaemonCommunicationError { diff --git a/cli/src/daemon/server/mod.rs b/cli/src/daemon/server/mod.rs index 995592efd14e..ab21330b6e5d 100644 --- a/cli/src/daemon/server/mod.rs +++ b/cli/src/daemon/server/mod.rs @@ -11,8 +11,6 @@ use std::collections::HashMap; use std::io; -use std::io::BufWriter; -use std::io::Write; use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; @@ -94,6 +92,7 @@ mod file_watcher; pub(crate) mod heartbeat_guard; mod host_info; pub(crate) mod lsp; +mod raw_output; mod snapshot; pub(crate) mod state; @@ -433,74 +432,6 @@ impl> + Send> Stream for SyncSt } } -pub(crate) struct RawOuputGuard<'a> { - _phantom: PhantomData<&'a mut ServerCommandContext>, - inner: BufWriter, -} - -/// A writer that fires InstantEvent (RawOutput) when `write` function is called. -/// Client is supposed to print the message to its stdout immediately as verbatim. -struct RawOutputWriter { - dispatcher: EventDispatcher, - /// Maximum bytes of a message that is delivered to cli per `write` call - chunk_size: usize, -} - -impl<'a> Write for RawOuputGuard<'a> { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.inner.write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - self.inner.flush() - } -} - -impl<'a> Drop for RawOuputGuard<'a> { - fn drop(&mut self) { - // This would only happen if we had output that isn't utf-8 and got flushed. For now we live with ignoring - // this. - if let Err(e) = self.inner.flush() { - tracing::error!("Discarded RawOutputWriter output: {:#}", e); - } - } -} - -impl RawOutputWriter { - pub(crate) fn new(context: &ServerCommandContext) -> anyhow::Result { - Ok(Self { - dispatcher: context.base_context.events.dupe(), - chunk_size: RawOutputWriter::get_chunk_size()?, - }) - } - - fn get_chunk_size() -> anyhow::Result { - // protobuf recommends each message should be under 1MB - const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024; - static CHUNK_SIZE: EnvHelper = EnvHelper::new("BUCK2_DEBUG_RAWOUTPUT_CHUNK_SIZE"); - Ok(CHUNK_SIZE.get()?.unwrap_or(DEFAULT_CHUNK_SIZE)) - } -} - -impl Write for RawOutputWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - let len = std::cmp::min(buf.len(), self.chunk_size); - if len > 0 { - let raw_output = buck2_data::RawOutput { - raw_output: String::from_utf8(buf[..len].to_vec()).map_err(|_| { - io::Error::new(io::ErrorKind::InvalidInput, "Output is not utf-8") - })?, - }; - self.dispatcher.instant_event(raw_output); - } - Ok(len) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - struct EventsCtx { dispatcher: EventDispatcher, } diff --git a/cli/src/daemon/server/raw_output.rs b/cli/src/daemon/server/raw_output.rs new file mode 100644 index 000000000000..4cae31f79586 --- /dev/null +++ b/cli/src/daemon/server/raw_output.rs @@ -0,0 +1,87 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under both the MIT license found in the + * LICENSE-MIT file in the root directory of this source tree and the Apache + * License, Version 2.0 found in the LICENSE-APACHE file in the root directory + * of this source tree. + */ + +use std::io; +use std::io::BufWriter; +use std::io::Write; +use std::marker::PhantomData; + +use buck2_core::env_helper::EnvHelper; +use events::dispatch::EventDispatcher; +use gazebo::dupe::Dupe; + +use crate::daemon::server::ctx::ServerCommandContext; + +pub(crate) struct RawOuputGuard<'a> { + pub(crate) _phantom: PhantomData<&'a mut ServerCommandContext>, + pub(crate) inner: BufWriter, +} + +/// A writer that fires InstantEvent (RawOutput) when `write` function is called. +/// Client is supposed to print the message to its stdout immediately as verbatim. +pub struct RawOutputWriter { + dispatcher: EventDispatcher, + /// Maximum bytes of a message that is delivered to cli per `write` call + chunk_size: usize, +} + +impl<'a> Write for RawOuputGuard<'a> { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} + +impl<'a> Drop for RawOuputGuard<'a> { + fn drop(&mut self) { + // This would only happen if we had output that isn't utf-8 and got flushed. For now we live with ignoring + // this. + if let Err(e) = self.inner.flush() { + tracing::error!("Discarded RawOutputWriter output: {:#}", e); + } + } +} + +impl RawOutputWriter { + pub(crate) fn new(context: &ServerCommandContext) -> anyhow::Result { + Ok(Self { + dispatcher: context.base_context.events.dupe(), + chunk_size: RawOutputWriter::get_chunk_size()?, + }) + } + + fn get_chunk_size() -> anyhow::Result { + // protobuf recommends each message should be under 1MB + const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024; + static CHUNK_SIZE: EnvHelper = EnvHelper::new("BUCK2_DEBUG_RAWOUTPUT_CHUNK_SIZE"); + Ok(CHUNK_SIZE.get()?.unwrap_or(DEFAULT_CHUNK_SIZE)) + } +} + +impl Write for RawOutputWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let len = std::cmp::min(buf.len(), self.chunk_size); + if len > 0 { + let raw_output = buck2_data::RawOutput { + raw_output: String::from_utf8(buf[..len].to_vec()).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidInput, "Output is not utf-8") + })?, + }; + self.dispatcher.instant_event(raw_output); + } + Ok(len) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +}