Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[compute] map LIR to dataflow #29848

Merged
merged 17 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions doc/user/content/sql/system-catalog/mz_introspection.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,19 @@ The `mz_dataflow_channel_operators` view associates [dataflow] channels with the

<!-- RELATION_SPEC_UNDOCUMENTED mz_introspection.mz_dataflow_channel_operators_per_worker -->

## `mz_dataflow_global_ids`

The `mz_dataflow_global_ids` view associates [dataflow] ids with global ids (ids of the form `u8` or `t5`).

<!-- RELATION_SPEC mz_introspection.mz_dataflow_global_ids -->

| Field | Type | Meaning |
|------------- | ------- | -------- |
| `id` | [`uint8`] | The dataflow ID. |
| `global_id` | [`text`] | A global ID associated with that dataflow. |

<!-- RELATION_SPEC_UNDOCUMENTED mz_introspection.mz_compute_dataflow_global_ids_per_worker -->

## `mz_dataflow_operators`

The `mz_dataflow_operators` view describes the [dataflow] operators in the system.
Expand Down Expand Up @@ -292,6 +305,29 @@ through a hierarchical scheme for either aggregation or Top K computations.
| `savings` | [`numeric`] | A conservative estimate of the amount of memory in bytes to be saved by applying the hint. |
| `hint` | [`double precision`] | The hint value that will eliminate `to_cut` levels from the region's hierarchy. |

## `mz_lir_mapping`

The `mz_lir_mapping` view describes the low-level internal representation (LIR) plan that corresponds to global ids.
LIR is a higher-level representation than dataflows; this view is used for profiling and debugging indices and materialized views.
Note that LIR is not a stable interface and may change at any time.
In particular, you should not attempt to parse `operator` descriptions.
LIR nodes are implemented by zero or more dataflow operators with sequential ids.
We use the range `[operator_id_start, operator_id_end)` to record this information.
If an LIR node was implemented without any dataflow operators, `operator_id_start` will be equal to `operator_id_end`.

<!-- RELATION_SPEC mz_introspection.mz_lir_mapping -->
| Field | Type | Meaning
| --------- | -------- | -----------
| global_id | [`text`] | The global ID.
| lir_id | [`uint8`] | The LIR node ID.
| operator | [`text`] | The LIR operator, in the format `OperatorName INPUTS [OPTIONS]`.
| parent_lir_id | [`uint8`] | The parent of this LIR node. May be `NULL`.
| nesting | [`uint2`] | The nesting level of this LIR node.
| operator_id_start | [`uint8`] | The first dataflow operator ID implementing this LIR operator (inclusive).
| operator_id_end | [`uint8`] | The first dataflow operator ID _after_ this LIR operator (exclusive).

<!-- RELATION_SPEC_UNDOCUMENTED mz_introspection.mz_compute_lir_mapping_per_worker -->

## `mz_message_counts`

The `mz_message_counts` view describes the messages and message batches sent and received over the [dataflow] channels in the system.
Expand Down Expand Up @@ -395,6 +431,7 @@ The `mz_scheduling_parks_histogram` view describes a histogram of [dataflow] wor
[`numeric`]: /sql/types/numeric
[`text`]: /sql/types/text
[`uuid`]: /sql/types/uuid
[`uint2`]: /sql/types/uint2
[`uint8`]: /sql/types/uint8
[`uint8 list`]: /sql/types/list
[arrangement]: /get-started/arrangements/#arrangements
Expand Down
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ breaking:
- compute-types/src/dataflows.proto
# reason: does currently not require backward-compatibility
- compute-types/src/plan.proto
# reason: does not currently require backward-compatibility
- compute-types/src/plan/flat_plan.proto
# reason: does currently not require backward-compatibility
- compute-types/src/plan/reduce.proto
# reason: does not currently require backward-compatibility
Expand Down
45 changes: 45 additions & 0 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,15 @@ pub static MZ_COMPUTE_EXPORTS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|
access: vec![PUBLIC_SELECT],
});

pub static MZ_COMPUTE_DATAFLOW_GLOBAL_IDS_PER_WORKER: LazyLock<BuiltinLog> =
LazyLock::new(|| BuiltinLog {
name: "mz_compute_dataflow_global_ids_per_worker",
schema: MZ_INTROSPECTION_SCHEMA,
oid: oid::LOG_MZ_COMPUTE_DATAFLOW_GLOBAL_IDS_PER_WORKER_OID,
variant: LogVariant::Compute(ComputeLog::DataflowGlobal),
access: vec![PUBLIC_SELECT],
});

pub static MZ_COMPUTE_FRONTIERS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
name: "mz_compute_frontiers_per_worker",
schema: MZ_INTROSPECTION_SCHEMA,
Expand Down Expand Up @@ -1953,6 +1962,14 @@ pub static MZ_ACTIVE_PEEKS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| B
access: vec![PUBLIC_SELECT],
});

pub static MZ_COMPUTE_LIR_MAPPING_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
name: "mz_compute_lir_mapping_per_worker",
schema: MZ_INTROSPECTION_SCHEMA,
oid: oid::LOG_MZ_COMPUTE_LIR_MAPPING_PER_WORKER_OID,
variant: LogVariant::Compute(ComputeLog::LirMapping),
access: vec![PUBLIC_SELECT],
});

pub static MZ_PEEK_DURATIONS_HISTOGRAM_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
name: "mz_peek_durations_histogram_raw",
schema: MZ_INTROSPECTION_SCHEMA,
Expand Down Expand Up @@ -4054,6 +4071,30 @@ WHERE worker_id = 0",
access: vec![PUBLIC_SELECT],
});

pub static MZ_DATAFLOW_GLOBAL_IDS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
mgree marked this conversation as resolved.
Show resolved Hide resolved
name: "mz_dataflow_global_ids",
schema: MZ_INTROSPECTION_SCHEMA,
oid: oid::VIEW_MZ_DATAFLOW_GLOBAL_IDS_OID,
column_defs: None,
sql: "
SELECT id, global_id
FROM mz_introspection.mz_compute_dataflow_global_ids_per_worker
WHERE worker_id = 0",
access: vec![PUBLIC_SELECT],
});

pub static MZ_LIR_MAPPING: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
name: "mz_lir_mapping",
schema: MZ_INTROSPECTION_SCHEMA,
oid: oid::VIEW_MZ_LIR_MAPPING_OID,
column_defs: None,
sql: "
SELECT global_id, lir_id, operator, parent_lir_id, nesting, operator_id_start, operator_id_end
FROM mz_introspection.mz_compute_lir_mapping_per_worker
WHERE worker_id = 0",
access: vec![PUBLIC_SELECT],
});

pub static MZ_DATAFLOW_OPERATOR_DATAFLOWS_PER_WORKER: LazyLock<BuiltinView> =
LazyLock::new(|| BuiltinView {
name: "mz_dataflow_operator_dataflows_per_worker",
Expand Down Expand Up @@ -9095,6 +9136,7 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::Log(&MZ_DATAFLOW_ADDRESSES_PER_WORKER),
Builtin::Log(&MZ_DATAFLOW_OPERATOR_REACHABILITY_RAW),
Builtin::Log(&MZ_COMPUTE_EXPORTS_PER_WORKER),
Builtin::Log(&MZ_COMPUTE_DATAFLOW_GLOBAL_IDS_PER_WORKER),
Builtin::Log(&MZ_MESSAGE_COUNTS_RECEIVED_RAW),
Builtin::Log(&MZ_MESSAGE_COUNTS_SENT_RAW),
Builtin::Log(&MZ_MESSAGE_BATCH_COUNTS_RECEIVED_RAW),
Expand Down Expand Up @@ -9190,6 +9232,7 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::View(&MZ_DATAFLOW_ADDRESSES),
Builtin::View(&MZ_DATAFLOW_CHANNELS),
Builtin::View(&MZ_DATAFLOW_OPERATORS),
Builtin::View(&MZ_DATAFLOW_GLOBAL_IDS),
Builtin::View(&MZ_DATAFLOW_OPERATOR_DATAFLOWS_PER_WORKER),
Builtin::View(&MZ_DATAFLOW_OPERATOR_DATAFLOWS),
Builtin::View(&MZ_OBJECT_TRANSITIVE_DEPENDENCIES),
Expand Down Expand Up @@ -9369,6 +9412,8 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::View(&MZ_COMPUTE_ERROR_COUNTS),
Builtin::Source(&MZ_COMPUTE_ERROR_COUNTS_RAW_UNIFIED),
Builtin::Source(&MZ_COMPUTE_HYDRATION_TIMES),
Builtin::Log(&MZ_COMPUTE_LIR_MAPPING_PER_WORKER),
Builtin::View(&MZ_LIR_MAPPING),
Builtin::View(&MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES),
Builtin::Source(&MZ_CLUSTER_REPLICA_FRONTIERS),
Builtin::View(&MZ_COMPUTE_HYDRATION_STATUSES),
Expand Down
2 changes: 1 addition & 1 deletion src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2969,7 +2969,7 @@ impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
self.operators.get(&(lir_id, worker_id)).map(|hydrated| {
Row::pack_slice(&[
Datum::String(&self.collection_id.to_string()),
Datum::UInt64(lir_id),
Datum::UInt64(lir_id.into()),
Datum::String(&self.replica_id.to_string()),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::from(*hydrated),
Expand Down
2 changes: 2 additions & 0 deletions src/compute-client/src/logging.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ message ProtoComputeLog {
google.protobuf.Empty shutdown_duration = 11;
google.protobuf.Empty error_count = 12;
google.protobuf.Empty hydration_time = 13;
google.protobuf.Empty lir_mapping = 14;
google.protobuf.Empty dataflow_global = 15;
}
}
message ProtoLogVariant {
Expand Down
33 changes: 32 additions & 1 deletion src/compute-client/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ pub enum ComputeLog {
ErrorCount,
/// Hydration times of exported collections.
HydrationTime,
/// Mappings from `GlobalId`/`LirId`` pairs to dataflow addresses.
LirMapping,
/// Mappings from dataflows to `GlobalId`s.
DataflowGlobal,
}

impl RustType<ProtoComputeLog> for ComputeLog {
Expand All @@ -321,6 +325,8 @@ impl RustType<ProtoComputeLog> for ComputeLog {
ComputeLog::ShutdownDuration => ShutdownDuration(()),
ComputeLog::ErrorCount => ErrorCount(()),
ComputeLog::HydrationTime => HydrationTime(()),
ComputeLog::LirMapping => LirMapping(()),
ComputeLog::DataflowGlobal => DataflowGlobal(()),
}),
}
}
Expand All @@ -339,6 +345,8 @@ impl RustType<ProtoComputeLog> for ComputeLog {
Some(ShutdownDuration(())) => Ok(ComputeLog::ShutdownDuration),
Some(ErrorCount(())) => Ok(ComputeLog::ErrorCount),
Some(HydrationTime(())) => Ok(ComputeLog::HydrationTime),
Some(LirMapping(())) => Ok(ComputeLog::LirMapping),
Some(DataflowGlobal(())) => Ok(ComputeLog::DataflowGlobal),
None => Err(TryFromProtoError::missing_field("ProtoComputeLog::kind")),
}
}
Expand All @@ -360,7 +368,11 @@ impl LogVariant {
.unwrap_or_else(|| (0..arity).collect())
}

/// TODO(database-issues#7533): Add documentation.
/// Relation schemas for the logs.
///
/// This types need to agree with the values that are produced
/// in `logging::compute::construct` and with the description in
/// `catalog/src/builtin.rs`.
pub fn desc(&self) -> RelationDesc {
match self {
LogVariant::Timely(TimelyLog::Operates) => RelationDesc::builder()
Expand Down Expand Up @@ -521,6 +533,25 @@ impl LogVariant {
.with_column("time_ns", ScalarType::UInt64.nullable(true))
.with_key(vec![0, 1])
.finish(),

LogVariant::Compute(ComputeLog::LirMapping) => RelationDesc::builder()
.with_column("global_id", ScalarType::String.nullable(false))
.with_column("lir_id", ScalarType::UInt64.nullable(false))
.with_column("worker_id", ScalarType::UInt64.nullable(false))
.with_column("operator", ScalarType::String.nullable(false))
.with_column("parent_lir_id", ScalarType::UInt64.nullable(true))
.with_column("nesting", ScalarType::UInt16.nullable(false))
.with_column("operator_id_start", ScalarType::UInt64.nullable(true))
.with_column("operator_id_end", ScalarType::UInt64.nullable(true))
.with_key(vec![0, 1, 2])
.finish(),

LogVariant::Compute(ComputeLog::DataflowGlobal) => RelationDesc::builder()
.with_column("id", ScalarType::UInt64.nullable(false))
.with_column("worker_id", ScalarType::UInt64.nullable(false))
.with_column("global_id", ScalarType::String.nullable(false))
.with_key(vec![0, 1])
.finish(),
}
}
}
Expand Down
43 changes: 42 additions & 1 deletion src/compute-types/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,28 @@ impl AvailableCollections {
}

/// An identifier for an LIR node.
pub type LirId = u64;
///
/// LirIds start at 1, not 0, which let's us get a better struct packing in `ComputeEvent::LirMapping`.
#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
pub struct LirId(NonZeroU64);

impl LirId {
fn as_u64(&self) -> u64 {
self.0.into()
}
}

impl From<LirId> for u64 {
fn from(value: LirId) -> Self {
value.as_u64()
}
}

impl std::fmt::Display for LirId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

/// A rendering plan with as much conditional logic as possible removed.
#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
Expand Down Expand Up @@ -504,6 +525,16 @@ impl Plan {
}
}

impl Arbitrary for LirId {
type Strategy = BoxedStrategy<LirId>;
type Parameters = ();

fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
let lir_id = NonZeroU64::arbitrary();
lir_id.prop_map(LirId).boxed()
}
}

impl Arbitrary for Plan {
type Strategy = BoxedStrategy<Plan>;
type Parameters = ();
Expand Down Expand Up @@ -1118,6 +1149,16 @@ mod tests {

use super::*;

#[mz_ore::test]
fn test_option_lirid_fits_in_usize() {
let option_lirid_size = std::mem::size_of::<Option<LirId>>();
let usize_size = std::mem::size_of::<usize>();
assert!(
option_lirid_size <= usize_size,
"Option<LirId> (size {option_lirid_size}) should fit in usize (size {usize_size})"
);
}

proptest! {
#![proptest_config(ProptestConfig::with_cases(10))]
#[mz_ore::test]
Expand Down
14 changes: 11 additions & 3 deletions src/compute-types/src/plan/flat_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// buf breaking: ignore (does not currently require backward-compatibility)

syntax = "proto3";

package mz_compute_types.plan.flat_plan;
Expand All @@ -23,16 +25,22 @@ import "expr/src/scalar.proto";
import "repr/src/row.proto";

message ProtoFlatPlan {
message ProtoNode {
message ProtoStep {
uint64 id = 1;
ProtoFlatPlanNode node = 2;
ProtoFlatPlanStep step = 2;
}

repeated ProtoNode nodes = 1;
repeated ProtoStep steps = 1;
uint64 root = 2;
repeated uint64 topological_order = 3;
}

message ProtoFlatPlanStep {
ProtoFlatPlanNode node = 1;
optional uint64 parent = 2;
uint32 nesting = 3;
}

message ProtoFlatPlanNode {
message ProtoConstant {
ProtoConstantRows rows = 1;
Expand Down
Loading
Loading