Skip to content

Commit

Permalink
Merge pull request #52003 from pbardea/backport20.1-51999
Browse files Browse the repository at this point in the history
release-20.1: backupccl: stop blocking backup jobs on client disconnect
  • Loading branch information
pbardea authored Jul 29, 2020
2 parents b42f9f1 + 845e688 commit e72a892
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 10 deletions.
6 changes: 1 addition & 5 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,11 +723,7 @@ func backupPlanHook(
}
}

errCh, err := sj.Start(ctx)
if err != nil {
return err
}
return <-errCh
return sj.Run(ctx)
}
return fn, header, nil, false, nil
}
Expand Down
121 changes: 121 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand All @@ -60,8 +61,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/workload/bank"
"github.com/cockroachdb/cockroach/pkg/workload/workloadsql"
"github.com/gogo/protobuf/proto"
"github.com/jackc/pgx"
"github.com/kr/pretty"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -3997,3 +4000,121 @@ func getFirstStoreReplica(
})
return store, repl
}

// TestClientDisconnect ensures that an backup job can complete even if
// the client connection which started it closes.
func TestClientDisconnect(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const restoreDB = "restoredb"

testCases := []struct {
jobType string
jobCommand string
}{
{
jobType: "BACKUP",
jobCommand: fmt.Sprintf("BACKUP TO '%s'", localFoo),
},
{
jobType: "RESTORE",
jobCommand: fmt.Sprintf("RESTORE data.* FROM '%s' WITH into_db='%s'", localFoo, restoreDB),
},
}

for _, testCase := range testCases {
t.Run(testCase.jobType, func(t *testing.T) {
// When completing an export request, signal the a request has been sent and
// then wait to be signaled.
allowResponse := make(chan struct{})
gotRequest := make(chan struct{}, 1)
args := base.TestClusterArgs{}
args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for _, ru := range br.Responses {
switch ru.GetInner().(type) {
case *roachpb.ExportResponse, *roachpb.ImportResponse:
select {
case gotRequest <- struct{}{}:
default:
}
<-allowResponse
}
}
return nil
},
}
ctx, tc, sqlDB, _, cleanup := backupRestoreTestSetupWithParams(t, multiNode, 1 /* numAccounts */, initNone, args)
defer cleanup()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

conn := tc.ServerConn(0)
sqlDB.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '100ms';")

// If we're testing restore, we first create a backup file to restore.
if testCase.jobType == "RESTORE" {
close(allowResponse)
sqlDB.Exec(t, fmt.Sprintf("CREATE DATABASE %s", restoreDB))
sqlDB.Exec(t, "BACKUP TO $1", localFoo)
allowResponse = make(chan struct{})
}

// Make credentials for the new connection.
sqlDB.Exec(t, `CREATE USER testuser`)
sqlDB.Exec(t, `GRANT admin TO testuser`)
pgURL, cleanup := sqlutils.PGUrl(t, tc.Server(0).ServingSQLAddr(),
"TestClientDisconnect-testuser", url.User("testuser"))
defer cleanup()

// Kick off the job on a new connection which we're going to close.
done := make(chan struct{})
ctxToCancel, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
defer close(done)
connCfg, err := pgx.ParseConnectionString(pgURL.String())
assert.NoError(t, err)
db, err := pgx.Connect(connCfg)
assert.NoError(t, err)
defer func() { _ = db.Close() }()
_, err = db.ExecEx(ctxToCancel, testCase.jobCommand, nil /* options */)
assert.Equal(t, context.Canceled, err)
}()

// Wait for the job to start.
var jobID string
testutils.SucceedsSoon(t, func() error {
row := conn.QueryRow(
"SELECT job_id FROM [SHOW JOBS] WHERE job_type = $1 ORDER BY created DESC LIMIT 1",
testCase.jobType,
)
return row.Scan(&jobID)
})

// Wait for it to actually start.
<-gotRequest

// Cancel the job's context and wait for the goroutine to exit.
cancel()
<-done

// Allow the job to proceed.
close(allowResponse)

// Wait for the job to get marked as succeeded.
testutils.SucceedsSoon(t, func() error {
var status string
if err := conn.QueryRow("SELECT status FROM [SHOW JOB " + jobID + "]").Scan(&status); err != nil {
return err
}
const succeeded = "succeeded"
if status != succeeded {
return errors.Errorf("expected %s, got %v", succeeded, status)
}
return nil
})
})
}
}
23 changes: 18 additions & 5 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -843,7 +844,7 @@ func doRestorePlan(
}
}

_, errCh, err := p.ExecCfg().JobRegistry.CreateAndStartJob(ctx, resultsCh, jobs.Record{
jr := jobs.Record{
Description: description,
Username: p.User(),
DescriptorIDs: func() (sqlDescIDs []sqlbase.ID) {
Expand All @@ -863,11 +864,23 @@ func doRestorePlan(
Encryption: encryption,
},
Progress: jobspb.RestoreProgress{},
})
if err != nil {
return err
}
return <-errCh
var sj *jobs.StartableJob
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn, resultsCh)
if err != nil {
return err
}
return nil
}); err != nil {
if sj != nil {
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
log.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr)
}
}
}

return sj.Run(ctx)
}

func init() {
Expand Down

0 comments on commit e72a892

Please sign in to comment.