From e81f3d68c4516f570ac1b1764051b480068dec8e Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Sat, 6 May 2023 16:50:04 -0400 Subject: [PATCH] backupccl: return not ok during import key elision in rewriter Previously, when the key rewriter encountered a key from an in progress import during restore, it would throw an error which would then be handled in the restore data processor. This patch modifies the key rewriter to now return not ok in this case, and simplifies the rewriter to always return early if a key is not ok. Epic: none Release note: None --- pkg/ccl/backupccl/key_rewriter.go | 24 ++++++++++--------- pkg/ccl/backupccl/key_rewriter_test.go | 8 ++++--- pkg/ccl/backupccl/restore_data_processor.go | 11 ++++----- pkg/ccl/backupccl/restore_job.go | 3 ++- .../stream_ingestion_processor.go | 2 +- .../stream_ingestion_processor_test.go | 2 +- 6 files changed, 26 insertions(+), 24 deletions(-) diff --git a/pkg/ccl/backupccl/key_rewriter.go b/pkg/ccl/backupccl/key_rewriter.go index cba36ab89768..e461e0531947 100644 --- a/pkg/ccl/backupccl/key_rewriter.go +++ b/pkg/ccl/backupccl/key_rewriter.go @@ -221,7 +221,9 @@ func MakeKeyRewriterPrefixIgnoringInterleaved(tableID descpb.ID, indexID descpb. // an error when it encounters a key from an in-progress import. Currently, this // is only relevant for RESTORE. See the checkAndRewriteTableKey function for // more details. -func (kr *KeyRewriter) RewriteKey(key []byte, wallTime int64) ([]byte, bool, error) { +func (kr *KeyRewriter) RewriteKey( + key []byte, walltimeForImportElision int64, +) ([]byte, bool, error) { // If we are reading a system tenant backup and this is a tenant key then it // is part of a backup *of* that tenant, so we only restore it if we have a // tenant rekey for it, i.e. we're restoring that tenant. @@ -254,8 +256,8 @@ func (kr *KeyRewriter) RewriteKey(key []byte, wallTime int64) ([]byte, bool, err return nil, false, err } - rekeyed, ok, err := kr.checkAndRewriteTableKey(noTenantPrefix, wallTime) - if err != nil { + rekeyed, ok, err := kr.checkAndRewriteTableKey(noTenantPrefix, walltimeForImportElision) + if err != nil || !ok { return nil, false, err } @@ -276,20 +278,20 @@ func (kr *KeyRewriter) RewriteKey(key []byte, wallTime int64) ([]byte, bool, err return rekeyed, ok, err } -// ErrImportingKeyError indicates the current key is apart of an in-progress import -var ErrImportingKeyError = errors.New("skipping rewrite of an importing key") - // checkAndRewriteTableKey rewrites the table IDs in the key. It assumes that // any tenant ID has been stripped from the key so it operates with the system // codec. It is the responsibility of the caller to either remap, or re-prepend -// any required tenant prefix. +// any required tenant prefix. The function returns the rewritten key (if possible), +// a boolean indicating if the key was rewritten, and an error, if any. // // The caller may also pass the key's walltime (part of the MVCC key's // timestamp), which the function uses to detect and filter out keys from // in-progress imports. If the caller passes a zero valued walltime, no // filtering occurs. Filtering is necessary during restore because the restoring // cluster should not contain keys from an in-progress import. -func (kr *KeyRewriter) checkAndRewriteTableKey(key []byte, wallTime int64) ([]byte, bool, error) { +func (kr *KeyRewriter) checkAndRewriteTableKey( + key []byte, walltimeForImportElision int64, +) ([]byte, bool, error) { // Fetch the original table ID for descriptor lookup. Ignore errors because // they will be caught later on if tableID isn't in descs or kr doesn't // perform a rewrite. @@ -298,7 +300,7 @@ func (kr *KeyRewriter) checkAndRewriteTableKey(key []byte, wallTime int64) ([]by // Skip keys from ephemeral cluster status tables so that the restored cluster // does not observe stale leases/liveness until it expires. if tableID == keys.SQLInstancesTableID || tableID == keys.SqllivenessID || tableID == keys.LeaseTableID { - return key, false, nil + return nil, false, nil } desc := kr.descs[descpb.ID(tableID)] @@ -311,8 +313,8 @@ func (kr *KeyRewriter) checkAndRewriteTableKey(key []byte, wallTime int64) ([]by // GetInProgressImportStartTime), then this function returns an error if this // key is a part of the import -- i.e. the key's walltime is greater than the // import start time. It is up to the caller to handle this error properly. - if importTime := desc.GetInProgressImportStartTime(); wallTime > 0 && importTime > 0 && wallTime >= importTime { - return nil, false, ErrImportingKeyError + if importTime := desc.GetInProgressImportStartTime(); walltimeForImportElision > 0 && importTime > 0 && walltimeForImportElision >= importTime { + return nil, false, nil } // Rewrite the first table ID. diff --git a/pkg/ccl/backupccl/key_rewriter_test.go b/pkg/ccl/backupccl/key_rewriter_test.go index 8743df0f1ce8..7fe0572abae5 100644 --- a/pkg/ccl/backupccl/key_rewriter_test.go +++ b/pkg/ccl/backupccl/key_rewriter_test.go @@ -161,9 +161,11 @@ func TestKeyRewriter(t *testing.T) { key := rowenc.MakeIndexKeyPrefix(keys.SystemSQLCodec, systemschema.NamespaceTable.GetID(), desc.GetPrimaryIndexID()) - // If the passed in walltime is at or above the ImportStartWalltime, an error should return - _, _, err = newKr.RewriteKey(key, 2) - require.Error(t, err, ErrImportingKeyError.Error()) + // If the passed in walltime is at or above the ImportStartWalltime, + // rewriting should not have occurred. + _, ok, err := newKr.RewriteKey(key, 2) + require.NoError(t, err) + require.False(t, ok) // Else, the key should get encoded normally. newKey, ok, err := newKr.RewriteKey(key, 1) diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 45fd44ab5d03..6e284684478d 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -471,19 +471,16 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( key.Key, ok, err = kr.RewriteKey(key.Key, key.Timestamp.WallTime) - if errors.Is(err, ErrImportingKeyError) { - // The keyRewriter returns ErrImportingKeyError iff the key is part of an - // in-progress import. Keys from in-progress imports never get restored, - // since the key's table gets restored to its pre-import state. Therefore, - // elide ingesting this key. - continue - } if err != nil { return summary, err } if !ok { // If the key rewriter didn't match this key, it's not data for the // table(s) we're interested in. + // + // As an example, keys from in-progress imports never get restored, + // since the key's table gets restored to its pre-import state. Therefore, + // we elide ingesting this key. if verbose { log.Infof(ctx, "skipping %s %s", key.Key, value.PrettyPrint()) } diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index d65e4d3d4105..05c95fdaab43 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -119,7 +119,8 @@ var restoreStatsInsertionConcurrency = settings.RegisterIntSetting( func rewriteBackupSpanKey( codec keys.SQLCodec, kr *KeyRewriter, key roachpb.Key, ) (roachpb.Key, error) { - newKey, rewritten, err := kr.RewriteKey(append([]byte(nil), key...), 0 /*wallTime*/) + newKey, rewritten, err := kr.RewriteKey(append([]byte(nil), key...), + 0 /*wallTimeForImportElision*/) if err != nil { return nil, errors.NewAssertionErrorWithWrappedErrf(err, "could not rewrite span start key: %s", key) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index b2d45a502623..61f4e306930a 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -656,7 +656,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err } func (sip *streamIngestionProcessor) rekey(key roachpb.Key) ([]byte, bool, error) { - return sip.rekeyer.RewriteKey(key, 0 /*wallTime*/) + return sip.rekeyer.RewriteKey(key, 0 /*wallTimeForImportElision*/) } func (sip *streamIngestionProcessor) bufferSST(sst *kvpb.RangeFeedSSTable) error { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index d52c56639f61..aec8749f6247 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -688,7 +688,7 @@ func noteKeyVal( validator *streamClientValidator, keyVal roachpb.KeyValue, spec streamclient.SubscriptionToken, ) { if validator.rekeyer != nil { - rekey, _, err := validator.rekeyer.RewriteKey(keyVal.Key, 0 /* wallTime*/) + rekey, _, err := validator.rekeyer.RewriteKey(keyVal.Key, 0 /* wallTimeForImportElision*/) if err != nil { panic(err.Error()) }