From 33f4a9d325498ca0909be4c7ea653cc3f9ebf728 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 22 Jul 2022 14:56:01 +0000 Subject: [PATCH 1/4] upgrades: add checkpointing for `raftAppliedIndexTermMigration` The `raftAppliedIndexTermMigration` upgrade migration could be unreliable. It iterates over all ranges and runs a `Migrate` request which must be applied on all replicas. However, if any ranges merge or replicas are unavailable, the migration fails and starts over from the beginning. In large clusters with many ranges, this meant that it might never complete. This patch makes the upgrade more robust, by retrying each `Migrate` request 5 times, and checkpointing the progress after every fifth batch (1000 ranges), allowing resumption on failure. At some point this should be made part of the migration infrastructure. NB: This fix was initially submitted for 22.1, and even though the migration will be removed for 22.2, it is forward-ported for posterity. Release note: None --- .../upgrades/raft_applied_index_term.go | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/pkg/upgrade/upgrades/raft_applied_index_term.go b/pkg/upgrade/upgrades/raft_applied_index_term.go index fe33e1393a43..408302ef680d 100644 --- a/pkg/upgrade/upgrades/raft_applied_index_term.go +++ b/pkg/upgrade/upgrades/raft_applied_index_term.go @@ -14,14 +14,18 @@ import ( "bytes" "context" "fmt" + "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/upgrade" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" ) // defaultPageSize controls how many ranges are paged in by default when @@ -37,12 +41,32 @@ import ( // page size of 200 ~ 50s const defaultPageSize = 200 +// persistWatermarkBatchInterval specifies how often to persist the progress +// watermark (in batches). 5 batches means we'll checkpoint every 1000 ranges. +const persistWatermarkBatchInterval = 5 + func raftAppliedIndexTermMigration( - ctx context.Context, cv clusterversion.ClusterVersion, deps upgrade.SystemDeps, _ *jobs.Job, + ctx context.Context, cv clusterversion.ClusterVersion, deps upgrade.SystemDeps, job *jobs.Job, ) error { + // Fetch the migration's watermark (latest migrated range's end key), in case + // we're resuming a previous migration. + var resumeWatermark roachpb.RKey + if progress, ok := job.Progress().Details.(*jobspb.Progress_Migration); ok { + if len(progress.Migration.Watermark) > 0 { + resumeWatermark = append(resumeWatermark, progress.Migration.Watermark...) + log.Infof(ctx, "loaded migration watermark %s, resuming", resumeWatermark) + } + } + + retryOpts := retry.Options{ + InitialBackoff: 100 * time.Millisecond, + MaxRetries: 5, + } + var batchIdx, numMigratedRanges int init := func() { batchIdx, numMigratedRanges = 1, 0 } if err := deps.Cluster.IterateRangeDescriptors(ctx, defaultPageSize, init, func(descriptors ...roachpb.RangeDescriptor) error { + var progress jobspb.MigrationProgress for _, desc := range descriptors { // NB: This is a bit of a wart. We want to reach the first range, // but we can't address the (local) StartKey. However, keys.LocalMax @@ -51,8 +75,30 @@ func raftAppliedIndexTermMigration( if bytes.Compare(desc.StartKey, keys.LocalMax) < 0 { start, _ = keys.Addr(keys.LocalMax) } - if err := deps.DB.Migrate(ctx, start, end, cv.Version); err != nil { + + // Skip any ranges below the resume watermark. + if bytes.Compare(end, resumeWatermark) <= 0 { + continue + } + + // Migrate the range, with a few retries. + if err := retryOpts.Do(ctx, func(ctx context.Context) error { + err := deps.DB.Migrate(ctx, start, end, cv.Version) + if err != nil { + log.Errorf(ctx, "range %d migration failed, retrying: %s", desc.RangeID, err) + } return err + }); err != nil { + return err + } + + progress.Watermark = end + } + + // Persist the migration's progress. + if batchIdx%persistWatermarkBatchInterval == 0 && len(progress.Watermark) > 0 { + if err := job.SetProgress(ctx, nil, progress); err != nil { + return errors.Wrap(err, "failed to record migration progress") } } From d5de88df5713981a485b6fef44ac9bf569218ad3 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Fri, 15 Jul 2022 18:50:32 -0400 Subject: [PATCH 2/4] backupccl: handle range keys in BACKUP Previously BACKUP would not back up range tombstones. With this patch, BACKUPs with revision_history will backup range tombstones. Non-revision history backups are not affected by this diff because MVCCExportToSST filters all tombstones out of the backup already. Specifically, this patch replaces the iterators used in the backup_processor with the pebbleIterator, which has baked in range key support. This refactor introduces a 5% regression in backup runtime, even when the backup has no range keys, though #83051 hopes to address this gap. See details below on the benchmark experiment. At this point a backup with range keys is restorable, thanks to #84214. Note that the restore codebase still touches iterators that are not range key aware. This is not a problem because restored data does not have range keys, nor do the empty ranges restore dumps data into. These iterators (e.g. in SSTBatcher and in CheckSSTConflicts) will be updated when #70428 gets fixed. Fixes #71155 Release note: none To benchmark this diff, the following commands were used on the following sha a5ccdc3a49a73ce8791a133297e70871223f3d7e, with and without this commit, over three trials: ``` roachprod create -n 5 --gce-machine-type=n2-standard-16 $CLUSTER roachprod put $CLUSTER [build] cockroach roachprod wipe $CLUSTER; roachprod start $CLUSTER; roachprod run $CLUSTER:1 -- "./cockroach workload init bank --rows 1000000000" roachprod sql $CLUSTER:1 -- -e "BACKUP INTO 'gs://somebucket/michael-rangkey?AUTH=implicit'" ``` The backup on the control binary took on average 478 seconds with a stdev of 13 seconds, while the backup with the treatment binary took on average 499 seconds with stddev of 8 seconds. --- pkg/ccl/backupccl/datadriven_test.go | 45 ++++++ pkg/ccl/backupccl/file_sst_sink.go | 92 ++++++++--- .../testdata/backup-restore/rangekeys | 84 ++++++++++ .../backup-restore/rangekeys-revision-history | 148 ++++++++++++++++++ pkg/kv/kvclient/revision_reader.go | 7 +- 5 files changed, 351 insertions(+), 25 deletions(-) create mode 100644 pkg/ccl/backupccl/testdata/backup-restore/rangekeys create mode 100644 pkg/ccl/backupccl/testdata/backup-restore/rangekeys-revision-history diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index 8b76c42f563d..f421d257a9b0 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -327,6 +328,15 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // + expect-pausepoint: expects the schema change job to end up in a paused state because // of a pausepoint error. // +// - "kv" [args] +// Issues a kv request +// +// Supported arguments: +// +// + type: kv request type. Currently, only DeleteRange is supported +// +// + target: SQL target. Currently, only table names are supported. +// // - "nudge-and-wait-for-temp-cleanup" // Nudges the temporary object reconciliation loop to run, and waits for completion. func TestDataDriven(t *testing.T) { @@ -626,6 +636,15 @@ func TestDataDriven(t *testing.T) { } return "" + case "kv": + var request string + d.ScanArgs(t, "request", &request) + + var target string + d.ScanArgs(t, "target", &target) + handleKVRequest(ctx, t, lastCreatedServer, ds, request, target) + return "" + case "save-cluster-ts": server := lastCreatedServer user := "root" @@ -677,6 +696,32 @@ func TestDataDriven(t *testing.T) { }) } +func handleKVRequest( + ctx context.Context, t *testing.T, server string, ds datadrivenTestState, request, target string, +) { + user := "root" + if request == "DeleteRange" { + var tableID uint32 + err := ds.getSQLDB(t, server, user).QueryRow(`SELECT id FROM system.namespace WHERE name = $1`, + target).Scan(&tableID) + require.NoError(t, err) + bankSpan := makeTableSpan(tableID) + dr := roachpb.DeleteRangeRequest{ + // Bogus span to make it a valid request. + RequestHeader: roachpb.RequestHeader{ + Key: bankSpan.Key, + EndKey: bankSpan.EndKey, + }, + UseRangeTombstone: true, + } + if _, err := kv.SendWrapped(ctx, ds.servers[server].DistSenderI().(*kvcoord.DistSender), &dr); err != nil { + t.Fatal(err) + } + } else { + t.Fatalf("Unknown kv request") + } +} + // findMostRecentJobWithType returns the most recently created job of `job_type` // jobType. func findMostRecentJobWithType( diff --git a/pkg/ccl/backupccl/file_sst_sink.go b/pkg/ccl/backupccl/file_sst_sink.go index 3de271d2974c..b5c49da6d13e 100644 --- a/pkg/ccl/backupccl/file_sst_sink.go +++ b/pkg/ccl/backupccl/file_sst_sink.go @@ -268,32 +268,16 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) error { log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName) - // Copy SST content. - sst, err := storage.NewMemSSTIterator(resp.dataSST, false) - if err != nil { + // To speed up SST reading, surface all the point keys first, flush, + // then surface all the range keys and flush. + // + // TODO(msbutler): investigate using single a single iterator that surfaces + // all point keys first and then all range keys + if err := s.copyPointKeys(resp.dataSST); err != nil { return err } - defer sst.Close() - - sst.SeekGE(storage.MVCCKey{Key: keys.MinKey}) - for { - if valid, err := sst.Valid(); !valid || err != nil { - if err != nil { - return err - } - break - } - k := sst.UnsafeKey() - if k.Timestamp.IsEmpty() { - if err := s.sst.PutUnversioned(k.Key, sst.UnsafeValue()); err != nil { - return err - } - } else { - if err := s.sst.PutRawMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil { - return err - } - } - sst.Next() + if err := s.copyRangeKeys(resp.dataSST); err != nil { + return err } // If this span extended the last span added -- that is, picked up where it @@ -328,6 +312,66 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) error { return nil } +func (s *fileSSTSink) copyPointKeys(dataSST []byte) error { + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + iter, err := storage.NewPebbleMemSSTIterator(dataSST, false, iterOpts) + if err != nil { + return err + } + defer iter.Close() + + for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() { + if valid, err := iter.Valid(); !valid || err != nil { + if err != nil { + return err + } + break + } + k := iter.UnsafeKey() + if k.Timestamp.IsEmpty() { + if err := s.sst.PutUnversioned(k.Key, iter.UnsafeValue()); err != nil { + return err + } + } else { + if err := s.sst.PutRawMVCC(iter.UnsafeKey(), iter.UnsafeValue()); err != nil { + return err + } + } + } + return nil +} + +func (s *fileSSTSink) copyRangeKeys(dataSST []byte) error { + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypeRangesOnly, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + iter, err := storage.NewPebbleMemSSTIterator(dataSST, false, iterOpts) + if err != nil { + return err + } + defer iter.Close() + + for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return err + } else if !ok { + break + } + for _, rkv := range iter.RangeKeys() { + if err := s.sst.PutRawMVCCRangeKey(rkv.RangeKey, rkv.Value); err != nil { + return err + } + } + } + return nil +} + func generateUniqueSSTName(nodeID base.SQLInstanceID) string { // The data/ prefix, including a /, is intended to group SSTs in most of the // common file/bucket browse UIs. diff --git a/pkg/ccl/backupccl/testdata/backup-restore/rangekeys b/pkg/ccl/backupccl/testdata/backup-restore/rangekeys new file mode 100644 index 000000000000..ba9be196e212 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/rangekeys @@ -0,0 +1,84 @@ +# Tests that Backups without Revisions History and Restore properly handle +# range keys + +new-server name=s1 +---- + +exec-sql +CREATE DATABASE orig; +USE orig; +CREATE TABLE foo (i INT PRIMARY KEY, s STRING); +INSERT INTO foo VALUES (1, 'x'),(2,'y'); +CREATE TABLE baz (i INT PRIMARY KEY, s STRING); +INSERT INTO baz VALUES (11, 'xx'),(22,'yy'); +---- + +# Ensure a full backup properly captures range keys +# - with foo, delete then insert, and ensure no original data surfaces in restore +# - with baz: chill for now + +kv request=DeleteRange target=foo +---- + +exec-sql +INSERT INTO foo VALUES (3,'z'); +---- + +exec-sql +BACKUP INTO 'nodelocal://0/test-root/'; +---- + +exec-sql +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' with new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo; +---- +1 + +query-sql +SELECT count(*) from orig1.baz; +---- +2 + +exec-sql +DROP DATABASE orig1 CASCADE +---- + +# Ensure incremental backup without revision history +# handles range tombstones: +# - with foo, insert and ensure latest data from previous backup surfaces in RESTORE +# - with baz, delete then insert, and ensure no data from previous backup surfaces in RESTORE + +exec-sql +INSERT INTO foo VALUES (4,'a'),(5,'b'); +---- + +kv request=DeleteRange target=baz +---- + +exec-sql +INSERT INTO baz VALUES (33,'zz'); +---- + +exec-sql +BACKUP INTO LATEST IN 'nodelocal://0/test-root/'; +---- + +exec-sql +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' with new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +3 + +query-sql +SELECT count(*) from orig1.baz +---- +1 + + + diff --git a/pkg/ccl/backupccl/testdata/backup-restore/rangekeys-revision-history b/pkg/ccl/backupccl/testdata/backup-restore/rangekeys-revision-history new file mode 100644 index 000000000000..0516db04b08b --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/rangekeys-revision-history @@ -0,0 +1,148 @@ +# Tests that Backups with Revisions History and As Of System Time +# Restore properly handle range keys in tables foo and baz +# - t0: inital dataset +# - t1: delrange on foo +# - t2: one insert in foo +# - full backup +# - t3: 2 inserts in foo; delrange on baz +# - t4: one insert in baz +# - incremental backup + +new-server name=s1 +---- + +exec-sql +CREATE DATABASE orig; +USE orig; +CREATE TABLE foo (i INT PRIMARY KEY, s STRING); +INSERT INTO foo VALUES (1, 'x'),(2,'y'); +CREATE TABLE baz (i INT PRIMARY KEY, s STRING); +INSERT INTO baz VALUES (11, 'xx'),(22,'yy'); +---- + +save-cluster-ts tag=t0 +---- + +kv request=DeleteRange target=foo +---- + +save-cluster-ts tag=t1 +---- + +exec-sql +INSERT INTO foo VALUES (3,'z'); +---- + +save-cluster-ts tag=t2 +---- + +exec-sql +BACKUP INTO 'nodelocal://0/test-root/' with revision_history; +---- + +exec-sql +INSERT INTO foo VALUES (4,'a'),(5,'b'); +---- + +kv request=DeleteRange target=baz +---- + +save-cluster-ts tag=t3 +---- + +exec-sql +INSERT INTO baz VALUES (33,'zz'); +---- + +save-cluster-ts tag=t4 +---- + +exec-sql +BACKUP INTO LATEST IN 'nodelocal://0/test-root/' with revision_history; +---- + +restore aost=t0 +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' AS OF SYSTEM TIME t0 WITH new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +2 + +query-sql +SELECT count(*) from orig1.baz +---- +2 + +exec-sql +DROP DATABASE orig1 CASCADE +---- + +restore aost=t1 +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' AS OF SYSTEM TIME t1 WITH new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +0 + +query-sql +SELECT count(*) from orig1.baz +---- +2 + +exec-sql +DROP DATABASE orig1 CASCADE +---- + +restore aost=t2 +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' AS OF SYSTEM TIME t2 WITH new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +1 + +query-sql +SELECT count(*) from orig1.baz +---- +2 + +exec-sql +DROP DATABASE orig1 CASCADE +---- + +restore aost=t3 +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' AS OF SYSTEM TIME t3 WITH new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +3 + +query-sql +SELECT count(*) from orig1.baz +---- +0 + +exec-sql +DROP DATABASE orig1 CASCADE +---- + +restore aost=t4 +RESTORE DATABASE orig FROM LATEST IN 'nodelocal://0/test-root/' AS OF SYSTEM TIME t4 WITH new_db_name='orig1'; +---- + +query-sql +SELECT count(*) from orig1.foo +---- +3 + +query-sql +SELECT count(*) from orig1.baz +---- +1 diff --git a/pkg/kv/kvclient/revision_reader.go b/pkg/kv/kvclient/revision_reader.go index f51292a3e0f2..e3f87eaa8845 100644 --- a/pkg/kv/kvclient/revision_reader.go +++ b/pkg/kv/kvclient/revision_reader.go @@ -48,7 +48,12 @@ func GetAllRevisions( var res []VersionedValues for _, file := range resp.(*roachpb.ExportResponse).Files { - iter, err := storage.NewMemSSTIterator(file.SST, false) + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: file.Span.Key, + UpperBound: file.Span.EndKey, + } + iter, err := storage.NewPebbleMemSSTIterator(file.SST, true, iterOpts) if err != nil { return nil, err } From c2dd3e90de9b48e721e16eda49cdbcc03eedf32d Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Thu, 21 Jul 2022 16:12:30 -0400 Subject: [PATCH 3/4] kvserver: add server-side transaction retry metrics This patch addes a few new metrics to track successful/failed server-side transaction retries. Specifically, whenever we attempt to retry a read or write batch or run into a read within uncertainty interval error, we increment specific counters indicating if the retry was successful or not. Release note: None --- pkg/kv/kvserver/metrics.go | 54 ++++++++++++++++++++++++++++++-- pkg/kv/kvserver/replica_read.go | 10 ++++-- pkg/kv/kvserver/replica_send.go | 3 ++ pkg/kv/kvserver/replica_write.go | 10 ++++-- pkg/ts/catalog/chart_catalog.go | 11 +++++++ 5 files changed, 82 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 738e0689a080..fd1a11682a79 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -338,6 +338,44 @@ var ( Measurement: "KV Transactions", Unit: metric.Unit_COUNT, } + metaWriteEvaluationServerSideRetrySuccess = metric.Metadata{ + Name: "txn.server_side_retry.write_evaluation.success", + Help: "Number of write batches that were successfully refreshed server side", + Measurement: "KV Transactions", + Unit: metric.Unit_COUNT, + } + metaWriteEvaluationServerSideRetryFailure = metric.Metadata{ + Name: "txn.server_side_retry.write_evaluation.failure", + Help: "Number of write batches that were not successfully refreshed server side", + Measurement: "KV Transactions", + Unit: metric.Unit_COUNT, + } + metaReadEvaluationServerSideRetrySuccess = metric.Metadata{ + Name: "txn.server_side_retry.read_evaluation.success", + Help: "Number of read batches that were successfully refreshed server side", + Measurement: "KV Transactions", + Unit: metric.Unit_COUNT, + } + metaReadEvaluationServerSideRetryFailure = metric.Metadata{ + Name: "txn.server_side_retry.read_evaluation.failure", + Help: "Number of read batches that were not successfully refreshed server side", + Measurement: "KV Transactions", + Unit: metric.Unit_COUNT, + } + metaReadWithinUncertaintyIntervalErrorServerSideRetrySuccess = metric.Metadata{ + Name: "txn.server_side_retry.uncertainty_interval_error.success", + Help: "Number of batches that ran into uncertainty interval errors that were " + + "successfully refreshed server side", + Measurement: "KV Transactions", + Unit: metric.Unit_COUNT, + } + metaReadWithinUncertaintyIntervalErrorServerSideRetryFailure = metric.Metadata{ + Name: "txn.server_side_retry.uncertainty_interval_error.failure", + Help: "Number of batches that ran into uncertainty interval errors that were not " + + "successfully refreshed server side", + Measurement: "KV Transactions", + Unit: metric.Unit_COUNT, + } // RocksDB/Pebble metrics. metaRdbBlockCacheHits = metric.Metadata{ @@ -1548,7 +1586,13 @@ type StoreMetrics struct { FollowerReadsCount *metric.Counter // Server-side transaction metrics. - CommitWaitsBeforeCommitTrigger *metric.Counter + CommitWaitsBeforeCommitTrigger *metric.Counter + WriteEvaluationServerSideRetrySuccess *metric.Counter + WriteEvaluationServerSideRetryFailure *metric.Counter + ReadEvaluationServerSideRetrySuccess *metric.Counter + ReadEvaluationServerSideRetryFailure *metric.Counter + ReadWithinUncertaintyIntervalErrorServerSideRetrySuccess *metric.Counter + ReadWithinUncertaintyIntervalErrorServerSideRetryFailure *metric.Counter // Storage (pebble) metrics. Some are named RocksDB which is what we used // before pebble, and this name is kept for backwards compatibility despite @@ -2026,7 +2070,13 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { FollowerReadsCount: metric.NewCounter(metaFollowerReadsCount), // Server-side transaction metrics. - CommitWaitsBeforeCommitTrigger: metric.NewCounter(metaCommitWaitBeforeCommitTriggerCount), + CommitWaitsBeforeCommitTrigger: metric.NewCounter(metaCommitWaitBeforeCommitTriggerCount), + WriteEvaluationServerSideRetrySuccess: metric.NewCounter(metaWriteEvaluationServerSideRetrySuccess), + WriteEvaluationServerSideRetryFailure: metric.NewCounter(metaWriteEvaluationServerSideRetryFailure), + ReadEvaluationServerSideRetrySuccess: metric.NewCounter(metaReadEvaluationServerSideRetrySuccess), + ReadEvaluationServerSideRetryFailure: metric.NewCounter(metaReadEvaluationServerSideRetryFailure), + ReadWithinUncertaintyIntervalErrorServerSideRetrySuccess: metric.NewCounter(metaReadWithinUncertaintyIntervalErrorServerSideRetrySuccess), + ReadWithinUncertaintyIntervalErrorServerSideRetryFailure: metric.NewCounter(metaReadWithinUncertaintyIntervalErrorServerSideRetryFailure), // RocksDB/Pebble metrics. RdbBlockCacheHits: metric.NewGauge(metaRdbBlockCacheHits), diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 93f1e061a385..fd247735881f 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -306,10 +306,16 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( now := timeutil.Now() br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, g, st, ui, true /* readOnly */) r.store.metrics.ReplicaReadBatchEvaluationLatency.RecordValue(timeutil.Since(now).Nanoseconds()) + // Allow only one retry. + if pErr == nil || retries > 0 { + break + } // If we can retry, set a higher batch timestamp and continue. - // Allow one retry only. - if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, g, nil /* deadline */) { + if !canDoServersideRetry(ctx, pErr, ba, br, g, nil /* deadline */) { + r.store.Metrics().ReadEvaluationServerSideRetryFailure.Inc(1) break + } else { + r.store.Metrics().ReadEvaluationServerSideRetrySuccess.Inc(1) } } diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 522b4d668a1c..defa9408359b 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -795,8 +795,11 @@ func (r *Replica) handleReadWithinUncertaintyIntervalError( // latchSpans, because we have already released our latches and plan to // re-acquire them if the retry is allowed. if !canDoServersideRetry(ctx, pErr, ba, nil /* br */, nil /* g */, nil /* deadline */) { + r.store.Metrics().ReadWithinUncertaintyIntervalErrorServerSideRetryFailure.Inc(1) return nil, pErr } + r.store.Metrics().ReadWithinUncertaintyIntervalErrorServerSideRetrySuccess.Inc(1) + if ba.Txn == nil { // If the request is non-transactional and it was refreshed into the future // after observing a value with a timestamp in the future, immediately sleep diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 4430d0e8a368..6d75b6d3384e 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -629,12 +629,18 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( success = false } - // If we can retry, set a higher batch timestamp and continue. // Allow one retry only; a non-txn batch containing overlapping // spans will always experience WriteTooOldError. - if success || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, g, deadline) { + if success || retries > 0 { break } + // If we can retry, set a higher batch timestamp and continue. + if !canDoServersideRetry(ctx, pErr, ba, br, g, deadline) { + r.store.Metrics().WriteEvaluationServerSideRetryFailure.Inc(1) + break + } else { + r.store.Metrics().WriteEvaluationServerSideRetrySuccess.Inc(1) + } } return batch, br, res, pErr } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 8d58080cc226..5cbd0aca4ddb 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1225,6 +1225,17 @@ var charts = []sectionDescription{ "txn.commit_waits.before_commit_trigger", }, }, + { + Title: "Server Side Retry", + Metrics: []string{ + "txn.server_side_retry.write_evaluation.success", + "txn.server_side_retry.write_evaluation.failure", + "txn.server_side_retry.read_evaluation.success", + "txn.server_side_retry.read_evaluation.failure", + "txn.server_side_retry.uncertainty_interval_error.success", + "txn.server_side_retry.uncertainty_interval_error.failure", + }, + }, { Title: "Durations", Metrics: []string{"txn.durations"}, From 6817eb67bcf5daf163334b5abc2c38b471f07952 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Tue, 26 Jul 2022 16:35:13 -0400 Subject: [PATCH 4/4] eval: stop ignoring all ResolveOIDFromOID errors The decision about whether an error is safe to ignore is made at the place where the error is created/returned. This way, the callers don't need to be aware of any new error codes that the implementation may start returning in the future. Release note (bug fix): Fixed incorrect error handling that could cause casts to OID types to fail in some cases. --- pkg/sql/faketreeeval/evalctx.go | 8 ++++---- pkg/sql/resolve_oid.go | 16 ++++++++-------- pkg/sql/sem/builtins/pg_builtins.go | 4 ++-- pkg/sql/sem/eval/cast.go | 5 ++++- pkg/sql/sem/eval/deps.go | 4 ++-- pkg/sql/sem/eval/parse_doid.go | 16 ++++++++++++---- 6 files changed, 32 insertions(+), 21 deletions(-) diff --git a/pkg/sql/faketreeeval/evalctx.go b/pkg/sql/faketreeeval/evalctx.go index fc2f609b72ee..4ef646506bc4 100644 --- a/pkg/sql/faketreeeval/evalctx.go +++ b/pkg/sql/faketreeeval/evalctx.go @@ -159,15 +159,15 @@ type DummyEvalPlanner struct{} // ResolveOIDFromString is part of the Planner interface. func (ep *DummyEvalPlanner) ResolveOIDFromString( ctx context.Context, resultType *types.T, toResolve *tree.DString, -) (*tree.DOid, error) { - return nil, errors.WithStack(errEvalPlanner) +) (*tree.DOid, bool, error) { + return nil, false, errors.WithStack(errEvalPlanner) } // ResolveOIDFromOID is part of the Planner interface. func (ep *DummyEvalPlanner) ResolveOIDFromOID( ctx context.Context, resultType *types.T, toResolve *tree.DOid, -) (*tree.DOid, error) { - return nil, errors.WithStack(errEvalPlanner) +) (*tree.DOid, bool, error) { + return nil, false, errors.WithStack(errEvalPlanner) } // UnsafeUpsertDescriptor is part of the Planner interface. diff --git a/pkg/sql/resolve_oid.go b/pkg/sql/resolve_oid.go index 7c755be98447..7aa67eb8f588 100644 --- a/pkg/sql/resolve_oid.go +++ b/pkg/sql/resolve_oid.go @@ -28,7 +28,7 @@ import ( // ResolveOIDFromString is part of tree.TypeResolver. func (p *planner) ResolveOIDFromString( ctx context.Context, resultType *types.T, toResolve *tree.DString, -) (*tree.DOid, error) { +) (_ *tree.DOid, errSafeToIgnore bool, _ error) { ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) return resolveOID( ctx, p.Txn(), @@ -40,7 +40,7 @@ func (p *planner) ResolveOIDFromString( // ResolveOIDFromOID is part of tree.TypeResolver. func (p *planner) ResolveOIDFromOID( ctx context.Context, resultType *types.T, toResolve *tree.DOid, -) (*tree.DOid, error) { +) (_ *tree.DOid, errSafeToIgnore bool, _ error) { ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) return resolveOID( ctx, p.Txn(), @@ -55,10 +55,10 @@ func resolveOID( ie sqlutil.InternalExecutor, resultType *types.T, toResolve tree.Datum, -) (*tree.DOid, error) { +) (_ *tree.DOid, errSafeToIgnore bool, _ error) { info, ok := regTypeInfos[resultType.Oid()] if !ok { - return nil, pgerror.Newf( + return nil, true, pgerror.Newf( pgcode.InvalidTextRepresentation, "invalid input syntax for type %s: %q", resultType, @@ -78,20 +78,20 @@ func resolveOID( sessiondata.NoSessionDataOverride, q, toResolve) if err != nil { if errors.HasType(err, (*tree.MultipleResultsError)(nil)) { - return nil, pgerror.Newf(pgcode.AmbiguousAlias, + return nil, false, pgerror.Newf(pgcode.AmbiguousAlias, "more than one %s named %s", info.objName, toResolve) } - return nil, err + return nil, false, err } if results.Len() == 0 { - return nil, pgerror.Newf(info.errType, + return nil, true, pgerror.Newf(info.errType, "%s %s does not exist", info.objName, toResolve) } return tree.NewDOidWithName( results[0].(*tree.DOid).Oid, resultType, tree.AsStringWithFlags(results[1], tree.FmtBareStrings), - ), nil + ), true, nil } // regTypeInfo contains details on a pg_catalog table that has a reg* type. diff --git a/pkg/sql/sem/builtins/pg_builtins.go b/pkg/sql/sem/builtins/pg_builtins.go index 8bbca082120b..19f91e953c99 100644 --- a/pkg/sql/sem/builtins/pg_builtins.go +++ b/pkg/sql/sem/builtins/pg_builtins.go @@ -772,13 +772,13 @@ var pgBuiltins = map[string]builtinDefinition{ // The session has not yet created a temporary schema. return tree.NewDOid(0), nil } - oid, err := ctx.Planner.ResolveOIDFromString( + oid, errSafeToIgnore, err := ctx.Planner.ResolveOIDFromString( ctx.Ctx(), types.RegNamespace, tree.NewDString(schema)) if err != nil { // If the OID lookup returns an UndefinedObject error, return 0 // instead. We can hit this path if the session created a temporary // schema in one database and then changed databases. - if pgerror.GetPGCode(err) == pgcode.UndefinedObject { + if errSafeToIgnore && pgerror.GetPGCode(err) == pgcode.UndefinedObject { return tree.NewDOid(0), nil } return nil, err diff --git a/pkg/sql/sem/eval/cast.go b/pkg/sql/sem/eval/cast.go index e861ee5ddf46..7fedc6246598 100644 --- a/pkg/sql/sem/eval/cast.go +++ b/pkg/sql/sem/eval/cast.go @@ -955,8 +955,11 @@ func performIntToOidCast( return tree.WrapAsZeroOid(t), nil } - dOid, err := res.ResolveOIDFromOID(ctx, t, tree.NewDOid(o)) + dOid, errSafeToIgnore, err := res.ResolveOIDFromOID(ctx, t, tree.NewDOid(o)) if err != nil { + if !errSafeToIgnore { + return nil, err + } dOid = tree.NewDOidWithType(o, t) } return dOid, nil diff --git a/pkg/sql/sem/eval/deps.go b/pkg/sql/sem/eval/deps.go index dfedf9b1778f..04a0e3fba986 100644 --- a/pkg/sql/sem/eval/deps.go +++ b/pkg/sql/sem/eval/deps.go @@ -171,7 +171,7 @@ type TypeResolver interface { // query, an error will be returned. ResolveOIDFromString( ctx context.Context, resultType *types.T, toResolve *tree.DString, - ) (*tree.DOid, error) + ) (_ *tree.DOid, errSafeToIgnore bool, _ error) // ResolveOIDFromOID looks up the populated value of the oid with the // desired resultType which matches the provided oid. @@ -181,7 +181,7 @@ type TypeResolver interface { // query, an error will be returned. ResolveOIDFromOID( ctx context.Context, resultType *types.T, toResolve *tree.DOid, - ) (*tree.DOid, error) + ) (_ *tree.DOid, errSafeToIgnore bool, _ error) } // Planner is a limited planner that can be used from EvalContext. diff --git a/pkg/sql/sem/eval/parse_doid.go b/pkg/sql/sem/eval/parse_doid.go index 86dd82229d4d..d8b7c69601b3 100644 --- a/pkg/sql/sem/eval/parse_doid.go +++ b/pkg/sql/sem/eval/parse_doid.go @@ -35,8 +35,11 @@ func ParseDOid(ctx *Context, s string, t *types.T) (*tree.DOid, error) { if err != nil { return nil, err } - oidRes, err := ctx.Planner.ResolveOIDFromOID(ctx.Ctx(), t, tmpOid) + oidRes, errSafeToIgnore, err := ctx.Planner.ResolveOIDFromOID(ctx.Ctx(), t, tmpOid) if err != nil { + if !errSafeToIgnore { + return nil, err + } oidRes = tmpOid *oidRes = tree.MakeDOid(tmpOid.Oid, t) } @@ -104,9 +107,13 @@ func ParseDOid(ctx *Context, s string, t *types.T) (*tree.DOid, error) { // Trim type modifiers, e.g. `numeric(10,3)` becomes `numeric`. s = pgSignatureRegexp.ReplaceAllString(s, "$1") - dOid, missingTypeErr := ctx.Planner.ResolveOIDFromString(ctx.Ctx(), t, tree.NewDString(tree.Name(s).Normalize())) + dOid, errSafeToIgnore, missingTypeErr := ctx.Planner.ResolveOIDFromString( + ctx.Ctx(), t, tree.NewDString(tree.Name(s).Normalize()), + ) if missingTypeErr == nil { - return dOid, missingTypeErr + return dOid, nil + } else if !errSafeToIgnore { + return nil, missingTypeErr } // Fall back to some special cases that we support for compatibility // only. Client use syntax like 'sometype'::regtype to produce the oid @@ -137,7 +144,8 @@ func ParseDOid(ctx *Context, s string, t *types.T) (*tree.DOid, error) { return tree.NewDOidWithTypeAndName(oid.Oid(id), t, tn.ObjectName.String()), nil default: - return ctx.Planner.ResolveOIDFromString(ctx.Ctx(), t, tree.NewDString(s)) + d, _ /* errSafeToIgnore */, err := ctx.Planner.ResolveOIDFromString(ctx.Ctx(), t, tree.NewDString(s)) + return d, err } }