Skip to content

Commit

Permalink
use timely-differential#593 to use operator spans instead of addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
mgree committed Oct 22, 2024
1 parent c26c135 commit 002cd64
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 58 deletions.
30 changes: 25 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ debug = 2
# version of Materialize.
[patch.crates-io]
# Projects that do not reliably release to crates.io.
timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow.git", rev = "0d453df386ed6589d2f970540c3e1793f63d79f0" }
timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4027,7 +4027,7 @@ pub static MZ_LIR_MAPPING: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView
oid: oid::VIEW_MZ_LIR_MAPPING_OID,
column_defs: None,
sql: "
SELECT global_id, lir_id, operator, address
SELECT global_id, lir_id, operator, operator_id_start, operator_id_end
FROM mz_internal.mz_compute_lir_mapping_per_worker
WHERE worker_id = 0",
access: vec![PUBLIC_SELECT],
Expand Down
10 changes: 2 additions & 8 deletions src/compute-client/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,8 @@ impl LogVariant {
.with_column("lir_id", ScalarType::UInt64.nullable(false))
.with_column("worker_id", ScalarType::UInt64.nullable(false))
.with_column("operator", ScalarType::String.nullable(false))
.with_column(
"address",
ScalarType::List {
element_type: Box::new(ScalarType::UInt64),
custom_id: None,
}
.nullable(false),
)
.with_column("operator_id_start", ScalarType::UInt64.nullable(true))
.with_column("operator_id_end", ScalarType::UInt64.nullable(true))
.with_key(vec![0, 1])
.finish(),

Expand Down
73 changes: 33 additions & 40 deletions src/compute/src/logging/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ pub enum ComputeEvent {
lir_id: LirId,
/// The LIR operator, as a string (see `FlatPlanNode::humanize`).
operator: String,
/// Operator address.
address: Rc<[usize]>,
/// Operator id span; may not be present if not operators were rendered.
operator_span: Option<(usize, usize)>,
},
DataflowGlobal {
/// The identifier of the dataflow.
Expand Down Expand Up @@ -413,35 +413,30 @@ pub(super) fn construct<A: Allocate + 'static>(
}
});

let mut packer = PermutedRowPacker::new(ComputeLog::LirMapping);
let packer = PermutedRowPacker::new(ComputeLog::LirMapping);
let lir_mapping = lir_mapping.as_collection().map({
move |datum| {
// we can't use `pack_slice` because we need `RowPacker::push_list`
packer.pack_by_index(|packer, index| match index {
0 => {
let mut scratch = String::new();
packer.push(make_string_datum(datum.global_id, &mut scratch))
}
1 => packer.push(Datum::UInt64(datum.lir_id)),
2 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
3 => {
let mut scratch = String::new();
packer.push(make_string_datum(&datum.operator, &mut scratch))
}
4 => packer.push_list(
datum
.address
.iter()
.copied()
.map(u64::cast_from)
.map(Datum::UInt64),
let mut scratch1 = String::new();
let mut scratch2 = String::new();
let (span_start, span_end) = match datum.operator_span {
Some((start, end)) => (
Datum::UInt64(u64::cast_from(start)),
Datum::UInt64(u64::cast_from(end)),
),
_ => unreachable!("bad index {index} in lir_mapping"),
})
None => (Datum::Null, Datum::Null),
};
packer.pack_slice(&[
make_string_datum(datum.global_id, &mut scratch1),
Datum::UInt64(datum.lir_id),
Datum::UInt64(u64::cast_from(worker_id)),
make_string_datum(&datum.operator, &mut scratch2),
span_start,
span_end,
])
}
});

let mut packer = PermutedRowPacker::new(ComputeLog::DataflowGlobal);
let packer = PermutedRowPacker::new(ComputeLog::DataflowGlobal);
let dataflow_global_ids = dataflow_global_ids.as_collection().map({
move |datum| {
let mut scratch = String::new();
Expand Down Expand Up @@ -521,8 +516,8 @@ struct DemuxState<A: Allocate> {
peek_stash: BTreeMap<Uuid, Duration>,
/// Arrangement size stash.
arrangement_size: BTreeMap<usize, ArrangementSizeState>,
/// LIR -> address mapping.
lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, (String, Vec<usize>)>>,
/// LIR -> operator span mapping.
lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, (String, Option<(usize, usize)>)>>,
/// Dataflow -> `GlobalId` mapping (many-to-one)/
dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
}
Expand Down Expand Up @@ -655,7 +650,7 @@ struct LirMappingDatum {
global_id: GlobalId,
lir_id: LirId,
operator: String,
address: Vec<usize>,
operator_span: Option<(usize, usize)>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -737,8 +732,8 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
global_id,
lir_id,
operator,
address,
} => self.handle_lir_mapping(global_id, lir_id, operator, address),
operator_span,
} => self.handle_lir_mapping(global_id, lir_id, operator, operator_span),
DataflowGlobal { id, global_id } => self.handle_dataflow_global(id, global_id),
}
}
Expand Down Expand Up @@ -848,12 +843,12 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {

// Remove LIR mapping.
if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
for (lir_id, (operator, address)) in mappings {
for (lir_id, (operator, operator_span)) in mappings {
let datum = LirMappingDatum {
global_id,
lir_id,
operator,
address,
operator_span,
};
self.output.lir_mapping.give((datum, ts, -1));
}
Expand Down Expand Up @@ -1081,29 +1076,27 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
global_id: GlobalId,
lir_id: LirId,
operator: String,
address: Rc<[usize]>,
operator_span: Option<(usize, usize)>,
) {
let address = address.to_vec();

// record the state (for the later drop)
self.state
.lir_mapping
.entry(global_id)
.and_modify(|id_mapping| {
let existing = id_mapping.insert(lir_id, (operator.clone(), address.clone()));
if let Some(old_address) = existing {
error!(%global_id, %lir_id, "lir mapping to {address:?} already registered as {old_address:?}");
let existing = id_mapping.insert(lir_id, (operator.clone(), operator_span.clone()));
if let Some(old_operator_span) = existing {
error!(%global_id, %lir_id, "lir mapping to operator span {operator_span:?} already registered as {old_operator_span:?}");
}
})
.or_insert_with(|| BTreeMap::from([(lir_id, (operator.clone(), address.clone()))]));
.or_insert_with(|| BTreeMap::from([(lir_id, (operator.clone(), operator_span.clone()))]));

// send the datum out
let ts = self.ts();
let datum = LirMappingDatum {
global_id,
lir_id,
operator,
address,
operator_span,
};
self.output.lir_mapping.give((datum, ts, 1));
}
Expand Down
14 changes: 11 additions & 3 deletions src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,9 +862,17 @@ where
// TODO(mgree) need ExprHumanizer in DataflowDescription (ActiveComputeState doesn't have a catalog reference)
let operator = node.humanize(&DummyHumanizer);

let operator_id_start = self.scope.peek_identifier();
let mut bundle = self.render_plan_node(node, &collections);
let operator_id_end = self.scope.peek_identifier();

self.log_lir_mapping(object_id, lir_id, operator, bundle.scope().addr());
let operator_span = if operator_id_start == operator_id_end {
None
} else {
Some((operator_id_start, operator_id_end))
};

self.log_lir_mapping(object_id, lir_id, operator, operator_span);
self.log_operator_hydration(&mut bundle, lir_id);

collections.insert(lir_id, bundle);
Expand Down Expand Up @@ -1084,14 +1092,14 @@ where
global_id: GlobalId,
lir_id: LirId,
operator: String,
address: Rc<[usize]>,
operator_span: Option<(usize, usize)>,
) {
if let Some(logger) = &self.compute_logger {
logger.log(ComputeEvent::LirMapping {
global_id,
lir_id,
operator,
address,
operator_span,
});
}
}
Expand Down

0 comments on commit 002cd64

Please sign in to comment.