Skip to content

Commit

Permalink
[compute] map LIR to dataflow (#29848)
Browse files Browse the repository at this point in the history
This PR introduces two new introspection sources and two new
introspection views. These novel forms of introspection allow us to map
LIR operators down to dataflow operators; we can now attribute existing
introspection data about dataflows to LIR operators.

# Introspection

The two new sources are `ComputeLog` sources that run per worker.

## `mz_introspection.mz_compute_dataflow_global_ids_per_worker`

Maps dataflow identifiers to the global IDs used internally for things
that get built as dataflows.

```
   name    | nullable | type  | comment
-----------+----------+-------+---------
 id        | f        | uint8 | dataflow ID
 worker_id | f        | uint8 |
 global_id | f        | text  |
```

## `mz_introspection.mz_compute_lir_mapping_per_worker`

Tracks attribution information for LIR terms (in terms of `FlatPlan`).

```
       name        | nullable | type  | comment
-------------------+----------+-------+---------
 global_id         | f        | text  |
 lir_id            | f        | uint8 | AST node number
 worker_id         | f        | uint8 |
 operator          | f        | text  | rendered string
 parent_lir_id     | t        | uint8 | parent AST node number
 nesting           | f        | uint2 | nesting (used for indentation)
 operator_id_start | t        | uint8 | first dataflow operator (inclusive)
 operator_id_end   | t        | uint8 | last dataflow oeprator (exclusive)
```

## Views

We use two introspection views to work with these per-worker sources. It
ought to be the case that all workers agree about _this_ metadata
(though they may not agree on, say, the amount of memory a dataflow
operator is using!).

So: these are just views that set `worker_id = 0`.

### `mz_introspection.mz_dataflow_global_ids`

```
   name    | nullable | type  | comment
-----------+----------+-------+---------
 id        | f        | uint8 |
 global_id | f        | text  |
```

### `mz_introspection.mz_lir_mapping`

```
       name        | nullable | type  | comment
-------------------+----------+-------+---------
 global_id         | f        | text  |
 lir_id            | f        | uint8 |
 operator          | f        | text  |
 parent_lir_id     | t        | uint8 |
 nesting           | f        | uint2 |
 operator_id_start | t        | uint8 |
 operator_id_end   | t        | uint8 |
```

# Attributing to LIR

We can see a sample interaction as follows:

```sql
CREATE TABLE t(x INT NOT NULL, y INT, z TEXT);
CREATE VIEW v AS
  SELECT t1.x AS x, t1.z AS z1, t2.z AS z2
  FROM t AS t1, t AS t2
  WHERE t1.x = t2.y;
CREATE INDEX v_idx_x ON v(x);

\! sleep 1

SELECT global_id, lir_id, REPEAT(' ', MAX(nesting) * 2) || operator AS operator, SUM(duration_ns) AS duration, SUM(count) AS count
    FROM           mz_introspection.mz_lir_mapping mlm
         LEFT JOIN mz_introspection.mz_compute_operator_durations_histogram mcodh
         ON (mlm.operator_id_start <= mcodh.id AND mcodh.id < mlm.operator_id_end)
GROUP BY global_id, lir_id, operator
ORDER BY global_id, lir_id DESC;
```

which yields an output like:

```
 global_id | lir_id |          operator          | duration | count
-----------+--------+----------------------------+----------+-------
 u2        | 4      | Join::Differential 1 » 3   |  1261568 |    17
 u2        | 3      |   Arrange 2                |   466944 |    16
 u2        | 2      |     Get::Collection u1     |    69632 |     7
 u2        | 1      |   Arrange 0                |   417792 |    16
 u2        | 0      |     Get::Collection u1     |    73728 |     7
 u3        | 6      | Arrange 5                  |   454656 |    17
 u3        | 5      |   Get::PassArrangements u2 |          |
```

### Motivation

  * This PR adds a known-desirable feature.

The first step in attribution/plan profiling.
MaterializeInc/database-issues#6551
  • Loading branch information
mgree authored Nov 6, 2024
1 parent 8d2610c commit c094caf
Show file tree
Hide file tree
Showing 31 changed files with 1,405 additions and 376 deletions.
37 changes: 37 additions & 0 deletions doc/user/content/sql/system-catalog/mz_introspection.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,19 @@ The `mz_dataflow_channel_operators` view associates [dataflow] channels with the

<!-- RELATION_SPEC_UNDOCUMENTED mz_introspection.mz_dataflow_channel_operators_per_worker -->

## `mz_dataflow_global_ids`

The `mz_dataflow_global_ids` view associates [dataflow] ids with global ids (ids of the form `u8` or `t5`).

<!-- RELATION_SPEC mz_introspection.mz_dataflow_global_ids -->

| Field | Type | Meaning |
|------------- | ------- | -------- |
| `id` | [`uint8`] | The dataflow ID. |
| `global_id` | [`text`] | A global ID associated with that dataflow. |

<!-- RELATION_SPEC_UNDOCUMENTED mz_introspection.mz_compute_dataflow_global_ids_per_worker -->

## `mz_dataflow_operators`

The `mz_dataflow_operators` view describes the [dataflow] operators in the system.
Expand Down Expand Up @@ -292,6 +305,29 @@ through a hierarchical scheme for either aggregation or Top K computations.
| `savings` | [`numeric`] | A conservative estimate of the amount of memory in bytes to be saved by applying the hint. |
| `hint` | [`double precision`] | The hint value that will eliminate `to_cut` levels from the region's hierarchy. |

## `mz_lir_mapping`

The `mz_lir_mapping` view describes the low-level internal representation (LIR) plan that corresponds to global ids.
LIR is a higher-level representation than dataflows; this view is used for profiling and debugging indices and materialized views.
Note that LIR is not a stable interface and may change at any time.
In particular, you should not attempt to parse `operator` descriptions.
LIR nodes are implemented by zero or more dataflow operators with sequential ids.
We use the range `[operator_id_start, operator_id_end)` to record this information.
If an LIR node was implemented without any dataflow operators, `operator_id_start` will be equal to `operator_id_end`.

<!-- RELATION_SPEC mz_introspection.mz_lir_mapping -->
| Field | Type | Meaning
| --------- | -------- | -----------
| global_id | [`text`] | The global ID.
| lir_id | [`uint8`] | The LIR node ID.
| operator | [`text`] | The LIR operator, in the format `OperatorName INPUTS [OPTIONS]`.
| parent_lir_id | [`uint8`] | The parent of this LIR node. May be `NULL`.
| nesting | [`uint2`] | The nesting level of this LIR node.
| operator_id_start | [`uint8`] | The first dataflow operator ID implementing this LIR operator (inclusive).
| operator_id_end | [`uint8`] | The first dataflow operator ID _after_ this LIR operator (exclusive).

<!-- RELATION_SPEC_UNDOCUMENTED mz_introspection.mz_compute_lir_mapping_per_worker -->

## `mz_message_counts`

The `mz_message_counts` view describes the messages and message batches sent and received over the [dataflow] channels in the system.
Expand Down Expand Up @@ -395,6 +431,7 @@ The `mz_scheduling_parks_histogram` view describes a histogram of [dataflow] wor
[`numeric`]: /sql/types/numeric
[`text`]: /sql/types/text
[`uuid`]: /sql/types/uuid
[`uint2`]: /sql/types/uint2
[`uint8`]: /sql/types/uint8
[`uint8 list`]: /sql/types/list
[arrangement]: /get-started/arrangements/#arrangements
Expand Down
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ breaking:
- compute-types/src/dataflows.proto
# reason: does currently not require backward-compatibility
- compute-types/src/plan.proto
# reason: does not currently require backward-compatibility
- compute-types/src/plan/flat_plan.proto
# reason: does currently not require backward-compatibility
- compute-types/src/plan/reduce.proto
# reason: does not currently require backward-compatibility
Expand Down
45 changes: 45 additions & 0 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,15 @@ pub static MZ_COMPUTE_EXPORTS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|
access: vec![PUBLIC_SELECT],
});

pub static MZ_COMPUTE_DATAFLOW_GLOBAL_IDS_PER_WORKER: LazyLock<BuiltinLog> =
LazyLock::new(|| BuiltinLog {
name: "mz_compute_dataflow_global_ids_per_worker",
schema: MZ_INTROSPECTION_SCHEMA,
oid: oid::LOG_MZ_COMPUTE_DATAFLOW_GLOBAL_IDS_PER_WORKER_OID,
variant: LogVariant::Compute(ComputeLog::DataflowGlobal),
access: vec![PUBLIC_SELECT],
});

pub static MZ_COMPUTE_FRONTIERS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
name: "mz_compute_frontiers_per_worker",
schema: MZ_INTROSPECTION_SCHEMA,
Expand Down Expand Up @@ -1953,6 +1962,14 @@ pub static MZ_ACTIVE_PEEKS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| B
access: vec![PUBLIC_SELECT],
});

pub static MZ_COMPUTE_LIR_MAPPING_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
name: "mz_compute_lir_mapping_per_worker",
schema: MZ_INTROSPECTION_SCHEMA,
oid: oid::LOG_MZ_COMPUTE_LIR_MAPPING_PER_WORKER_OID,
variant: LogVariant::Compute(ComputeLog::LirMapping),
access: vec![PUBLIC_SELECT],
});

pub static MZ_PEEK_DURATIONS_HISTOGRAM_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
name: "mz_peek_durations_histogram_raw",
schema: MZ_INTROSPECTION_SCHEMA,
Expand Down Expand Up @@ -4054,6 +4071,30 @@ WHERE worker_id = 0",
access: vec![PUBLIC_SELECT],
});

pub static MZ_DATAFLOW_GLOBAL_IDS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
name: "mz_dataflow_global_ids",
schema: MZ_INTROSPECTION_SCHEMA,
oid: oid::VIEW_MZ_DATAFLOW_GLOBAL_IDS_OID,
column_defs: None,
sql: "
SELECT id, global_id
FROM mz_introspection.mz_compute_dataflow_global_ids_per_worker
WHERE worker_id = 0",
access: vec![PUBLIC_SELECT],
});

pub static MZ_LIR_MAPPING: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
name: "mz_lir_mapping",
schema: MZ_INTROSPECTION_SCHEMA,
oid: oid::VIEW_MZ_LIR_MAPPING_OID,
column_defs: None,
sql: "
SELECT global_id, lir_id, operator, parent_lir_id, nesting, operator_id_start, operator_id_end
FROM mz_introspection.mz_compute_lir_mapping_per_worker
WHERE worker_id = 0",
access: vec![PUBLIC_SELECT],
});

pub static MZ_DATAFLOW_OPERATOR_DATAFLOWS_PER_WORKER: LazyLock<BuiltinView> =
LazyLock::new(|| BuiltinView {
name: "mz_dataflow_operator_dataflows_per_worker",
Expand Down Expand Up @@ -9095,6 +9136,7 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::Log(&MZ_DATAFLOW_ADDRESSES_PER_WORKER),
Builtin::Log(&MZ_DATAFLOW_OPERATOR_REACHABILITY_RAW),
Builtin::Log(&MZ_COMPUTE_EXPORTS_PER_WORKER),
Builtin::Log(&MZ_COMPUTE_DATAFLOW_GLOBAL_IDS_PER_WORKER),
Builtin::Log(&MZ_MESSAGE_COUNTS_RECEIVED_RAW),
Builtin::Log(&MZ_MESSAGE_COUNTS_SENT_RAW),
Builtin::Log(&MZ_MESSAGE_BATCH_COUNTS_RECEIVED_RAW),
Expand Down Expand Up @@ -9190,6 +9232,7 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::View(&MZ_DATAFLOW_ADDRESSES),
Builtin::View(&MZ_DATAFLOW_CHANNELS),
Builtin::View(&MZ_DATAFLOW_OPERATORS),
Builtin::View(&MZ_DATAFLOW_GLOBAL_IDS),
Builtin::View(&MZ_DATAFLOW_OPERATOR_DATAFLOWS_PER_WORKER),
Builtin::View(&MZ_DATAFLOW_OPERATOR_DATAFLOWS),
Builtin::View(&MZ_OBJECT_TRANSITIVE_DEPENDENCIES),
Expand Down Expand Up @@ -9369,6 +9412,8 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::View(&MZ_COMPUTE_ERROR_COUNTS),
Builtin::Source(&MZ_COMPUTE_ERROR_COUNTS_RAW_UNIFIED),
Builtin::Source(&MZ_COMPUTE_HYDRATION_TIMES),
Builtin::Log(&MZ_COMPUTE_LIR_MAPPING_PER_WORKER),
Builtin::View(&MZ_LIR_MAPPING),
Builtin::View(&MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES),
Builtin::Source(&MZ_CLUSTER_REPLICA_FRONTIERS),
Builtin::View(&MZ_COMPUTE_HYDRATION_STATUSES),
Expand Down
2 changes: 1 addition & 1 deletion src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2969,7 +2969,7 @@ impl<T: ComputeControllerTimestamp> ReplicaCollectionIntrospection<T> {
self.operators.get(&(lir_id, worker_id)).map(|hydrated| {
Row::pack_slice(&[
Datum::String(&self.collection_id.to_string()),
Datum::UInt64(lir_id),
Datum::UInt64(lir_id.into()),
Datum::String(&self.replica_id.to_string()),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::from(*hydrated),
Expand Down
2 changes: 2 additions & 0 deletions src/compute-client/src/logging.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ message ProtoComputeLog {
google.protobuf.Empty shutdown_duration = 11;
google.protobuf.Empty error_count = 12;
google.protobuf.Empty hydration_time = 13;
google.protobuf.Empty lir_mapping = 14;
google.protobuf.Empty dataflow_global = 15;
}
}
message ProtoLogVariant {
Expand Down
33 changes: 32 additions & 1 deletion src/compute-client/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ pub enum ComputeLog {
ErrorCount,
/// Hydration times of exported collections.
HydrationTime,
/// Mappings from `GlobalId`/`LirId`` pairs to dataflow addresses.
LirMapping,
/// Mappings from dataflows to `GlobalId`s.
DataflowGlobal,
}

impl RustType<ProtoComputeLog> for ComputeLog {
Expand All @@ -321,6 +325,8 @@ impl RustType<ProtoComputeLog> for ComputeLog {
ComputeLog::ShutdownDuration => ShutdownDuration(()),
ComputeLog::ErrorCount => ErrorCount(()),
ComputeLog::HydrationTime => HydrationTime(()),
ComputeLog::LirMapping => LirMapping(()),
ComputeLog::DataflowGlobal => DataflowGlobal(()),
}),
}
}
Expand All @@ -339,6 +345,8 @@ impl RustType<ProtoComputeLog> for ComputeLog {
Some(ShutdownDuration(())) => Ok(ComputeLog::ShutdownDuration),
Some(ErrorCount(())) => Ok(ComputeLog::ErrorCount),
Some(HydrationTime(())) => Ok(ComputeLog::HydrationTime),
Some(LirMapping(())) => Ok(ComputeLog::LirMapping),
Some(DataflowGlobal(())) => Ok(ComputeLog::DataflowGlobal),
None => Err(TryFromProtoError::missing_field("ProtoComputeLog::kind")),
}
}
Expand All @@ -360,7 +368,11 @@ impl LogVariant {
.unwrap_or_else(|| (0..arity).collect())
}

/// TODO(database-issues#7533): Add documentation.
/// Relation schemas for the logs.
///
/// This types need to agree with the values that are produced
/// in `logging::compute::construct` and with the description in
/// `catalog/src/builtin.rs`.
pub fn desc(&self) -> RelationDesc {
match self {
LogVariant::Timely(TimelyLog::Operates) => RelationDesc::builder()
Expand Down Expand Up @@ -521,6 +533,25 @@ impl LogVariant {
.with_column("time_ns", ScalarType::UInt64.nullable(true))
.with_key(vec![0, 1])
.finish(),

LogVariant::Compute(ComputeLog::LirMapping) => RelationDesc::builder()
.with_column("global_id", ScalarType::String.nullable(false))
.with_column("lir_id", ScalarType::UInt64.nullable(false))
.with_column("worker_id", ScalarType::UInt64.nullable(false))
.with_column("operator", ScalarType::String.nullable(false))
.with_column("parent_lir_id", ScalarType::UInt64.nullable(true))
.with_column("nesting", ScalarType::UInt16.nullable(false))
.with_column("operator_id_start", ScalarType::UInt64.nullable(true))
.with_column("operator_id_end", ScalarType::UInt64.nullable(true))
.with_key(vec![0, 1, 2])
.finish(),

LogVariant::Compute(ComputeLog::DataflowGlobal) => RelationDesc::builder()
.with_column("id", ScalarType::UInt64.nullable(false))
.with_column("worker_id", ScalarType::UInt64.nullable(false))
.with_column("global_id", ScalarType::String.nullable(false))
.with_key(vec![0, 1])
.finish(),
}
}
}
Expand Down
43 changes: 42 additions & 1 deletion src/compute-types/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,28 @@ impl AvailableCollections {
}

/// An identifier for an LIR node.
pub type LirId = u64;
///
/// LirIds start at 1, not 0, which let's us get a better struct packing in `ComputeEvent::LirMapping`.
#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
pub struct LirId(NonZeroU64);

impl LirId {
fn as_u64(&self) -> u64 {
self.0.into()
}
}

impl From<LirId> for u64 {
fn from(value: LirId) -> Self {
value.as_u64()
}
}

impl std::fmt::Display for LirId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

/// A rendering plan with as much conditional logic as possible removed.
#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
Expand Down Expand Up @@ -504,6 +525,16 @@ impl Plan {
}
}

impl Arbitrary for LirId {
type Strategy = BoxedStrategy<LirId>;
type Parameters = ();

fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
let lir_id = NonZeroU64::arbitrary();
lir_id.prop_map(LirId).boxed()
}
}

impl Arbitrary for Plan {
type Strategy = BoxedStrategy<Plan>;
type Parameters = ();
Expand Down Expand Up @@ -1118,6 +1149,16 @@ mod tests {

use super::*;

#[mz_ore::test]
fn test_option_lirid_fits_in_usize() {
let option_lirid_size = std::mem::size_of::<Option<LirId>>();
let usize_size = std::mem::size_of::<usize>();
assert!(
option_lirid_size <= usize_size,
"Option<LirId> (size {option_lirid_size}) should fit in usize (size {usize_size})"
);
}

proptest! {
#![proptest_config(ProptestConfig::with_cases(10))]
#[mz_ore::test]
Expand Down
14 changes: 11 additions & 3 deletions src/compute-types/src/plan/flat_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// buf breaking: ignore (does not currently require backward-compatibility)

syntax = "proto3";

package mz_compute_types.plan.flat_plan;
Expand All @@ -23,16 +25,22 @@ import "expr/src/scalar.proto";
import "repr/src/row.proto";

message ProtoFlatPlan {
message ProtoNode {
message ProtoStep {
uint64 id = 1;
ProtoFlatPlanNode node = 2;
ProtoFlatPlanStep step = 2;
}

repeated ProtoNode nodes = 1;
repeated ProtoStep steps = 1;
uint64 root = 2;
repeated uint64 topological_order = 3;
}

message ProtoFlatPlanStep {
ProtoFlatPlanNode node = 1;
optional uint64 parent = 2;
uint32 nesting = 3;
}

message ProtoFlatPlanNode {
message ProtoConstant {
ProtoConstantRows rows = 1;
Expand Down
Loading

0 comments on commit c094caf

Please sign in to comment.