From 49c90b7a93ca676e6482e92fc4731fe43e283445 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 5 Aug 2022 14:25:25 -0400 Subject: [PATCH] amazon,externalconn: add s3 support to External Connections This change registers s3 as a URI that can be represented as an External Connection. Most notably we take a page from the CDC book and switch the s3 parse function to check for invalid parameters, and configurations. This allows us to catch certain misconfiguration at the time we create the external connection. Informs: #84753 Release note (sql change): Users can now `CREATE EXTERNAL CONNECTION` to represent an s3 URI. --- .../nightlies/cloud_unit_tests_impl.sh | 2 +- pkg/BUILD.bazel | 3 + .../changefeedccl/sink_kafka_connection.go | 2 +- pkg/ccl/cloudccl/amazon/BUILD.bazel | 34 ++++ pkg/ccl/cloudccl/amazon/main_test.go | 35 ++++ pkg/ccl/cloudccl/amazon/s3_connection_test.go | 184 ++++++++++++++++++ .../testdata/create_drop_external_connection | 30 +++ .../create_drop_external_connection | 30 +++ pkg/ccl/cloudccl/gcp/BUILD.bazel | 1 + .../cloudccl/gcp/gcs_kms_connection_test.go | 6 +- pkg/cloud/amazon/BUILD.bazel | 3 + pkg/cloud/amazon/s3_connection.go | 46 +++++ pkg/cloud/amazon/s3_storage.go | 82 ++++++-- pkg/cloud/externalconn/connection_kms.go | 5 + pkg/cloud/externalconn/connection_storage.go | 5 + .../externalconn/connectionpb/connection.go | 6 +- .../connectionpb/connection.proto | 9 +- pkg/cloud/externalconn/providers/BUILD.bazel | 1 + pkg/cloud/externalconn/providers/registry.go | 1 + pkg/cloud/gcp/gcs_kms_connection.go | 2 +- pkg/cloud/nodelocal/nodelocal_connection.go | 3 +- pkg/cloud/nodelocal/nodelocal_storage.go | 3 +- pkg/cloud/uris.go | 18 +- pkg/sql/importer/import_stmt_test.go | 6 +- 24 files changed, 477 insertions(+), 40 deletions(-) create mode 100644 pkg/ccl/cloudccl/amazon/BUILD.bazel create mode 100644 pkg/ccl/cloudccl/amazon/main_test.go create mode 100644 pkg/ccl/cloudccl/amazon/s3_connection_test.go create mode 100644 pkg/cloud/amazon/s3_connection.go diff --git a/build/teamcity/cockroach/nightlies/cloud_unit_tests_impl.sh b/build/teamcity/cockroach/nightlies/cloud_unit_tests_impl.sh index 3ac3fea88356..43474b384656 100755 --- a/build/teamcity/cockroach/nightlies/cloud_unit_tests_impl.sh +++ b/build/teamcity/cockroach/nightlies/cloud_unit_tests_impl.sh @@ -26,7 +26,7 @@ log_into_aws exit_status=0 $BAZEL_BIN/pkg/cmd/bazci/bazci_/bazci --config=ci \ - test //pkg/cloud/gcp:gcp_test //pkg/cloud/amazon:amazon_test //pkg/ccl/cloudccl/gcp:gcp_test -- \ + test //pkg/cloud/gcp:gcp_test //pkg/cloud/amazon:amazon_test //pkg/ccl/cloudccl/gcp:gcp_test //pkg/ccl/cloudccl/amazon:amazon_test -- \ --test_env=GO_TEST_WRAP_TESTV=1 \ --test_env=GO_TEST_WRAP=1 \ --test_env=GO_TEST_JSON_OUTPUT_FILE=$GO_TEST_JSON_OUTPUT_FILE \ diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index ebf066ec9d49..178177aeff77 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -29,6 +29,7 @@ ALL_TESTS = [ "//pkg/ccl/changefeedccl/schemafeed:schemafeed_test", "//pkg/ccl/changefeedccl:changefeedccl_test", "//pkg/ccl/cliccl:cliccl_test", + "//pkg/ccl/cloudccl/amazon:amazon_test", "//pkg/ccl/cloudccl/externalconn:externalconn_test", "//pkg/ccl/cloudccl/gcp:gcp_test", "//pkg/ccl/importerccl:importerccl_test", @@ -662,6 +663,7 @@ GO_TARGETS = [ "//pkg/ccl/cliccl/cliflagsccl:cliflagsccl", "//pkg/ccl/cliccl:cliccl", "//pkg/ccl/cliccl:cliccl_test", + "//pkg/ccl/cloudccl/amazon:amazon_test", "//pkg/ccl/cloudccl/externalconn:externalconn_test", "//pkg/ccl/cloudccl/gcp:gcp_test", "//pkg/ccl/cmdccl/enc_utils:enc_utils", @@ -2080,6 +2082,7 @@ GET_X_DATA_TARGETS = [ "//pkg/ccl/changefeedccl/schemafeed/schematestutils:get_x_data", "//pkg/ccl/cliccl:get_x_data", "//pkg/ccl/cliccl/cliflagsccl:get_x_data", + "//pkg/ccl/cloudccl/amazon:get_x_data", "//pkg/ccl/cloudccl/externalconn:get_x_data", "//pkg/ccl/cloudccl/gcp:get_x_data", "//pkg/ccl/cmdccl/enc_utils:get_x_data", diff --git a/pkg/ccl/changefeedccl/sink_kafka_connection.go b/pkg/ccl/changefeedccl/sink_kafka_connection.go index 8327d74b9f31..03ea49e20d44 100644 --- a/pkg/ccl/changefeedccl/sink_kafka_connection.go +++ b/pkg/ccl/changefeedccl/sink_kafka_connection.go @@ -32,7 +32,7 @@ func parseAndValidateKafkaSinkURI( } connDetails := connectionpb.ConnectionDetails{ - Provider: connectionpb.ConnectionProvider_TypeKafka, + Provider: connectionpb.ConnectionProvider_kafka, Details: &connectionpb.ConnectionDetails_SimpleURI{ SimpleURI: &connectionpb.SimpleURI{ URI: uri.String(), diff --git a/pkg/ccl/cloudccl/amazon/BUILD.bazel b/pkg/ccl/cloudccl/amazon/BUILD.bazel new file mode 100644 index 000000000000..19d2545adf56 --- /dev/null +++ b/pkg/ccl/cloudccl/amazon/BUILD.bazel @@ -0,0 +1,34 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "amazon_test", + srcs = [ + "main_test.go", + "s3_connection_test.go", + ], + deps = [ + "//pkg/base", + "//pkg/ccl", + "//pkg/ccl/kvccl/kvtenantccl", + "//pkg/ccl/utilccl", + "//pkg/cloud", + "//pkg/cloud/amazon", + "//pkg/cloud/cloudpb", + "//pkg/cloud/externalconn/providers", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/skip", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "@com_github_aws_aws_sdk_go//aws/credentials", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/ccl/cloudccl/amazon/main_test.go b/pkg/ccl/cloudccl/amazon/main_test.go new file mode 100644 index 000000000000..0cb075ce35bb --- /dev/null +++ b/pkg/ccl/cloudccl/amazon/main_test.go @@ -0,0 +1,35 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package amazon_test + +import ( + "os" + "testing" + + _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + + securityassets.SetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/ccl/cloudccl/amazon/s3_connection_test.go b/pkg/ccl/cloudccl/amazon/s3_connection_test.go new file mode 100644 index 000000000000..802fb15bb8a5 --- /dev/null +++ b/pkg/ccl/cloudccl/amazon/s3_connection_test.go @@ -0,0 +1,184 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package amazon + +import ( + "context" + "fmt" + "net/url" + "os" + "testing" + + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl" + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/cloud/amazon" + "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" + _ "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/providers" // import External Connection providers. + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +func TestS3ExternalConnection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + dir, dirCleanupFn := testutils.TempDir(t) + defer dirCleanupFn() + + params := base.TestClusterArgs{} + params.ServerArgs.ExternalIODir = dir + + tc := testcluster.StartTestCluster(t, 1, params) + defer tc.Stopper().Stop(context.Background()) + + tc.WaitForNodeLiveness(t) + sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0]) + + // Setup some dummy data. + sqlDB.Exec(t, `CREATE DATABASE foo`) + sqlDB.Exec(t, `USE foo`) + sqlDB.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2), (3)`) + + createExternalConnection := func(externalConnectionName, uri string) { + sqlDB.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION '%s' AS '%s'`, externalConnectionName, uri)) + } + backupAndRestoreFromExternalConnection := func(backupExternalConnectionName string) { + backupURI := fmt.Sprintf("external://%s", backupExternalConnectionName) + sqlDB.Exec(t, fmt.Sprintf(`BACKUP DATABASE foo INTO '%s'`, backupURI)) + sqlDB.Exec(t, fmt.Sprintf(`RESTORE DATABASE foo FROM LATEST IN '%s' WITH new_db_name = bar`, backupURI)) + sqlDB.CheckQueryResults(t, `SELECT * FROM bar.foo`, [][]string{{"1"}, {"2"}, {"3"}}) + sqlDB.CheckQueryResults(t, `SELECT * FROM crdb_internal.invalid_objects`, [][]string{}) + sqlDB.Exec(t, `DROP DATABASE bar CASCADE`) + } + + // If environment credentials are not present, we want to + // skip all S3 tests, including auth-implicit, even though + // it is not used in auth-implicit. + creds, err := credentials.NewEnvCredentials().Get() + if err != nil { + skip.IgnoreLint(t, "No AWS credentials") + } + bucket := os.Getenv("AWS_S3_BUCKET") + if bucket == "" { + skip.IgnoreLint(t, "AWS_S3_BUCKET env var must be set") + } + + t.Run("auth-implicit", func(t *testing.T) { + // You can create an IAM that can access S3 + // in the AWS console, then set it up locally. + // https://docs.aws.com/cli/latest/userguide/cli-configure-role.html + // We only run this test if default role exists. + credentialsProvider := credentials.SharedCredentialsProvider{} + _, err := credentialsProvider.Retrieve() + if err != nil { + skip.IgnoreLintf(t, "we only run this test if a default role exists, "+ + "refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err) + } + + // Set the AUTH to implicit. + params := make(url.Values) + params.Add(cloud.AuthParam, cloud.AuthParamImplicit) + + s3URI := fmt.Sprintf("s3://%s/backup-ec-test-default?%s", bucket, params.Encode()) + ecName := "auth-implicit-s3" + createExternalConnection(ecName, s3URI) + backupAndRestoreFromExternalConnection(ecName) + }) + + t.Run("auth-specified", func(t *testing.T) { + s3URI := amazon.S3URI(bucket, "backup-ec-test-default", + &cloudpb.ExternalStorage_S3{ + AccessKey: creds.AccessKeyID, + Secret: creds.SecretAccessKey, + Region: "us-east-1", + Auth: cloud.AuthParamSpecified, + }, + ) + ecName := "auth-specified-s3" + createExternalConnection(ecName, s3URI) + backupAndRestoreFromExternalConnection(ecName) + }) + + // Tests that we can put an object with server side encryption specified. + t.Run("server-side-encryption", func(t *testing.T) { + // You can create an IAM that can access S3 + // in the AWS console, then set it up locally. + // https://docs.aws.com/cli/latest/userguide/cli-configure-role.html + // We only run this test if default role exists. + credentialsProvider := credentials.SharedCredentialsProvider{} + _, err := credentialsProvider.Retrieve() + if err != nil { + skip.IgnoreLintf(t, "we only run this test if a default role exists, "+ + "refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err) + } + + s3URI := amazon.S3URI(bucket, "backup-ec-test-sse-256", &cloudpb.ExternalStorage_S3{ + Region: "us-east-1", + Auth: cloud.AuthParamImplicit, + ServerEncMode: "AES256", + }) + ecName := "server-side-encryption-s3" + createExternalConnection(ecName, s3URI) + backupAndRestoreFromExternalConnection(ecName) + + v := os.Getenv("AWS_KMS_KEY_ARN") + if v == "" { + skip.IgnoreLint(t, "AWS_KMS_KEY_ARN env var must be set") + } + s3KMSURI := amazon.S3URI(bucket, "backup-ec-test-sse-kms", &cloudpb.ExternalStorage_S3{ + Region: "us-east-1", + Auth: cloud.AuthParamImplicit, + ServerEncMode: "aws:kms", + ServerKMSID: v, + }) + ecName = "server-side-encryption-kms-s3" + createExternalConnection(ecName, s3KMSURI) + backupAndRestoreFromExternalConnection(ecName) + }) + + t.Run("server-side-encryption-invalid-params", func(t *testing.T) { + // You can create an IAM that can access S3 + // in the AWS console, then set it up locally. + // https://docs.aws.com/cli/latest/userguide/cli-configure-role.html + // We only run this test if default role exists. + credentialsProvider := credentials.SharedCredentialsProvider{} + _, err := credentialsProvider.Retrieve() + if err != nil { + skip.IgnoreLintf(t, "we only run this test if a default role exists, "+ + "refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err) + } + + // Unsupported server side encryption option. + invalidS3URI := amazon.S3URI(bucket, "backup-ec-test-sse-256", &cloudpb.ExternalStorage_S3{ + Region: "us-east-1", + Auth: cloud.AuthParamImplicit, + ServerEncMode: "unsupported-algorithm", + }) + sqlDB.ExpectErr(t, + "unsupported server encryption mode unsupported-algorithm. Supported values are `aws:kms` and `AES256", + fmt.Sprintf(`BACKUP DATABASE foo INTO '%s'`, invalidS3URI)) + + invalidS3URI = amazon.S3URI(bucket, "backup-ec-test-sse-256", &cloudpb.ExternalStorage_S3{ + Region: "us-east-1", + Auth: cloud.AuthParamImplicit, + ServerEncMode: "aws:kms", + }) + + // Specify aws:kms encryption mode but don't specify kms ID. + sqlDB.ExpectErr(t, "AWS_SERVER_KMS_ID param must be set when using aws:kms server side encryption mode.", fmt.Sprintf(`BACKUP DATABASE foo INTO '%s'`, + invalidS3URI)) + }) +} diff --git a/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection b/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection index 0dca2fc492fc..2aae1d87d312 100644 --- a/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection +++ b/pkg/ccl/cloudccl/externalconn/testdata/create_drop_external_connection @@ -170,6 +170,36 @@ inspect-system-table subtest end +subtest basic-s3 + +exec-sql +CREATE EXTERNAL CONNECTION "foo-s3" AS 's3://foo/bar?AUTH=implicit&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456&ASSUME_ROLE=ronaldo,rashford,bruno'; +---- + +# Reject invalid S3 URIs. +exec-sql +CREATE EXTERNAL CONNECTION "missing-host-s3" AS 's3:///?AUTH=implicit'; +---- +pq: failed to construct External Connection details: empty host component; s3 URI must specify a target bucket + +exec-sql +CREATE EXTERNAL CONNECTION "invalid-params-s3" AS 's3://foo/bar?AUTH=implicit&INVALIDPARAM=baz'; +---- +pq: failed to construct External Connection details: unknown S3 query parameters: INVALIDPARAM + +inspect-system-table +---- +foo-s3 STORAGE {"provider": "s3", "simpleUri": {"uri": "s3://foo/bar?AUTH=implicit&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456&ASSUME_ROLE=ronaldo,rashford,bruno"}} + +exec-sql +DROP EXTERNAL CONNECTION "foo-s3"; +---- + +inspect-system-table +---- + +subtest end + subtest basic-kafka-sink exec-sql diff --git a/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection b/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection index 405dcde27c75..b3aba38f0eab 100644 --- a/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection +++ b/pkg/ccl/cloudccl/externalconn/testdata/multi-tenant/create_drop_external_connection @@ -162,6 +162,36 @@ inspect-system-table subtest end +subtest basic-s3 + +exec-sql +CREATE EXTERNAL CONNECTION "foo-s3" AS 's3://foo/bar?AUTH=implicit&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456&ASSUME_ROLE=ronaldo,rashford,bruno'; +---- + +# Reject invalid S3 URIs. +exec-sql +CREATE EXTERNAL CONNECTION "missing-host-s3" AS 's3:///?AUTH=implicit'; +---- +pq: failed to construct External Connection details: empty host component; s3 URI must specify a target bucket + +exec-sql +CREATE EXTERNAL CONNECTION "invalid-params-s3" AS 's3://foo/bar?AUTH=implicit&INVALIDPARAM=baz'; +---- +pq: failed to construct External Connection details: unknown S3 query parameters: INVALIDPARAM + +inspect-system-table +---- +foo-s3 STORAGE {"provider": "s3", "simpleUri": {"uri": "s3://foo/bar?AUTH=implicit&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456&ASSUME_ROLE=ronaldo,rashford,bruno"}} + +exec-sql +DROP EXTERNAL CONNECTION "foo-s3"; +---- + +inspect-system-table +---- + +subtest end + subtest basic-kafka-sink exec-sql diff --git a/pkg/ccl/cloudccl/gcp/BUILD.bazel b/pkg/ccl/cloudccl/gcp/BUILD.bazel index e673ccd2188c..585f1609787d 100644 --- a/pkg/ccl/cloudccl/gcp/BUILD.bazel +++ b/pkg/ccl/cloudccl/gcp/BUILD.bazel @@ -16,6 +16,7 @@ go_test( "//pkg/cloud/cloudtestutils", "//pkg/cloud/externalconn/providers", "//pkg/cloud/gcp", + "//pkg/cloud/impl:cloudimpl", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", diff --git a/pkg/ccl/cloudccl/gcp/gcs_kms_connection_test.go b/pkg/ccl/cloudccl/gcp/gcs_kms_connection_test.go index 8b4aff76e249..1555a25c6c3d 100644 --- a/pkg/ccl/cloudccl/gcp/gcs_kms_connection_test.go +++ b/pkg/ccl/cloudccl/gcp/gcs_kms_connection_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud/cloudtestutils" _ "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/providers" // import External Connection providers. "github.com/cockroachdb/cockroach/pkg/cloud/gcp" + _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register ExternalStorage providers. "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -146,8 +147,7 @@ func TestGCSKMSExternalConnection(t *testing.T) { // ExternalStorage. This should be disallowed. backupExternalConnectionURI := fmt.Sprintf("external://%s", backupExternalConnectionName) sqlDB.ExpectErr(t, - "failed to load external connection object: expected External Connection object of type KMS but "+ - "'backup' is of type STORAGE", + "KMS cannot use object of type STORAGE", fmt.Sprintf(`BACKUP DATABASE foo INTO '%s' WITH kms='%s'`, backupExternalConnectionURI, backupExternalConnectionURI)) }) @@ -177,7 +177,6 @@ func TestGCSExternalConnectionAssumeRole(t *testing.T) { createExternalConnection := func(externalConnectionName, uri string) { sqlDB.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION '%s' AS '%s'`, externalConnectionName, uri)) - fmt.Printf("created external connection %s\n\n", externalConnectionName) } backupAndRestoreFromExternalConnection := func(backupExternalConnectionName, kmsExternalConnectionName string) { backupURI := fmt.Sprintf("external://%s", backupExternalConnectionName) @@ -191,7 +190,6 @@ func TestGCSExternalConnectionAssumeRole(t *testing.T) { disallowedBackupToExternalConnection := func(backupExternalConnectionName, kmsExternalConnectionName string) { backupURI := fmt.Sprintf("external://%s", backupExternalConnectionName) kmsURI := fmt.Sprintf("external://%s", kmsExternalConnectionName) - fmt.Printf("backing up into %s with kms %s\n\n", backupURI, kmsURI) sqlDB.ExpectErr(t, "(PermissionDenied|AccessDenied|PERMISSION_DENIED)", fmt.Sprintf(`BACKUP INTO '%s' WITH kms='%s'`, backupURI, kmsURI)) } diff --git a/pkg/cloud/amazon/BUILD.bazel b/pkg/cloud/amazon/BUILD.bazel index 615043d63766..d6ca46f02ba2 100644 --- a/pkg/cloud/amazon/BUILD.bazel +++ b/pkg/cloud/amazon/BUILD.bazel @@ -5,6 +5,7 @@ go_library( name = "amazon", srcs = [ "aws_kms.go", + "s3_connection.go", "s3_storage.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/cloud/amazon", @@ -13,6 +14,8 @@ go_library( "//pkg/base", "//pkg/cloud", "//pkg/cloud/cloudpb", + "//pkg/cloud/externalconn", + "//pkg/cloud/externalconn/connectionpb", "//pkg/server/telemetry", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/cloud/amazon/s3_connection.go b/pkg/cloud/amazon/s3_connection.go new file mode 100644 index 000000000000..ff875ca7917a --- /dev/null +++ b/pkg/cloud/amazon/s3_connection.go @@ -0,0 +1,46 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package amazon + +import ( + "context" + "net/url" + + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" + "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/connectionpb" +) + +func parseAndValidateS3ConnectionURI( + _ context.Context, uri *url.URL, +) (externalconn.ExternalConnection, error) { + // Parse and validate the S3 URL. + if _, err := parseS3URL(cloud.ExternalStorageURIContext{}, uri); err != nil { + return nil, err + } + + connDetails := connectionpb.ConnectionDetails{ + Provider: connectionpb.ConnectionProvider_s3, + Details: &connectionpb.ConnectionDetails_SimpleURI{ + SimpleURI: &connectionpb.SimpleURI{ + URI: uri.String(), + }, + }, + } + return externalconn.NewExternalConnection(connDetails), nil +} + +func init() { + externalconn.RegisterConnectionDetailsFromURIFactory( + scheme, + parseAndValidateS3ConnectionURI, + ) +} diff --git a/pkg/cloud/amazon/s3_storage.go b/pkg/cloud/amazon/s3_storage.go index 53257103cb4b..9a22521916ec 100644 --- a/pkg/cloud/amazon/s3_storage.go +++ b/pkg/cloud/amazon/s3_storage.go @@ -76,6 +76,9 @@ const ( // AssumeRoleParam is the query parameter for the chain of AWS Role ARNs to // assume. AssumeRoleParam = "ASSUME_ROLE" + + // scheme component of an S3 URI. + scheme = "s3" ) type s3Storage struct { @@ -211,21 +214,26 @@ func S3URI(bucket, path string, conf *cloudpb.ExternalStorage_S3) string { } func parseS3URL(_ cloud.ExternalStorageURIContext, uri *url.URL) (cloudpb.ExternalStorage, error) { + s3URL := cloud.ConsumeURL{URL: uri} conf := cloudpb.ExternalStorage{} + if s3URL.Host == "" { + return conf, errors.New("empty host component; s3 URI must specify a target bucket") + } + conf.Provider = cloudpb.ExternalStorageProvider_s3 - assumeRole, delegateRoles := cloud.ParseRoleString(uri.Query().Get(AssumeRoleParam)) + assumeRole, delegateRoles := cloud.ParseRoleString(s3URL.ConsumeParam(AssumeRoleParam)) conf.S3Config = &cloudpb.ExternalStorage_S3{ - Bucket: uri.Host, - Prefix: uri.Path, - AccessKey: uri.Query().Get(AWSAccessKeyParam), - Secret: uri.Query().Get(AWSSecretParam), - TempToken: uri.Query().Get(AWSTempTokenParam), - Endpoint: uri.Query().Get(AWSEndpointParam), - Region: uri.Query().Get(S3RegionParam), - Auth: uri.Query().Get(cloud.AuthParam), - ServerEncMode: uri.Query().Get(AWSServerSideEncryptionMode), - ServerKMSID: uri.Query().Get(AWSServerSideEncryptionKMSID), - StorageClass: uri.Query().Get(S3StorageClassParam), + Bucket: s3URL.Host, + Prefix: s3URL.Path, + AccessKey: s3URL.ConsumeParam(AWSAccessKeyParam), + Secret: s3URL.ConsumeParam(AWSSecretParam), + TempToken: s3URL.ConsumeParam(AWSTempTokenParam), + Endpoint: s3URL.ConsumeParam(AWSEndpointParam), + Region: s3URL.ConsumeParam(S3RegionParam), + Auth: s3URL.ConsumeParam(cloud.AuthParam), + ServerEncMode: s3URL.ConsumeParam(AWSServerSideEncryptionMode), + ServerKMSID: s3URL.ConsumeParam(AWSServerSideEncryptionKMSID), + StorageClass: s3URL.ConsumeParam(S3StorageClassParam), RoleARN: assumeRole, DelegateRoleARNs: delegateRoles, /* NB: additions here should also update s3QueryParams() serializer */ @@ -239,6 +247,54 @@ func parseS3URL(_ cloud.ExternalStorageURIContext, uri *url.URL) (cloudpb.Extern // contain spaces. We can convert any space characters we see to + // characters to recover the original secret. conf.S3Config.Secret = strings.Replace(conf.S3Config.Secret, " ", "+", -1) + + // Validate that all the passed in parameters are supported. + if unknownParams := s3URL.RemainingQueryParams(); len(unknownParams) > 0 { + return cloudpb.ExternalStorage{}, errors.Errorf( + `unknown S3 query parameters: %s`, strings.Join(unknownParams, ", ")) + } + + // Validate the authentication parameters are set correctly. + switch conf.S3Config.Auth { + case "", cloud.AuthParamSpecified: + if conf.S3Config.AccessKey == "" { + return cloudpb.ExternalStorage{}, errors.Errorf( + "%s is set to '%s', but %s is not set", + cloud.AuthParam, + cloud.AuthParamSpecified, + AWSAccessKeyParam, + ) + } + if conf.S3Config.Secret == "" { + return cloudpb.ExternalStorage{}, errors.Errorf( + "%s is set to '%s', but %s is not set", + cloud.AuthParam, + cloud.AuthParamSpecified, + AWSSecretParam, + ) + } + case cloud.AuthParamImplicit: + default: + return cloudpb.ExternalStorage{}, errors.Errorf("unsupported value %s for %s", + conf.S3Config.Auth, cloud.AuthParam) + } + + // Ensure that a KMS ID is specified if server side encryption is set to use + // KMS. + if conf.S3Config.ServerEncMode != "" { + switch conf.S3Config.ServerEncMode { + case string(aes256Enc): + case string(kmsEnc): + if conf.S3Config.ServerKMSID == "" { + return cloudpb.ExternalStorage{}, errors.New("AWS_SERVER_KMS_ID param must be set" + + " when using aws:kms server side encryption mode.") + } + default: + return cloudpb.ExternalStorage{}, errors.Newf("unsupported server encryption mode %s. "+ + "Supported values are `aws:kms` and `AES256`.", conf.S3Config.ServerEncMode) + } + } + return conf, nil } @@ -695,5 +751,5 @@ func s3ErrDelay(err error) time.Duration { func init() { cloud.RegisterExternalStorageProvider(cloudpb.ExternalStorageProvider_s3, - parseS3URL, MakeS3Storage, cloud.RedactedParams(AWSSecretParam, AWSTempTokenParam), "s3") + parseS3URL, MakeS3Storage, cloud.RedactedParams(AWSSecretParam, AWSTempTokenParam), scheme) } diff --git a/pkg/cloud/externalconn/connection_kms.go b/pkg/cloud/externalconn/connection_kms.go index a2f35af29e0a..fb6247fc83eb 100644 --- a/pkg/cloud/externalconn/connection_kms.go +++ b/pkg/cloud/externalconn/connection_kms.go @@ -46,6 +46,11 @@ func makeExternalConnectionKMS( return nil, errors.Wrap(err, "failed to load external connection object") } + // Sanity check that we are connecting to a KMS object. + if ec.ConnectionType() != connectionpb.TypeKMS { + return nil, errors.Newf("KMS cannot use object of type %s", ec.ConnectionType().String()) + } + // Construct a KMS handle for the underlying resource represented by the // external connection object. switch d := ec.ConnectionProto().Details.(type) { diff --git a/pkg/cloud/externalconn/connection_storage.go b/pkg/cloud/externalconn/connection_storage.go index 46c2ee128af2..902563ed9ec6 100644 --- a/pkg/cloud/externalconn/connection_storage.go +++ b/pkg/cloud/externalconn/connection_storage.go @@ -74,6 +74,11 @@ func makeExternalConnectionStorage( return nil, errors.Wrap(err, "failed to load external connection object") } + // Sanity check that we are connecting to a STORAGE object. + if ec.ConnectionType() != connectionpb.TypeStorage { + return nil, errors.Newf("STORAGE cannot use object of type %s", ec.ConnectionType().String()) + } + // Construct an ExternalStorage handle for the underlying resource represented // by the external connection object. switch d := ec.ConnectionProto().Details.(type) { diff --git a/pkg/cloud/externalconn/connectionpb/connection.go b/pkg/cloud/externalconn/connectionpb/connection.go index 27904c7d735a..dc8248bedc81 100644 --- a/pkg/cloud/externalconn/connectionpb/connection.go +++ b/pkg/cloud/externalconn/connectionpb/connection.go @@ -15,11 +15,11 @@ import "github.com/cockroachdb/errors" // Type returns the ConnectionType of the receiver. func (d *ConnectionDetails) Type() ConnectionType { switch d.Provider { - case ConnectionProvider_TypeNodelocal: + case ConnectionProvider_nodelocal, ConnectionProvider_s3: return TypeStorage - case ConnectionProvider_TypeGSKMS: + case ConnectionProvider_gs_kms: return TypeKMS - case ConnectionProvider_TypeKafka: + case ConnectionProvider_kafka: return TypeStorage default: panic(errors.AssertionFailedf("ConnectionDetails.Type called on a details with an unknown type: %T", d.Provider.String())) diff --git a/pkg/cloud/externalconn/connectionpb/connection.proto b/pkg/cloud/externalconn/connectionpb/connection.proto index 7f9098e17ddf..c53121e2c806 100644 --- a/pkg/cloud/externalconn/connectionpb/connection.proto +++ b/pkg/cloud/externalconn/connectionpb/connection.proto @@ -15,16 +15,17 @@ option go_package = "connectionpb"; import "gogoproto/gogo.proto"; enum ConnectionProvider { - Unknown = 0 [(gogoproto.enumvalue_customname) = "TypeUnspecified"]; + Unknown = 0; // External Storage providers. - nodelocal = 1 [(gogoproto.enumvalue_customname) = "TypeNodelocal"]; + nodelocal = 1; + s3 = 4; // KMS providers. - gs_kms = 2 [(gogoproto.enumvalue_customname) = "TypeGSKMS"]; + gs_kms = 2; // Sink providers. - kafka = 3 [(gogoproto.enumvalue_customname) = "TypeKafka"]; + kafka = 3; } // ConnectionType is the type of the External Connection object. diff --git a/pkg/cloud/externalconn/providers/BUILD.bazel b/pkg/cloud/externalconn/providers/BUILD.bazel index 6fda41be3e91..478c7a0f6498 100644 --- a/pkg/cloud/externalconn/providers/BUILD.bazel +++ b/pkg/cloud/externalconn/providers/BUILD.bazel @@ -7,6 +7,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/providers", visibility = ["//visibility:public"], deps = [ + "//pkg/cloud/amazon", "//pkg/cloud/gcp", "//pkg/cloud/nodelocal", ], diff --git a/pkg/cloud/externalconn/providers/registry.go b/pkg/cloud/externalconn/providers/registry.go index 8ef0c1599a80..6563e5ba9348 100644 --- a/pkg/cloud/externalconn/providers/registry.go +++ b/pkg/cloud/externalconn/providers/registry.go @@ -17,6 +17,7 @@ package providers import ( // import all the cloud provider packages to register them. + _ "github.com/cockroachdb/cockroach/pkg/cloud/amazon" _ "github.com/cockroachdb/cockroach/pkg/cloud/gcp" _ "github.com/cockroachdb/cockroach/pkg/cloud/nodelocal" ) diff --git a/pkg/cloud/gcp/gcs_kms_connection.go b/pkg/cloud/gcp/gcs_kms_connection.go index 1812325b7372..8f13eb92f55d 100644 --- a/pkg/cloud/gcp/gcs_kms_connection.go +++ b/pkg/cloud/gcp/gcs_kms_connection.go @@ -26,7 +26,7 @@ func parseAndValidateGCSKMSConnectionURI( } connDetails := connectionpb.ConnectionDetails{ - Provider: connectionpb.ConnectionProvider_TypeGSKMS, + Provider: connectionpb.ConnectionProvider_gs_kms, Details: &connectionpb.ConnectionDetails_SimpleURI{ SimpleURI: &connectionpb.SimpleURI{ URI: uri.String(), diff --git a/pkg/cloud/nodelocal/nodelocal_connection.go b/pkg/cloud/nodelocal/nodelocal_connection.go index 911267256833..a714066f6757 100644 --- a/pkg/cloud/nodelocal/nodelocal_connection.go +++ b/pkg/cloud/nodelocal/nodelocal_connection.go @@ -27,7 +27,7 @@ func parseAndValidateLocalFileConnectionURI( } connDetails := connectionpb.ConnectionDetails{ - Provider: connectionpb.ConnectionProvider_TypeNodelocal, + Provider: connectionpb.ConnectionProvider_nodelocal, Details: &connectionpb.ConnectionDetails_SimpleURI{ SimpleURI: &connectionpb.SimpleURI{ URI: uri.String(), @@ -38,7 +38,6 @@ func parseAndValidateLocalFileConnectionURI( } func init() { - const scheme = "nodelocal" externalconn.RegisterConnectionDetailsFromURIFactory( scheme, parseAndValidateLocalFileConnectionURI, diff --git a/pkg/cloud/nodelocal/nodelocal_storage.go b/pkg/cloud/nodelocal/nodelocal_storage.go index 13d879ceb97b..1479cb668e07 100644 --- a/pkg/cloud/nodelocal/nodelocal_storage.go +++ b/pkg/cloud/nodelocal/nodelocal_storage.go @@ -34,6 +34,8 @@ import ( "google.golang.org/grpc/status" ) +const scheme = "nodelocal" + func validateLocalFileURI(uri *url.URL) error { if uri.Host == "" { return errors.Newf( @@ -231,7 +233,6 @@ func (*localFileStorage) Close() error { } func init() { - const scheme = "nodelocal" cloud.RegisterExternalStorageProvider(cloudpb.ExternalStorageProvider_nodelocal, parseLocalFileURI, makeLocalFileStorage, cloud.RedactedParams(), scheme) } diff --git a/pkg/cloud/uris.go b/pkg/cloud/uris.go index 1722095ea4fa..8b6dd86b0e3a 100644 --- a/pkg/cloud/uris.go +++ b/pkg/cloud/uris.go @@ -96,14 +96,16 @@ func ParseRoleString(roleString string) (assumeRole string, delegateRoles []stri return assumeRole, delegateRoles } -// consumeURL is a helper struct which for "consuming" URL query +// ConsumeURL is a helper struct which for "consuming" URL query // parameters from the underlying URL. -type consumeURL struct { +type ConsumeURL struct { *url.URL q url.Values } -func (u *consumeURL) consumeParam(p string) string { +// ConsumeParam returns the value of the parameter p from the underlying URL, +// and deletes the parameter from the URL. +func (u *ConsumeURL) ConsumeParam(p string) string { if u.q == nil { u.q = u.Query() } @@ -112,7 +114,9 @@ func (u *consumeURL) consumeParam(p string) string { return v } -func (u *consumeURL) remainingQueryParams() (res []string) { +// RemainingQueryParams returns the query parameters that have not been consumed +// from the underlying URL. +func (u *ConsumeURL) RemainingQueryParams() (res []string) { if u.q == nil { u.q = u.Query() } @@ -126,12 +130,12 @@ func (u *consumeURL) remainingQueryParams() (res []string) { // are not part of the supportedParameters. func ValidateQueryParameters(uri url.URL, supportedParameters []string) error { u := uri - validateURL := consumeURL{URL: &u} + validateURL := ConsumeURL{URL: &u} for _, option := range supportedParameters { - validateURL.consumeParam(option) + validateURL.ConsumeParam(option) } - if unknownParams := validateURL.remainingQueryParams(); len(unknownParams) > 0 { + if unknownParams := validateURL.RemainingQueryParams(); len(unknownParams) > 0 { return errors.Errorf( `unknown query parameters: %s for %s URI`, strings.Join(unknownParams, ", "), uri.Scheme) diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 5245b18aa0d8..9eafde101016 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -2685,12 +2685,12 @@ func TestURIRequiresAdminRole(t *testing.T) { }, { name: "s3-specified", - uri: "s3://foo/bar?AUTH=specified", + uri: "s3://foo/bar?AUTH=specified&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456", requiresAdmin: false, }, { name: "s3-custom", - uri: "s3://foo/bar?AUTH=specified&AWS_ENDPOINT=baz", + uri: "s3://foo/bar?AUTH=specified&AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456&AWS_ENDPOINT=baz", requiresAdmin: true, }, { @@ -2736,7 +2736,7 @@ func TestURIRequiresAdminRole(t *testing.T) { t.Run(tc.name+"-direct", func(t *testing.T) { conf, err := cloud.ExternalStorageConfFromURI(tc.uri, username.RootUserName()) require.NoError(t, err) - require.Equal(t, conf.AccessIsWithExplicitAuth(), !tc.requiresAdmin) + require.Equal(t, !tc.requiresAdmin, conf.AccessIsWithExplicitAuth()) }) } }