Skip to content

Commit

Permalink
batch lir metadata mapping, shrink ComputeEvent, and avoid work when …
Browse files Browse the repository at this point in the history
…not logging, per @antiguru
  • Loading branch information
mgree committed Nov 4, 2024
1 parent eeccbbf commit e94055a
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 128 deletions.
136 changes: 59 additions & 77 deletions src/compute/src/logging/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,9 @@ pub enum ComputeEvent {
/// tracks the many-to-one relationship between `GlobalId`s and
/// dataflows.
global_id: GlobalId,
/// The LIR identifier (local to `export_id`).
lir_id: LirId,
/// The LIR operator, as a string (see `FlatPlanNode::humanize`).
/// We use `Box<str>` to reduce the size of the `ComputeEvent` representation.
operator: Box<str>,
/// The LIR identifier of the parent (if any).
/// Since `LirId`s are strictly positive, Rust can steal the low bit.
/// TODO(mgree) write a test to ensure that low bit is stolen
parent_lir_id: Option<LirId>,
/// How nested this operator is.
nesting: u8,
/// Operator id span start (inclusive) and end (exclusive).
/// If the two numbers are equal, then no operators were used.
operator_span: (usize, usize),
/// The actual mapping.
/// Represented this way to reduce the size of `ComputeEvent`.
mapping: Box<[(LirId, LirMetadata)]>,
},
DataflowGlobal {
/// The identifier of the dataflow.
Expand Down Expand Up @@ -198,6 +187,38 @@ impl Peek {
}
}

/// Metadata for LIR operators.
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct LirMetadata {
/// The LIR operator, as a string (see `FlatPlanNode::humanize`).
operator: Box<str>,
/// The LIR identifier of the parent (if any).
/// Since `LirId`s are strictly positive, Rust can steal the low bit.
/// (See `test_option_lirid_fits_in_usize`.)
parent_lir_id: Option<LirId>,
/// How nested the operator is (for nice indentation).
nesting: u8,
/// The dataflow operator ids, given as start (inclusive) and end (exclusive).
/// If `start == end`, then no operators were used.
operator_span: (usize, usize),
}

impl LirMetadata {
pub fn new(
operator: Box<str>,
parent_lir_id: Option<LirId>,
nesting: u8,
operator_span: (usize, usize),
) -> Self {
Self {
operator,
parent_lir_id,
nesting,
operator_span,
}
}
}

/// Constructs the logging dataflow for compute logs.
///
/// Params
Expand Down Expand Up @@ -528,19 +549,6 @@ struct DemuxState<A: Allocate> {
dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
}

/// Metadata for LIR operators.
#[derive(Debug)]
struct LirMetadata {
/// The operator rendered as a string.
operator: Box<str>,
parent_lir_id: Option<LirId>,
/// How nested the operator is (for nice indentation).
nesting: u8,
/// The dataflow operator ids, given as start (inclusive) and end (exclusive).
/// If `start == end`, then no operators were used.
operator_span: (usize, usize),
}

impl<A: Allocate> DemuxState<A> {
fn new(worker: Worker<A>) -> Self {
Self {
Expand Down Expand Up @@ -749,21 +757,7 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
DataflowShutdown { dataflow_index } => self.handle_dataflow_shutdown(dataflow_index),
ErrorCount { export_id, diff } => self.handle_error_count(export_id, diff),
Hydration { export_id } => self.handle_hydration(export_id),
LirMapping {
global_id,
lir_id,
operator,
parent_lir_id,
nesting,
operator_span,
} => self.handle_lir_mapping(
global_id,
lir_id,
operator,
parent_lir_id,
nesting,
operator_span,
),
LirMapping { global_id, mapping } => self.handle_lir_mapping(global_id, mapping),
DataflowGlobal { id, global_id } => self.handle_dataflow_global(id, global_id),
}
}
Expand Down Expand Up @@ -1112,48 +1106,36 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
}

/// Indicate that a new LIR operator exists; record the dataflow address it maps to.
fn handle_lir_mapping(
&mut self,
global_id: GlobalId,
lir_id: LirId,
operator: Box<str>,
parent_lir_id: Option<LirId>,
nesting: u8,
operator_span: (usize, usize),
) {
fn handle_lir_mapping(&mut self, global_id: GlobalId, mapping: Box<[(LirId, LirMetadata)]>) {
// record the state (for the later drop)
self.state
.lir_mapping
.entry(global_id)
.and_modify(|id_mapping| {
let existing = id_mapping.insert(lir_id, LirMetadata {
operator: operator.clone(),
parent_lir_id: parent_lir_id.clone(),
nesting,
operator_span: operator_span.clone(),
});
if let Some(old_operator_span) = existing {
error!(%global_id, %lir_id, "lir mapping to operator span {operator_span:?} already registered as {old_operator_span:?}");
}
})
.or_insert_with(|| BTreeMap::from([(lir_id, LirMetadata {
operator: operator.clone(),
parent_lir_id: parent_lir_id.clone(),
nesting,
operator_span: operator_span.clone(),
})]));
.and_modify(|existing_mapping| existing_mapping.extend(mapping.iter().cloned()))
.or_insert_with(|| mapping.iter().cloned().collect::<BTreeMap<_, _>>());

// send the datum out
let ts = self.ts();
let datum = LirMappingDatum {
global_id,
for (
lir_id,
operator,
parent_lir_id,
nesting,
operator_span,
};
self.output.lir_mapping.give((datum, ts, 1));
LirMetadata {
operator,
parent_lir_id,
nesting,
operator_span,
},
) in mapping
{
let datum = LirMappingDatum {
global_id,
lir_id,
operator,
parent_lir_id,
nesting,
operator_span,
};
self.output.lir_mapping.give((datum, ts, 1));
}
}

fn handle_dataflow_global(&mut self, id: usize, global_id: GlobalId) {
Expand Down Expand Up @@ -1356,6 +1338,6 @@ mod tests {
#[mz_ore::test]
fn test_compute_event_size() {
// This could be a static assertion, but we don't use those yet in this crate.
assert_eq!(72, std::mem::size_of::<ComputeEvent>())
assert_eq!(48, std::mem::size_of::<ComputeEvent>())
}
}
72 changes: 40 additions & 32 deletions src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ use crate::arrangement::manager::TraceBundle;
use crate::compute_state::ComputeState;
use crate::extensions::arrange::{KeyCollection, MzArrange};
use crate::extensions::reduce::MzReduce;
use crate::logging::compute::{ComputeEvent, LogDataflowErrors};
use crate::logging::compute::{ComputeEvent, LirMetadata, LogDataflowErrors};
use crate::render::context::{
ArrangementFlavor, Context, MzArrangement, MzArrangementImport, ShutdownToken,
};
Expand Down Expand Up @@ -881,32 +881,56 @@ where
// Rendered collections by their `LirId`.
let mut collections = BTreeMap::new();

// Mappings to send along.
// To save overhead, we'll only compute mappings when we need to,
// which means things get gated behind options. Unfortuantely, that means we
// have several `Option<...>` types that are _all_ `Some` or `None` together,
// but there's no convenient way to express the invariant.
let should_compute_lir_metadata = self.compute_logger.is_some();
let mut lir_mapping_metadata = if should_compute_lir_metadata {
Some(Vec::with_capacity(steps.len()))
} else {
None
};

for lir_id in topological_order {
let step = steps.remove(&lir_id).unwrap();

// TODO(mgree) need ExprHumanizer in DataflowDescription to get nice column names
// ActiveComputeState can't have a catalog reference, so we'll need to capture the names
// in some other structure and have that structure impl ExprHumanizer
let operator = step.node.humanize(&DummyHumanizer);
let metadata = if should_compute_lir_metadata {
let operator: Box<str> = step.node.humanize(&DummyHumanizer).into();
let operator_id_start = self.scope.peek_identifier();
Some((operator, operator_id_start))
} else {
None
};

let operator_id_start = self.scope.peek_identifier();
let mut bundle = self.render_plan_node(step.node, &collections);
let operator_id_end = self.scope.peek_identifier();
let operator_span = (operator_id_start, operator_id_end);

self.log_lir_mapping(
object_id,
lir_id,
operator,
step.parent,
step.nesting,
operator_span,
);

if let Some((operator, operator_id_start)) = metadata {
let operator_id_end = self.scope.peek_identifier();
let operator_span = (operator_id_start, operator_id_end);

if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
lir_mapping_metadata.push((
lir_id,
LirMetadata::new(operator, step.parent, step.nesting, operator_span),
))
}
}

self.log_operator_hydration(&mut bundle, lir_id);

collections.insert(lir_id, bundle);
}

if let Some(lir_mapping_metadata) = lir_mapping_metadata {
let mapping: Box<[(LirId, LirMetadata)]> = lir_mapping_metadata.into();
self.log_lir_mapping(object_id, mapping);
}

collections
.remove(&root_id)
.expect("FlatPlan invariant (1)")
Expand Down Expand Up @@ -1116,25 +1140,9 @@ where
}
}

fn log_lir_mapping(
&self,
global_id: GlobalId,
lir_id: LirId,
operator: String,
parent_lir_id: Option<LirId>,
nesting: u8,
operator_span: (usize, usize),
) {
fn log_lir_mapping(&self, global_id: GlobalId, mapping: Box<[(LirId, LirMetadata)]>) {
if let Some(logger) = &self.compute_logger {
let operator = operator.into();
logger.log(ComputeEvent::LirMapping {
global_id,
lir_id,
operator,
parent_lir_id,
nesting,
operator_span,
});
logger.log(ComputeEvent::LirMapping { global_id, mapping });
}
}

Expand Down
Loading

0 comments on commit e94055a

Please sign in to comment.