Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
85900: sql: implement `REPLACE FUNCTION` r=chengxiong-ruan a=chengxiong-ruan

This commit adds to the `REPLACE` path to the
`CREATE OR REPLACE FUNCTION` statement. Major changes are
(1) fetch function with same signature if exists, and validate
(2) remove refereces before replacing the function, and then
    add new references.

Fixes #83236 

Release note: None
Release justification: This commit includes a fix to avoid duplicate functions.

85964: externalconn: add gs support to External Connections r=rhu713 a=adityamaru

This change registers Google Storage `gs` as a supported
External Connection.

Release note (sql change): Users can now
`CREATE EXTERNAL CONNECTION` to represent an underlying
google storage resource.

Release justification: low risk change to new functionality around External Connections

86156: clusterversions: remove RowLevelTTL version r=rafiss a=rafiss

Release justification: low risk change to remove dead code

Release note: None

86179: roachtest: reduce slowness threshold in tpchvec/streamer r=yuzefovich a=yuzefovich

Release note: None

Release justification: test-only change.

86185: grunning: improve some tests r=irfansharif a=irfansharif

Release note: None
Release justification: test-only changes

86219: storage/metamorphic: skip TestPebbleEquivalence r=erikgrinaker a=jbowens

Skip TestPebbleEquivalence until #86102/#86088 is resolved.

Release note: None
Release justification: Non-production code changes.

86222: sql/logictest: attempt to deflake retry errors r=ajwerner a=ajwerner

When we run transactions in logictests, they can be exposed to retry errors.
The framework does not have tools to retry whole blocks. In #58217 we added
a mechanism to skip tests which hit such errors. Apply that logic here.

(hopefully)
Fixes #86215

Release justification: testing only change

Release note: None

86231: sqlsmith: skip crdb_internal.set_compaction_concurrency r=yuzefovich a=yuzefovich

Fixes: #86201

Release justification: test-only change.

Release note: None

Co-authored-by: Chengxiong Ruan <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
8 people committed Aug 16, 2022
9 parents aed288d + d71f834 + 2b4f176 + 4710f62 + 2ddb990 + 0ee5479 + 7f6691e + d364c31 + 47d0339 commit 7c38417
Show file tree
Hide file tree
Showing 27 changed files with 1,279 additions and 784 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,36 @@ DROP EXTERNAL CONNECTION "foo-userfile";
----

subtest end

subtest basic-gs

disable-check-external-storage
----

exec-sql
CREATE EXTERNAL CONNECTION "foo-gs" AS 'gs://bucket/path?AUTH=implicit&ASSUME_ROLE=soccer,cricket,football'
----

# Reject invalid gs external connections.
exec-sql
CREATE EXTERNAL CONNECTION "invalid-param-gs" AS 'gs://bucket/path?INVALIDPARAM=baz'
----
pq: failed to construct External Connection details: failed to create gs external connection: unknown GS query parameters: INVALIDPARAM

exec-sql
CREATE EXTERNAL CONNECTION "invalid-creds-gs" AS 'gs://bucket/path?AUTH=specified&CREDENTIALS=123'
----
pq: failed to construct External Connection details: failed to create gs external connection: error getting credentials from CREDENTIALS: illegal base64 data at input byte 0

inspect-system-table
----
foo-gs STORAGE {"provider": "gs", "simpleUri": {"uri": "gs://bucket/path?AUTH=implicit&ASSUME_ROLE=soccer,cricket,football"}}

exec-sql
DROP EXTERNAL CONNECTION "foo-gs";
----

enable-check-external-storage
----

subtest end
3 changes: 2 additions & 1 deletion pkg/ccl/cloudccl/gcp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_test(
name = "gcp_test",
srcs = [
"gcp_kms_connection_test.go",
"gcp_connection_test.go",
"main_test.go",
],
deps = [
Expand All @@ -30,6 +30,7 @@ go_test(
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
"@com_google_cloud_go_kms//apiv1",
"@com_google_cloud_go_storage//:storage",
"@org_golang_x_oauth2//google",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

kms "cloud.google.com/go/kms/apiv1"
gcs "cloud.google.com/go/storage"
"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
Expand All @@ -35,7 +36,7 @@ import (
"golang.org/x/oauth2/google"
)

func TestGCSKMSExternalConnection(t *testing.T) {
func TestGCPKMSExternalConnection(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down Expand Up @@ -79,8 +80,18 @@ func TestGCSKMSExternalConnection(t *testing.T) {
skip.IgnoreLint(t, "GOOGLE_KMS_KEY_NAME env var must be set")
}

bucket := os.Getenv("GOOGLE_BUCKET")
if bucket == "" {
skip.IgnoreLint(t, "GOOGLE_BUCKET env var must be set")
}

if !cloudtestutils.IsImplicitAuthConfigured() {
skip.IgnoreLint(t, "implicit auth is not configured")
}

// Create an external connection where we will write the backup.
backupURI := "nodelocal://1/backup"
backupURI := fmt.Sprintf("gs://%s/backup?%s=%s", bucket,
cloud.AuthParam, cloud.AuthParamImplicit)
backupExternalConnectionName := "backup"
createExternalConnection(backupExternalConnectionName, backupURI)

Expand Down Expand Up @@ -153,7 +164,7 @@ func TestGCSKMSExternalConnection(t *testing.T) {
})
}

func TestGCSExternalConnectionAssumeRole(t *testing.T) {
func TestGCPKMSExternalConnectionAssumeRole(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down Expand Up @@ -209,8 +220,18 @@ func TestGCSExternalConnectionAssumeRole(t *testing.T) {
assumedAccount := os.Getenv("ASSUME_SERVICE_ACCOUNT")
encodedCredentials := base64.StdEncoding.EncodeToString([]byte(os.Getenv("GOOGLE_CREDENTIALS_JSON")))

bucket := os.Getenv("GOOGLE_BUCKET")
if bucket == "" {
skip.IgnoreLint(t, "GOOGLE_BUCKET env var must be set")
}

if !cloudtestutils.IsImplicitAuthConfigured() {
skip.IgnoreLint(t, "implicit auth is not configured")
}

// Create an external connection where we will write the backup.
backupURI := "nodelocal://1/backup"
backupURI := fmt.Sprintf("gs://%s/backup?%s=%s", bucket,
cloud.AuthParam, cloud.AuthParamImplicit)
backupExternalConnectionName := "backup"
createExternalConnection(backupExternalConnectionName, backupURI)

Expand Down Expand Up @@ -269,3 +290,242 @@ func TestGCSExternalConnectionAssumeRole(t *testing.T) {
backupAndRestoreFromExternalConnection(backupExternalConnectionName, "auth-assume-role-chaining")
})
}

func TestGCPAssumeRoleExternalConnection(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

dir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()

params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODir = dir

tc := testcluster.StartTestCluster(t, 1, params)
defer tc.Stopper().Stop(context.Background())

tc.WaitForNodeLiveness(t)
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])

// Setup some dummy data.
sqlDB.Exec(t, `CREATE DATABASE foo`)
sqlDB.Exec(t, `USE foo`)
sqlDB.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2), (3)`)

disallowedCreateExternalConnection := func(externalConnectionName, uri string) {
sqlDB.ExpectErr(t, "(PermissionDenied|AccessDenied|PERMISSION_DENIED|does not have storage.objects.create access)",
fmt.Sprintf(`CREATE EXTERNAL CONNECTION '%s' AS '%s'`, externalConnectionName, uri))
}
createExternalConnection := func(externalConnectionName, uri string) {
sqlDB.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION '%s' AS '%s'`, externalConnectionName, uri))
}
backupAndRestoreFromExternalConnection := func(backupExternalConnectionName string) {
backupURI := fmt.Sprintf("external://%s", backupExternalConnectionName)
sqlDB.Exec(t, fmt.Sprintf(`BACKUP DATABASE foo INTO '%s'`, backupURI))
sqlDB.Exec(t, fmt.Sprintf(`RESTORE DATABASE foo FROM LATEST IN '%s' WITH new_db_name = bar`, backupURI))
sqlDB.CheckQueryResults(t, `SELECT * FROM bar.foo`, [][]string{{"1"}, {"2"}, {"3"}})
sqlDB.CheckQueryResults(t, `SELECT * FROM crdb_internal.invalid_objects`, [][]string{})
sqlDB.Exec(t, `DROP DATABASE bar CASCADE`)
}

limitedBucket := os.Getenv("GOOGLE_LIMITED_BUCKET")
if limitedBucket == "" {
skip.IgnoreLint(t, "GOOGLE_LIMITED_BUCKET env var must be set")
}
assumedAccount := os.Getenv("ASSUME_SERVICE_ACCOUNT")
if assumedAccount == "" {
skip.IgnoreLint(t, "ASSUME_SERVICE_ACCOUNT env var must be set")
}

t.Run("ec-assume-role-specified", func(t *testing.T) {
ecName := "ec-assume-role-specified"
disallowedECName := "ec-assume-role-specified-disallowed"
credentials := os.Getenv("GOOGLE_CREDENTIALS_JSON")
if credentials == "" {
skip.IgnoreLint(t, "GOOGLE_CREDENTIALS_JSON env var must be set")
}
encoded := base64.StdEncoding.EncodeToString([]byte(credentials))
disallowedURI := fmt.Sprintf("gs://%s/%s?%s=%s", limitedBucket, disallowedECName,
gcp.CredentialsParam, url.QueryEscape(encoded))
disallowedCreateExternalConnection(disallowedECName, disallowedURI)

uri := fmt.Sprintf("gs://%s/%s?%s=%s&%s=%s&%s=%s",
limitedBucket,
ecName,
cloud.AuthParam,
cloud.AuthParamSpecified,
gcp.AssumeRoleParam,
assumedAccount, gcp.CredentialsParam,
url.QueryEscape(encoded),
)
createExternalConnection(ecName, uri)
backupAndRestoreFromExternalConnection(ecName)
})

t.Run("ec-assume-role-implicit", func(t *testing.T) {
if _, err := google.FindDefaultCredentials(context.Background()); err != nil {
skip.IgnoreLint(t, err)
}
ecName := "ec-assume-role-implicit"
disallowedECName := "ec-assume-role-implicit-disallowed"
disallowedURI := fmt.Sprintf("gs://%s/%s?%s=%s", limitedBucket, disallowedECName,
cloud.AuthParam, cloud.AuthParamImplicit)
disallowedCreateExternalConnection(disallowedECName, disallowedURI)

uri := fmt.Sprintf("gs://%s/%s?%s=%s&%s=%s",
limitedBucket,
ecName,
cloud.AuthParam,
cloud.AuthParamImplicit,
gcp.AssumeRoleParam,
assumedAccount,
)
createExternalConnection(ecName, uri)
backupAndRestoreFromExternalConnection(ecName)
})

t.Run("ec-assume-role-chaining", func(t *testing.T) {
credentials := os.Getenv("GOOGLE_CREDENTIALS_JSON")
if credentials == "" {
skip.IgnoreLint(t, "GOOGLE_CREDENTIALS_JSON env var must be set")
}
encoded := base64.StdEncoding.EncodeToString([]byte(credentials))

roleChainStr := os.Getenv("ASSUME_SERVICE_ACCOUNT_CHAIN")
if roleChainStr == "" {
skip.IgnoreLint(t, "ASSUME_SERVICE_ACCOUNT_CHAIN env var must be set")
}

roleChain := strings.Split(roleChainStr, ",")

for _, tc := range []struct {
auth string
credentials string
}{
{cloud.AuthParamSpecified, encoded},
{cloud.AuthParamImplicit, ""},
} {
t.Run(tc.auth, func(t *testing.T) {
q := make(url.Values)
q.Set(cloud.AuthParam, tc.auth)
q.Set(gcp.CredentialsParam, tc.credentials)

// First verify that none of the individual roles in the chain can be used
// to access the storage.
for i, role := range roleChain {
i := i
q.Set(gcp.AssumeRoleParam, role)
disallowedECName := fmt.Sprintf("ec-assume-role-checking-%d", i)
disallowedBackupURI := fmt.Sprintf("gs://%s/%s?%s", limitedBucket,
disallowedECName, q.Encode())
disallowedCreateExternalConnection(disallowedECName, disallowedBackupURI)
}

// Finally, check that the chain of roles can be used to access the storage.
q.Set(gcp.AssumeRoleParam, roleChainStr)
ecName := fmt.Sprintf("ec-assume-role-checking-%s", tc.auth)
uri := fmt.Sprintf("gs://%s/%s?%s",
limitedBucket,
ecName,
q.Encode(),
)
createExternalConnection(ecName, uri)
backupAndRestoreFromExternalConnection(ecName)
})
}
})
}

func TestGCPExternalConnection(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

dir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()

params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODir = dir

tc := testcluster.StartTestCluster(t, 1, params)
defer tc.Stopper().Stop(context.Background())

tc.WaitForNodeLiveness(t)
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])

// Setup some dummy data.
sqlDB.Exec(t, `CREATE DATABASE foo`)
sqlDB.Exec(t, `USE foo`)
sqlDB.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2), (3)`)

createExternalConnection := func(externalConnectionName, uri string) {
sqlDB.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION '%s' AS '%s'`, externalConnectionName, uri))
}
backupAndRestoreFromExternalConnection := func(backupExternalConnectionName string) {
backupURI := fmt.Sprintf("external://%s", backupExternalConnectionName)
sqlDB.Exec(t, fmt.Sprintf(`BACKUP DATABASE foo INTO '%s'`, backupURI))
sqlDB.Exec(t, fmt.Sprintf(`RESTORE DATABASE foo FROM LATEST IN '%s' WITH new_db_name = bar`, backupURI))
sqlDB.CheckQueryResults(t, `SELECT * FROM bar.foo`, [][]string{{"1"}, {"2"}, {"3"}})
sqlDB.CheckQueryResults(t, `SELECT * FROM crdb_internal.invalid_objects`, [][]string{})
sqlDB.Exec(t, `DROP DATABASE bar CASCADE`)
}

bucket := os.Getenv("GOOGLE_BUCKET")
if bucket == "" {
skip.IgnoreLint(t, "GOOGLE_BUCKET env var must be set")
}

t.Run("ec-auth-implicit", func(t *testing.T) {
if !cloudtestutils.IsImplicitAuthConfigured() {
skip.IgnoreLint(t, "implicit auth is not configured")
}

ecName := "ec-auth-implicit"
backupURI := fmt.Sprintf("gs://%s/%s?%s=%s", bucket, ecName, cloud.AuthParam,
cloud.AuthParamImplicit)
createExternalConnection(ecName, backupURI)
backupAndRestoreFromExternalConnection(ecName)
})

t.Run("ec-auth-specified", func(t *testing.T) {
credentials := os.Getenv("GOOGLE_CREDENTIALS_JSON")
if credentials == "" {
skip.IgnoreLint(t, "GOOGLE_CREDENTIALS_JSON env var must be set")
}
encoded := base64.StdEncoding.EncodeToString([]byte(credentials))
ecName := "ec-auth-specified"
backupURI := fmt.Sprintf("gs://%s/%s?%s=%s",
bucket,
ecName,
gcp.CredentialsParam,
url.QueryEscape(encoded),
)
createExternalConnection(ecName, backupURI)
backupAndRestoreFromExternalConnection(ecName)
})

t.Run("ec-auth-specified-bearer-token", func(t *testing.T) {
credentials := os.Getenv("GOOGLE_CREDENTIALS_JSON")
if credentials == "" {
skip.IgnoreLint(t, "GOOGLE_CREDENTIALS_JSON env var must be set")
}

ctx := context.Background()
source, err := google.JWTConfigFromJSON([]byte(credentials), gcs.ScopeReadWrite)
require.NoError(t, err, "creating GCS oauth token source from specified credentials")
ts := source.TokenSource(ctx)

token, err := ts.Token()
require.NoError(t, err, "getting token")
ecName := "ec-auth-specified-bearer-token"
backupURI := fmt.Sprintf("gs://%s/%s?%s=%s",
bucket,
ecName,
gcp.BearerTokenParam,
token.AccessToken,
)
createExternalConnection(ecName, backupURI)
backupAndRestoreFromExternalConnection(ecName)
})
}
2 changes: 1 addition & 1 deletion pkg/cloud/externalconn/connectionpb/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import "github.com/cockroachdb/errors"
// Type returns the ConnectionType of the receiver.
func (d *ConnectionDetails) Type() ConnectionType {
switch d.Provider {
case ConnectionProvider_nodelocal, ConnectionProvider_s3, ConnectionProvider_userfile:
case ConnectionProvider_nodelocal, ConnectionProvider_s3, ConnectionProvider_userfile, ConnectionProvider_gs:
return TypeStorage
case ConnectionProvider_gcp_kms:
return TypeKMS
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/externalconn/connectionpb/connection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ enum ConnectionProvider {
nodelocal = 1;
s3 = 4;
userfile = 5;
gs = 6;

// KMS providers.
gcp_kms = 2;
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/gcp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
srcs = [
"gcp_kms.go",
"gcp_kms_connection.go",
"gcs_connection.go",
"gcs_retry.go",
"gcs_storage.go",
],
Expand Down
Loading

0 comments on commit 7c38417

Please sign in to comment.