Skip to content

Commit

Permalink
newtype LirId; shrink ComputeEvent::LirMapping per @antiguru
Browse files Browse the repository at this point in the history
  • Loading branch information
mgree committed Oct 31, 2024
1 parent 5854a26 commit a79f9e6
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 207 deletions.
2 changes: 1 addition & 1 deletion src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2960,7 +2960,7 @@ impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
self.operators.get(&(lir_id, worker_id)).map(|hydrated| {
Row::pack_slice(&[
Datum::String(&self.collection_id.to_string()),
Datum::UInt64(lir_id),
Datum::UInt64(lir_id.into()),
Datum::String(&self.replica_id.to_string()),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::from(*hydrated),
Expand Down
31 changes: 30 additions & 1 deletion src/compute-types/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,26 @@ impl AvailableCollections {
}

/// An identifier for an LIR node.
pub type LirId = u64;
#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
pub struct LirId(NonZeroU64);

impl LirId {
fn as_u64(&self) -> u64 {
self.0.into()
}
}

impl From<LirId> for u64 {
fn from(value: LirId) -> Self {
value.as_u64()
}
}

impl std::fmt::Display for LirId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

/// A rendering plan with as much conditional logic as possible removed.
#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
Expand Down Expand Up @@ -504,6 +523,16 @@ impl Plan {
}
}

impl Arbitrary for LirId {
type Strategy = BoxedStrategy<LirId>;
type Parameters = ();

fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
let lir_id = NonZeroU64::arbitrary();
lir_id.prop_map(LirId).boxed()
}
}

impl Arbitrary for Plan {
type Strategy = BoxedStrategy<Plan>;
type Parameters = ();
Expand Down
65 changes: 41 additions & 24 deletions src/compute-types/src/plan/flat_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,19 +991,29 @@ impl Arbitrary for FlatPlan {
}
}

impl RustType<u64> for LirId {
fn into_proto(&self) -> u64 {
u64::from(self.0)
}

fn from_proto(proto: u64) -> Result<Self, mz_proto::TryFromProtoError> {
Ok(Self(proto.try_into()?))
}
}

impl RustType<ProtoFlatPlan> for FlatPlan {
fn into_proto(&self) -> ProtoFlatPlan {
ProtoFlatPlan {
steps: self.steps.into_proto(),
root: self.root,
root: self.root.into_proto(),
topological_order: self.topological_order.into_proto(),
}
}

fn from_proto(proto: ProtoFlatPlan) -> Result<Self, mz_proto::TryFromProtoError> {
Ok(Self {
steps: proto.steps.into_rust()?,
root: proto.root,
root: LirId::from_proto(proto.root)?,
topological_order: proto.topological_order.into_rust()?,
})
}
Expand All @@ -1012,13 +1022,16 @@ impl RustType<ProtoFlatPlan> for FlatPlan {
impl ProtoMapEntry<LirId, FlatPlanStep> for proto_flat_plan::ProtoStep {
fn from_rust(entry: (&LirId, &FlatPlanStep)) -> Self {
Self {
id: *entry.0,
id: entry.0.into_proto(),
step: Some(entry.1.into_proto()),
}
}

fn into_rust(self) -> Result<(LirId, FlatPlanStep), TryFromProtoError> {
Ok((self.id, self.step.into_rust_if_some("ProtoStep::step")?))
Ok((
LirId::from_proto(self.id)?,
self.step.into_rust_if_some("ProtoStep::step")?,
))
}
}

Expand Down Expand Up @@ -1070,8 +1083,8 @@ impl RustType<ProtoFlatPlanNode> for FlatPlanNode {
}),
Self::Let { id, value, body } => Kind::Let(ProtoLet {
id: Some(id.into_proto()),
value: *value,
body: *body,
value: value.into_proto(),
body: body.into_proto(),
}),
Self::LetRec {
ids,
Expand Down Expand Up @@ -1105,15 +1118,15 @@ impl RustType<ProtoFlatPlanNode> for FlatPlanNode {
values: values.into_proto(),
limits: proto_limits,
limit_is_some: proto_limit_is_some,
body: *body,
body: body.into_proto(),
})
}
Self::Mfp {
input,
mfp,
input_key_val,
} => Kind::Mfp(ProtoMfp {
input: *input,
input: input.into_proto(),
mfp: Some(mfp.into_proto()),
input_key_val: input_kv_into(input_key_val),
}),
Expand All @@ -1124,7 +1137,7 @@ impl RustType<ProtoFlatPlanNode> for FlatPlanNode {
mfp_after,
input_key,
} => Kind::FlatMap(ProtoFlatMap {
input: *input,
input: input.into_proto(),
func: Some(func.into_proto()),
exprs: exprs.into_proto(),
mfp_after: Some(mfp_after.into_proto()),
Expand All @@ -1141,22 +1154,24 @@ impl RustType<ProtoFlatPlanNode> for FlatPlanNode {
input_key,
mfp_after,
} => Kind::Reduce(ProtoReduce {
input: *input,
input: input.into_proto(),
key_val_plan: Some(key_val_plan.into_proto()),
plan: Some(plan.into_proto()),
input_key: input_k_into(input_key),
mfp_after: Some(mfp_after.into_proto()),
}),
Self::TopK { input, top_k_plan } => Kind::TopK(ProtoTopK {
input: *input,
input: input.into_proto(),
top_k_plan: Some(top_k_plan.into_proto()),
}),
Self::Negate { input } => Kind::Negate(ProtoNegate { input: *input }),
Self::Negate { input } => Kind::Negate(ProtoNegate {
input: input.into_proto(),
}),
Self::Threshold {
input,
threshold_plan,
} => Kind::Threshold(ProtoThreshold {
input: *input,
input: input.into_proto(),
threshold_plan: Some(threshold_plan.into_proto()),
}),
Self::Union {
Expand All @@ -1172,7 +1187,7 @@ impl RustType<ProtoFlatPlanNode> for FlatPlanNode {
input_key,
input_mfp,
} => Kind::ArrangeBy(ProtoArrangeBy {
input: *input,
input: input.into_proto(),
forms: Some(forms.into_proto()),
input_key: input_k_into(input_key),
input_mfp: Some(input_mfp.into_proto()),
Expand Down Expand Up @@ -1220,8 +1235,8 @@ impl RustType<ProtoFlatPlanNode> for FlatPlanNode {
},
Kind::Let(proto) => Self::Let {
id: proto.id.into_rust_if_some("ProtoLet::id")?,
value: proto.value,
body: proto.body,
value: LirId::from_proto(proto.value)?,
body: LirId::from_proto(proto.body)?,
},
Kind::LetRec(proto) => {
let mut limits = Vec::with_capacity(proto.limits.len());
Expand All @@ -1237,16 +1252,16 @@ impl RustType<ProtoFlatPlanNode> for FlatPlanNode {
ids: proto.ids.into_rust()?,
values: proto.values.into_rust()?,
limits,
body: proto.body,
body: LirId::from_proto(proto.body)?,
}
}
Kind::Mfp(proto) => Self::Mfp {
input: proto.input,
input: LirId::from_proto(proto.input)?,
mfp: proto.mfp.into_rust_if_some("ProtoMfp::mfp")?,
input_key_val: input_kv_try_into(proto.input_key_val)?,
},
Kind::FlatMap(proto) => Self::FlatMap {
input: proto.input,
input: LirId::from_proto(proto.input)?,
func: proto.func.into_rust_if_some("ProtoFlatMap::func")?,
exprs: proto.exprs.into_rust()?,
mfp_after: proto
Expand All @@ -1259,7 +1274,7 @@ impl RustType<ProtoFlatPlanNode> for FlatPlanNode {
plan: proto.plan.into_rust_if_some("ProtoJoin::plan")?,
},
Kind::Reduce(proto) => Self::Reduce {
input: proto.input,
input: LirId::from_proto(proto.input)?,
key_val_plan: proto
.key_val_plan
.into_rust_if_some("ProtoReduce::key_val_plan")?,
Expand All @@ -1268,14 +1283,16 @@ impl RustType<ProtoFlatPlanNode> for FlatPlanNode {
mfp_after: proto.mfp_after.into_rust_if_some("Proto::mfp_after")?,
},
Kind::TopK(proto) => Self::TopK {
input: proto.input,
input: LirId::from_proto(proto.input)?,
top_k_plan: proto
.top_k_plan
.into_rust_if_some("ProtoTopK::top_k_plan")?,
},
Kind::Negate(proto) => Self::Negate { input: proto.input },
Kind::Negate(proto) => Self::Negate {
input: LirId::from_proto(proto.input)?,
},
Kind::Threshold(proto) => Self::Threshold {
input: proto.input,
input: LirId::from_proto(proto.input)?,
threshold_plan: proto
.threshold_plan
.into_rust_if_some("ProtoThreshold::threshold_plan")?,
Expand All @@ -1285,7 +1302,7 @@ impl RustType<ProtoFlatPlanNode> for FlatPlanNode {
consolidate_output: proto.consolidate_output,
},
Kind::ArrangeBy(proto) => Self::ArrangeBy {
input: proto.input,
input: LirId::from_proto(proto.input)?,
forms: proto.forms.into_rust_if_some("ProtoArrangeBy::forms")?,
input_key: input_k_try_into(proto.input_key)?,
input_mfp: proto
Expand Down
8 changes: 6 additions & 2 deletions src/compute-types/src/plan/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Context {
pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
Self {
arrangements: Default::default(),
next_lir_id: 0,
next_lir_id: LirId(std::num::NonZero::new(1).unwrap()),
debug_info: LirDebugInfo {
debug_name,
id: GlobalId::Transient(0),
Expand All @@ -57,7 +57,11 @@ impl Context {

fn allocate_lir_id(&mut self) -> LirId {
let id = self.next_lir_id;
self.next_lir_id += 1;
if let Some(next_lir_id) = self.next_lir_id.0.checked_add(1) {
self.next_lir_id = LirId(next_lir_id);
} else {
panic!("ran out of LirIds")
}
id
}

Expand Down
33 changes: 15 additions & 18 deletions src/compute/src/logging/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,11 @@ pub enum ComputeEvent {
/// The LIR identifier (local to `export_id`).
lir_id: LirId,
/// The LIR operator, as a string (see `FlatPlanNode::humanize`).
/// We use `Box<str>` to reduce the size of the `ComputeEvent` representation.
operator: Box<str>,
/// The LIR identifier of the parent (if any).
/// Since `LirId`s are strictly positive, Rust can steal the low bit.
/// TODO(mgree) write a test to ensure that low bit is stolen
parent_lir_id: Option<LirId>,
/// How nested this operator is.
nesting: u8,
Expand Down Expand Up @@ -423,25 +426,18 @@ pub(super) fn construct<A: Allocate + 'static>(
move |datum| {
let mut scratch1 = String::new();
let mut scratch2 = String::new();
let (span_start, span_end) = match datum.operator_span {
Some((start, end)) => (
Datum::UInt64(u64::cast_from(start)),
Datum::UInt64(u64::cast_from(end)),
),
None => (Datum::Null, Datum::Null),
};
packer.pack_slice(&[
make_string_datum(datum.global_id, &mut scratch1),
Datum::UInt64(datum.lir_id),
Datum::UInt64(datum.lir_id.into()),
Datum::UInt64(u64::cast_from(worker_id)),
make_string_datum(&datum.operator, &mut scratch2),
datum
.parent_lir_id
.map(Datum::UInt64)
.map(|lir_id| Datum::UInt64(lir_id.into()))
.unwrap_or_else(|| Datum::Null),
Datum::UInt16(u16::cast_from(datum.nesting)),
span_start,
span_end,
Datum::UInt64(u64::cast_from(datum.operator_span.0)),
Datum::UInt64(u64::cast_from(datum.operator_span.1)),
])
}
});
Expand Down Expand Up @@ -536,12 +532,13 @@ struct DemuxState<A: Allocate> {
#[derive(Debug)]
struct LirMetadata {
/// The operator rendered as a string.
operator: String,
operator: Box<str>,
parent_lir_id: Option<LirId>,
/// How nested the operator is (for nice indentation).
nesting: u8,
/// The dataflow operator ids, given as start (inclusive) and end (exclusive).
operator_span: Option<(usize, usize)>,
/// If `start == end`, then no operators were used.
operator_span: (usize, usize),
}

impl<A: Allocate> DemuxState<A> {
Expand Down Expand Up @@ -671,10 +668,10 @@ struct ErrorCountDatum {
struct LirMappingDatum {
global_id: GlobalId,
lir_id: LirId,
operator: String,
operator: Box<str>,
parent_lir_id: Option<LirId>,
nesting: u8,
operator_span: Option<(usize, usize)>,
operator_span: (usize, usize),
}

#[derive(Clone)]
Expand Down Expand Up @@ -1119,10 +1116,10 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
&mut self,
global_id: GlobalId,
lir_id: LirId,
operator: String,
operator: Box<str>,
parent_lir_id: Option<LirId>,
nesting: u8,
operator_span: Option<(usize, usize)>,
operator_span: (usize, usize),
) {
// record the state (for the later drop)
self.state
Expand Down Expand Up @@ -1359,6 +1356,6 @@ mod tests {
#[mz_ore::test]
fn test_compute_event_size() {
// This could be a static assertion, but we don't use those yet in this crate.
assert_eq!(96, std::mem::size_of::<ComputeEvent>())
assert_eq!(72, std::mem::size_of::<ComputeEvent>())
}
}
Loading

0 comments on commit a79f9e6

Please sign in to comment.