From adac51242df1e729ea21ebb09fd63c776a8c1253 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 8 Aug 2022 15:32:40 -0700 Subject: [PATCH 01/10] rowexec: use the sorter's own context Release note: none --- pkg/sql/rowexec/sorter.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index 2eb7b027a624..3d2bd1734b59 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -237,8 +237,6 @@ func (s *sortAllProcessor) Start(ctx context.Context) { // drain if it's not recoverable. It is possible for ok to be false even if no // error is returned - in case an error metadata was received. func (s *sortAllProcessor) fill() (ok bool, _ error) { - ctx := s.EvalCtx.Ctx() - for { row, meta := s.input.Next() if meta != nil { @@ -252,13 +250,13 @@ func (s *sortAllProcessor) fill() (ok bool, _ error) { break } - if err := s.rows.AddRow(ctx, row); err != nil { + if err := s.rows.AddRow(s.Ctx, row); err != nil { return false, err } } - s.rows.Sort(ctx) + s.rows.Sort(s.Ctx) - s.i = s.rows.NewFinalIterator(ctx) + s.i = s.rows.NewFinalIterator(s.Ctx) s.i.Rewind() return true, nil } From 9b8100b21e893db36b19fd8aba55192a3c30a16c Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 9 Aug 2022 16:14:49 -0700 Subject: [PATCH 02/10] streamingccl, rowexec: correctly mark eventStream as "streaming" This commit fixes an incomplete solution of 9bb5d30d280f53d5efe806525076bf419fb44882 which attempted to mark `eventStream` generator builtin as "streaming" so that the columnarizer on top of the `projectSetProcessor` would not buffer anything. As found by Steven, the solution in that commit was incomplete since the generators array is not populated at the time where `MustBeStreaming` check is performed. This commit fixes that oversight by using a different way of propagating the property - via the function properties. Release note: None --- .../streamproducer/event_stream.go | 4 ++-- pkg/sql/rowexec/project_set.go | 13 ++++++------- pkg/sql/sem/builtins/replication_builtins.go | 7 ++++--- pkg/sql/sem/eval/generators.go | 19 ------------------- pkg/sql/sem/tree/expr.go | 6 ++++++ pkg/sql/sem/tree/function_definition.go | 4 ++++ 6 files changed, 22 insertions(+), 31 deletions(-) diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index c308e6982bc4..a0b7a115f715 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -532,11 +532,11 @@ func streamPartition( for _, sp := range spec.Spans { subscribedSpans.Add(sp) } - return eval.MakeStreamingValueGenerator(&eventStream{ + return &eventStream{ streamID: streamID, spec: spec, subscribedSpans: subscribedSpans, execCfg: execCfg, mon: evalCtx.Mon, - }), nil + }, nil } diff --git a/pkg/sql/rowexec/project_set.go b/pkg/sql/rowexec/project_set.go index 852340a0ecd0..12c9b5b36d6f 100644 --- a/pkg/sql/rowexec/project_set.go +++ b/pkg/sql/rowexec/project_set.go @@ -43,6 +43,10 @@ type projectSetProcessor struct { // The size of the slice is the same as `exprHelpers` though. funcs []*tree.FuncExpr + // mustBeStreaming indicates whether at least one function in funcs is of + // "streaming" nature. + mustBeStreaming bool + // inputRowReady is set when there was a row of input data available // from the source. inputRowReady bool @@ -116,6 +120,7 @@ func newProjectSetProcessor( if tFunc, ok := helper.Expr.(*tree.FuncExpr); ok && tFunc.IsGeneratorApplication() { // Expr is a set-generating function. ps.funcs[i] = tFunc + ps.mustBeStreaming = ps.mustBeStreaming || tFunc.IsVectorizeStreaming() } ps.exprHelpers[i] = &helper } @@ -124,13 +129,7 @@ func newProjectSetProcessor( // MustBeStreaming implements the execinfra.Processor interface. func (ps *projectSetProcessor) MustBeStreaming() bool { - // If we have a single streaming generator, then the processor is such too. - for _, gen := range ps.gens { - if eval.IsStreamingValueGenerator(gen) { - return true - } - } - return false + return ps.mustBeStreaming } // Start is part of the RowSource interface. diff --git a/pkg/sql/sem/builtins/replication_builtins.go b/pkg/sql/sem/builtins/replication_builtins.go index 79f664a72cc8..8c65437a506f 100644 --- a/pkg/sql/sem/builtins/replication_builtins.go +++ b/pkg/sql/sem/builtins/replication_builtins.go @@ -227,9 +227,10 @@ var replicationBuiltins = map[string]builtinDefinition{ ), "crdb_internal.stream_partition": makeBuiltin( tree.FunctionProperties{ - Category: builtinconstants.CategoryStreamIngestion, - DistsqlBlocklist: false, - Class: tree.GeneratorClass, + Category: builtinconstants.CategoryStreamIngestion, + DistsqlBlocklist: false, + Class: tree.GeneratorClass, + VectorizeStreaming: true, }, makeGeneratorOverload( tree.ArgTypes{ diff --git a/pkg/sql/sem/eval/generators.go b/pkg/sql/sem/eval/generators.go index 754b773c3133..a3e9f3ad47bc 100644 --- a/pkg/sql/sem/eval/generators.go +++ b/pkg/sql/sem/eval/generators.go @@ -83,25 +83,6 @@ type ValueGenerator interface { Close(ctx context.Context) } -// streamingValueGenerator is a marker-type indicating that the wrapped -// generator is of "streaming" nature, thus, projectSet processor must be -// streaming too. -type streamingValueGenerator struct { - ValueGenerator -} - -// MakeStreamingValueGenerator marks the generator as "streaming". -func MakeStreamingValueGenerator(gen ValueGenerator) ValueGenerator { - return streamingValueGenerator{ValueGenerator: gen} -} - -// IsStreamingValueGenerator returns whether the generator is of the "streaming" -// nature. -func IsStreamingValueGenerator(gen ValueGenerator) bool { - _, ok := gen.(streamingValueGenerator) - return ok -} - // CallbackValueGenerator is a ValueGenerator that calls a supplied callback for // producing the values. To be used with // eval.TestingKnobs.CallbackGenerators. diff --git a/pkg/sql/sem/tree/expr.go b/pkg/sql/sem/tree/expr.go index 874332f6a538..da59ad13ec54 100644 --- a/pkg/sql/sem/tree/expr.go +++ b/pkg/sql/sem/tree/expr.go @@ -1295,6 +1295,12 @@ func (node *FuncExpr) IsDistSQLBlocklist() bool { return (node.fn != nil && node.fn.DistsqlBlocklist) || (node.fnProps != nil && node.fnProps.DistsqlBlocklist) } +// IsVectorizeStreaming returns whether the function is of "streaming" nature +// from the perspective of the vectorized execution engine. +func (node *FuncExpr) IsVectorizeStreaming() bool { + return node.fnProps != nil && node.fnProps.VectorizeStreaming +} + type funcType int // FuncExpr.Type diff --git a/pkg/sql/sem/tree/function_definition.go b/pkg/sql/sem/tree/function_definition.go index c10540dba848..1acf7ed86add 100644 --- a/pkg/sql/sem/tree/function_definition.go +++ b/pkg/sql/sem/tree/function_definition.go @@ -124,6 +124,10 @@ type FunctionProperties struct { // // See memo.CanBeCompositeSensitive. CompositeInsensitive bool + + // VectorizeStreaming indicates that the function is of "streaming" nature + // from the perspective of the vectorized execution engine. + VectorizeStreaming bool } // ShouldDocument returns whether the built-in function should be included in From 7f023c2617c125e9fb3d430f336601961bcb2c87 Mon Sep 17 00:00:00 2001 From: Marylia Gutierrez Date: Tue, 9 Aug 2022 18:49:41 -0400 Subject: [PATCH 03/10] ui: new insights table component Creation of the base insights table, that can be used on both schema insights and statement details page. This commit only introduce the table, with the different types, but still needs the proper cell formating for each type. Part of #83783 Release note: None --- .../insightsTable/insightsTable.module.scss | 13 ++ .../src/insightsTable/insightsTable.tsx | 125 ++++++++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 pkg/ui/workspaces/cluster-ui/src/insightsTable/insightsTable.module.scss create mode 100644 pkg/ui/workspaces/cluster-ui/src/insightsTable/insightsTable.tsx diff --git a/pkg/ui/workspaces/cluster-ui/src/insightsTable/insightsTable.module.scss b/pkg/ui/workspaces/cluster-ui/src/insightsTable/insightsTable.module.scss new file mode 100644 index 000000000000..2c822cbbf391 --- /dev/null +++ b/pkg/ui/workspaces/cluster-ui/src/insightsTable/insightsTable.module.scss @@ -0,0 +1,13 @@ +@import "src/core/index.module"; + +.insight-type { + color: $colors--functional-orange-4; + font-weight: $font-weight--bold; +} + +.label-bold { + font-family: $font-family--semi-bold; + font-size: $font-size--medium; + line-height: $line-height--x-small; + color: $colors--neutral-7; +} diff --git a/pkg/ui/workspaces/cluster-ui/src/insightsTable/insightsTable.tsx b/pkg/ui/workspaces/cluster-ui/src/insightsTable/insightsTable.tsx new file mode 100644 index 000000000000..da140a1ae7ce --- /dev/null +++ b/pkg/ui/workspaces/cluster-ui/src/insightsTable/insightsTable.tsx @@ -0,0 +1,125 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +import { Tooltip } from "@cockroachlabs/ui-components"; +import React from "react"; +import { ColumnDescriptor, SortedTable } from "../sortedtable"; +import classNames from "classnames/bind"; +import styles from "./insightsTable.module.scss"; + +const cx = classNames.bind(styles); + +export type InsightType = "DROP_INDEX" | "CREATE_INDEX" | "REPLACE_INDEX"; + +export interface InsightRecommendation { + type: InsightType; + database: string; + table: string; + index_id: number; + query: string; + exec_stmt: string; + exec_id: string; +} + +export class InsightsSortedTable extends SortedTable {} + +const insightColumnLabels = { + insights: "Insights", + details: "Details", +}; +export type InsightsTableColumnKeys = keyof typeof insightColumnLabels; + +type InsightsTableTitleType = { + [key in InsightsTableColumnKeys]: () => JSX.Element; +}; + +export const insightsTableTitles: InsightsTableTitleType = { + insights: () => { + return ( + + {insightColumnLabels.insights} + + ); + }, + details: () => { + return ( + + {insightColumnLabels.details} + + ); + }, +}; + +function insightType(type: InsightType): string { + switch (type) { + case "CREATE_INDEX": + return "Create New Index"; + case "DROP_INDEX": + return "Drop Unused Index"; + case "REPLACE_INDEX": + return "Replace Index"; + default: + return "Insight"; + } +} + +function typeCell(value: string): React.ReactElement { + return
{value}
; +} + +function descriptionCell( + insightRec: InsightRecommendation, +): React.ReactElement { + switch (insightRec.type) { + case "CREATE_INDEX": + case "REPLACE_INDEX": + return ( + <> +
+ Statement Execution: {" "} + {insightRec.exec_stmt} +
+
+ Recommendation: {" "} + {insightRec.query} +
+ + ); + case "DROP_INDEX": + return <>{`Index ${insightRec.index_id}`}; + default: + return <>{insightRec.query}; + } +} + +export function makeInsightsColumns(): ColumnDescriptor[] { + return [ + { + name: "insights", + title: insightsTableTitles.insights(), + cell: (item: InsightRecommendation) => typeCell(insightType(item.type)), + sort: (item: InsightRecommendation) => item.type, + }, + { + name: "details", + title: insightsTableTitles.details(), + cell: (item: InsightRecommendation) => descriptionCell(item), + sort: (item: InsightRecommendation) => item.type, + }, + ]; +} From 31d8c224eb41fb3f5088aa3ce1688b5e7ffb3a19 Mon Sep 17 00:00:00 2001 From: Xiang Gu Date: Tue, 9 Aug 2022 16:22:40 -0400 Subject: [PATCH 04/10] sql/catalog/tabledesc: relaxed validation for virtual col in SUFFIX cols One of the validation rule says that "we don't allow virtual columns to be in the SUFFIX columns of a secondary index", except for one case: `ALTER PRIMARY KYE USING HASH`, where the implicitly created virtual, shard column, will need to appear in the SUFFIX columns of the implicitly created unique, secondary index of the old PK key columns ( which btw is a CRDB unique feature). The validation has logic to exempt us from this special case but it's written specifically to the legacy schema changer. Namely, it uses the canonical `PrimaryKeySwap` mutation type as the signal but we don't have that in the declarative schema changer. This PR addresses this issue and allows the validation logic to also exempt the exact same case but in the declarative schema changer. Release note: None --- pkg/sql/catalog/tabledesc/validate.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/sql/catalog/tabledesc/validate.go b/pkg/sql/catalog/tabledesc/validate.go index b12c084929bd..3e2ef6e64623 100644 --- a/pkg/sql/catalog/tabledesc/validate.go +++ b/pkg/sql/catalog/tabledesc/validate.go @@ -1289,6 +1289,20 @@ func (desc *wrapper) validateTableIndexes( newPKColIDs.UnionWith(newPK.CollectKeyColumnIDs()) } } + if newPKColIDs.Empty() { + // Sadly, if the `ALTER PRIMARY KEY USING HASH` is from declarative schema changer, + // we won't find the `PrimaryKeySwap` mutation. In that case, we will attempt to + // find a mutation of adding a primary index and allow its key columns to be used + // as SUFFIX columns in other indexes, even if they are virtual. + for _, mut := range desc.Mutations { + if pidx := mut.GetIndex(); pidx != nil && + pidx.EncodingType == descpb.PrimaryIndexEncoding && + mut.Direction == descpb.DescriptorMutation_ADD && + !mut.Rollback { + newPKColIDs.UnionWith(catalog.MakeTableColSet(pidx.KeyColumnIDs...)) + } + } + } for _, colID := range idx.IndexDesc().KeySuffixColumnIDs { if !vea.IsActive(clusterversion.Start22_1) { if col := columnsByID[colID]; col != nil && col.IsVirtual() { From 62b625c1a8ba3288959932c6e298f07a4fbd5123 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Thu, 4 Aug 2022 15:25:24 -0400 Subject: [PATCH 05/10] cloudpb: move configuration protos A previous version of external connections persisted the External Storage protos. Now we just persist the raw URI, and so we can move these protos back. Release note: None --- pkg/cloud/cloudpb/external_storage.proto | 39 ++++++++++---------- pkg/cloud/externalconn/connection_storage.go | 4 +- pkg/cloud/nodelocal/nodelocal_storage.go | 14 +++---- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/pkg/cloud/cloudpb/external_storage.proto b/pkg/cloud/cloudpb/external_storage.proto index b8bc88f0ed43..a9f6561f942f 100644 --- a/pkg/cloud/cloudpb/external_storage.proto +++ b/pkg/cloud/cloudpb/external_storage.proto @@ -27,29 +27,14 @@ enum ExternalStorageProvider { external = 9; } -message LocalFileConfig { - string path = 1; - uint32 node_id = 2 [(gogoproto.customname) = "NodeID", - (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; -} - -// ExternalConnectionConfig is the ExternalStorage configuration for the -// `external` provider. -message ExternalConnectionConfig { - // Name identifies the External Connection object. - string name = 1; - // User interacting with the external storage. This is used to check access - // privileges of the external connection object. - string user = 2; - // Path will be appended to the endpoint of the resource represented by the - // external connection object. It is used to access subdirectories/buckets of - // the external resource. - string path = 3; -} - message ExternalStorage { ExternalStorageProvider provider = 1; + message LocalFileConfig { + string path = 1; + uint32 node_id = 2 [(gogoproto.customname) = "NodeID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + } message Http { string baseUri = 1; } @@ -127,6 +112,20 @@ message ExternalStorage { // Path is the filename being read/written to via the FileTableSystem. string path = 3; } + // ExternalConnectionConfig is the ExternalStorage configuration for the + // `external` provider. + message ExternalConnectionConfig { + // Name identifies the External Connection object. + string name = 1; + // User interacting with the external storage. This is used to check access + // privileges of the external connection object. + string user = 2; + // Path will be appended to the endpoint of the resource represented by the + // external connection object. It is used to access subdirectories/buckets of + // the external resource. + string path = 3; + } + LocalFileConfig local_file_config = 2 [(gogoproto.nullable) = false]; Http HttpPath = 3 [(gogoproto.nullable) = false]; GCS GoogleCloudConfig = 4; diff --git a/pkg/cloud/externalconn/connection_storage.go b/pkg/cloud/externalconn/connection_storage.go index 1ff2c68a9342..46c2ee128af2 100644 --- a/pkg/cloud/externalconn/connection_storage.go +++ b/pkg/cloud/externalconn/connection_storage.go @@ -27,8 +27,8 @@ const scheme = "external" func makeExternalConnectionConfig( uri *url.URL, args cloud.ExternalStorageURIContext, -) (cloudpb.ExternalConnectionConfig, error) { - externalConnCfg := cloudpb.ExternalConnectionConfig{} +) (cloudpb.ExternalStorage_ExternalConnectionConfig, error) { + externalConnCfg := cloudpb.ExternalStorage_ExternalConnectionConfig{} if uri.Host == "" { return externalConnCfg, errors.Newf("host component of an external URI must refer to an "+ "existing External Connection object: %s", uri.String()) diff --git a/pkg/cloud/nodelocal/nodelocal_storage.go b/pkg/cloud/nodelocal/nodelocal_storage.go index a69b5dcde5e5..13d879ceb97b 100644 --- a/pkg/cloud/nodelocal/nodelocal_storage.go +++ b/pkg/cloud/nodelocal/nodelocal_storage.go @@ -57,8 +57,8 @@ func validateLocalFileURI(uri *url.URL) error { return nil } -func makeLocalFileConfig(uri *url.URL) (cloudpb.LocalFileConfig, error) { - localCfg := cloudpb.LocalFileConfig{} +func makeLocalFileConfig(uri *url.URL) (cloudpb.ExternalStorage_LocalFileConfig, error) { + localCfg := cloudpb.ExternalStorage_LocalFileConfig{} nodeID, err := strconv.Atoi(uri.Host) if err != nil { return localCfg, errors.Errorf("host component of nodelocal URI must be a node ID: %s", uri.String()) @@ -85,11 +85,11 @@ func parseLocalFileURI( } type localFileStorage struct { - cfg cloudpb.LocalFileConfig // contains un-prefixed filepath -- DO NOT use for I/O ops. - ioConf base.ExternalIODirConfig // server configurations for the ExternalStorage - base string // relative filepath prefixed with externalIODir, for I/O ops on this node. - blobClient blobs.BlobClient // inter-node file sharing service - settings *cluster.Settings // cluster settings for the ExternalStorage + cfg cloudpb.ExternalStorage_LocalFileConfig // contains un-prefixed filepath -- DO NOT use for I/O ops. + ioConf base.ExternalIODirConfig // server configurations for the ExternalStorage + base string // relative filepath prefixed with externalIODir, for I/O ops on this node. + blobClient blobs.BlobClient // inter-node file sharing service + settings *cluster.Settings // cluster settings for the ExternalStorage } var _ cloud.ExternalStorage = &localFileStorage{} From 5d51ab34156935fe1af3eb7a5f2b0941cc7f3d4d Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 10 Aug 2022 14:13:01 +0100 Subject: [PATCH 06/10] kvserver: add queue size metric to RaftTransport Currently, there is a RaftEnqueuePending metric in StoreMetrics which exposes the RaftTransport outgoing queue size. However, this is a per-Store metric, so it duplicates the same variable across all the stores. The metric should be tracked on a per-node/transport basis. This commit introduces such metric, and does the necessary plumbing to include it to other existing metrics, since it's the first metric in RaftTransport. Release note: None --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/raft_transport.go | 17 +++++++----- pkg/kv/kvserver/raft_transport_metrics.go | 33 +++++++++++++++++++++++ pkg/server/server.go | 1 + pkg/ts/catalog/chart_catalog.go | 9 +++++++ 5 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 pkg/kv/kvserver/raft_transport_metrics.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index e21d795b08e1..d3622726defd 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "raft_log_truncator.go", "raft_snapshot_queue.go", "raft_transport.go", + "raft_transport_metrics.go", "raft_truncator_replica.go", "replica.go", "replica_application_cmd.go", diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 3a2ff8fd4ced..fa8b8104e004 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -176,10 +176,10 @@ func (s raftTransportStatsSlice) Less(i, j int) bool { return s[i].nodeID < s[j] // which remote hung up. type RaftTransport struct { log.AmbientContext - st *cluster.Settings - tracer *tracing.Tracer - + st *cluster.Settings + tracer *tracing.Tracer stopper *stop.Stopper + metrics *RaftTransportMetrics queues [rpc.NumConnectionClasses]syncutil.IntMap // map[roachpb.NodeID]*chan *RaftMessageRequest stats [rpc.NumConnectionClasses]syncutil.IntMap // map[roachpb.NodeID]*raftTransportStats @@ -210,10 +210,10 @@ func NewRaftTransport( AmbientContext: ambient, st: st, tracer: tracer, - - stopper: stopper, - dialer: dialer, + stopper: stopper, + dialer: dialer, } + t.initMetrics() if grpcServer != nil { RegisterMultiRaftServer(grpcServer, t) @@ -301,6 +301,11 @@ func NewRaftTransport( return t } +// Metrics returns metrics tracking this transport. +func (t *RaftTransport) Metrics() *RaftTransportMetrics { + return t.metrics +} + func (t *RaftTransport) queuedMessageCount() int64 { var n int64 addLength := func(k int64, v unsafe.Pointer) bool { diff --git a/pkg/kv/kvserver/raft_transport_metrics.go b/pkg/kv/kvserver/raft_transport_metrics.go new file mode 100644 index 000000000000..c500807435b1 --- /dev/null +++ b/pkg/kv/kvserver/raft_transport_metrics.go @@ -0,0 +1,33 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import "github.com/cockroachdb/cockroach/pkg/util/metric" + +// RaftTransportMetrics is the set of metrics for a given RaftTransport. +type RaftTransportMetrics struct { + SendQueueSize *metric.Gauge +} + +func (t *RaftTransport) initMetrics() { + t.metrics = &RaftTransportMetrics{ + SendQueueSize: metric.NewFunctionalGauge(metric.Metadata{ + Name: "raft.transport.send-queue-size", + Help: `Number of pending outgoing messages in the Raft Transport queue. + +The queue is composed of multiple bounded channels associated with different +peers. The overall size of tens of thousands could indicate issues streaming +messages to at least one peer.`, + Measurement: "Messages", + Unit: metric.Unit_COUNT, + }, t.queuedMessageCount), + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index afbd951c6af1..98aa504b633c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -467,6 +467,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { raftTransport := kvserver.NewRaftTransport( cfg.AmbientCtx, st, cfg.AmbientCtx.Tracer, nodeDialer, grpcServer.Server, stopper, ) + registry.AddMetricStruct(raftTransport.Metrics()) ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer) ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */) diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index e4517eb923f6..6a7817d2f8df 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1882,6 +1882,15 @@ var charts = []sectionDescription{ }, }, }, + { + Organization: [][]string{{ReplicationLayer, "Raft", "Transport"}}, + Charts: []chartDescription{ + { + Title: "Send Queue Messages Count", + Metrics: []string{"raft.transport.send-queue-size"}, + }, + }, + }, { Organization: [][]string{{ReplicationLayer, "Ranges"}}, Charts: []chartDescription{ From 827d310d021bcf79fad578b1022de675d669d990 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 10 Aug 2022 14:40:23 +0100 Subject: [PATCH 07/10] kvserver: remove obsolete RaftEnqueuePending metric This metric was replaced by SendQueueSize of RaftTransportMetircs. Release note: None --- pkg/kv/kvserver/metrics.go | 12 ------------ pkg/kv/kvserver/store.go | 2 -- pkg/ts/catalog/chart_catalog.go | 4 ---- .../cluster-ui/src/store/nodes/nodes.fixtures.ts | 1 - 4 files changed, 19 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 4cdf0ee95943..12e60c230231 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -968,15 +968,6 @@ handling consumes writes. Unit: metric.Unit_BYTES, } - metaRaftEnqueuedPending = metric.Metadata{ - Name: "raft.enqueued.pending", - Help: `Number of pending outgoing messages in the Raft Transport queue. - -The queue is bounded in size, so instead of unbounded growth one would observe a -ceiling value in the tens of thousands.`, - Measurement: "Messages", - Unit: metric.Unit_COUNT, - } metaRaftCoalescedHeartbeatsPending = metric.Metadata{ Name: "raft.heartbeats.pending", Help: "Number of pending heartbeats and responses waiting to be coalesced", @@ -1753,7 +1744,6 @@ type StoreMetrics struct { RaftPausedFollowerCount *metric.Gauge - RaftEnqueuedPending *metric.Gauge RaftCoalescedHeartbeatsPending *metric.Gauge // Replica queue metrics. @@ -2264,8 +2254,6 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RaftPausedFollowerCount: metric.NewGauge(metaRaftFollowerPaused), - RaftEnqueuedPending: metric.NewGauge(metaRaftEnqueuedPending), - // This Gauge measures the number of heartbeats queued up just before // the queue is cleared, to avoid flapping wildly. RaftCoalescedHeartbeatsPending: metric.NewGauge(metaRaftCoalescedHeartbeatsPending), diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 79361866caea..aa72c7f8c7af 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3287,8 +3287,6 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { s.metrics.ClosedTimestampMaxBehindNanos.Update(nanos) } - s.metrics.RaftEnqueuedPending.Update(s.cfg.Transport.queuedMessageCount()) - return nil } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 6a7817d2f8df..f20ff4fed463 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1728,10 +1728,6 @@ var charts = []sectionDescription{ Title: "Commands Count", Metrics: []string{"raft.commandsapplied"}, }, - { - Title: "Enqueued", - Metrics: []string{"raft.enqueued.pending"}, - }, { Title: "Keys/Sec Avg.", Metrics: []string{"rebalancing.writespersecond"}, diff --git a/pkg/ui/workspaces/cluster-ui/src/store/nodes/nodes.fixtures.ts b/pkg/ui/workspaces/cluster-ui/src/store/nodes/nodes.fixtures.ts index 12b69609d915..7aacd79d06b3 100644 --- a/pkg/ui/workspaces/cluster-ui/src/store/nodes/nodes.fixtures.ts +++ b/pkg/ui/workspaces/cluster-ui/src/store/nodes/nodes.fixtures.ts @@ -806,7 +806,6 @@ export const getNodeStatus = () => { "queue.tsmaintenance.process.success": 3, "queue.tsmaintenance.processingnanos": 174301000, "raft.commandsapplied": 0, - "raft.enqueued.pending": 0, "raft.entrycache.accesses": 485, "raft.entrycache.bytes": 217172, "raft.entrycache.hits": 331, From 655ae287ab4800cef6d94c45b61bfe1262f152bb Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Wed, 10 Aug 2022 13:03:49 -0400 Subject: [PATCH 08/10] eval: don't always ignore error from ResolveFunctionByOID This addresses a comment from the review of #85656. Release note: None --- pkg/sql/sem/eval/cast.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/sql/sem/eval/cast.go b/pkg/sql/sem/eval/cast.go index 38ba96256787..e50554f1fe59 100644 --- a/pkg/sql/sem/eval/cast.go +++ b/pkg/sql/sem/eval/cast.go @@ -940,12 +940,15 @@ func performIntToOidCast( return tree.NewDOidWithTypeAndName(o, t, name), nil case oid.T_regproc, oid.T_regprocedure: + if v == 0 { + return tree.WrapAsZeroOid(t), nil + } name, _, err := res.ResolveFunctionByOID(ctx, oid.Oid(v)) if err != nil { - if v == 0 { - return tree.WrapAsZeroOid(t), nil + if errors.Is(err, tree.ErrFunctionUndefined) { + return tree.NewDOidWithType(o, t), nil //nolint:returnerrcheck } - return tree.NewDOidWithType(o, t), nil //nolint:returnerrcheck + return nil, err } return tree.NewDOidWithTypeAndName(o, t, name), nil From 49c90b7a93ca676e6482e92fc4731fe43e283445 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 5 Aug 2022 14:25:25 -0400 Subject: [PATCH 09/10] amazon,externalconn: add s3 support to External Connections This change registers s3 as a URI that can be represented as an External Connection. Most notably we take a page from the CDC book and switch the s3 parse function to check for invalid parameters, and configurations. This allows us to catch certain misconfiguration at the time we create the external connection. Informs: #84753 Release note (sql change): Users can now `CREATE EXTERNAL CONNECTION` to represent an s3 URI. --- .../nightlies/cloud_unit_tests_impl.sh | 2 +- pkg/BUILD.bazel | 3 + .../changefeedccl/sink_kafka_connection.go | 2 +- pkg/ccl/cloudccl/amazon/BUILD.bazel | 34 ++++ pkg/ccl/cloudccl/amazon/main_test.go | 35 ++++ pkg/ccl/cloudccl/amazon/s3_connection_test.go | 184 ++++++++++++++++++ .../testdata/create_drop_external_connection | 30 +++ .../create_drop_external_connection | 30 +++ pkg/ccl/cloudccl/gcp/BUILD.bazel | 1 + .../cloudccl/gcp/gcs_kms_connection_test.go | 6 +- pkg/cloud/amazon/BUILD.bazel | 3 + pkg/cloud/amazon/s3_connection.go | 46 +++++ pkg/cloud/amazon/s3_storage.go | 82 ++++++-- pkg/cloud/externalconn/connection_kms.go | 5 + pkg/cloud/externalconn/connection_storage.go | 5 + .../externalconn/connectionpb/connection.go | 6 +- .../connectionpb/connection.proto | 9 +- pkg/cloud/externalconn/providers/BUILD.bazel | 1 + pkg/cloud/externalconn/providers/registry.go | 1 + pkg/cloud/gcp/gcs_kms_connection.go | 2 +- pkg/cloud/nodelocal/nodelocal_connection.go | 3 +- pkg/cloud/nodelocal/nodelocal_storage.go | 3 +- pkg/cloud/uris.go | 18 +- pkg/sql/importer/import_stmt_test.go | 6 +- 24 files changed, 477 insertions(+), 40 deletions(-) create mode 100644 pkg/ccl/cloudccl/amazon/BUILD.bazel create mode 100644 pkg/ccl/cloudccl/amazon/main_test.go create mode 100644 pkg/ccl/cloudccl/amazon/s3_connection_test.go create mode 100644 pkg/cloud/amazon/s3_connection.go diff --git a/build/teamcity/cockroach/nightlies/cloud_unit_tests_impl.sh b/build/teamcity/cockroach/nightlies/cloud_unit_tests_impl.sh index 3ac3fea88356..43474b384656 100755 --- a/build/teamcity/cockroach/nightlies/cloud_unit_tests_impl.sh +++ b/build/teamcity/cockroach/nightlies/cloud_unit_tests_impl.sh @@ -26,7 +26,7 @@ log_into_aws exit_status=0 $BAZEL_BIN/pkg/cmd/bazci/bazci_/bazci --config=ci \ - test //pkg/cloud/gcp:gcp_test //pkg/cloud/amazon:amazon_test //pkg/ccl/cloudccl/gcp:gcp_test -- \ + test //pkg/cloud/gcp:gcp_test //pkg/cloud/amazon:amazon_test //pkg/ccl/cloudccl/gcp:gcp_test //pkg/ccl/cloudccl/amazon:amazon_test -- \ --test_env=GO_TEST_WRAP_TESTV=1 \ --test_env=GO_TEST_WRAP=1 \ --test_env=GO_TEST_JSON_OUTPUT_FILE=$GO_TEST_JSON_OUTPUT_FILE \ diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index ebf066ec9d49..178177aeff77 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -29,6 +29,7 @@ ALL_TESTS = [ "//pkg/ccl/changefeedccl/schemafeed:schemafeed_test", "//pkg/ccl/changefeedccl:changefeedccl_test", "//pkg/ccl/cliccl:cliccl_test", + "//pkg/ccl/cloudccl/amazon:amazon_test", "//pkg/ccl/cloudccl/externalconn:externalconn_test", "//pkg/ccl/cloudccl/gcp:gcp_test", "//pkg/ccl/importerccl:importerccl_test", @@ -662,6 +663,7 @@ GO_TARGETS = [ "//pkg/ccl/cliccl/cliflagsccl:cliflagsccl", "//pkg/ccl/cliccl:cliccl", "//pkg/ccl/cliccl:cliccl_test", + "//pkg/ccl/cloudccl/amazon:amazon_test", "//pkg/ccl/cloudccl/externalconn:externalconn_test", "//pkg/ccl/cloudccl/gcp:gcp_test", "//pkg/ccl/cmdccl/enc_utils:enc_utils", @@ -2080,6 +2082,7 @@ GET_X_DATA_TARGETS = [ "//pkg/ccl/changefeedccl/schemafeed/schematestutils:get_x_data", "//pkg/ccl/cliccl:get_x_data", "//pkg/ccl/cliccl/cliflagsccl:get_x_data", + "//pkg/ccl/cloudccl/amazon:get_x_data", "//pkg/ccl/cloudccl/externalconn:get_x_data", "//pkg/ccl/cloudccl/gcp:get_x_data", "//pkg/ccl/cmdccl/enc_utils:get_x_data", diff --git a/pkg/ccl/changefeedccl/sink_kafka_connection.go b/pkg/ccl/changefeedccl/sink_kafka_connection.go index 8327d74b9f31..03ea49e20d44 100644 --- a/pkg/ccl/changefeedccl/sink_kafka_connection.go +++ b/pkg/ccl/changefeedccl/sink_kafka_connection.go @@ -32,7 +32,7 @@ func parseAndValidateKafkaSinkURI( } connDetails := connectionpb.ConnectionDetails{ - Provider: connectionpb.ConnectionProvider_TypeKafka, + Provider: connectionpb.ConnectionProvider_kafka, Details: &connectionpb.ConnectionDetails_SimpleURI{ SimpleURI: &connectionpb.SimpleURI{ URI: uri.String(), diff --git a/pkg/ccl/cloudccl/amazon/BUILD.bazel b/pkg/ccl/cloudccl/amazon/BUILD.bazel new file mode 100644 index 000000000000..19d2545adf56 --- /dev/null +++ b/pkg/ccl/cloudccl/amazon/BUILD.bazel @@ -0,0 +1,34 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "amazon_test", + srcs = [ + "main_test.go", + "s3_connection_test.go", + ], + deps = [ + "//pkg/base", + "//pkg/ccl", + "//pkg/ccl/kvccl/kvtenantccl", + "//pkg/ccl/utilccl", + "//pkg/cloud", + "//pkg/cloud/amazon", + "//pkg/cloud/cloudpb", + "//pkg/cloud/externalconn/providers", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/skip", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "@com_github_aws_aws_sdk_go//aws/credentials", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/ccl/cloudccl/amazon/main_test.go b/pkg/ccl/cloudccl/amazon/main_test.go new file mode 100644 index 000000000000..0cb075ce35bb --- /dev/null +++ b/pkg/ccl/cloudccl/amazon/main_test.go @@ -0,0 +1,35 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package amazon_test + +import ( + "os" + "testing" + + _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + + securityassets.SetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/ccl/cloudccl/amazon/s3_connection_test.go b/pkg/ccl/cloudccl/amazon/s3_connection_test.go new file mode 100644 index 000000000000..802fb15bb8a5 --- /dev/null +++ b/pkg/ccl/cloudccl/amazon/s3_connection_test.go @@ -0,0 +1,184 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package amazon + +import ( + "context" + "fmt" + "net/url" + "os" + "testing" + + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl" + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/cloud/amazon" + "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" + _ "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/providers" // import External Connection providers. + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +func TestS3ExternalConnection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + dir, dirCleanupFn := testutils.TempDir(t) + defer dirCleanupFn() + + params := base.TestClusterArgs{} + params.ServerArgs.ExternalIODir = dir + + tc := testcluster.StartTestCluster(t, 1, params) + defer tc.Stopper().Stop(context.Background()) + + tc.WaitForNodeLiveness(t) + sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0]) + + // Setup some dummy data. + sqlDB.Exec(t, `CREATE DATABASE foo`) + sqlDB.Exec(t, `USE foo`) + sqlDB.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2), (3)`) + + createExternalConnection := func(externalConnectionName, uri string) { + sqlDB.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION '%s' AS '%s'`, externalConnectionName, uri)) + } + backupAndRestoreFromExternalConnection := func(backupExternalConnectionName string) { + backupURI := fmt.Sprintf("external://%s", backupExternalConnectionName) + sqlDB.Exec(t, fmt.Sprintf(`BACKUP DATABASE foo INTO '%s'`, backupURI)) + sqlDB.Exec(t, fmt.Sprintf(`RESTORE DATABASE foo FROM LATEST IN '%s' WITH new_db_name = bar`, backupURI)) + sqlDB.CheckQueryResults(t, `SELECT * FROM bar.foo`, [][]string{{"1"}, {"2"}, {"3"}}) + sqlDB.CheckQueryResults(t, `SELECT * FROM crdb_internal.invalid_objects`, [][]string{}) + sqlDB.Exec(t, `DROP DATABASE bar CASCADE`) + } + + // If environment credentials are not present, we want to + // skip all S3 tests, including auth-implicit, even though + // it is not used in auth-implicit. + creds, err := credentials.NewEnvCredentials().Get() + if err != nil { + skip.IgnoreLint(t, "No AWS credentials") + } + bucket := os.Getenv("AWS_S3_BUCKET") + if bucket == "" { + skip.IgnoreLint(t, "AWS_S3_BUCKET env var must be set") + } + + t.Run("auth-implicit", func(t *testing.T) { + // You can create an IAM that can access S3 + // in the AWS console, then set it up locally. + // https://docs.aws.com/cli/latest/userguide/cli-configure-role.html + // We only run this test if default role exists. + credentialsProvider := credentials.SharedCredentialsProvider{} + _, err := credentialsProvider.Retrieve() + if err != nil { + skip.IgnoreLintf(t, "we only run this test if a default role exists, "+ + "refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err) + } + + // Set the AUTH to implicit. + params := make(url.Values) + params.Add(cloud.AuthParam, cloud.AuthParamImplicit) + + s3URI := fmt.Sprintf("s3://%s/backup-ec-test-default?%s", bucket, params.Encode()) + ecName := "auth-implicit-s3" + createExternalConnection(ecName, s3URI) + backupAndRestoreFromExternalConnection(ecName) + }) + + t.Run("auth-specified", func(t *testing.T) { + s3URI := amazon.S3URI(bucket, "backup-ec-test-default", + &cloudpb.ExternalStorage_S3{ + AccessKey: creds.AccessKeyID, + Secret: creds.SecretAccessKey, + Region: "us-east-1", + Auth: cloud.AuthParamSpecified, + }, + ) + ecName := "auth-specified-s3" + createExternalConnection(ecName, s3URI) + backupAndRestoreFromExternalConnection(ecName) + }) + + // Tests that we can put an object with server side encryption specified. + t.Run("server-side-encryption", func(t *testing.T) { + // You can create an IAM that can access S3 + // in the AWS console, then set it up locally. + // https://docs.aws.com/cli/latest/userguide/cli-configure-role.html + // We only run this test if default role exists. + credentialsProvider := credentials.SharedCredentialsProvider{} + _, err := credentialsProvider.Retrieve() + if err != nil { + skip.IgnoreLintf(t, "we only run this test if a default role exists, "+ + "refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err) + } + + s3URI := amazon.S3URI(bucket, "backup-ec-test-sse-256", &cloudpb.ExternalStorage_S3{ + Region: "us-east-1", + Auth: cloud.AuthParamImplicit, + ServerEncMode: "AES256", + }) + ecName := "server-side-encryption-s3" + createExternalConnection(ecName, s3URI) + backupAndRestoreFromExternalConnection(ecName) + + v := os.Getenv("AWS_KMS_KEY_ARN") + if v == "" { + skip.IgnoreLint(t, "AWS_KMS_KEY_ARN env var must be set") + } + s3KMSURI := amazon.S3URI(bucket, "backup-ec-test-sse-kms", &cloudpb.ExternalStorage_S3{ + Region: "us-east-1", + Auth: cloud.AuthParamImplicit, + ServerEncMode: "aws:kms", + ServerKMSID: v, + }) + ecName = "server-side-encryption-kms-s3" + createExternalConnection(ecName, s3KMSURI) + backupAndRestoreFromExternalConnection(ecName) + }) + + t.Run("server-side-encryption-invalid-params", func(t *testing.T) { + // You can create an IAM that can access S3 + // in the AWS console, then set it up locally. + // https://docs.aws.com/cli/latest/userguide/cli-configure-role.html + // We only run this test if default role exists. + credentialsProvider := credentials.SharedCredentialsProvider{} + _, err := credentialsProvider.Retrieve() + if err != nil { + skip.IgnoreLintf(t, "we only run this test if a default role exists, "+ + "refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err) + } + + // Unsupported server side encryption option. + invalidS3URI := amazon.S3URI(bucket, "backup-ec-test-sse-256", &cloudpb.ExternalStorage_S3{ + Region: "us-east-1", + Auth: cloud.AuthParamImplicit, + ServerEncMode: "unsupported-algorithm", + }) + sqlDB.ExpectErr(t, + "unsupported server encryption mode unsupported-algorithm. Supported values are `aws:kms` and `AES256", + fmt.Sprintf(`BACKUP DATABASE foo INTO '%s'`, invalidS3URI)) + + invalidS3URI = amazon.S3URI(bucket, "backup-ec-test-sse-256", &cloudpb.ExternalStorage_S3{ + Region: "us-east-1", + Auth: cloud.AuthParamImplicit, + ServerEncMode: "aws:kms", + }) + + // Specify aws:kms encryption mode but don't specify kms ID. + sqlDB.ExpectErr(t, "AWS_SERVER_KMS_ID param must be set when using aws:kms server side encryption mode.", fmt.Sprintf(`BACKUP DATABASE foo INTO '%s'`, + invalidS3URI)) + }) +} diff --git a/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection b/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection index 0dca2fc492fc..2aae1d87d312 100644 --- a/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection +++ b/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection @@ -170,6 +170,36 @@ inspect-system-table subtest end +subtest basic-s3 + +exec-sql +CREATE EXTERNAL CONNECTION "foo-s3" AS 's3://foo/bar?AUTH=implicit&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456&ASSUME_ROLE=ronaldo,rashford,bruno'; +---- + +# Reject invalid S3 URIs. +exec-sql +CREATE EXTERNAL CONNECTION "missing-host-s3" AS 's3:///?AUTH=implicit'; +---- +pq: failed to construct External Connection details: empty host component; s3 URI must specify a target bucket + +exec-sql +CREATE EXTERNAL CONNECTION "invalid-params-s3" AS 's3://foo/bar?AUTH=implicit&INVALIDPARAM=baz'; +---- +pq: failed to construct External Connection details: unknown S3 query parameters: INVALIDPARAM + +inspect-system-table +---- +foo-s3 STORAGE {"provider": "s3", "simpleUri": {"uri": "s3://foo/bar?AUTH=implicit&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456&ASSUME_ROLE=ronaldo,rashford,bruno"}} + +exec-sql +DROP EXTERNAL CONNECTION "foo-s3"; +---- + +inspect-system-table +---- + +subtest end + subtest basic-kafka-sink exec-sql diff --git a/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection b/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection index 405dcde27c75..b3aba38f0eab 100644 --- a/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection +++ b/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection @@ -162,6 +162,36 @@ inspect-system-table subtest end +subtest basic-s3 + +exec-sql +CREATE EXTERNAL CONNECTION "foo-s3" AS 's3://foo/bar?AUTH=implicit&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456&ASSUME_ROLE=ronaldo,rashford,bruno'; +---- + +# Reject invalid S3 URIs. +exec-sql +CREATE EXTERNAL CONNECTION "missing-host-s3" AS 's3:///?AUTH=implicit'; +---- +pq: failed to construct External Connection details: empty host component; s3 URI must specify a target bucket + +exec-sql +CREATE EXTERNAL CONNECTION "invalid-params-s3" AS 's3://foo/bar?AUTH=implicit&INVALIDPARAM=baz'; +---- +pq: failed to construct External Connection details: unknown S3 query parameters: INVALIDPARAM + +inspect-system-table +---- +foo-s3 STORAGE {"provider": "s3", "simpleUri": {"uri": "s3://foo/bar?AUTH=implicit&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456&ASSUME_ROLE=ronaldo,rashford,bruno"}} + +exec-sql +DROP EXTERNAL CONNECTION "foo-s3"; +---- + +inspect-system-table +---- + +subtest end + subtest basic-kafka-sink exec-sql diff --git a/pkg/ccl/cloudccl/gcp/BUILD.bazel b/pkg/ccl/cloudccl/gcp/BUILD.bazel index e673ccd2188c..585f1609787d 100644 --- a/pkg/ccl/cloudccl/gcp/BUILD.bazel +++ b/pkg/ccl/cloudccl/gcp/BUILD.bazel @@ -16,6 +16,7 @@ go_test( "//pkg/cloud/cloudtestutils", "//pkg/cloud/externalconn/providers", "//pkg/cloud/gcp", + "//pkg/cloud/impl:cloudimpl", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", diff --git a/pkg/ccl/cloudccl/gcp/gcs_kms_connection_test.go b/pkg/ccl/cloudccl/gcp/gcs_kms_connection_test.go index 8b4aff76e249..1555a25c6c3d 100644 --- a/pkg/ccl/cloudccl/gcp/gcs_kms_connection_test.go +++ b/pkg/ccl/cloudccl/gcp/gcs_kms_connection_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud/cloudtestutils" _ "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/providers" // import External Connection providers. "github.com/cockroachdb/cockroach/pkg/cloud/gcp" + _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register ExternalStorage providers. "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -146,8 +147,7 @@ func TestGCSKMSExternalConnection(t *testing.T) { // ExternalStorage. This should be disallowed. backupExternalConnectionURI := fmt.Sprintf("external://%s", backupExternalConnectionName) sqlDB.ExpectErr(t, - "failed to load external connection object: expected External Connection object of type KMS but "+ - "'backup' is of type STORAGE", + "KMS cannot use object of type STORAGE", fmt.Sprintf(`BACKUP DATABASE foo INTO '%s' WITH kms='%s'`, backupExternalConnectionURI, backupExternalConnectionURI)) }) @@ -177,7 +177,6 @@ func TestGCSExternalConnectionAssumeRole(t *testing.T) { createExternalConnection := func(externalConnectionName, uri string) { sqlDB.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION '%s' AS '%s'`, externalConnectionName, uri)) - fmt.Printf("created external connection %s\n\n", externalConnectionName) } backupAndRestoreFromExternalConnection := func(backupExternalConnectionName, kmsExternalConnectionName string) { backupURI := fmt.Sprintf("external://%s", backupExternalConnectionName) @@ -191,7 +190,6 @@ func TestGCSExternalConnectionAssumeRole(t *testing.T) { disallowedBackupToExternalConnection := func(backupExternalConnectionName, kmsExternalConnectionName string) { backupURI := fmt.Sprintf("external://%s", backupExternalConnectionName) kmsURI := fmt.Sprintf("external://%s", kmsExternalConnectionName) - fmt.Printf("backing up into %s with kms %s\n\n", backupURI, kmsURI) sqlDB.ExpectErr(t, "(PermissionDenied|AccessDenied|PERMISSION_DENIED)", fmt.Sprintf(`BACKUP INTO '%s' WITH kms='%s'`, backupURI, kmsURI)) } diff --git a/pkg/cloud/amazon/BUILD.bazel b/pkg/cloud/amazon/BUILD.bazel index 615043d63766..d6ca46f02ba2 100644 --- a/pkg/cloud/amazon/BUILD.bazel +++ b/pkg/cloud/amazon/BUILD.bazel @@ -5,6 +5,7 @@ go_library( name = "amazon", srcs = [ "aws_kms.go", + "s3_connection.go", "s3_storage.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/cloud/amazon", @@ -13,6 +14,8 @@ go_library( "//pkg/base", "//pkg/cloud", "//pkg/cloud/cloudpb", + "//pkg/cloud/externalconn", + "//pkg/cloud/externalconn/connectionpb", "//pkg/server/telemetry", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/cloud/amazon/s3_connection.go b/pkg/cloud/amazon/s3_connection.go new file mode 100644 index 000000000000..ff875ca7917a --- /dev/null +++ b/pkg/cloud/amazon/s3_connection.go @@ -0,0 +1,46 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package amazon + +import ( + "context" + "net/url" + + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" + "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/connectionpb" +) + +func parseAndValidateS3ConnectionURI( + _ context.Context, uri *url.URL, +) (externalconn.ExternalConnection, error) { + // Parse and validate the S3 URL. + if _, err := parseS3URL(cloud.ExternalStorageURIContext{}, uri); err != nil { + return nil, err + } + + connDetails := connectionpb.ConnectionDetails{ + Provider: connectionpb.ConnectionProvider_s3, + Details: &connectionpb.ConnectionDetails_SimpleURI{ + SimpleURI: &connectionpb.SimpleURI{ + URI: uri.String(), + }, + }, + } + return externalconn.NewExternalConnection(connDetails), nil +} + +func init() { + externalconn.RegisterConnectionDetailsFromURIFactory( + scheme, + parseAndValidateS3ConnectionURI, + ) +} diff --git a/pkg/cloud/amazon/s3_storage.go b/pkg/cloud/amazon/s3_storage.go index 53257103cb4b..9a22521916ec 100644 --- a/pkg/cloud/amazon/s3_storage.go +++ b/pkg/cloud/amazon/s3_storage.go @@ -76,6 +76,9 @@ const ( // AssumeRoleParam is the query parameter for the chain of AWS Role ARNs to // assume. AssumeRoleParam = "ASSUME_ROLE" + + // scheme component of an S3 URI. + scheme = "s3" ) type s3Storage struct { @@ -211,21 +214,26 @@ func S3URI(bucket, path string, conf *cloudpb.ExternalStorage_S3) string { } func parseS3URL(_ cloud.ExternalStorageURIContext, uri *url.URL) (cloudpb.ExternalStorage, error) { + s3URL := cloud.ConsumeURL{URL: uri} conf := cloudpb.ExternalStorage{} + if s3URL.Host == "" { + return conf, errors.New("empty host component; s3 URI must specify a target bucket") + } + conf.Provider = cloudpb.ExternalStorageProvider_s3 - assumeRole, delegateRoles := cloud.ParseRoleString(uri.Query().Get(AssumeRoleParam)) + assumeRole, delegateRoles := cloud.ParseRoleString(s3URL.ConsumeParam(AssumeRoleParam)) conf.S3Config = &cloudpb.ExternalStorage_S3{ - Bucket: uri.Host, - Prefix: uri.Path, - AccessKey: uri.Query().Get(AWSAccessKeyParam), - Secret: uri.Query().Get(AWSSecretParam), - TempToken: uri.Query().Get(AWSTempTokenParam), - Endpoint: uri.Query().Get(AWSEndpointParam), - Region: uri.Query().Get(S3RegionParam), - Auth: uri.Query().Get(cloud.AuthParam), - ServerEncMode: uri.Query().Get(AWSServerSideEncryptionMode), - ServerKMSID: uri.Query().Get(AWSServerSideEncryptionKMSID), - StorageClass: uri.Query().Get(S3StorageClassParam), + Bucket: s3URL.Host, + Prefix: s3URL.Path, + AccessKey: s3URL.ConsumeParam(AWSAccessKeyParam), + Secret: s3URL.ConsumeParam(AWSSecretParam), + TempToken: s3URL.ConsumeParam(AWSTempTokenParam), + Endpoint: s3URL.ConsumeParam(AWSEndpointParam), + Region: s3URL.ConsumeParam(S3RegionParam), + Auth: s3URL.ConsumeParam(cloud.AuthParam), + ServerEncMode: s3URL.ConsumeParam(AWSServerSideEncryptionMode), + ServerKMSID: s3URL.ConsumeParam(AWSServerSideEncryptionKMSID), + StorageClass: s3URL.ConsumeParam(S3StorageClassParam), RoleARN: assumeRole, DelegateRoleARNs: delegateRoles, /* NB: additions here should also update s3QueryParams() serializer */ @@ -239,6 +247,54 @@ func parseS3URL(_ cloud.ExternalStorageURIContext, uri *url.URL) (cloudpb.Extern // contain spaces. We can convert any space characters we see to + // characters to recover the original secret. conf.S3Config.Secret = strings.Replace(conf.S3Config.Secret, " ", "+", -1) + + // Validate that all the passed in parameters are supported. + if unknownParams := s3URL.RemainingQueryParams(); len(unknownParams) > 0 { + return cloudpb.ExternalStorage{}, errors.Errorf( + `unknown S3 query parameters: %s`, strings.Join(unknownParams, ", ")) + } + + // Validate the authentication parameters are set correctly. + switch conf.S3Config.Auth { + case "", cloud.AuthParamSpecified: + if conf.S3Config.AccessKey == "" { + return cloudpb.ExternalStorage{}, errors.Errorf( + "%s is set to '%s', but %s is not set", + cloud.AuthParam, + cloud.AuthParamSpecified, + AWSAccessKeyParam, + ) + } + if conf.S3Config.Secret == "" { + return cloudpb.ExternalStorage{}, errors.Errorf( + "%s is set to '%s', but %s is not set", + cloud.AuthParam, + cloud.AuthParamSpecified, + AWSSecretParam, + ) + } + case cloud.AuthParamImplicit: + default: + return cloudpb.ExternalStorage{}, errors.Errorf("unsupported value %s for %s", + conf.S3Config.Auth, cloud.AuthParam) + } + + // Ensure that a KMS ID is specified if server side encryption is set to use + // KMS. + if conf.S3Config.ServerEncMode != "" { + switch conf.S3Config.ServerEncMode { + case string(aes256Enc): + case string(kmsEnc): + if conf.S3Config.ServerKMSID == "" { + return cloudpb.ExternalStorage{}, errors.New("AWS_SERVER_KMS_ID param must be set" + + " when using aws:kms server side encryption mode.") + } + default: + return cloudpb.ExternalStorage{}, errors.Newf("unsupported server encryption mode %s. "+ + "Supported values are `aws:kms` and `AES256`.", conf.S3Config.ServerEncMode) + } + } + return conf, nil } @@ -695,5 +751,5 @@ func s3ErrDelay(err error) time.Duration { func init() { cloud.RegisterExternalStorageProvider(cloudpb.ExternalStorageProvider_s3, - parseS3URL, MakeS3Storage, cloud.RedactedParams(AWSSecretParam, AWSTempTokenParam), "s3") + parseS3URL, MakeS3Storage, cloud.RedactedParams(AWSSecretParam, AWSTempTokenParam), scheme) } diff --git a/pkg/cloud/externalconn/connection_kms.go b/pkg/cloud/externalconn/connection_kms.go index a2f35af29e0a..fb6247fc83eb 100644 --- a/pkg/cloud/externalconn/connection_kms.go +++ b/pkg/cloud/externalconn/connection_kms.go @@ -46,6 +46,11 @@ func makeExternalConnectionKMS( return nil, errors.Wrap(err, "failed to load external connection object") } + // Sanity check that we are connecting to a KMS object. + if ec.ConnectionType() != connectionpb.TypeKMS { + return nil, errors.Newf("KMS cannot use object of type %s", ec.ConnectionType().String()) + } + // Construct a KMS handle for the underlying resource represented by the // external connection object. switch d := ec.ConnectionProto().Details.(type) { diff --git a/pkg/cloud/externalconn/connection_storage.go b/pkg/cloud/externalconn/connection_storage.go index 46c2ee128af2..902563ed9ec6 100644 --- a/pkg/cloud/externalconn/connection_storage.go +++ b/pkg/cloud/externalconn/connection_storage.go @@ -74,6 +74,11 @@ func makeExternalConnectionStorage( return nil, errors.Wrap(err, "failed to load external connection object") } + // Sanity check that we are connecting to a STORAGE object. + if ec.ConnectionType() != connectionpb.TypeStorage { + return nil, errors.Newf("STORAGE cannot use object of type %s", ec.ConnectionType().String()) + } + // Construct an ExternalStorage handle for the underlying resource represented // by the external connection object. switch d := ec.ConnectionProto().Details.(type) { diff --git a/pkg/cloud/externalconn/connectionpb/connection.go b/pkg/cloud/externalconn/connectionpb/connection.go index 27904c7d735a..dc8248bedc81 100644 --- a/pkg/cloud/externalconn/connectionpb/connection.go +++ b/pkg/cloud/externalconn/connectionpb/connection.go @@ -15,11 +15,11 @@ import "github.com/cockroachdb/errors" // Type returns the ConnectionType of the receiver. func (d *ConnectionDetails) Type() ConnectionType { switch d.Provider { - case ConnectionProvider_TypeNodelocal: + case ConnectionProvider_nodelocal, ConnectionProvider_s3: return TypeStorage - case ConnectionProvider_TypeGSKMS: + case ConnectionProvider_gs_kms: return TypeKMS - case ConnectionProvider_TypeKafka: + case ConnectionProvider_kafka: return TypeStorage default: panic(errors.AssertionFailedf("ConnectionDetails.Type called on a details with an unknown type: %T", d.Provider.String())) diff --git a/pkg/cloud/externalconn/connectionpb/connection.proto b/pkg/cloud/externalconn/connectionpb/connection.proto index 7f9098e17ddf..c53121e2c806 100644 --- a/pkg/cloud/externalconn/connectionpb/connection.proto +++ b/pkg/cloud/externalconn/connectionpb/connection.proto @@ -15,16 +15,17 @@ option go_package = "connectionpb"; import "gogoproto/gogo.proto"; enum ConnectionProvider { - Unknown = 0 [(gogoproto.enumvalue_customname) = "TypeUnspecified"]; + Unknown = 0; // External Storage providers. - nodelocal = 1 [(gogoproto.enumvalue_customname) = "TypeNodelocal"]; + nodelocal = 1; + s3 = 4; // KMS providers. - gs_kms = 2 [(gogoproto.enumvalue_customname) = "TypeGSKMS"]; + gs_kms = 2; // Sink providers. - kafka = 3 [(gogoproto.enumvalue_customname) = "TypeKafka"]; + kafka = 3; } // ConnectionType is the type of the External Connection object. diff --git a/pkg/cloud/externalconn/providers/BUILD.bazel b/pkg/cloud/externalconn/providers/BUILD.bazel index 6fda41be3e91..478c7a0f6498 100644 --- a/pkg/cloud/externalconn/providers/BUILD.bazel +++ b/pkg/cloud/externalconn/providers/BUILD.bazel @@ -7,6 +7,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/providers", visibility = ["//visibility:public"], deps = [ + "//pkg/cloud/amazon", "//pkg/cloud/gcp", "//pkg/cloud/nodelocal", ], diff --git a/pkg/cloud/externalconn/providers/registry.go b/pkg/cloud/externalconn/providers/registry.go index 8ef0c1599a80..6563e5ba9348 100644 --- a/pkg/cloud/externalconn/providers/registry.go +++ b/pkg/cloud/externalconn/providers/registry.go @@ -17,6 +17,7 @@ package providers import ( // import all the cloud provider packages to register them. + _ "github.com/cockroachdb/cockroach/pkg/cloud/amazon" _ "github.com/cockroachdb/cockroach/pkg/cloud/gcp" _ "github.com/cockroachdb/cockroach/pkg/cloud/nodelocal" ) diff --git a/pkg/cloud/gcp/gcs_kms_connection.go b/pkg/cloud/gcp/gcs_kms_connection.go index 1812325b7372..8f13eb92f55d 100644 --- a/pkg/cloud/gcp/gcs_kms_connection.go +++ b/pkg/cloud/gcp/gcs_kms_connection.go @@ -26,7 +26,7 @@ func parseAndValidateGCSKMSConnectionURI( } connDetails := connectionpb.ConnectionDetails{ - Provider: connectionpb.ConnectionProvider_TypeGSKMS, + Provider: connectionpb.ConnectionProvider_gs_kms, Details: &connectionpb.ConnectionDetails_SimpleURI{ SimpleURI: &connectionpb.SimpleURI{ URI: uri.String(), diff --git a/pkg/cloud/nodelocal/nodelocal_connection.go b/pkg/cloud/nodelocal/nodelocal_connection.go index 911267256833..a714066f6757 100644 --- a/pkg/cloud/nodelocal/nodelocal_connection.go +++ b/pkg/cloud/nodelocal/nodelocal_connection.go @@ -27,7 +27,7 @@ func parseAndValidateLocalFileConnectionURI( } connDetails := connectionpb.ConnectionDetails{ - Provider: connectionpb.ConnectionProvider_TypeNodelocal, + Provider: connectionpb.ConnectionProvider_nodelocal, Details: &connectionpb.ConnectionDetails_SimpleURI{ SimpleURI: &connectionpb.SimpleURI{ URI: uri.String(), @@ -38,7 +38,6 @@ func parseAndValidateLocalFileConnectionURI( } func init() { - const scheme = "nodelocal" externalconn.RegisterConnectionDetailsFromURIFactory( scheme, parseAndValidateLocalFileConnectionURI, diff --git a/pkg/cloud/nodelocal/nodelocal_storage.go b/pkg/cloud/nodelocal/nodelocal_storage.go index 13d879ceb97b..1479cb668e07 100644 --- a/pkg/cloud/nodelocal/nodelocal_storage.go +++ b/pkg/cloud/nodelocal/nodelocal_storage.go @@ -34,6 +34,8 @@ import ( "google.golang.org/grpc/status" ) +const scheme = "nodelocal" + func validateLocalFileURI(uri *url.URL) error { if uri.Host == "" { return errors.Newf( @@ -231,7 +233,6 @@ func (*localFileStorage) Close() error { } func init() { - const scheme = "nodelocal" cloud.RegisterExternalStorageProvider(cloudpb.ExternalStorageProvider_nodelocal, parseLocalFileURI, makeLocalFileStorage, cloud.RedactedParams(), scheme) } diff --git a/pkg/cloud/uris.go b/pkg/cloud/uris.go index 1722095ea4fa..8b6dd86b0e3a 100644 --- a/pkg/cloud/uris.go +++ b/pkg/cloud/uris.go @@ -96,14 +96,16 @@ func ParseRoleString(roleString string) (assumeRole string, delegateRoles []stri return assumeRole, delegateRoles } -// consumeURL is a helper struct which for "consuming" URL query +// ConsumeURL is a helper struct which for "consuming" URL query // parameters from the underlying URL. -type consumeURL struct { +type ConsumeURL struct { *url.URL q url.Values } -func (u *consumeURL) consumeParam(p string) string { +// ConsumeParam returns the value of the parameter p from the underlying URL, +// and deletes the parameter from the URL. +func (u *ConsumeURL) ConsumeParam(p string) string { if u.q == nil { u.q = u.Query() } @@ -112,7 +114,9 @@ func (u *consumeURL) consumeParam(p string) string { return v } -func (u *consumeURL) remainingQueryParams() (res []string) { +// RemainingQueryParams returns the query parameters that have not been consumed +// from the underlying URL. +func (u *ConsumeURL) RemainingQueryParams() (res []string) { if u.q == nil { u.q = u.Query() } @@ -126,12 +130,12 @@ func (u *consumeURL) remainingQueryParams() (res []string) { // are not part of the supportedParameters. func ValidateQueryParameters(uri url.URL, supportedParameters []string) error { u := uri - validateURL := consumeURL{URL: &u} + validateURL := ConsumeURL{URL: &u} for _, option := range supportedParameters { - validateURL.consumeParam(option) + validateURL.ConsumeParam(option) } - if unknownParams := validateURL.remainingQueryParams(); len(unknownParams) > 0 { + if unknownParams := validateURL.RemainingQueryParams(); len(unknownParams) > 0 { return errors.Errorf( `unknown query parameters: %s for %s URI`, strings.Join(unknownParams, ", "), uri.Scheme) diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 5245b18aa0d8..9eafde101016 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -2685,12 +2685,12 @@ func TestURIRequiresAdminRole(t *testing.T) { }, { name: "s3-specified", - uri: "s3://foo/bar?AUTH=specified", + uri: "s3://foo/bar?AUTH=specified&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456", requiresAdmin: false, }, { name: "s3-custom", - uri: "s3://foo/bar?AUTH=specified&AWS_ENDPOINT=baz", + uri: "s3://foo/bar?AUTH=specified&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456&AWS_ENDPOINT=baz", requiresAdmin: true, }, { @@ -2736,7 +2736,7 @@ func TestURIRequiresAdminRole(t *testing.T) { t.Run(tc.name+"-direct", func(t *testing.T) { conf, err := cloud.ExternalStorageConfFromURI(tc.uri, username.RootUserName()) require.NoError(t, err) - require.Equal(t, conf.AccessIsWithExplicitAuth(), !tc.requiresAdmin) + require.Equal(t, !tc.requiresAdmin, conf.AccessIsWithExplicitAuth()) }) } } From caec19c18b9960f0f94ac07e4114e6ffcfd349a4 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Wed, 10 Aug 2022 14:20:42 -0400 Subject: [PATCH 10/10] systemschema_test: fix timestamp stripping regex The test in this package was flaky because hlc.Timestamp values inside descriptors (modification time, for instance) with a logical component would be improperly redacted. This commit fixes this. Fixes #85799. Release note: None --- pkg/sql/catalog/systemschema_test/systemschema_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sql/catalog/systemschema_test/systemschema_test.go b/pkg/sql/catalog/systemschema_test/systemschema_test.go index 90dadc306915..6c8067ebc606 100644 --- a/pkg/sql/catalog/systemschema_test/systemschema_test.go +++ b/pkg/sql/catalog/systemschema_test/systemschema_test.go @@ -43,7 +43,7 @@ func TestValidateSystemSchemaAfterBootStrap(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - wallTimeRE, err := regexp.Compile(`"wallTime":"\d*"`) + hlcRE, err := regexp.Compile(`"wallTime":"\d*"(,"logical":\d*)?`) require.NoError(t, err) datadriven.Walk(t, testutils.TestDataPath(t, "bootstrap"), func(t *testing.T, path string) { @@ -114,7 +114,7 @@ func TestValidateSystemSchemaAfterBootStrap(t *testing.T) { require.NotNilf(t, ev.Desc, "unexpectedly missing descriptor in %s", ev) str, err := je.MarshalToString(ev.Desc) require.NoError(t, err, "unexpected descriptor marshal error") - str = wallTimeRE.ReplaceAllString(str, `"wallTime":"0"`) + str = hlcRE.ReplaceAllString(str, `"wallTime":"0"`) sb.WriteString(str) sb.WriteRune('\n') }