From a0923179b1e499a8cfa0478d7cd544195461a07c Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Wed, 4 Sep 2024 12:46:42 +0200 Subject: [PATCH 1/3] Add `mz_cluster_replica_status_history` source This commit adds a new `mz_cluster_replica_status_history` source, which is a log-style version of `mz_cluster_replica_statuses`. This deprecates `mz_cluster_replica_statuses` but doesn't remove it yet, to give us time to migrate internal clients. --- src/adapter/src/coord/message_handler.rs | 21 ++++++++++++++ src/catalog/src/builtin.rs | 28 +++++++++++++++++-- src/environmentd/tests/testdata/http/ws | 4 +-- src/pgrepr-consts/src/oid.rs | 2 ++ src/storage-client/src/controller.rs | 1 + src/storage-client/src/healthcheck.rs | 13 +++++++++ src/storage-controller/src/lib.rs | 13 +++++++++ test/sqllogictest/cluster.slt | 4 +-- .../information_schema_tables.slt | 4 +++ .../mz_catalog_server_index_accounting.slt | 6 ++++ test/sqllogictest/oid.slt | 2 ++ test/testdrive/catalog.td | 3 +- test/testdrive/distinct-arrangements.td | 4 +-- test/testdrive/indexes.td | 1 + 14 files changed, 97 insertions(+), 9 deletions(-) diff --git a/src/adapter/src/coord/message_handler.rs b/src/adapter/src/coord/message_handler.rs index 4d3676ce94025..60e99dd1bb4c6 100644 --- a/src/adapter/src/coord/message_handler.rs +++ b/src/adapter/src/coord/message_handler.rs @@ -26,9 +26,11 @@ use mz_ore::option::OptionExt; use mz_ore::tracing::OpenTelemetryContext; use mz_ore::{soft_assert_or_log, task}; use mz_persist_client::usage::ShardsUsageReferenced; +use mz_repr::{Datum, Row}; use mz_sql::ast::Statement; use mz_sql::names::ResolvedIds; use mz_sql::pure::PurifiedStatement; +use mz_storage_client::controller::IntrospectionType; use mz_storage_types::controller::CollectionMetadata; use opentelemetry::trace::TraceContextExt; use rand::{rngs, Rng, SeedableRng}; @@ -742,6 +744,25 @@ impl Coordinator { }; if event.status != replica_statues[&event.process_id].status { + if !self.controller.read_only() { + let offline_reason = match event.status { + ClusterStatus::Online => None, + ClusterStatus::Offline(None) => None, + ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()), + }; + let row = Row::pack_slice(&[ + Datum::String(&event.replica_id.to_string()), + Datum::UInt64(event.process_id), + Datum::String(event.status.as_kebab_case_str()), + Datum::from(offline_reason.as_deref()), + Datum::TimestampTz(event.time.try_into().expect("must fit")), + ]); + self.controller.storage.append_introspection_updates( + IntrospectionType::ReplicaStatusHistory, + vec![(row, 1)], + ); + } + let old_replica_status = ClusterReplicaStatuses::cluster_replica_status(replica_statues); let old_process_status = replica_statues diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index edbdaad560904..0d2bf2c93daed 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -50,11 +50,11 @@ use mz_sql::session::user::{ MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER_NAME, SYSTEM_USER_NAME, }; use mz_storage_client::controller::IntrospectionType; -use mz_storage_client::healthcheck::REPLICA_METRICS_HISTORY_DESC; use mz_storage_client::healthcheck::{ MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_PREPARED_STATEMENT_HISTORY_DESC, MZ_SESSION_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, - MZ_SQL_TEXT_DESC, MZ_STATEMENT_EXECUTION_HISTORY_DESC, + MZ_SQL_TEXT_DESC, MZ_STATEMENT_EXECUTION_HISTORY_DESC, REPLICA_METRICS_HISTORY_DESC, + REPLICA_STATUS_HISTORY_DESC, }; use mz_storage_client::statistics::{MZ_SINK_STATISTICS_RAW_DESC, MZ_SOURCE_STATISTICS_RAW_DESC}; use rand::Rng; @@ -2727,6 +2727,8 @@ pub static MZ_PENDING_CLUSTER_REPLICAS: LazyLock = LazyLock::new(| access: vec![PUBLIC_SELECT], }); +// TODO(teskje) Remove this table in favor of `MZ_CLUSTER_REPLICA_STATUS_HISTORY`, once internal +// clients have been migrated. pub static MZ_CLUSTER_REPLICA_STATUSES: LazyLock = LazyLock::new(|| BuiltinTable { name: "mz_cluster_replica_statuses", schema: MZ_INTERNAL_SCHEMA, @@ -2745,6 +2747,17 @@ pub static MZ_CLUSTER_REPLICA_STATUSES: LazyLock = LazyLock::new(| access: vec![PUBLIC_SELECT], }); +pub static MZ_CLUSTER_REPLICA_STATUS_HISTORY: LazyLock = + LazyLock::new(|| BuiltinSource { + name: "mz_cluster_replica_status_history", + schema: MZ_INTERNAL_SCHEMA, + oid: oid::SOURCE_MZ_CLUSTER_REPLICA_STATUS_HISTORY_OID, + data_source: IntrospectionType::ReplicaStatusHistory, + desc: REPLICA_STATUS_HISTORY_DESC.clone(), + is_retained_metrics_object: false, + access: vec![PUBLIC_SELECT], + }); + pub static MZ_CLUSTER_REPLICA_SIZES: LazyLock = LazyLock::new(|| BuiltinTable { name: "mz_cluster_replica_sizes", schema: MZ_CATALOG_SCHEMA, @@ -7655,6 +7668,15 @@ ON mz_internal.mz_cluster_replica_statuses (replica_id)", is_retained_metrics_object: true, }; +pub const MZ_CLUSTER_REPLICA_STATUS_HISTORY_IND: BuiltinIndex = BuiltinIndex { + name: "mz_cluster_replica_status_history_ind", + schema: MZ_INTERNAL_SCHEMA, + oid: oid::INDEX_MZ_CLUSTER_REPLICA_STATUS_HISTORY_IND_OID, + sql: "IN CLUSTER mz_catalog_server +ON mz_internal.mz_cluster_replica_status_history (replica_id)", + is_retained_metrics_object: false, +}; + pub const MZ_CLUSTER_REPLICA_METRICS_IND: BuiltinIndex = BuiltinIndex { name: "mz_cluster_replica_metrics_ind", schema: MZ_INTERNAL_SCHEMA, @@ -8097,6 +8119,7 @@ pub static BUILTINS_STATIC: LazyLock>> = LazyLock::ne Builtin::Source(&MZ_CLUSTER_REPLICA_METRICS_HISTORY), Builtin::Table(&MZ_CLUSTER_REPLICA_SIZES), Builtin::Table(&MZ_CLUSTER_REPLICA_STATUSES), + Builtin::Source(&MZ_CLUSTER_REPLICA_STATUS_HISTORY), Builtin::Table(&MZ_INTERNAL_CLUSTER_REPLICAS), Builtin::Table(&MZ_PENDING_CLUSTER_REPLICAS), Builtin::Table(&MZ_AUDIT_EVENTS), @@ -8332,6 +8355,7 @@ pub static BUILTINS_STATIC: LazyLock>> = LazyLock::ne Builtin::Index(&MZ_CLUSTER_REPLICAS_IND), Builtin::Index(&MZ_CLUSTER_REPLICA_SIZES_IND), Builtin::Index(&MZ_CLUSTER_REPLICA_STATUSES_IND), + Builtin::Index(&MZ_CLUSTER_REPLICA_STATUS_HISTORY_IND), Builtin::Index(&MZ_CLUSTER_REPLICA_METRICS_IND), Builtin::Index(&MZ_CLUSTER_REPLICA_METRICS_HISTORY_IND), Builtin::Index(&MZ_CLUSTER_REPLICA_HISTORY_IND), diff --git a/src/environmentd/tests/testdata/http/ws b/src/environmentd/tests/testdata/http/ws index c331c6b6e8991..58dead001bdc1 100644 --- a/src/environmentd/tests/testdata/http/ws +++ b/src/environmentd/tests/testdata/http/ws @@ -402,7 +402,7 @@ ws-text ws-text {"query": "SELECT 1 FROM mz_sources LIMIT 1"} ---- -{"type":"Notice","payload":{"message":"{\n \"plans\": {\n \"raw\": {\n \"text\": \"Finish limit=1 output=[#0]\\n Project (#15)\\n Map (1)\\n Get mz_catalog.mz_sources\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 459\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"data\": [\n 42,\n 1\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n },\n \"optimized\": {\n \"global\": {\n \"text\": \"t72:\\n Finish limit=1 output=[#0]\\n ArrangeBy keys=[[#0]]\\n ReadGlobalFromSameDataflow t71\\n\\nt71:\\n Project (#15)\\n Map (1)\\n ReadIndex on=mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"t72\",\n \"plan\": {\n \"ArrangeBy\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"Transient\": 71\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ],\n \"keys\": []\n },\n \"access_strategy\": \"SameDataflow\"\n }\n },\n \"keys\": [\n [\n {\n \"Column\": 0\n }\n ]\n ]\n }\n }\n },\n {\n \"id\": \"t71\",\n \"plan\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 459\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n },\n \"access_strategy\": {\n \"Index\": [\n [\n {\n \"System\": 717\n },\n \"FullScan\"\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"Ok\": {\n \"data\": [\n 42,\n 1\n ]\n }\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n }\n ],\n \"sources\": []\n }\n },\n \"fast_path\": {\n \"text\": \"Explained Query (fast path):\\n Finish limit=1 output=[#0]\\n Project (#15)\\n Map (1)\\n ReadIndex on=mz_catalog.mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"Explained Query (fast path)\",\n \"plan\": {\n \"PeekExisting\": [\n {\n \"System\": 459\n },\n {\n \"System\": 717\n },\n null,\n {\n \"mfp\": {\n \"expressions\": [\n {\n \"Literal\": [\n {\n \"Ok\": {\n \"data\": [\n 42,\n 1\n ]\n }\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ],\n \"predicates\": [],\n \"projection\": [\n 15\n ],\n \"input_arity\": 15\n }\n }\n ]\n }\n }\n ],\n \"sources\": []\n }\n }\n }\n },\n \"insights\": {\n \"imports\": {\n \"s717\": {\n \"name\": {\n \"schema\": \"mz_catalog\",\n \"item\": \"mz_sources_ind\"\n },\n \"type\": \"compute\"\n }\n },\n \"fast_path_clusters\": {},\n \"fast_path_limit\": null,\n \"persist_count\": []\n },\n \"cluster\": {\n \"name\": \"mz_catalog_server\",\n \"id\": {\n \"System\": 2\n }\n },\n \"redacted_sql\": \"SELECT '' FROM [s459 AS mz_catalog.mz_sources] LIMIT ''\"\n}","code":"MZ001","severity":"notice"}} +{"type":"Notice","payload":{"message":"{\n \"plans\": {\n \"raw\": {\n \"text\": \"Finish limit=1 output=[#0]\\n Project (#15)\\n Map (1)\\n Get mz_catalog.mz_sources\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 459\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"data\": [\n 42,\n 1\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n },\n \"optimized\": {\n \"global\": {\n \"text\": \"t72:\\n Finish limit=1 output=[#0]\\n ArrangeBy keys=[[#0]]\\n ReadGlobalFromSameDataflow t71\\n\\nt71:\\n Project (#15)\\n Map (1)\\n ReadIndex on=mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"t72\",\n \"plan\": {\n \"ArrangeBy\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"Transient\": 71\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ],\n \"keys\": []\n },\n \"access_strategy\": \"SameDataflow\"\n }\n },\n \"keys\": [\n [\n {\n \"Column\": 0\n }\n ]\n ]\n }\n }\n },\n {\n \"id\": \"t71\",\n \"plan\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 459\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n },\n \"access_strategy\": {\n \"Index\": [\n [\n {\n \"System\": 718\n },\n \"FullScan\"\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"Ok\": {\n \"data\": [\n 42,\n 1\n ]\n }\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n }\n ],\n \"sources\": []\n }\n },\n \"fast_path\": {\n \"text\": \"Explained Query (fast path):\\n Finish limit=1 output=[#0]\\n Project (#15)\\n Map (1)\\n ReadIndex on=mz_catalog.mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"Explained Query (fast path)\",\n \"plan\": {\n \"PeekExisting\": [\n {\n \"System\": 459\n },\n {\n \"System\": 718\n },\n null,\n {\n \"mfp\": {\n \"expressions\": [\n {\n \"Literal\": [\n {\n \"Ok\": {\n \"data\": [\n 42,\n 1\n ]\n }\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ],\n \"predicates\": [],\n \"projection\": [\n 15\n ],\n \"input_arity\": 15\n }\n }\n ]\n }\n }\n ],\n \"sources\": []\n }\n }\n }\n },\n \"insights\": {\n \"imports\": {\n \"s718\": {\n \"name\": {\n \"schema\": \"mz_catalog\",\n \"item\": \"mz_sources_ind\"\n },\n \"type\": \"compute\"\n }\n },\n \"fast_path_clusters\": {},\n \"fast_path_limit\": null,\n \"persist_count\": []\n },\n \"cluster\": {\n \"name\": \"mz_catalog_server\",\n \"id\": {\n \"System\": 2\n }\n },\n \"redacted_sql\": \"SELECT '' FROM [s459 AS mz_catalog.mz_sources] LIMIT ''\"\n}","code":"MZ001","severity":"notice"}} {"type":"CommandStarting","payload":{"has_rows":true,"is_streaming":false}} {"type":"Rows","payload":{"columns":[{"name":"?column?","type_oid":23,"type_len":4,"type_mod":-1}]}} {"type":"Row","payload":["1"]} @@ -412,7 +412,7 @@ ws-text ws-text {"query": "SELECT 1 / 0 FROM mz_sources LIMIT 1"} ---- -{"type":"Notice","payload":{"message":"{\n \"plans\": {\n \"raw\": {\n \"text\": \"Finish limit=1 output=[#0]\\n Project (#15)\\n Map ((1 / 0))\\n Get mz_catalog.mz_sources\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 459\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"CallBinary\": {\n \"func\": \"DivInt32\",\n \"expr1\": {\n \"Literal\": [\n {\n \"data\": [\n 42,\n 1\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n },\n \"expr2\": {\n \"Literal\": [\n {\n \"data\": [\n 41\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n }\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n },\n \"optimized\": {\n \"global\": {\n \"text\": \"t75:\\n Finish limit=1 output=[#0]\\n ArrangeBy keys=[[#0]]\\n ReadGlobalFromSameDataflow t74\\n\\nt74:\\n Map (error(\\\"division by zero\\\"))\\n Project ()\\n ReadIndex on=mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"t75\",\n \"plan\": {\n \"ArrangeBy\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"Transient\": 74\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ],\n \"keys\": []\n },\n \"access_strategy\": \"SameDataflow\"\n }\n },\n \"keys\": [\n [\n {\n \"Column\": 0\n }\n ]\n ]\n }\n }\n },\n {\n \"id\": \"t74\",\n \"plan\": {\n \"Map\": {\n \"input\": {\n \"Project\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 459\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n },\n \"access_strategy\": {\n \"Index\": [\n [\n {\n \"System\": 717\n },\n \"FullScan\"\n ]\n ]\n }\n }\n },\n \"outputs\": []\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"Err\": \"DivisionByZero\"\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n }\n }\n ],\n \"sources\": []\n }\n },\n \"fast_path\": {\n \"text\": \"Explained Query (fast path):\\n Finish limit=1 output=[#0]\\n Project (#15)\\n Map (error(\\\"division by zero\\\"))\\n ReadIndex on=mz_catalog.mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"Explained Query (fast path)\",\n \"plan\": {\n \"PeekExisting\": [\n {\n \"System\": 459\n },\n {\n \"System\": 717\n },\n null,\n {\n \"mfp\": {\n \"expressions\": [\n {\n \"Literal\": [\n {\n \"Err\": \"DivisionByZero\"\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ],\n \"predicates\": [],\n \"projection\": [\n 15\n ],\n \"input_arity\": 15\n }\n }\n ]\n }\n }\n ],\n \"sources\": []\n }\n }\n }\n },\n \"insights\": {\n \"imports\": {\n \"s717\": {\n \"name\": {\n \"schema\": \"mz_catalog\",\n \"item\": \"mz_sources_ind\"\n },\n \"type\": \"compute\"\n }\n },\n \"fast_path_clusters\": {},\n \"fast_path_limit\": null,\n \"persist_count\": []\n },\n \"cluster\": {\n \"name\": \"mz_catalog_server\",\n \"id\": {\n \"System\": 2\n }\n },\n \"redacted_sql\": \"SELECT '' / '' FROM [s459 AS mz_catalog.mz_sources] LIMIT ''\"\n}","code":"MZ001","severity":"notice"}} +{"type":"Notice","payload":{"message":"{\n \"plans\": {\n \"raw\": {\n \"text\": \"Finish limit=1 output=[#0]\\n Project (#15)\\n Map ((1 / 0))\\n Get mz_catalog.mz_sources\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 459\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"CallBinary\": {\n \"func\": \"DivInt32\",\n \"expr1\": {\n \"Literal\": [\n {\n \"data\": [\n 42,\n 1\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n },\n \"expr2\": {\n \"Literal\": [\n {\n \"data\": [\n 41\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n }\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n },\n \"optimized\": {\n \"global\": {\n \"text\": \"t75:\\n Finish limit=1 output=[#0]\\n ArrangeBy keys=[[#0]]\\n ReadGlobalFromSameDataflow t74\\n\\nt74:\\n Map (error(\\\"division by zero\\\"))\\n Project ()\\n ReadIndex on=mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"t75\",\n \"plan\": {\n \"ArrangeBy\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"Transient\": 74\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ],\n \"keys\": []\n },\n \"access_strategy\": \"SameDataflow\"\n }\n },\n \"keys\": [\n [\n {\n \"Column\": 0\n }\n ]\n ]\n }\n }\n },\n {\n \"id\": \"t74\",\n \"plan\": {\n \"Map\": {\n \"input\": {\n \"Project\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 459\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n },\n \"access_strategy\": {\n \"Index\": [\n [\n {\n \"System\": 718\n },\n \"FullScan\"\n ]\n ]\n }\n }\n },\n \"outputs\": []\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"Err\": \"DivisionByZero\"\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n }\n }\n ],\n \"sources\": []\n }\n },\n \"fast_path\": {\n \"text\": \"Explained Query (fast path):\\n Finish limit=1 output=[#0]\\n Project (#15)\\n Map (error(\\\"division by zero\\\"))\\n ReadIndex on=mz_catalog.mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"Explained Query (fast path)\",\n \"plan\": {\n \"PeekExisting\": [\n {\n \"System\": 459\n },\n {\n \"System\": 718\n },\n null,\n {\n \"mfp\": {\n \"expressions\": [\n {\n \"Literal\": [\n {\n \"Err\": \"DivisionByZero\"\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ],\n \"predicates\": [],\n \"projection\": [\n 15\n ],\n \"input_arity\": 15\n }\n }\n ]\n }\n }\n ],\n \"sources\": []\n }\n }\n }\n },\n \"insights\": {\n \"imports\": {\n \"s718\": {\n \"name\": {\n \"schema\": \"mz_catalog\",\n \"item\": \"mz_sources_ind\"\n },\n \"type\": \"compute\"\n }\n },\n \"fast_path_clusters\": {},\n \"fast_path_limit\": null,\n \"persist_count\": []\n },\n \"cluster\": {\n \"name\": \"mz_catalog_server\",\n \"id\": {\n \"System\": 2\n }\n },\n \"redacted_sql\": \"SELECT '' / '' FROM [s459 AS mz_catalog.mz_sources] LIMIT ''\"\n}","code":"MZ001","severity":"notice"}} {"type":"CommandStarting","payload":{"has_rows":false,"is_streaming":false}} {"type":"Error","payload":{"message":"division by zero","code":"XX000"}} {"type":"ReadyForQuery","payload":"I"} diff --git a/src/pgrepr-consts/src/oid.rs b/src/pgrepr-consts/src/oid.rs index d80e846205670..9a49c2e1b8d25 100644 --- a/src/pgrepr-consts/src/oid.rs +++ b/src/pgrepr-consts/src/oid.rs @@ -738,3 +738,5 @@ pub const TABLE_MZ_SOURCE_REFERENCES_OID: u32 = 17015; pub const SOURCE_MZ_CLUSTER_REPLICA_METRICS_HISTORY_OID: u32 = 17016; pub const INDEX_MZ_CLUSTER_REPLICA_METRICS_HISTORY_IND_OID: u32 = 17017; pub const VIEW_MZ_CLUSTER_REPLICA_UTILIZATION_HISTORY_OID: u32 = 17018; +pub const SOURCE_MZ_CLUSTER_REPLICA_STATUS_HISTORY_OID: u32 = 17019; +pub const INDEX_MZ_CLUSTER_REPLICA_STATUS_HISTORY_IND_OID: u32 = 17020; diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 72f9ec3230c86..74bcf75bd1b28 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -66,6 +66,7 @@ pub enum IntrospectionType { Frontiers, ReplicaFrontiers, + ReplicaStatusHistory, ReplicaMetricsHistory, // Note that this single-shard introspection source will be changed to per-replica, diff --git a/src/storage-client/src/healthcheck.rs b/src/storage-client/src/healthcheck.rs index 6308fdad75442..3cc161c30d0f0 100644 --- a/src/storage-client/src/healthcheck.rs +++ b/src/storage-client/src/healthcheck.rs @@ -139,6 +139,19 @@ pub static MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC: LazyLock = LazyLock::new(|| { + RelationDesc::builder() + .with_column("replica_id", ScalarType::String.nullable(false)) + .with_column("process_id", ScalarType::UInt64.nullable(false)) + .with_column("status", ScalarType::String.nullable(false)) + .with_column("reason", ScalarType::String.nullable(true)) + .with_column( + "occurred_at", + ScalarType::TimestampTz { precision: None }.nullable(false), + ) + .finish() +}); + pub static REPLICA_METRICS_HISTORY_DESC: LazyLock = LazyLock::new(|| { RelationDesc::builder() .with_column("replica_id", ScalarType::String.nullable(false)) diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 2b65d1c8868ef..4f5592f3b6ce4 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -2857,6 +2857,7 @@ where IntrospectionType::SourceStatusHistory | IntrospectionType::SinkStatusHistory | IntrospectionType::PrivatelinkConnectionStatusHistory + | IntrospectionType::ReplicaStatusHistory | IntrospectionType::ReplicaMetricsHistory => { if !self.read_only { self.prepare_introspection_collection( @@ -3075,6 +3076,14 @@ where ) .await; } + IntrospectionType::ReplicaStatusHistory => { + let write_handle = write_handle.expect("filled in by caller"); + self.partially_truncate_status_history( + IntrospectionType::ReplicaStatusHistory, + write_handle, + ) + .await; + } // Truncate compute-maintained collections. IntrospectionType::ComputeDependencies @@ -3194,6 +3203,10 @@ where .expect("schema has not changed") .0, ), + IntrospectionType::ReplicaStatusHistory => { + // FIXME: needs some rewriting to return a two-column key (replica_id, process_id) + return BTreeMap::new(); + } _ => unreachable!(), }; diff --git a/test/sqllogictest/cluster.slt b/test/sqllogictest/cluster.slt index 2a32f8abcc5b5..4e1127be7b881 100644 --- a/test/sqllogictest/cluster.slt +++ b/test/sqllogictest/cluster.slt @@ -420,7 +420,7 @@ CREATE CLUSTER test REPLICAS (foo (SIZE '1')); query I SELECT COUNT(name) FROM mz_indexes; ---- -265 +266 statement ok DROP CLUSTER test CASCADE @@ -428,7 +428,7 @@ DROP CLUSTER test CASCADE query T SELECT COUNT(name) FROM mz_indexes; ---- -236 +237 simple conn=mz_system,user=mz_system ALTER CLUSTER quickstart OWNER TO materialize diff --git a/test/sqllogictest/information_schema_tables.slt b/test/sqllogictest/information_schema_tables.slt index 36bc80d376af0..33b27e772c2d7 100644 --- a/test/sqllogictest/information_schema_tables.slt +++ b/test/sqllogictest/information_schema_tables.slt @@ -293,6 +293,10 @@ mz_cluster_replica_metrics_history SOURCE materialize mz_internal +mz_cluster_replica_status_history +SOURCE +materialize +mz_internal mz_cluster_replica_statuses BASE TABLE materialize diff --git a/test/sqllogictest/mz_catalog_server_index_accounting.slt b/test/sqllogictest/mz_catalog_server_index_accounting.slt index a799ca72d99dc..8e06cb5a0366c 100644 --- a/test/sqllogictest/mz_catalog_server_index_accounting.slt +++ b/test/sqllogictest/mz_catalog_server_index_accounting.slt @@ -41,6 +41,7 @@ mz_cluster_replica_history_ind CREATE␠INDEX␠"mz_cluster_replica_history_ind mz_cluster_replica_metrics_history_ind CREATE␠INDEX␠"mz_cluster_replica_metrics_history_ind"␠IN␠CLUSTER␠[s2]␠ON␠"mz_internal"."mz_cluster_replica_metrics_history"␠("replica_id") mz_cluster_replica_metrics_ind CREATE␠INDEX␠"mz_cluster_replica_metrics_ind"␠IN␠CLUSTER␠[s2]␠ON␠"mz_internal"."mz_cluster_replica_metrics"␠("replica_id") mz_cluster_replica_sizes_ind CREATE␠INDEX␠"mz_cluster_replica_sizes_ind"␠IN␠CLUSTER␠[s2]␠ON␠"mz_catalog"."mz_cluster_replica_sizes"␠("size") +mz_cluster_replica_status_history_ind CREATE␠INDEX␠"mz_cluster_replica_status_history_ind"␠IN␠CLUSTER␠[s2]␠ON␠"mz_internal"."mz_cluster_replica_status_history"␠("replica_id") mz_cluster_replica_statuses_ind CREATE␠INDEX␠"mz_cluster_replica_statuses_ind"␠IN␠CLUSTER␠[s2]␠ON␠"mz_internal"."mz_cluster_replica_statuses"␠("replica_id") mz_cluster_replicas_ind CREATE␠INDEX␠"mz_cluster_replicas_ind"␠IN␠CLUSTER␠[s2]␠ON␠"mz_catalog"."mz_cluster_replicas"␠("id") mz_clusters_ind CREATE␠INDEX␠"mz_clusters_ind"␠IN␠CLUSTER␠[s2]␠ON␠"mz_catalog"."mz_clusters"␠("id") @@ -222,6 +223,11 @@ mz_cluster_replica_sizes memory_bytes mz_cluster_replica_sizes processes mz_cluster_replica_sizes size mz_cluster_replica_sizes workers +mz_cluster_replica_status_history occurred_at +mz_cluster_replica_status_history process_id +mz_cluster_replica_status_history reason +mz_cluster_replica_status_history replica_id +mz_cluster_replica_status_history status mz_cluster_replica_statuses process_id mz_cluster_replica_statuses reason mz_cluster_replica_statuses replica_id diff --git a/test/sqllogictest/oid.slt b/test/sqllogictest/oid.slt index 618262393d87f..04757df957869 100644 --- a/test/sqllogictest/oid.slt +++ b/test/sqllogictest/oid.slt @@ -1124,3 +1124,5 @@ SELECT oid, name FROM mz_objects WHERE id LIKE 's%' AND oid < 20000 ORDER BY oid 17016 mz_cluster_replica_metrics_history 17017 mz_cluster_replica_metrics_history_ind 17018 mz_cluster_replica_utilization_history +17019 mz_cluster_replica_status_history +17020 mz_cluster_replica_status_history_ind diff --git a/test/testdrive/catalog.td b/test/testdrive/catalog.td index 631c538e3e522..03ea91ed4d6ba 100644 --- a/test/testdrive/catalog.td +++ b/test/testdrive/catalog.td @@ -567,6 +567,7 @@ name type cluster comment mz_aws_privatelink_connection_status_history source "" mz_cluster_replica_frontiers source "" mz_cluster_replica_metrics_history source "" +mz_cluster_replica_status_history source "" mz_compute_dependencies source "" mz_compute_error_counts_raw_unified source "" mz_compute_hydration_times source "" @@ -776,7 +777,7 @@ test_table "" # There is one entry in mz_indexes for each field_number/expression of the index. > SELECT COUNT(id) FROM mz_indexes WHERE id LIKE 's%' -236 +237 # Create a second schema with the same table name as above > CREATE SCHEMA tester2 diff --git a/test/testdrive/distinct-arrangements.td b/test/testdrive/distinct-arrangements.td index dd6fa71fbb06d..708f41c801bf9 100644 --- a/test/testdrive/distinct-arrangements.td +++ b/test/testdrive/distinct-arrangements.td @@ -731,8 +731,8 @@ ReduceMinsMaxes "ArrangeBy[[Column(0), Column(1)]]" 1 "ArrangeBy[[Column(0), Column(2)]]" 3 "ArrangeBy[[Column(1), Column(2)]]" 1 -"ArrangeBy[[Column(0)]]-errors" 38 -"ArrangeBy[[Column(0)]]" 141 +"ArrangeBy[[Column(0)]]-errors" 39 +"ArrangeBy[[Column(0)]]" 142 "ArrangeBy[[Column(1), Column(3)]]" 1 "ArrangeBy[[Column(1)]]-errors" 6 ArrangeBy[[Column(13)]] 1 diff --git a/test/testdrive/indexes.td b/test/testdrive/indexes.td index 1701cf11e6d68..919e7f49156a8 100644 --- a/test/testdrive/indexes.td +++ b/test/testdrive/indexes.td @@ -296,6 +296,7 @@ mz_cluster_replica_metrics_ind mz_cluster_replica_m mz_cluster_replica_metrics_history_ind mz_cluster_replica_metrics_history mz_catalog_server {replica_id} "" mz_cluster_replica_sizes_ind mz_cluster_replica_sizes mz_catalog_server {size} "" mz_cluster_replica_statuses_ind mz_cluster_replica_statuses mz_catalog_server {replica_id} "" +mz_cluster_replica_status_history_ind mz_cluster_replica_status_history mz_catalog_server {replica_id} "" mz_cluster_replicas_ind mz_cluster_replicas mz_catalog_server {id} "" mz_clusters_ind mz_clusters mz_catalog_server {id} "" mz_columns_ind mz_columns mz_catalog_server {name} "" From 043e71241c696b42db026beff9da09d32bc7802b Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Wed, 4 Sep 2024 15:24:53 +0200 Subject: [PATCH 2/3] storage: truncate replica status history This commit adds support for truncation of the `ReplicaStatusHistory` collection during storage controller startup. In contrast to the other status histories, the replica status history is keyed by two columns (`replica_id`, `process_id`), so we need to modify the truncation code to work with multi-column keys. To this end, a `StatusHistoryDesc` type is introduced that allows the caller to configure the truncation behavior while being generic over the key type. --- src/adapter/src/flags.rs | 1 + src/sql/src/session/vars.rs | 5 + src/sql/src/session/vars/definitions.rs | 9 ++ src/storage-controller/src/lib.rs | 205 +++++++++++++----------- src/storage-types/src/parameters.proto | 1 + src/storage-types/src/parameters.rs | 10 ++ 6 files changed, 135 insertions(+), 96 deletions(-) diff --git a/src/adapter/src/flags.rs b/src/adapter/src/flags.rs index 490ded118f129..0e0d0ec30cccf 100644 --- a/src/adapter/src/flags.rs +++ b/src/adapter/src/flags.rs @@ -53,6 +53,7 @@ pub fn storage_config(config: &SystemVars) -> StorageParameters { keep_n_sink_status_history_entries: config.keep_n_sink_status_history_entries(), keep_n_privatelink_status_history_entries: config .keep_n_privatelink_status_history_entries(), + keep_n_replica_status_history_entries: config.keep_n_replica_status_history_entries(), upsert_rocksdb_tuning_config: { match mz_rocksdb_types::RocksDBTuningParameters::from_parameters( config.upsert_rocksdb_compaction_style(), diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 638cc393c8be6..fb53f9d1efe71 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1242,6 +1242,7 @@ impl SystemVars { &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES, &KEEP_N_SINK_STATUS_HISTORY_ENTRIES, &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES, + &KEEP_N_REPLICA_STATUS_HISTORY_ENTRIES, &ARRANGEMENT_EXERT_PROPORTIONALITY, &ENABLE_STORAGE_SHARD_FINALIZATION, &ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE, @@ -2031,6 +2032,10 @@ impl SystemVars { *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES) } + pub fn keep_n_replica_status_history_entries(&self) -> usize { + *self.expect_value(&KEEP_N_REPLICA_STATUS_HISTORY_ENTRIES) + } + /// Returns the `arrangement_exert_proportionality` configuration parameter. pub fn arrangement_exert_proportionality(&self) -> u32 { *self.expect_value(&ARRANGEMENT_EXERT_PROPORTIONALITY) diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 5f908a3de51e9..de079c0bd787b 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -1429,6 +1429,15 @@ pub static KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinit false, ); +/// Controls [`mz_storage_types::parameters::StorageParameters::keep_n_replica_status_history_entries`]. +pub static KEEP_N_REPLICA_STATUS_HISTORY_ENTRIES: VarDefinition = VarDefinition::new( + "keep_n_replica_status_history_entries", + value!(usize; 5), + "On reboot, truncate all but the last n entries per ID in the mz_cluster_replica_status_history \ + collection (Materialize).", + false, +); + pub static ENABLE_STORAGE_SHARD_FINALIZATION: VarDefinition = VarDefinition::new( "enable_storage_shard_finalization", value!(bool; true), diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 4f5592f3b6ce4..8ef41720ffc47 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -3016,6 +3016,7 @@ where .partially_truncate_status_history( IntrospectionType::SourceStatusHistory, write_handle, + source_status_history_desc(&self.config.parameters), ) .await; @@ -3045,6 +3046,7 @@ where .partially_truncate_status_history( IntrospectionType::SinkStatusHistory, write_handle, + sink_status_history_desc(&self.config.parameters), ) .await; @@ -3073,6 +3075,7 @@ where self.partially_truncate_status_history( IntrospectionType::PrivatelinkConnectionStatusHistory, write_handle, + privatelink_status_history_desc(&self.config.parameters), ) .await; } @@ -3081,6 +3084,7 @@ where self.partially_truncate_status_history( IntrospectionType::ReplicaStatusHistory, write_handle, + replica_status_history_desc(&self.config.parameters), ) .await; } @@ -3153,7 +3157,7 @@ where } /// Effectively truncates the status history shard except for the most - /// recent updates from each ID. + /// recent updates from each key. /// /// NOTE: The history collections are really append-only collections, but /// every-now-and-then we want to retract old updates so that the collection @@ -3161,55 +3165,16 @@ where /// collections, they are not derived from a state at some time `t` and we /// cannot maintain a desired state for them. /// - /// Returns a map with latest unpacked row per id. - async fn partially_truncate_status_history( + /// Returns a map with latest unpacked row per key. + async fn partially_truncate_status_history( &mut self, collection: IntrospectionType, write_handle: &mut WriteHandle, - ) -> BTreeMap { - let (keep_n, occurred_at_col, id_col) = match collection { - IntrospectionType::SourceStatusHistory => ( - self.config.parameters.keep_n_source_status_history_entries, - collection_status::MZ_SOURCE_STATUS_HISTORY_DESC - .get_by_name(&ColumnName::from("occurred_at")) - .expect("schema has not changed") - .0, - collection_status::MZ_SOURCE_STATUS_HISTORY_DESC - .get_by_name(&ColumnName::from("source_id")) - .expect("schema has not changed") - .0, - ), - IntrospectionType::SinkStatusHistory => ( - self.config.parameters.keep_n_sink_status_history_entries, - collection_status::MZ_SINK_STATUS_HISTORY_DESC - .get_by_name(&ColumnName::from("occurred_at")) - .expect("schema has not changed") - .0, - collection_status::MZ_SINK_STATUS_HISTORY_DESC - .get_by_name(&ColumnName::from("sink_id")) - .expect("schema has not changed") - .0, - ), - IntrospectionType::PrivatelinkConnectionStatusHistory => ( - self.config - .parameters - .keep_n_privatelink_status_history_entries, - collection_status::MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC - .get_by_name(&ColumnName::from("occurred_at")) - .expect("schema has not changed") - .0, - collection_status::MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC - .get_by_name(&ColumnName::from("connection_id")) - .expect("schema has not changed") - .0, - ), - IntrospectionType::ReplicaStatusHistory => { - // FIXME: needs some rewriting to return a two-column key (replica_id, process_id) - return BTreeMap::new(); - } - _ => unreachable!(), - }; - + status_history_desc: StatusHistoryDesc, + ) -> BTreeMap + where + K: Clone + Debug + Ord, + { let id = self.introspection_ids.lock().expect("poisoned")[&collection]; let upper = write_handle.fetch_recent_upper().await.clone(); @@ -3225,13 +3190,14 @@ where _ => return BTreeMap::new(), }; - // BTreeMap>, to track the - // earliest events for each id. - let mut last_n_entries_per_id: BTreeMap)>>> = - BTreeMap::new(); + // BTreeMap to track the earliest events for each key. + let mut last_n_entries_per_key: BTreeMap< + K, + BinaryHeap>, Row)>>, + > = BTreeMap::new(); - // BTreeMap to keep track of the row with the latest timestamp for each id - let mut latest_row_per_id: BTreeMap>, Vec)> = + // BTreeMap to keep track of the row with the latest timestamp for each key. + let mut latest_row_per_key: BTreeMap>, Row)> = BTreeMap::new(); // Consolidate the snapshot, so we can process it correctly below. @@ -3239,35 +3205,30 @@ where let mut deletions = vec![]; - for (row, diff) in rows.iter() { - let status_row = row.unpack(); - let id = status_row[id_col]; - let occurred_at = status_row[occurred_at_col]; + for (row, diff) in rows { + let datums = row.unpack(); + let key = (status_history_desc.extract_key)(&datums); + let timestamp = (status_history_desc.extract_time)(&datums); // Duplicate rows ARE possible if many status changes happen in VERY quick succession, // so we go ahead and handle them. assert!( - *diff > 0, + diff > 0, "only know how to operate over consolidated data with diffs > 0, \ - found diff {} for object {} in {:?}", - diff, - id, - collection + found diff {diff} for object {key:?} in {collection:?}", ); - // Keep track of the timestamp of the latest row per id - let timestamp = occurred_at.unwrap_timestamptz(); - match latest_row_per_id.get(&id) { + // Keep track of the timestamp of the latest row per key. + match latest_row_per_key.get(&key) { Some(existing) if &existing.0 > ×tamp => {} _ => { - latest_row_per_id.insert(id, (timestamp, status_row.clone())); + latest_row_per_key.insert(key.clone(), (timestamp, row.clone())); } } // Consider duplicated rows separately. - for _ in 0..*diff { - let entries = last_n_entries_per_id.entry(id).or_default(); - + let entries = last_n_entries_per_key.entry(key).or_default(); + for _ in 0..diff { // We CAN have multiple statuses (most likely Starting and Running) at the exact same // millisecond, depending on how the `health_operator` is scheduled. // @@ -3276,12 +3237,12 @@ where // so we don't bother being careful about it. // // TODO(guswynn): unpack these into health-status objects and use - // their `Ord1 impl. - entries.push(Reverse((occurred_at, status_row.clone()))); + // their `Ord` impl. + entries.push(Reverse((timestamp, row.clone()))); // Retain some number of entries, using pop to mark the oldest entries for // deletion. - while entries.len() > keep_n { + while entries.len() > status_history_desc.keep_n { if let Some(Reverse((_, r))) = entries.pop() { deletions.push(r); } @@ -3296,20 +3257,10 @@ where let expected_upper = upper.into_option().expect("checked above"); let new_upper = TimestampManipulation::step_forward(&expected_upper); - let mut row_buf = Row::default(); // Updates are only deletes because everything else is already in the shard. let updates = deletions .into_iter() - .map(|unpacked_row| { - // Re-pack all rows - let mut packer = row_buf.packer(); - packer.extend(unpacked_row.into_iter()); - ( - (SourceData(Ok(row_buf.clone())), ()), - expected_upper.clone(), - -1, - ) - }) + .map(|row| ((SourceData(Ok(row)), ()), expected_upper.clone(), -1)) .collect::>(); let res = write_handle @@ -3333,23 +3284,16 @@ where // NOTE: We might want to attempt these partial // retractions on an interval, instead of only when // starting up! - info!(%id, ?expected_upper, current_upper = ?err.current, "failed to append partial truncation"); + info!( + %id, ?expected_upper, current_upper = ?err.current, + "failed to append partial truncation", + ); } } - latest_row_per_id + latest_row_per_key .into_iter() - .filter_map(|(key, (_, row_vec))| { - match GlobalId::from_str(key.unwrap_str()) { - Ok(id) => { - let mut packer = row_buf.packer(); - packer.extend(row_vec.into_iter()); - Some((id, row_buf.clone())) - } - // Ignore any rows that can't be unwrapped correctly - Err(_) => None, - } - }) + .map(|(key, (_, row))| (key, row)) .collect() } @@ -3931,3 +3875,72 @@ struct IngestionState { /// The ID of the instance in which the ingestion is running. pub instance_id: StorageInstanceId, } + +/// A description of a status history collection. +/// +/// Used to inform partial truncation, see [`Controller::partially_truncate_status_history`]. +struct StatusHistoryDesc { + keep_n: usize, + extract_key: Box K>, + extract_time: Box CheckedTimestamp>>, +} + +fn source_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc { + let desc = &collection_status::MZ_SOURCE_STATUS_HISTORY_DESC; + let (key_idx, _) = desc.get_by_name(&"source_id".into()).expect("exists"); + let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists"); + + StatusHistoryDesc { + keep_n: params.keep_n_source_status_history_entries, + extract_key: Box::new(move |datums| { + GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column") + }), + extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()), + } +} + +fn sink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc { + let desc = &collection_status::MZ_SINK_STATUS_HISTORY_DESC; + let (key_idx, _) = desc.get_by_name(&"sink_id".into()).expect("exists"); + let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists"); + + StatusHistoryDesc { + keep_n: params.keep_n_sink_status_history_entries, + extract_key: Box::new(move |datums| { + GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column") + }), + extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()), + } +} + +fn privatelink_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc { + let desc = &collection_status::MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC; + let (key_idx, _) = desc.get_by_name(&"connection_id".into()).expect("exists"); + let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists"); + + StatusHistoryDesc { + keep_n: params.keep_n_privatelink_status_history_entries, + extract_key: Box::new(move |datums| { + GlobalId::from_str(datums[key_idx].unwrap_str()).expect("GlobalId column") + }), + extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()), + } +} + +fn replica_status_history_desc(params: &StorageParameters) -> StatusHistoryDesc<(GlobalId, u64)> { + let desc = &collection_status::REPLICA_STATUS_HISTORY_DESC; + let (replica_idx, _) = desc.get_by_name(&"replica_id".into()).expect("exists"); + let (process_idx, _) = desc.get_by_name(&"process_id".into()).expect("exists"); + let (time_idx, _) = desc.get_by_name(&"occurred_at".into()).expect("exists"); + + StatusHistoryDesc { + keep_n: params.keep_n_privatelink_status_history_entries, + extract_key: Box::new(move |datums| { + ( + GlobalId::from_str(datums[replica_idx].unwrap_str()).expect("GlobalId column"), + datums[process_idx].unwrap_uint64(), + ) + }), + extract_time: Box::new(move |datums| datums[time_idx].unwrap_timestamptz()), + } +} diff --git a/src/storage-types/src/parameters.proto b/src/storage-types/src/parameters.proto index 496c1ad7e6a48..28dad393f79ca 100644 --- a/src/storage-types/src/parameters.proto +++ b/src/storage-types/src/parameters.proto @@ -46,6 +46,7 @@ message ProtoStorageParameters { optional mz_proto.ProtoDuration pg_source_tcp_keepalives_interval = 36; optional mz_proto.ProtoDuration pg_source_tcp_user_timeout = 37; bool pg_source_tcp_configure_server = 38; + uint64 keep_n_replica_status_history_entries = 39; mz_dyncfg.ConfigUpdates dyncfg_updates = 30; diff --git a/src/storage-types/src/parameters.rs b/src/storage-types/src/parameters.rs index 0d37418ff6a69..e087684522dc5 100644 --- a/src/storage-types/src/parameters.rs +++ b/src/storage-types/src/parameters.rs @@ -65,6 +65,7 @@ pub struct StorageParameters { pub keep_n_source_status_history_entries: usize, pub keep_n_sink_status_history_entries: usize, pub keep_n_privatelink_status_history_entries: usize, + pub keep_n_replica_status_history_entries: usize, /// A set of parameters used to tune RocksDB when used with `UPSERT` sources. pub upsert_rocksdb_tuning_config: mz_rocksdb_types::RocksDBTuningParameters, /// Whether or not to allow shard finalization to occur. Note that this will @@ -130,6 +131,7 @@ impl Default for StorageParameters { keep_n_source_status_history_entries: Default::default(), keep_n_sink_status_history_entries: Default::default(), keep_n_privatelink_status_history_entries: Default::default(), + keep_n_replica_status_history_entries: Default::default(), upsert_rocksdb_tuning_config: Default::default(), finalize_shards: Default::default(), tracing: Default::default(), @@ -234,6 +236,7 @@ impl StorageParameters { keep_n_source_status_history_entries, keep_n_sink_status_history_entries, keep_n_privatelink_status_history_entries, + keep_n_replica_status_history_entries, upsert_rocksdb_tuning_config, finalize_shards, tracing, @@ -263,6 +266,7 @@ impl StorageParameters { self.keep_n_source_status_history_entries = keep_n_source_status_history_entries; self.keep_n_sink_status_history_entries = keep_n_sink_status_history_entries; self.keep_n_privatelink_status_history_entries = keep_n_privatelink_status_history_entries; + self.keep_n_replica_status_history_entries = keep_n_replica_status_history_entries; self.upsert_rocksdb_tuning_config = upsert_rocksdb_tuning_config; self.finalize_shards = finalize_shards; self.tracing.update(tracing); @@ -313,6 +317,9 @@ impl RustType for StorageParameters { keep_n_privatelink_status_history_entries: u64::cast_from( self.keep_n_privatelink_status_history_entries, ), + keep_n_replica_status_history_entries: u64::cast_from( + self.keep_n_replica_status_history_entries, + ), upsert_rocksdb_tuning_config: Some(self.upsert_rocksdb_tuning_config.into_proto()), finalize_shards: self.finalize_shards, tracing: Some(self.tracing.into_proto()), @@ -366,6 +373,9 @@ impl RustType for StorageParameters { keep_n_privatelink_status_history_entries: usize::cast_from( proto.keep_n_privatelink_status_history_entries, ), + keep_n_replica_status_history_entries: usize::cast_from( + proto.keep_n_replica_status_history_entries, + ), upsert_rocksdb_tuning_config: proto .upsert_rocksdb_tuning_config .into_rust_if_some("ProtoStorageParameters::upsert_rocksdb_tuning_config")?, From c96b861dc5fb9d5510b84412576b7fafe8b245f8 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Wed, 4 Sep 2024 15:32:00 +0200 Subject: [PATCH 3/3] doc/user: document mz_cluster_replica_status_history --- .../content/sql/system-catalog/mz_internal.md | 23 +++++++++++++++---- .../autogenerated/mz_internal.slt | 10 ++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/doc/user/content/sql/system-catalog/mz_internal.md b/doc/user/content/sql/system-catalog/mz_internal.md index 88da12ca982c2..3bcbf16ca7a2b 100644 --- a/doc/user/content/sql/system-catalog/mz_internal.md +++ b/doc/user/content/sql/system-catalog/mz_internal.md @@ -176,7 +176,7 @@ At this time, we do not make any guarantees about the exactness or freshness of | Field | Type | Meaning | | ------------------- | ------------ | -------- | | `replica_id` | [`text`] | The ID of a cluster replica. | -| `process_id` | [`uint8`] | An identifier of a compute process within a replica. | +| `process_id` | [`uint8`] | The ID of a process within the replica. | | `cpu_nano_cores` | [`uint8`] | Approximate CPU usage, in billionths of a vCPU core. | | `memory_bytes` | [`uint8`] | Approximate RAM usage, in bytes. | | `disk_bytes` | [`uint8`] | Approximate disk usage in bytes. | @@ -193,7 +193,7 @@ At this time, we do not make any guarantees about the exactness or freshness of | Field | Type | Meaning | ---------------- | --------- | -------- | `replica_id` | [`text`] | The ID of a cluster replica. -| `process_id` | [`uint8`] | An identifier of a process within the replica. +| `process_id` | [`uint8`] | The ID of a process within the replica. | `cpu_nano_cores` | [`uint8`] | Approximate CPU usage in billionths of a vCPU core. | `memory_bytes` | [`uint8`] | Approximate memory usage in bytes. | `disk_bytes` | [`uint8`] | Approximate disk usage in bytes. @@ -213,6 +213,21 @@ of each process in each cluster replica in the system. | `reason` | [`text`] | If the cluster replica is in a `offline` state, the reason (if available). For example, `oom-killed`. | | `updated_at` | [`timestamp with time zone`] | The time at which the status was last updated. | +## `mz_cluster_replica_status_history` + +{{< warn-if-unreleased v0.116 >}} +The `mz_cluster_replica_status_history` table records status changes +for all processes of all extant cluster replicas. + + +| Field | Type | Meaning +|---------------|-----------|-------- +| `replica_id` | [`text`] | The ID of a cluster replica. +| `process_id` | [`uint8`] | The ID of a process within the replica. +| `status` | [`text`] | The status of the cluster replica: `online` or `offline`. +| `reason` | [`text`] | If the cluster replica is in an `offline` state, the reason (if available). For example, `oom-killed`. +| `occurred_at` | [`timestamp with time zone`] | Wall-clock timestamp at which the event occurred. + ## `mz_cluster_replica_utilization` The `mz_cluster_replica_utilization` view gives the last known CPU and RAM utilization statistics @@ -224,7 +239,7 @@ At this time, we do not make any guarantees about the exactness or freshness of | Field | Type | Meaning | |------------------|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `replica_id` | [`text`] | The ID of a cluster replica. | -| `process_id` | [`uint8`] | An identifier of a compute process within a replica. | +| `process_id` | [`uint8`] | The ID of a process within the replica. | | `cpu_percent` | [`double precision`] | Approximate CPU usage in percent of the total allocation. | | `memory_percent` | [`double precision`] | Approximate RAM usage in percent of the total allocation. | | `disk_percent` | [`double precision`] | Approximate disk usage in percent of the total allocation. | @@ -241,7 +256,7 @@ At this time, we do not make any guarantees about the exactness or freshness of | Field | Type | Meaning |------------------|----------------------|-------- | `replica_id` | [`text`] | The ID of a cluster replica. -| `process_id` | [`uint8`] | An identifier of a compute process within the replica. +| `process_id` | [`uint8`] | The ID of a process within the replica. | `cpu_percent` | [`double precision`] | Approximate CPU usage in percent of the total allocation. | `memory_percent` | [`double precision`] | Approximate RAM usage in percent of the total allocation. | `disk_percent` | [`double precision`] | Approximate disk usage in percent of the total allocation. diff --git a/test/sqllogictest/autogenerated/mz_internal.slt b/test/sqllogictest/autogenerated/mz_internal.slt index e751566ca75fa..0dc490ba221b4 100644 --- a/test/sqllogictest/autogenerated/mz_internal.slt +++ b/test/sqllogictest/autogenerated/mz_internal.slt @@ -136,6 +136,15 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 4 reason text 5 updated_at timestamp␠with␠time␠zone +query ITT +SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_cluster_replica_status_history' ORDER BY position +---- +1 replica_id text +2 process_id uint8 +3 status text +4 reason text +5 occurred_at timestamp␠with␠time␠zone + query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_cluster_replica_utilization' ORDER BY position ---- @@ -590,6 +599,7 @@ mz_cluster_replica_frontiers mz_cluster_replica_history mz_cluster_replica_metrics mz_cluster_replica_metrics_history +mz_cluster_replica_status_history mz_cluster_replica_statuses mz_cluster_replica_utilization mz_cluster_replica_utilization_history