Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
115616: cloud,backupccl: add object-locked test variants r=adityamaru a=stevendanna

This adds a test variant that runs backup/restore against an object-locked bucket. We assume that a second AWS bucket exists with the -locked prefix to run these tests.

This test would have discovered a recent, near-miss:

    --- FAIL: TestCloudBackupRestoreS3/object-locked-bucket (2.34s)
    --- PASS: TestCloudBackupRestoreS3/regular-bucket (19.41s)

Epic: none

Fixes #79771

Release note: None

115712: sql: protect WaitGroup decrement in CopyIn via sync.Once r=yuzefovich a=yuzefovich

We've recently seen "negative WaitGroup counter" server crash during COPY FROM execution a few times, but we have been unable to understand the root cause. It appears that the problem can happen right after the COPY execution is canceled due to `statement_timeout`. The synchronization setup is the following:
- the network-handling goroutine calls `wg.Add(1)`, pushes CopyIn command onto the stmt buf, and then blocks via `wg.Wait()`
- the copy-handling connExecutor calls `wg.Done()` in the defer of `execCopyIn`. It must be the case that that defer is executed at least twice, but it's unclear to me how that can happen.

In the absence of understanding of how this can happen and with no reproduction, this commit attempts to mitigate the problem by ensuring that `wg.Done()` is called exactly once. This is achieved via `sync.Once`.

Fixes: #112095.

Release note: None

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Dec 7, 2023
3 parents 44fd29d + 4f08661 + d017766 commit af1fda5
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 143 deletions.
25 changes: 18 additions & 7 deletions pkg/ccl/backupccl/backup_cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,27 @@ import (
func TestCloudBackupRestoreS3(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
creds, bucket := requiredS3CredsAndBucket(t)
creds, baseBucket := requiredS3CredsAndBucket(t)

const numAccounts = 1000

ctx := context.Background()
tc, db, _, cleanupFn := backupRestoreTestSetup(t, 1, numAccounts, InitManualReplication)
defer cleanupFn()
prefix := fmt.Sprintf("TestBackupRestoreS3-%d", timeutil.Now().UnixNano())
uri := setupS3URI(t, db, bucket, prefix, creds)
backupAndRestore(ctx, t, tc, []string{uri.String()}, []string{uri.String()}, numAccounts, nil)

for _, locked := range []bool{true, false} {
bucket := baseBucket
testName := "regular-bucket"
if locked {
testName = "object-locked-bucket"
bucket += "-locked"
}

t.Run(testName, func(t *testing.T) {
tc, db, _, cleanupFn := backupRestoreTestSetup(t, 1, numAccounts, InitManualReplication)
defer cleanupFn()
prefix := fmt.Sprintf("TestBackupRestoreS3-%d", timeutil.Now().UnixNano())
uri := setupS3URI(t, db, bucket, prefix, creds)
backupAndRestore(ctx, t, tc, []string{uri.String()}, []string{uri.String()}, numAccounts, nil)
})
}
}

// TestCloudBackupRestoreS3WithLegacyPut tests that backup/restore works when
Expand Down
244 changes: 127 additions & 117 deletions pkg/cloud/amazon/s3_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func TestPutS3(t *testing.T) {
if err != nil {
skip.IgnoreLint(t, "No AWS credentials")
}
bucket := os.Getenv("AWS_S3_BUCKET")
if bucket == "" {
baseBucket := os.Getenv("AWS_S3_BUCKET")
if baseBucket == "" {
skip.IgnoreLint(t, "AWS_S3_BUCKET env var must be set")
}

Expand All @@ -80,128 +80,138 @@ func TestPutS3(t *testing.T) {
ctx := context.Background()
user := username.RootUserName()
testID := cloudtestutils.NewTestID()

t.Run("auth-empty-no-cred", func(t *testing.T) {
_, err := cloud.ExternalStorageFromURI(ctx, fmt.Sprintf("s3://%s/%s-%d", bucket,
"backup-test-default", testID), base.ExternalIODirConfig{}, testSettings,
blobs.TestEmptyBlobClientFactory, user,
nil, /* ie */
nil, /* ief */
nil, /* kvDB */
nil, /* limiters */
nil, /* metrics */
)
require.EqualError(t, err, fmt.Sprintf(
`%s is set to '%s', but %s is not set`,
cloud.AuthParam,
cloud.AuthParamSpecified,
AWSAccessKeyParam,
))
})
t.Run("auth-implicit", func(t *testing.T) {
// You can create an IAM that can access S3
// in the AWS console, then set it up locally.
// https://docs.aws.com/cli/latest/userguide/cli-configure-role.html
// We only run this test if default role exists.
credentialsProvider := credentials.SharedCredentialsProvider{}
_, err := credentialsProvider.Retrieve()
if err != nil {
skip.IgnoreLintf(t, "we only run this test if a default role exists, "+
"refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err)
for _, locked := range []bool{true, false} {
bucket := baseBucket
testName := "regular-bucket"
if locked {
testName = "object-locked-bucket"
bucket += "-locked"
}
t.Run(testName, func(t *testing.T) {
t.Run("auth-empty-no-cred", func(t *testing.T) {
_, err := cloud.ExternalStorageFromURI(ctx, fmt.Sprintf("s3://%s/%s-%d", bucket,
"backup-test-default", testID), base.ExternalIODirConfig{}, testSettings,
blobs.TestEmptyBlobClientFactory, user,
nil, /* ie */
nil, /* ief */
nil, /* kvDB */
nil, /* limiters */
nil, /* metrics */
)
require.EqualError(t, err, fmt.Sprintf(
`%s is set to '%s', but %s is not set`,
cloud.AuthParam,
cloud.AuthParamSpecified,
AWSAccessKeyParam,
))
})
t.Run("auth-implicit", func(t *testing.T) {
// You can create an IAM that can access S3
// in the AWS console, then set it up locally.
// https://docs.aws.com/cli/latest/userguide/cli-configure-role.html
// We only run this test if default role exists.
credentialsProvider := credentials.SharedCredentialsProvider{}
_, err := credentialsProvider.Retrieve()
if err != nil {
skip.IgnoreLintf(t, "we only run this test if a default role exists, "+
"refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err)
}

cloudtestutils.CheckExportStore(t, fmt.Sprintf(
"s3://%s/%s-%d?%s=%s",
bucket, "backup-test-default", testID,
cloud.AuthParam, cloud.AuthParamImplicit,
), false, user,
nil, /* db */
testSettings)
})
t.Run("auth-specified", func(t *testing.T) {
uri := S3URI(bucket, fmt.Sprintf("backup-test-%d", testID),
&cloudpb.ExternalStorage_S3{AccessKey: creds.AccessKeyID, Secret: creds.SecretAccessKey, Region: "us-east-1"},
)
cloudtestutils.CheckExportStore(
t, uri, false, user, nil /* db */, testSettings,
)
cloudtestutils.CheckListFiles(
t, uri, user, nil /* db */, testSettings,
)
})
cloudtestutils.CheckExportStore(t, fmt.Sprintf(
"s3://%s/%s-%d?%s=%s",
bucket, "backup-test-default", testID,
cloud.AuthParam, cloud.AuthParamImplicit,
), false, user,
nil, /* db */
testSettings)
})
t.Run("auth-specified", func(t *testing.T) {
uri := S3URI(bucket, fmt.Sprintf("backup-test-%d", testID),
&cloudpb.ExternalStorage_S3{AccessKey: creds.AccessKeyID, Secret: creds.SecretAccessKey, Region: "us-east-1"},
)
cloudtestutils.CheckExportStore(
t, uri, false, user, nil /* db */, testSettings,
)
cloudtestutils.CheckListFiles(
t, uri, user, nil /* db */, testSettings,
)
})

// Tests that we can put an object with server side encryption specified.
t.Run("server-side-encryption", func(t *testing.T) {
// You can create an IAM that can access S3
// in the AWS console, then set it up locally.
// https://docs.aws.com/cli/latest/userguide/cli-configure-role.html
// We only run this test if default role exists.
credentialsProvider := credentials.SharedCredentialsProvider{}
_, err := credentialsProvider.Retrieve()
if err != nil {
skip.IgnoreLintf(t, "we only run this test if a default role exists, "+
"refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err)
}
// Tests that we can put an object with server side encryption specified.
t.Run("server-side-encryption", func(t *testing.T) {
// You can create an IAM that can access S3
// in the AWS console, then set it up locally.
// https://docs.aws.com/cli/latest/userguide/cli-configure-role.html
// We only run this test if default role exists.
credentialsProvider := credentials.SharedCredentialsProvider{}
_, err := credentialsProvider.Retrieve()
if err != nil {
skip.IgnoreLintf(t, "we only run this test if a default role exists, "+
"refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err)
}

cloudtestutils.CheckExportStore(t, fmt.Sprintf(
"s3://%s/%s-%d?%s=%s&%s=%s",
bucket, "backup-test-sse-256", testID,
cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode,
"AES256",
),
false,
user,
nil, /* db */
testSettings,
)
cloudtestutils.CheckExportStore(t, fmt.Sprintf(
"s3://%s/%s-%d?%s=%s&%s=%s",
bucket, "backup-test-sse-256", testID,
cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode,
"AES256",
),
false,
user,
nil, /* db */
testSettings,
)

v := os.Getenv("AWS_KMS_KEY_ARN")
if v == "" {
skip.IgnoreLint(t, "AWS_KMS_KEY_ARN env var must be set")
}
cloudtestutils.CheckExportStore(t, fmt.Sprintf(
"s3://%s/%s-%d?%s=%s&%s=%s&%s=%s",
bucket, "backup-test-sse-kms", testID,
cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode,
"aws:kms", AWSServerSideEncryptionKMSID, v,
),
false,
user,
nil, /* db */
testSettings)
})
v := os.Getenv("AWS_KMS_KEY_ARN")
if v == "" {
skip.IgnoreLint(t, "AWS_KMS_KEY_ARN env var must be set")
}
cloudtestutils.CheckExportStore(t, fmt.Sprintf(
"s3://%s/%s-%d?%s=%s&%s=%s&%s=%s",
bucket, "backup-test-sse-kms", testID,
cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode,
"aws:kms", AWSServerSideEncryptionKMSID, v,
),
false,
user,
nil, /* db */
testSettings)
})

t.Run("server-side-encryption-invalid-params", func(t *testing.T) {
// You can create an IAM that can access S3
// in the AWS console, then set it up locally.
// https://docs.aws.com/cli/latest/userguide/cli-configure-role.html
// We only run this test if default role exists.
credentialsProvider := credentials.SharedCredentialsProvider{}
_, err := credentialsProvider.Retrieve()
if err != nil {
skip.IgnoreLintf(t, "we only run this test if a default role exists, "+
"refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err)
}
t.Run("server-side-encryption-invalid-params", func(t *testing.T) {
// You can create an IAM that can access S3
// in the AWS console, then set it up locally.
// https://docs.aws.com/cli/latest/userguide/cli-configure-role.html
// We only run this test if default role exists.
credentialsProvider := credentials.SharedCredentialsProvider{}
_, err := credentialsProvider.Retrieve()
if err != nil {
skip.IgnoreLintf(t, "we only run this test if a default role exists, "+
"refer to https://docs.aws.com/cli/latest/userguide/cli-configure-role.html: %s", err)
}

// Unsupported server side encryption option.
invalidSSEModeURI := fmt.Sprintf(
"s3://%s/%s?%s=%s&%s=%s",
bucket, "backup-test-sse-256",
cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode,
"unsupported-algorithm")

_, err = makeS3Storage(ctx, invalidSSEModeURI, user)
require.True(t, testutils.IsError(err, "unsupported server encryption mode unsupported-algorithm. Supported values are `aws:kms` and `AES256"))

// Specify aws:kms encryption mode but don't specify kms ID.
invalidKMSURI := fmt.Sprintf(
"s3://%s/%s?%s=%s&%s=%s",
bucket, "backup-test-sse-256",
cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode,
"aws:kms")
_, err = makeS3Storage(ctx, invalidKMSURI, user)
require.True(t, testutils.IsError(err, "AWS_SERVER_KMS_ID param must be set when using aws:kms server side encryption mode."))
})
// Unsupported server side encryption option.
invalidSSEModeURI := fmt.Sprintf(
"s3://%s/%s?%s=%s&%s=%s",
bucket, "backup-test-sse-256",
cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode,
"unsupported-algorithm")

_, err = makeS3Storage(ctx, invalidSSEModeURI, user)
require.True(t, testutils.IsError(err, "unsupported server encryption mode unsupported-algorithm. Supported values are `aws:kms` and `AES256"))

// Specify aws:kms encryption mode but don't specify kms ID.
invalidKMSURI := fmt.Sprintf(
"s3://%s/%s?%s=%s&%s=%s",
bucket, "backup-test-sse-256",
cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode,
"aws:kms")
_, err = makeS3Storage(ctx, invalidKMSURI, user)
require.True(t, testutils.IsError(err, "AWS_SERVER_KMS_ID param must be set when using aws:kms server side encryption mode."))
})
})

}
}

func TestPutS3AssumeRole(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3008,7 +3008,12 @@ func (ex *connExecutor) execCopyIn(
}()

// When we're done, unblock the network connection.
defer cmd.CopyDone.Done()
defer func() {
// We've seen cases where this deferred function is executed multiple
// times for the same CopyIn command (#112095), so we protect the wait
// group to be decremented exactly once via sync.Once.
cmd.CopyDone.Once.Do(cmd.CopyDone.WaitGroup.Done)
}()

// The connExecutor state machine has already set us up with a txn at this
// point.
Expand Down
11 changes: 8 additions & 3 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,14 @@ type CopyIn struct {
// Conn is the network connection. Execution of the CopyFrom statement takes
// control of the connection.
Conn pgwirebase.Conn
// CopyDone is decremented once execution finishes, signaling that control of
// the connection is being handed back to the network routine.
CopyDone *sync.WaitGroup
// CopyDone is used to signal that control of the connection is being handed
// back to the network routine.
CopyDone struct {
// WaitGroup is decremented once execution finishes.
*sync.WaitGroup
// Once is used to decrement the WaitGroup exactly once.
*sync.Once
}
// TimeReceived is the time at which the message was received
// from the client. Used to compute the service latency.
TimeReceived time.Time
Expand Down
30 changes: 15 additions & 15 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,23 +409,23 @@ func (c *conn) handleSimpleQuery(
"COPY together with other statements in a query string is not supported"),
})
}
copyDone := sync.WaitGroup{}
copyDone.Add(1)
if err := c.stmtBuf.Push(
ctx,
sql.CopyIn{
Conn: c,
ParsedStmt: stmts[i],
Stmt: cp,
CopyDone: &copyDone,
TimeReceived: timeReceived,
ParseStart: startParse,
ParseEnd: endParse,
},
); err != nil {
var wg sync.WaitGroup
var once sync.Once
wg.Add(1)
cmd := sql.CopyIn{
Conn: c,
ParsedStmt: stmts[i],
Stmt: cp,
TimeReceived: timeReceived,
ParseStart: startParse,
ParseEnd: endParse,
}
cmd.CopyDone.WaitGroup = &wg
cmd.CopyDone.Once = &once
if err := c.stmtBuf.Push(ctx, cmd); err != nil {
return err
}
copyDone.Wait()
wg.Wait()
return nil
}
if cp, ok := stmts[i].AST.(*tree.CopyTo); ok {
Expand Down

0 comments on commit af1fda5

Please sign in to comment.