Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
102875: backupccl: return not ok during import key elision in rewriter r=adityamaru a=msbutler

Previously, when the key rewriter encountered a key from an in progress import during restore, it would throw an error which would then be handled in the restore data processor. This patch modifies the key rewriter to now return not ok in this case, and simplifies the rewriter to always return early if a key is not ok.

Epic: none

Release note: None

102916: roachtest: add `failover` variants with expiration leases r=erikgrinaker a=erikgrinaker

Expiration-based leases have different availability properties than epoch leases under most failure modes. This patch adds `failover` test variants that use expiration-based leases only.

Epic: none
Release note: None

Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
3 people committed May 10, 2023
3 parents 9af73d5 + e81f3d6 + 8cafeae commit 506a55a
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 83 deletions.
24 changes: 13 additions & 11 deletions pkg/ccl/backupccl/key_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ func MakeKeyRewriterPrefixIgnoringInterleaved(tableID descpb.ID, indexID descpb.
// an error when it encounters a key from an in-progress import. Currently, this
// is only relevant for RESTORE. See the checkAndRewriteTableKey function for
// more details.
func (kr *KeyRewriter) RewriteKey(key []byte, wallTime int64) ([]byte, bool, error) {
func (kr *KeyRewriter) RewriteKey(
key []byte, walltimeForImportElision int64,
) ([]byte, bool, error) {
// If we are reading a system tenant backup and this is a tenant key then it
// is part of a backup *of* that tenant, so we only restore it if we have a
// tenant rekey for it, i.e. we're restoring that tenant.
Expand Down Expand Up @@ -254,8 +256,8 @@ func (kr *KeyRewriter) RewriteKey(key []byte, wallTime int64) ([]byte, bool, err
return nil, false, err
}

rekeyed, ok, err := kr.checkAndRewriteTableKey(noTenantPrefix, wallTime)
if err != nil {
rekeyed, ok, err := kr.checkAndRewriteTableKey(noTenantPrefix, walltimeForImportElision)
if err != nil || !ok {
return nil, false, err
}

Expand All @@ -276,20 +278,20 @@ func (kr *KeyRewriter) RewriteKey(key []byte, wallTime int64) ([]byte, bool, err
return rekeyed, ok, err
}

// ErrImportingKeyError indicates the current key is apart of an in-progress import
var ErrImportingKeyError = errors.New("skipping rewrite of an importing key")

// checkAndRewriteTableKey rewrites the table IDs in the key. It assumes that
// any tenant ID has been stripped from the key so it operates with the system
// codec. It is the responsibility of the caller to either remap, or re-prepend
// any required tenant prefix.
// any required tenant prefix. The function returns the rewritten key (if possible),
// a boolean indicating if the key was rewritten, and an error, if any.
//
// The caller may also pass the key's walltime (part of the MVCC key's
// timestamp), which the function uses to detect and filter out keys from
// in-progress imports. If the caller passes a zero valued walltime, no
// filtering occurs. Filtering is necessary during restore because the restoring
// cluster should not contain keys from an in-progress import.
func (kr *KeyRewriter) checkAndRewriteTableKey(key []byte, wallTime int64) ([]byte, bool, error) {
func (kr *KeyRewriter) checkAndRewriteTableKey(
key []byte, walltimeForImportElision int64,
) ([]byte, bool, error) {
// Fetch the original table ID for descriptor lookup. Ignore errors because
// they will be caught later on if tableID isn't in descs or kr doesn't
// perform a rewrite.
Expand All @@ -298,7 +300,7 @@ func (kr *KeyRewriter) checkAndRewriteTableKey(key []byte, wallTime int64) ([]by
// Skip keys from ephemeral cluster status tables so that the restored cluster
// does not observe stale leases/liveness until it expires.
if tableID == keys.SQLInstancesTableID || tableID == keys.SqllivenessID || tableID == keys.LeaseTableID {
return key, false, nil
return nil, false, nil
}

desc := kr.descs[descpb.ID(tableID)]
Expand All @@ -311,8 +313,8 @@ func (kr *KeyRewriter) checkAndRewriteTableKey(key []byte, wallTime int64) ([]by
// GetInProgressImportStartTime), then this function returns an error if this
// key is a part of the import -- i.e. the key's walltime is greater than the
// import start time. It is up to the caller to handle this error properly.
if importTime := desc.GetInProgressImportStartTime(); wallTime > 0 && importTime > 0 && wallTime >= importTime {
return nil, false, ErrImportingKeyError
if importTime := desc.GetInProgressImportStartTime(); walltimeForImportElision > 0 && importTime > 0 && walltimeForImportElision >= importTime {
return nil, false, nil
}

// Rewrite the first table ID.
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/backupccl/key_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,11 @@ func TestKeyRewriter(t *testing.T) {
key := rowenc.MakeIndexKeyPrefix(keys.SystemSQLCodec,
systemschema.NamespaceTable.GetID(), desc.GetPrimaryIndexID())

// If the passed in walltime is at or above the ImportStartWalltime, an error should return
_, _, err = newKr.RewriteKey(key, 2)
require.Error(t, err, ErrImportingKeyError.Error())
// If the passed in walltime is at or above the ImportStartWalltime,
// rewriting should not have occurred.
_, ok, err := newKr.RewriteKey(key, 2)
require.NoError(t, err)
require.False(t, ok)

// Else, the key should get encoded normally.
newKey, ok, err := newKr.RewriteKey(key, 1)
Expand Down
11 changes: 4 additions & 7 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,19 +471,16 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(

key.Key, ok, err = kr.RewriteKey(key.Key, key.Timestamp.WallTime)

if errors.Is(err, ErrImportingKeyError) {
// The keyRewriter returns ErrImportingKeyError iff the key is part of an
// in-progress import. Keys from in-progress imports never get restored,
// since the key's table gets restored to its pre-import state. Therefore,
// elide ingesting this key.
continue
}
if err != nil {
return summary, err
}
if !ok {
// If the key rewriter didn't match this key, it's not data for the
// table(s) we're interested in.
//
// As an example, keys from in-progress imports never get restored,
// since the key's table gets restored to its pre-import state. Therefore,
// we elide ingesting this key.
if verbose {
log.Infof(ctx, "skipping %s %s", key.Key, value.PrettyPrint())
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ var restoreStatsInsertionConcurrency = settings.RegisterIntSetting(
func rewriteBackupSpanKey(
codec keys.SQLCodec, kr *KeyRewriter, key roachpb.Key,
) (roachpb.Key, error) {
newKey, rewritten, err := kr.RewriteKey(append([]byte(nil), key...), 0 /*wallTime*/)
newKey, rewritten, err := kr.RewriteKey(append([]byte(nil), key...),
0 /*wallTimeForImportElision*/)
if err != nil {
return nil, errors.NewAssertionErrorWithWrappedErrf(err,
"could not rewrite span start key: %s", key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err
}

func (sip *streamIngestionProcessor) rekey(key roachpb.Key) ([]byte, bool, error) {
return sip.rekeyer.RewriteKey(key, 0 /*wallTime*/)
return sip.rekeyer.RewriteKey(key, 0 /*wallTimeForImportElision*/)
}

func (sip *streamIngestionProcessor) bufferSST(sst *kvpb.RangeFeedSSTable) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func noteKeyVal(
validator *streamClientValidator, keyVal roachpb.KeyValue, spec streamclient.SubscriptionToken,
) {
if validator.rekeyer != nil {
rekey, _, err := validator.rekeyer.RewriteKey(keyVal.Key, 0 /* wallTime*/)
rekey, _, err := validator.rekeyer.RewriteKey(keyVal.Key, 0 /* wallTimeForImportElision*/)
if err != nil {
panic(err.Error())
}
Expand Down
142 changes: 83 additions & 59 deletions pkg/cmd/roachtest/tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,65 +30,76 @@ import (
)

func registerFailover(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "failover/partial/lease-liveness",
Owner: registry.OwnerKV,
Timeout: 30 * time.Minute,
Cluster: r.MakeClusterSpec(6, spec.CPU(4)),
Run: runDisconnect,
})
for _, failureMode := range []failureMode{
failureModeBlackhole,
failureModeBlackholeRecv,
failureModeBlackholeSend,
failureModeCrash,
failureModeDiskStall,
failureModePause,
} {
failureMode := failureMode // pin loop variable
makeSpec := func(nNodes, nCPU int) spec.ClusterSpec {
s := r.MakeClusterSpec(nNodes, spec.CPU(nCPU))
if failureMode == failureModeDiskStall {
// Use PDs in an attempt to work around flakes encountered when using
// SSDs. See #97968.
s.PreferLocalSSD = false
}
return s
}
var postValidation registry.PostValidation = 0
if failureMode == failureModeDiskStall {
postValidation = registry.PostValidationNoDeadNodes
for _, expirationLeases := range []bool{false, true} {
expirationLeases := expirationLeases // pin loop variable
var suffix string
if expirationLeases {
suffix = "/lease=expiration"
}

r.Add(registry.TestSpec{
Name: fmt.Sprintf("failover/non-system/%s", failureMode),
Owner: registry.OwnerKV,
Timeout: 30 * time.Minute,
SkipPostValidations: postValidation,
Cluster: makeSpec(7 /* nodes */, 4 /* cpus */),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runFailoverNonSystem(ctx, t, c, failureMode)
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("failover/liveness/%s", failureMode),
Owner: registry.OwnerKV,
Timeout: 30 * time.Minute,
SkipPostValidations: postValidation,
Cluster: makeSpec(5 /* nodes */, 4 /* cpus */),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runFailoverLiveness(ctx, t, c, failureMode)
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("failover/system-non-liveness/%s", failureMode),
Owner: registry.OwnerKV,
Timeout: 30 * time.Minute,
SkipPostValidations: postValidation,
Cluster: makeSpec(7 /* nodes */, 4 /* cpus */),
Name: "failover/partial/lease-liveness" + suffix,
Owner: registry.OwnerKV,
Timeout: 30 * time.Minute,
Cluster: r.MakeClusterSpec(6, spec.CPU(4)),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runFailoverSystemNonLiveness(ctx, t, c, failureMode)
runDisconnect(ctx, t, c, expirationLeases)
},
})

for _, failureMode := range []failureMode{
failureModeBlackhole,
failureModeBlackholeRecv,
failureModeBlackholeSend,
failureModeCrash,
failureModeDiskStall,
failureModePause,
} {
failureMode := failureMode // pin loop variable
makeSpec := func(nNodes, nCPU int) spec.ClusterSpec {
s := r.MakeClusterSpec(nNodes, spec.CPU(nCPU))
if failureMode == failureModeDiskStall {
// Use PDs in an attempt to work around flakes encountered when using
// SSDs. See #97968.
s.PreferLocalSSD = false
}
return s
}
var postValidation registry.PostValidation = 0
if failureMode == failureModeDiskStall {
postValidation = registry.PostValidationNoDeadNodes
}
r.Add(registry.TestSpec{
Name: fmt.Sprintf("failover/non-system/%s%s", failureMode, suffix),
Owner: registry.OwnerKV,
Timeout: 30 * time.Minute,
SkipPostValidations: postValidation,
Cluster: makeSpec(7 /* nodes */, 4 /* cpus */),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runFailoverNonSystem(ctx, t, c, failureMode, expirationLeases)
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("failover/liveness/%s%s", failureMode, suffix),
Owner: registry.OwnerKV,
Timeout: 30 * time.Minute,
SkipPostValidations: postValidation,
Cluster: makeSpec(5 /* nodes */, 4 /* cpus */),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runFailoverLiveness(ctx, t, c, failureMode, expirationLeases)
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("failover/system-non-liveness/%s%s", failureMode, suffix),
Owner: registry.OwnerKV,
Timeout: 30 * time.Minute,
SkipPostValidations: postValidation,
Cluster: makeSpec(7 /* nodes */, 4 /* cpus */),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runFailoverSystemNonLiveness(ctx, t, c, failureMode, expirationLeases)
},
})
}
}
}

Expand All @@ -103,7 +114,7 @@ func randSleep(ctx context.Context, rng *rand.Rand, max time.Duration) {
// 5 nodes fully connected. Break the connection between a pair of nodes 4 and 5
// while running a workload against nodes 1 through 3. Before each disconnect,
// move all the leases to nodes 4 and 5 in a different pattern.
func runDisconnect(ctx context.Context, t test.Test, c cluster.Cluster) {
func runDisconnect(ctx context.Context, t test.Test, c cluster.Cluster, expLeases bool) {
require.Equal(t, 6, c.Spec().NodeCount)

rng, _ := randutil.NewTestRand()
Expand All @@ -117,13 +128,17 @@ func runDisconnect(ctx context.Context, t test.Test, c cluster.Cluster) {
conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()

_, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.expiration_leases_only.enabled = $1`,
expLeases)
require.NoError(t, err)

constrainAllConfig(t, ctx, conn, 3, []int{4, 5}, 0)
constrainConfig(t, ctx, conn, `RANGE liveness`, 3, []int{3, 5}, 4)
// Wait for upreplication.
require.NoError(t, WaitFor3XReplication(ctx, t, conn))

t.Status("creating workload database")
_, err := conn.ExecContext(ctx, `CREATE DATABASE kv`)
_, err = conn.ExecContext(ctx, `CREATE DATABASE kv`)
require.NoError(t, err)
constrainConfig(t, ctx, conn, `DATABASE kv`, 3, []int{2, 3, 5}, 0)

Expand Down Expand Up @@ -209,7 +224,7 @@ func runDisconnect(ctx context.Context, t test.Test, c cluster.Cluster) {
// order, with 1 minute between each operation, for 3 cycles totaling 9
// failures.
func runFailoverNonSystem(
ctx context.Context, t test.Test, c cluster.Cluster, failureMode failureMode,
ctx context.Context, t test.Test, c cluster.Cluster, failureMode failureMode, expLeases bool,
) {
require.Equal(t, 7, c.Spec().NodeCount)

Expand All @@ -233,6 +248,9 @@ func runFailoverNonSystem(
t.Status("configuring cluster")
_, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.range_split.by_load_enabled = 'false'`)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, `SET CLUSTER SETTING kv.expiration_leases_only.enabled = $1`,
expLeases)
require.NoError(t, err)

// Constrain all existing zone configs to n1-n3.
constrainAllConfig(t, ctx, conn, 3, []int{4, 5, 6}, 0)
Expand Down Expand Up @@ -350,7 +368,7 @@ func runFailoverNonSystem(
// have currently. Prometheus scraping more often isn't enough, because CRDB
// itself only samples every 10 seconds.
func runFailoverLiveness(
ctx context.Context, t test.Test, c cluster.Cluster, failureMode failureMode,
ctx context.Context, t test.Test, c cluster.Cluster, failureMode failureMode, expLeases bool,
) {
require.Equal(t, 5, c.Spec().NodeCount)

Expand All @@ -374,6 +392,9 @@ func runFailoverLiveness(
t.Status("configuring cluster")
_, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.range_split.by_load_enabled = 'false'`)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, `SET CLUSTER SETTING kv.expiration_leases_only.enabled = $1`,
expLeases)
require.NoError(t, err)

// Constrain all existing zone configs to n1-n3.
constrainAllConfig(t, ctx, conn, 3, []int{4}, 0)
Expand Down Expand Up @@ -490,7 +511,7 @@ func runFailoverLiveness(
// order, with 1 minute between each operation, for 3 cycles totaling 9
// failures.
func runFailoverSystemNonLiveness(
ctx context.Context, t test.Test, c cluster.Cluster, failureMode failureMode,
ctx context.Context, t test.Test, c cluster.Cluster, failureMode failureMode, expLeases bool,
) {
require.Equal(t, 7, c.Spec().NodeCount)

Expand All @@ -514,6 +535,9 @@ func runFailoverSystemNonLiveness(
t.Status("configuring cluster")
_, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.range_split.by_load_enabled = 'false'`)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, `SET CLUSTER SETTING kv.expiration_leases_only.enabled = $1`,
expLeases)
require.NoError(t, err)

// Constrain all existing zone configs to n4-n6, except liveness which is
// constrained to n1-n3.
Expand Down

0 comments on commit 506a55a

Please sign in to comment.