Skip to content

Commit

Permalink
Move raw output into a separate file
Browse files Browse the repository at this point in the history
Summary: Split long file.

Reviewed By: bobyangyf

Differential Revision: D38682729

fbshipit-source-id: c7b9b30b455d5ac3ec12cc902cba0aed7ceaf2de
  • Loading branch information
stepancheg authored and facebook-github-bot committed Aug 16, 2022
1 parent 4cbcace commit 3b5eb89
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 72 deletions.
4 changes: 2 additions & 2 deletions cli/src/daemon/server/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ use crate::daemon::server::dice_tracker::BuckDiceTracker;
use crate::daemon::server::file_watcher::FileWatcher;
use crate::daemon::server::heartbeat_guard::HeartbeatGuard;
use crate::daemon::server::host_info;
use crate::daemon::server::RawOuputGuard;
use crate::daemon::server::RawOutputWriter;
use crate::daemon::server::raw_output::RawOuputGuard;
use crate::daemon::server::raw_output::RawOutputWriter;

#[derive(Debug, thiserror::Error)]
enum DaemonCommunicationError {
Expand Down
71 changes: 1 addition & 70 deletions cli/src/daemon/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

use std::collections::HashMap;
use std::io;
use std::io::BufWriter;
use std::io::Write;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
Expand Down Expand Up @@ -94,6 +92,7 @@ mod file_watcher;
pub(crate) mod heartbeat_guard;
mod host_info;
pub(crate) mod lsp;
mod raw_output;
mod snapshot;
pub(crate) mod state;

Expand Down Expand Up @@ -433,74 +432,6 @@ impl<T: Stream<Item = Result<CommandProgress, Status>> + Send> Stream for SyncSt
}
}

pub(crate) struct RawOuputGuard<'a> {
_phantom: PhantomData<&'a mut ServerCommandContext>,
inner: BufWriter<RawOutputWriter>,
}

/// A writer that fires InstantEvent (RawOutput) when `write` function is called.
/// Client is supposed to print the message to its stdout immediately as verbatim.
struct RawOutputWriter {
dispatcher: EventDispatcher,
/// Maximum bytes of a message that is delivered to cli per `write` call
chunk_size: usize,
}

impl<'a> Write for RawOuputGuard<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}

fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}

impl<'a> Drop for RawOuputGuard<'a> {
fn drop(&mut self) {
// This would only happen if we had output that isn't utf-8 and got flushed. For now we live with ignoring
// this.
if let Err(e) = self.inner.flush() {
tracing::error!("Discarded RawOutputWriter output: {:#}", e);
}
}
}

impl RawOutputWriter {
pub(crate) fn new(context: &ServerCommandContext) -> anyhow::Result<Self> {
Ok(Self {
dispatcher: context.base_context.events.dupe(),
chunk_size: RawOutputWriter::get_chunk_size()?,
})
}

fn get_chunk_size() -> anyhow::Result<usize> {
// protobuf recommends each message should be under 1MB
const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
static CHUNK_SIZE: EnvHelper<usize> = EnvHelper::new("BUCK2_DEBUG_RAWOUTPUT_CHUNK_SIZE");
Ok(CHUNK_SIZE.get()?.unwrap_or(DEFAULT_CHUNK_SIZE))
}
}

impl Write for RawOutputWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let len = std::cmp::min(buf.len(), self.chunk_size);
if len > 0 {
let raw_output = buck2_data::RawOutput {
raw_output: String::from_utf8(buf[..len].to_vec()).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidInput, "Output is not utf-8")
})?,
};
self.dispatcher.instant_event(raw_output);
}
Ok(len)
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

struct EventsCtx {
dispatcher: EventDispatcher,
}
Expand Down
87 changes: 87 additions & 0 deletions cli/src/daemon/server/raw_output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under both the MIT license found in the
* LICENSE-MIT file in the root directory of this source tree and the Apache
* License, Version 2.0 found in the LICENSE-APACHE file in the root directory
* of this source tree.
*/

use std::io;
use std::io::BufWriter;
use std::io::Write;
use std::marker::PhantomData;

use buck2_core::env_helper::EnvHelper;
use events::dispatch::EventDispatcher;
use gazebo::dupe::Dupe;

use crate::daemon::server::ctx::ServerCommandContext;

pub(crate) struct RawOuputGuard<'a> {
pub(crate) _phantom: PhantomData<&'a mut ServerCommandContext>,
pub(crate) inner: BufWriter<RawOutputWriter>,
}

/// A writer that fires InstantEvent (RawOutput) when `write` function is called.
/// Client is supposed to print the message to its stdout immediately as verbatim.
pub struct RawOutputWriter {
dispatcher: EventDispatcher,
/// Maximum bytes of a message that is delivered to cli per `write` call
chunk_size: usize,
}

impl<'a> Write for RawOuputGuard<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}

fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}

impl<'a> Drop for RawOuputGuard<'a> {
fn drop(&mut self) {
// This would only happen if we had output that isn't utf-8 and got flushed. For now we live with ignoring
// this.
if let Err(e) = self.inner.flush() {
tracing::error!("Discarded RawOutputWriter output: {:#}", e);
}
}
}

impl RawOutputWriter {
pub(crate) fn new(context: &ServerCommandContext) -> anyhow::Result<Self> {
Ok(Self {
dispatcher: context.base_context.events.dupe(),
chunk_size: RawOutputWriter::get_chunk_size()?,
})
}

fn get_chunk_size() -> anyhow::Result<usize> {
// protobuf recommends each message should be under 1MB
const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
static CHUNK_SIZE: EnvHelper<usize> = EnvHelper::new("BUCK2_DEBUG_RAWOUTPUT_CHUNK_SIZE");
Ok(CHUNK_SIZE.get()?.unwrap_or(DEFAULT_CHUNK_SIZE))
}
}

impl Write for RawOutputWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let len = std::cmp::min(buf.len(), self.chunk_size);
if len > 0 {
let raw_output = buck2_data::RawOutput {
raw_output: String::from_utf8(buf[..len].to_vec()).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidInput, "Output is not utf-8")
})?,
};
self.dispatcher.instant_event(raw_output);
}
Ok(len)
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

0 comments on commit 3b5eb89

Please sign in to comment.