Skip to content

Commit

Permalink
backupccl: return not ok during import key elision in rewriter
Browse files Browse the repository at this point in the history
Previously, when the key rewriter encountered a key from an in progress import
during restore, it would throw an error which would then be handled in the
restore data processor. This patch modifies the key rewriter to now return not
ok in this case, and simplifies the rewriter to always return early if a key
is not ok.

Epic: none

Release note: None
  • Loading branch information
msbutler committed May 8, 2023
1 parent 53abb2f commit e81f3d6
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 24 deletions.
24 changes: 13 additions & 11 deletions pkg/ccl/backupccl/key_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ func MakeKeyRewriterPrefixIgnoringInterleaved(tableID descpb.ID, indexID descpb.
// an error when it encounters a key from an in-progress import. Currently, this
// is only relevant for RESTORE. See the checkAndRewriteTableKey function for
// more details.
func (kr *KeyRewriter) RewriteKey(key []byte, wallTime int64) ([]byte, bool, error) {
func (kr *KeyRewriter) RewriteKey(
key []byte, walltimeForImportElision int64,
) ([]byte, bool, error) {
// If we are reading a system tenant backup and this is a tenant key then it
// is part of a backup *of* that tenant, so we only restore it if we have a
// tenant rekey for it, i.e. we're restoring that tenant.
Expand Down Expand Up @@ -254,8 +256,8 @@ func (kr *KeyRewriter) RewriteKey(key []byte, wallTime int64) ([]byte, bool, err
return nil, false, err
}

rekeyed, ok, err := kr.checkAndRewriteTableKey(noTenantPrefix, wallTime)
if err != nil {
rekeyed, ok, err := kr.checkAndRewriteTableKey(noTenantPrefix, walltimeForImportElision)
if err != nil || !ok {
return nil, false, err
}

Expand All @@ -276,20 +278,20 @@ func (kr *KeyRewriter) RewriteKey(key []byte, wallTime int64) ([]byte, bool, err
return rekeyed, ok, err
}

// ErrImportingKeyError indicates the current key is apart of an in-progress import
var ErrImportingKeyError = errors.New("skipping rewrite of an importing key")

// checkAndRewriteTableKey rewrites the table IDs in the key. It assumes that
// any tenant ID has been stripped from the key so it operates with the system
// codec. It is the responsibility of the caller to either remap, or re-prepend
// any required tenant prefix.
// any required tenant prefix. The function returns the rewritten key (if possible),
// a boolean indicating if the key was rewritten, and an error, if any.
//
// The caller may also pass the key's walltime (part of the MVCC key's
// timestamp), which the function uses to detect and filter out keys from
// in-progress imports. If the caller passes a zero valued walltime, no
// filtering occurs. Filtering is necessary during restore because the restoring
// cluster should not contain keys from an in-progress import.
func (kr *KeyRewriter) checkAndRewriteTableKey(key []byte, wallTime int64) ([]byte, bool, error) {
func (kr *KeyRewriter) checkAndRewriteTableKey(
key []byte, walltimeForImportElision int64,
) ([]byte, bool, error) {
// Fetch the original table ID for descriptor lookup. Ignore errors because
// they will be caught later on if tableID isn't in descs or kr doesn't
// perform a rewrite.
Expand All @@ -298,7 +300,7 @@ func (kr *KeyRewriter) checkAndRewriteTableKey(key []byte, wallTime int64) ([]by
// Skip keys from ephemeral cluster status tables so that the restored cluster
// does not observe stale leases/liveness until it expires.
if tableID == keys.SQLInstancesTableID || tableID == keys.SqllivenessID || tableID == keys.LeaseTableID {
return key, false, nil
return nil, false, nil
}

desc := kr.descs[descpb.ID(tableID)]
Expand All @@ -311,8 +313,8 @@ func (kr *KeyRewriter) checkAndRewriteTableKey(key []byte, wallTime int64) ([]by
// GetInProgressImportStartTime), then this function returns an error if this
// key is a part of the import -- i.e. the key's walltime is greater than the
// import start time. It is up to the caller to handle this error properly.
if importTime := desc.GetInProgressImportStartTime(); wallTime > 0 && importTime > 0 && wallTime >= importTime {
return nil, false, ErrImportingKeyError
if importTime := desc.GetInProgressImportStartTime(); walltimeForImportElision > 0 && importTime > 0 && walltimeForImportElision >= importTime {
return nil, false, nil
}

// Rewrite the first table ID.
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/backupccl/key_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,11 @@ func TestKeyRewriter(t *testing.T) {
key := rowenc.MakeIndexKeyPrefix(keys.SystemSQLCodec,
systemschema.NamespaceTable.GetID(), desc.GetPrimaryIndexID())

// If the passed in walltime is at or above the ImportStartWalltime, an error should return
_, _, err = newKr.RewriteKey(key, 2)
require.Error(t, err, ErrImportingKeyError.Error())
// If the passed in walltime is at or above the ImportStartWalltime,
// rewriting should not have occurred.
_, ok, err := newKr.RewriteKey(key, 2)
require.NoError(t, err)
require.False(t, ok)

// Else, the key should get encoded normally.
newKey, ok, err := newKr.RewriteKey(key, 1)
Expand Down
11 changes: 4 additions & 7 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,19 +471,16 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(

key.Key, ok, err = kr.RewriteKey(key.Key, key.Timestamp.WallTime)

if errors.Is(err, ErrImportingKeyError) {
// The keyRewriter returns ErrImportingKeyError iff the key is part of an
// in-progress import. Keys from in-progress imports never get restored,
// since the key's table gets restored to its pre-import state. Therefore,
// elide ingesting this key.
continue
}
if err != nil {
return summary, err
}
if !ok {
// If the key rewriter didn't match this key, it's not data for the
// table(s) we're interested in.
//
// As an example, keys from in-progress imports never get restored,
// since the key's table gets restored to its pre-import state. Therefore,
// we elide ingesting this key.
if verbose {
log.Infof(ctx, "skipping %s %s", key.Key, value.PrettyPrint())
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ var restoreStatsInsertionConcurrency = settings.RegisterIntSetting(
func rewriteBackupSpanKey(
codec keys.SQLCodec, kr *KeyRewriter, key roachpb.Key,
) (roachpb.Key, error) {
newKey, rewritten, err := kr.RewriteKey(append([]byte(nil), key...), 0 /*wallTime*/)
newKey, rewritten, err := kr.RewriteKey(append([]byte(nil), key...),
0 /*wallTimeForImportElision*/)
if err != nil {
return nil, errors.NewAssertionErrorWithWrappedErrf(err,
"could not rewrite span start key: %s", key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err
}

func (sip *streamIngestionProcessor) rekey(key roachpb.Key) ([]byte, bool, error) {
return sip.rekeyer.RewriteKey(key, 0 /*wallTime*/)
return sip.rekeyer.RewriteKey(key, 0 /*wallTimeForImportElision*/)
}

func (sip *streamIngestionProcessor) bufferSST(sst *kvpb.RangeFeedSSTable) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func noteKeyVal(
validator *streamClientValidator, keyVal roachpb.KeyValue, spec streamclient.SubscriptionToken,
) {
if validator.rekeyer != nil {
rekey, _, err := validator.rekeyer.RewriteKey(keyVal.Key, 0 /* wallTime*/)
rekey, _, err := validator.rekeyer.RewriteKey(keyVal.Key, 0 /* wallTimeForImportElision*/)
if err != nil {
panic(err.Error())
}
Expand Down

0 comments on commit e81f3d6

Please sign in to comment.