Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#63837

63589: server, security: Fix one-way connectivity with connect cmd r=knz a=itsbilal

Informs cockroachdb#60632.

Previously, non-trust-leader nodes couldn't connect back
to the trust leader due to the presence of the wrong
`ca-client.crt` on their disk; the main CA cert/key was
being written in four places.

This change fixes that bug,
and also creates a new `client.node.crt` certificate to prevent
other subsequent errors from being thrown.

Fixes cockroachdb#61624.

Release note: None.

63672: kvserver: fix write below closedts bug  r=andreimatei a=andreimatei

This patch fixes a bug in our closed timestamp management. This bug was
making it possible for a command to close a timestamp even though other
requests writing at lower timestamps are currently evaluating. The
problem was that we were assuming that, if a replica is proposing a new
lease, there can't be any requests in flight and every future write
evaluated on the range will wait for the new lease and the evaluate
above the lease start time. Based on that reasoning, the proposal buffer
was recording the lease start time as its assignedClosedTimestamp. This
was matching what it does for every write, where assignedClosedTimestamp
corresponds to the the closed timestamp carried by the command.

It turns out that the replica's reasoning was wrong. It is, in fact,
possible for writes to be evaluating on the range when the lease
acquisition is proposed. And these evaluations might be done at
timestamps below the would-be lease's start time. This happens when the
replica has already received a lease through a lease transfer. The
transfer must have applied after the previous lease expired and the
replica decided to start acquiring a new one.

This fixes one of the assertion failures seen in cockroachdb#62655.

Release note (bug fix): A bug leading to crashes with the message
"writing below closed ts" has been fixed.

63756: backupccl: reset restored jobs during cluster restore r=dt a=pbardea

Previously, jobs were restored without modification during cluster
restore. Due to a recently discovered bug where backup may miss
non-transactional writes written to offline spans by these jobs, their
progress may no longer be accurate on the restored cluster.

IMPORT and RESTORE jobs perform non-transactional writes that may be
missed. When a cluster RESTORE brings back these OFFLINE tables, it will
also bring back its associated job. To ensure the underlying data in
these tables is correct, the jobs are now set in a reverting state so
that they can clean up after themselves.

In-progress schema change jobs that are affected, will fail upon
validation.

Release note (bug fix): Fix a bug where restored jobs may have assumed
to have made progress that was not captured in the backup. The restored
jobs are now either canceled cluster restore.

63837: build: update the go version requirement for `make` r=otan a=knz

Fixes cockroachdb#63837.

The builder image already requires go 1.15.10. This patch
modifies the check for a non-builder `make` command to require
at least the same version.

Release note: None

Co-authored-by: Bilal Akhtar <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Paul Bardea <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
5 people committed Apr 19, 2021
5 parents f73a761 + 60dba10 + 467ae98 + 1c882a0 + e184d51 commit 4dc05cc
Show file tree
Hide file tree
Showing 22 changed files with 1,632 additions and 978 deletions.
3 changes: 2 additions & 1 deletion build/go-version-check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

required_version_major=1
minimum_version_minor=15
minimum_version_15_patch=3
minimum_version_15_patch=10 # update to 11 when issue #63836 is addressed
minimum_version_16_patch=3

go=${1-go}

Expand Down
68 changes: 48 additions & 20 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,7 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
// This test retries the job (by injected a retry error) after restoring a
// every system table that has a custom restore function. This tried to tease
// out any errors that may occur if some of the system table restoration
// functions are not idempotent (e.g. jobs table), but are retried by the
// restore anyway.
// functions are not idempotent.
t.Run("retry-during-custom-system-table-restore", func(t *testing.T) {
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

Expand All @@ -612,14 +611,16 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
_, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{})
defer cleanupEmptyCluster()

// Inject a retry error
// Inject a retry error, that returns once.
alreadyErrored := false
for _, server := range tcRestore.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.duringSystemTableRestoration = func(systemTableName string) error {
if systemTableName == customRestoreSystemTable {
if !alreadyErrored && systemTableName == customRestoreSystemTable {
alreadyErrored = true
return jobs.NewRetryJobError("injected error")
}
return nil
Expand All @@ -629,10 +630,9 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
}
}

// The initial restore will fail, and restart.
// The initial restore will return an error, and restart.
sqlDBRestore.ExpectErr(t, `injected error: restarting in background`, `RESTORE FROM $1`, LocalFoo)
// Expect the job to succeed. If the job fails, it's likely due to
// attempting to restore the same system table data twice.
// Expect the restore to succeed.
sqlDBRestore.CheckQueryResultsRetry(t,
`SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' AND status = 'succeeded'`,
[][]string{{"1"}})
Expand Down Expand Up @@ -841,9 +841,7 @@ func TestReintroduceOfflineSpans(t *testing.T) {

const numAccounts = 1000
ctx, _, srcDB, tempDir, cleanupSrc := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{})
defer cleanupSrc()
defer cleanupDst()

dbBackupLoc := "nodelocal://0/my_db_backup"
clusterBackupLoc := "nodelocal://0/my_cluster_backup"
Expand All @@ -867,7 +865,10 @@ func TestReintroduceOfflineSpans(t *testing.T) {
<-dbRestoreStarted
srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// All the restore to finish. This will issue AddSSTable requests at a
var tsMidRestore string
srcDB.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsMidRestore)

// Allow the restore to finish. This will issue AddSSTable requests at a
// timestamp that is before the last incremental we just took.
close(blockDBRestore)

Expand All @@ -884,16 +885,43 @@ func TestReintroduceOfflineSpans(t *testing.T) {

srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// Restore the incremental backup chain that has missing writes.
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc)
t.Run("spans-reintroduced", func(t *testing.T) {
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{})
defer cleanupDst()

// Restore the incremental backup chain that has missing writes.
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc)

// Assert that the restored database has the same number of rows in both the
// source and destination cluster.
checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore
expectedCount := srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount)

checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore
expectedCount = srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount)
})

t.Run("restore-canceled", func(t *testing.T) {
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{})
defer cleanupDst()

// Assert that the restored database has the same number
// of rows in both the source and destination cluster.
checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore
expectedCount := srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount)
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsMidRestore, clusterBackupLoc)

checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore
expectedCount = srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount)
// Wait for the cluster restore job to finish, as well as the restored RESTORE TABLE
// job to cancel.
destDB.CheckQueryResultsRetry(t, `
SELECT description, status FROM [SHOW JOBS]
WHERE job_type = 'RESTORE' AND status NOT IN ('succeeded', 'canceled')`,
[][]string{},
)
// The cluster restore should succeed, but the table restore should have failed.
destDB.CheckQueryResults(t,
`SELECT status, count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' GROUP BY status ORDER BY status`,
[][]string{{"canceled", "1"}, {"succeeded", "1"}})

destDB.ExpectErr(t, `relation "restoredb.bank" does not exist`, `SELECT count(*) FROM restoredb.bank`)
})
}
18 changes: 18 additions & 0 deletions pkg/ccl/backupccl/restoration_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package backupccl

import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand Down Expand Up @@ -107,3 +108,20 @@ func (b *restorationDataBase) isEmpty() bool {

// isMainBundle implements restorationData.
func (restorationDataBase) isMainBundle() bool { return false }

// checkForMigratedData checks to see if any of the system tables in the set of
// data that is to be restored has already been restored. If this is the case,
// it is not safe to try and restore the data again since the migration may have
// written to the temporary system table.
func checkForMigratedData(details jobspb.RestoreDetails, dataToRestore restorationData) bool {
for _, systemTable := range dataToRestore.getSystemTables() {
// We only need to check if _any_ of the system tables in this batch of
// data have been migrated. This is because the migration can only
// happen after all of the data in the batch has been restored.
if _, ok := details.SystemTablesMigrated[systemTable.GetName()]; ok {
return true
}
}

return false
}
57 changes: 39 additions & 18 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,14 @@ func restore(
return emptyRowCount, nil
}

// If we've already migrated some of the system tables we're about to
// restore, this implies that a previous attempt restored all of this data.
// We want to avoid restoring again since we'll be shadowing migrated keys.
details := job.Details().(jobspb.RestoreDetails)
if alreadyMigrated := checkForMigratedData(details, dataToRestore); alreadyMigrated {
return emptyRowCount, nil
}

mu := struct {
syncutil.Mutex
highWaterMark int
Expand Down Expand Up @@ -1670,8 +1678,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
}
// Reload the details as we may have updated the job.
details = r.job.Details().(jobspb.RestoreDetails)
}
if details.DescriptorCoverage == tree.AllDescriptors {

if err := r.cleanupTempSystemTables(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -2426,8 +2433,8 @@ func (r *restoreResumer) restoreSystemTables(
) error {
tempSystemDBID := getTempSystemDBID(restoreDetails)
details := r.job.Details().(jobspb.RestoreDetails)
if details.SystemTablesRestored == nil {
details.SystemTablesRestored = make(map[string]bool)
if details.SystemTablesMigrated == nil {
details.SystemTablesMigrated = make(map[string]bool)
}

// Iterate through all the tables that we're restoring, and if it was restored
Expand All @@ -2438,18 +2445,36 @@ func (r *restoreResumer) restoreSystemTables(
continue
}
systemTableName := table.GetName()
if details.SystemTablesRestored[systemTableName] {
// We've already restored this table.
continue
stagingTableName := restoreTempSystemDB + "." + systemTableName

config, ok := systemTableBackupConfiguration[systemTableName]
if !ok {
log.Warningf(ctx, "no configuration specified for table %s... skipping restoration",
systemTableName)
}

if config.migrationFunc != nil {
if details.SystemTablesMigrated[systemTableName] {
continue
}

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := config.migrationFunc(ctx, r.execCfg, txn, stagingTableName); err != nil {
return err
}

// Keep track of which system tables we've migrated so that future job
// restarts don't try to import data over our migrated data. This would
// fail since the restored data would shadow the migrated keys.
details.SystemTablesMigrated[systemTableName] = true
return r.job.SetDetails(ctx, txn, details)
}); err != nil {
return err
}
}

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetDebugName("system-restore-txn")
config, ok := systemTableBackupConfiguration[systemTableName]
if !ok {
log.Warningf(ctx, "no configuration specified for table %s... skipping restoration",
systemTableName)
}

restoreFunc := defaultSystemTableRestoreFunc
if config.customRestoreFunc != nil {
Expand All @@ -2458,15 +2483,11 @@ func (r *restoreResumer) restoreSystemTables(
}

log.Eventf(ctx, "restoring system table %s", systemTableName)
err := restoreFunc(ctx, r.execCfg, txn, systemTableName, restoreTempSystemDB+"."+systemTableName)
err := restoreFunc(ctx, r.execCfg, txn, systemTableName, stagingTableName)
if err != nil {
return errors.Wrapf(err, "restoring system table %s", systemTableName)
}

// System table restoration may not be idempotent, so we need to keep
// track of what we've restored.
details.SystemTablesRestored[systemTableName] = true
return r.job.SetDetails(ctx, txn, details)
return nil
}); err != nil {
return err
}
Expand Down
84 changes: 83 additions & 1 deletion pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ package backupccl
import (
"context"
fmt "fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -55,6 +61,10 @@ type systemBackupConfiguration struct {
// to support the restore (e.g. users that can run the restore, cluster settings
// that control how the restore runs, etc...).
restoreBeforeData bool
// migrationFunc performs the necessary migrations on the system table data in
// the crdb_temp staging table before it is loaded into the actual system
// table.
migrationFunc func(ctx context.Context, execCtx *sql.ExecutorConfig, txn *kv.Txn, tempTableName string) error
// customRestoreFunc is responsible for restoring the data from a table that
// holds the restore system table data into the given system table. If none
// is provided then `defaultRestoreFunc` is used.
Expand Down Expand Up @@ -93,6 +103,77 @@ func defaultSystemTableRestoreFunc(

// Custom restore functions for different system tables.

// jobsMigrationFunc resets the progress on schema change jobs, and marks all
// other jobs as reverting.
func jobsMigrationFunc(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, tempTableName string,
) (err error) {
executor := execCfg.InternalExecutor

const statesToRevert = `('` + string(jobs.StatusRunning) + `', ` +
`'` + string(jobs.StatusPauseRequested) + `', ` +
`'` + string(jobs.StatusPaused) + `')`

jobsToRevert := make([]int64, 0)
query := `SELECT id, payload FROM ` + tempTableName + ` WHERE status IN ` + statesToRevert
it, err := executor.QueryIteratorEx(
ctx, "restore-fetching-job-payloads", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
query)
if err != nil {
return errors.Wrap(err, "fetching job payloads")
}
defer func() {
closeErr := it.Close()
if err == nil {
err = closeErr
}
}()
for {
ok, err := it.Next(ctx)
if !ok {
if err != nil {
return err
}
break
}

r := it.Cur()
id, payloadBytes := r[0], r[1]
rawJobID, ok := id.(*tree.DInt)
if !ok {
return errors.Errorf("job: failed to read job id as DInt (was %T)", id)
}
jobID := int64(*rawJobID)

payload, err := jobs.UnmarshalPayload(payloadBytes)
if err != nil {
return errors.Wrap(err, "failed to unmarshal job to restore")
}
if payload.Type() == jobspb.TypeImport || payload.Type() == jobspb.TypeRestore {
jobsToRevert = append(jobsToRevert, jobID)
}
}

// Update the status for other jobs.
var updateStatusQuery strings.Builder
fmt.Fprintf(&updateStatusQuery, "UPDATE %s SET status = $1 WHERE id IN ", tempTableName)
fmt.Fprint(&updateStatusQuery, "(")
for i, job := range jobsToRevert {
if i > 0 {
fmt.Fprint(&updateStatusQuery, ", ")
}
fmt.Fprintf(&updateStatusQuery, "'%d'", job)
}
fmt.Fprint(&updateStatusQuery, ")")

if _, err := executor.Exec(ctx, "updating-job-status", txn, updateStatusQuery.String(), jobs.StatusCancelRequested); err != nil {
return errors.Wrap(err, "updating restored jobs as reverting")
}

return nil
}

// When restoring the jobs table we don't want to remove existing jobs, since
// that includes the restore that we're running.
func jobsRestoreFunc(
Expand All @@ -105,7 +186,7 @@ func jobsRestoreFunc(

// When restoring jobs, don't clear the existing table.

restoreQuery := fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s);",
restoreQuery := fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s) ON CONFLICT DO NOTHING;",
systemTableName, tempTableName)
opName := systemTableName + "-data-insert"
if _, err := executor.Exec(ctx, opName, txn, restoreQuery); err != nil {
Expand Down Expand Up @@ -178,6 +259,7 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
},
systemschema.JobsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
migrationFunc: jobsMigrationFunc,
customRestoreFunc: jobsRestoreFunc,
},
systemschema.ScheduledJobsTable.GetName(): {
Expand Down
Loading

0 comments on commit 4dc05cc

Please sign in to comment.