From b761eba20b218aee12b295df027d22c4a17c9b61 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 4 Aug 2021 12:31:22 -0400 Subject: [PATCH] kv: assert txn unused in SetFixedTimestamp This commit asserts that a transaction has not been used to read or to write by the time that `SetFixedTimestamp` is called on it. This was extracted from #68194 and modified to return an error from `SetFixedTimestamp` on misuse instead of fatal-ing. This provides a sufficient, temporary backstop for #68216 until the conn executor logic is fixed: ``` root@127.0.0.1:26257/movr> create table t (x int); CREATE TABLE root@127.0.0.1:26257/movr> insert into t values (1); INSERT 1 root@127.0.0.1:26257/movr> select crdb_internal_mvcc_timestamp, * from t; crdb_internal_mvcc_timestamp | x ---------------------------------+---- 1628094563935439000.0000000000 | 1 (1 row) root@127.0.0.1:26257/movr> begin as of system time (1628094563935439000.0000000000-1)::string; BEGIN root@127.0.0.1:26257/movr OPEN> select * from t; x ----- (0 rows) root@127.0.0.1:26257/movr OPEN> prepare y as select * from t as of system time 1628094563935439000.0000000000; ERROR: internal error: cannot set fixed timestamp, txn "sql txn" meta={id=e5e81c19 pri=0.01517572 epo=0 ts=1628094563.935438999,0 min=1628094563.935438999,0 seq=0} lock=false stat=PENDING rts=1628094563.935438999,0 wto=false gul=1628094563.935438999,0 already performed reads SQLSTATE: XX000 DETAIL: stack trace: github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord/txn_coord_sender.go:1016: SetFixedTimestamp() github.com/cockroachdb/cockroach/pkg/kv/txn.go:1200: SetFixedTimestamp() github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:278: populatePrepared() github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:220: func1() github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:226: prepare() github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:112: addPreparedStmt() github.com/cockroachdb/cockroach/pkg/sql/conn_executor_exec.go:570: execStmtInOpenState() github.com/cockroachdb/cockroach/pkg/sql/conn_executor_exec.go:126: execStmt() github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:1626: func1() github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:1628: execCmd() github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:1550: run() github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:627: ServeConn() github.com/cockroachdb/cockroach/pkg/sql/pgwire/conn.go:645: func1() runtime/asm_amd64.s:1371: goexit() HINT: You have encountered an unexpected error. Please check the public issue tracker to check whether this problem is already tracked. If you cannot find it there, please report the error with details by creating a new issue. If you would rather not post publicly, please contact us directly using the support form. We appreciate your feedback. root@127.0.0.1:26257/? ERROR> ``` --- pkg/ccl/backupccl/backup_planning.go | 4 +- pkg/ccl/backupccl/backupresolver/targets.go | 7 +- pkg/ccl/changefeedccl/changefeed_dist.go | 4 +- pkg/ccl/changefeedccl/kvfeed/scanner.go | 4 +- pkg/ccl/changefeedccl/rowfetcher_cache.go | 6 +- .../changefeedccl/schemafeed/schema_feed.go | 4 +- .../kvfollowerreadsccl/followerreads_test.go | 6 +- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 13 ++- .../kvclient/kvcoord/txn_coord_sender_test.go | 88 +++++++++++++++++++ pkg/kv/kvclient/rangefeed/db_adapter.go | 4 +- pkg/kv/kvserver/client_replica_test.go | 2 +- pkg/kv/kvserver/replica_rangefeed_test.go | 8 +- pkg/kv/mock_transactional_sender.go | 5 +- pkg/kv/sender.go | 11 +-- pkg/kv/txn.go | 21 +++-- pkg/sql/backfill.go | 4 +- pkg/sql/catalog/lease/lease.go | 4 +- pkg/sql/catalog/lease/lease_internal_test.go | 4 +- pkg/sql/catalog/lease/lease_test.go | 2 +- pkg/sql/catalog/lease/storage.go | 7 +- pkg/sql/conn_executor.go | 4 +- pkg/sql/conn_executor_exec.go | 4 +- pkg/sql/conn_executor_prepare.go | 4 +- pkg/sql/create_stats.go | 4 +- pkg/sql/index_backfiller.go | 4 +- pkg/sql/row/fetcher.go | 8 +- pkg/sql/row/row_converter.go | 4 +- pkg/sql/rowexec/indexbackfiller.go | 4 +- pkg/sql/schema_change_plan_node.go | 4 +- pkg/sql/schema_changer.go | 4 +- pkg/sql/txn_state.go | 15 +++- 31 files changed, 218 insertions(+), 49 deletions(-) diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 965e2f531566..a56c99b5c428 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -328,7 +328,9 @@ func spansForAllTableIndexes( checkForKVInBounds := func(start, end roachpb.Key, endTime hlc.Timestamp) (bool, error) { var foundKV bool err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, endTime) + if err := txn.SetFixedTimestamp(ctx, endTime); err != nil { + return err + } res, err := txn.Scan(ctx, start, end, 1 /* maxRows */) if err != nil { return err diff --git a/pkg/ccl/backupccl/backupresolver/targets.go b/pkg/ccl/backupccl/backupresolver/targets.go index 03b64e66bfa3..e9ece2248c56 100644 --- a/pkg/ccl/backupccl/backupresolver/targets.go +++ b/pkg/ccl/backupccl/backupresolver/targets.go @@ -571,8 +571,11 @@ func LoadAllDescs( var allDescs []catalog.Descriptor if err := db.Txn( ctx, - func(ctx context.Context, txn *kv.Txn) (err error) { - txn.SetFixedTimestamp(ctx, asOf) + func(ctx context.Context, txn *kv.Txn) error { + err := txn.SetFixedTimestamp(ctx, asOf) + if err != nil { + return err + } allDescs, err = catalogkv.GetAllDescriptors( ctx, txn, codec, true, /* shouldRunPostDeserializationChanges */ ) diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 3e6b617309fb..ee5f6bfe1ed3 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -129,7 +129,9 @@ func fetchSpansForTargets( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { spans = nil - txn.SetFixedTimestamp(ctx, ts) + if err := txn.SetFixedTimestamp(ctx, ts); err != nil { + return err + } // Note that all targets are currently guaranteed to be tables. for tableID := range targets { flags := tree.ObjectLookupFlagsWithRequired() diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index cd42b11ef699..3c098202fefd 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -103,7 +103,9 @@ func (p *scanRequestScanner) exportSpan( if log.V(2) { log.Infof(ctx, `sending ScanRequest %s at %s`, span, ts) } - txn.SetFixedTimestamp(ctx, ts) + if err := txn.SetFixedTimestamp(ctx, ts); err != nil { + return err + } stopwatchStart := timeutil.Now() var scanDuration, bufferDuration time.Duration const targetBytesPerScan = 16 << 20 // 16 MiB diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index 63d6d715a60f..6e8eb87b0c70 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -101,8 +101,10 @@ func (c *rowFetcherCache) TableDescForKey( // descs.Collection directly here. // TODO (SQL Schema): #53751. if err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, ts) - var err error + err := txn.SetFixedTimestamp(ctx, ts) + if err != nil { + return err + } tableDesc, err = c.collection.GetImmutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{}) return err }); err != nil { diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 43303029a2f4..041f94475016 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -274,7 +274,9 @@ func (tf *schemaFeed) primeInitialTableDescs(ctx context.Context) error { ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { initialDescs = initialDescs[:0] - txn.SetFixedTimestamp(ctx, initialTableDescTs) + if err := txn.SetFixedTimestamp(ctx, initialTableDescTs); err != nil { + return err + } // Note that all targets are currently guaranteed to be tables. for tableID := range tf.targets { flags := tree.ObjectLookupFlagsWithRequired() diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 5a6fbc6a5023..cf11f95a3503 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -400,11 +400,11 @@ func TestOracle(t *testing.T) { c := kv.NewDB(log.AmbientContext{Tracer: tracing.NewTracer()}, kv.MockTxnSenderFactory{}, clock, stopper) staleTxn := kv.NewTxn(ctx, c, 0) - staleTxn.SetFixedTimestamp(ctx, stale) + require.NoError(t, staleTxn.SetFixedTimestamp(ctx, stale)) currentTxn := kv.NewTxn(ctx, c, 0) - currentTxn.SetFixedTimestamp(ctx, current) + require.NoError(t, currentTxn.SetFixedTimestamp(ctx, current)) futureTxn := kv.NewTxn(ctx, c, 0) - futureTxn.SetFixedTimestamp(ctx, future) + require.NoError(t, futureTxn.SetFixedTimestamp(ctx, future)) nodes := mockNodeStore{ {NodeID: 1, Address: util.MakeUnresolvedAddr("tcp", "1")}, diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 1525b79b8157..be30aa59d065 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1008,9 +1008,19 @@ func (tc *TxnCoordSender) CommitTimestampFixed() bool { } // SetFixedTimestamp is part of the client.TxnSender interface. -func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) { +func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error { tc.mu.Lock() defer tc.mu.Unlock() + // The transaction must not have already been used in this epoch. + if !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() { + return errors.WithContextTags(errors.AssertionFailedf( + "cannot set fixed timestamp, txn %s already performed reads", tc.mu.txn), ctx) + } + if tc.mu.txn.Sequence != 0 { + return errors.WithContextTags(errors.AssertionFailedf( + "cannot set fixed timestamp, txn %s already performed writes", tc.mu.txn), ctx) + } + tc.mu.txn.ReadTimestamp = ts tc.mu.txn.WriteTimestamp = ts tc.mu.txn.GlobalUncertaintyLimit = ts @@ -1019,6 +1029,7 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam // Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed // timestamp. This ensures that the MinTimestamp is always <= the other timestamps. tc.mu.txn.MinTimestamp.Backward(ts) + return nil } // RequiredFrontier is part of the client.TxnSender interface. diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index c19659d09041..3bc03b680640 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -2672,3 +2672,91 @@ func TestTxnManualRefresh(t *testing.T) { }) } } + +// TestTxnCoordSenderSetFixedTimestamp tests that SetFixedTimestamp cannot be +// called after a transaction has already been used in the current epoch to read +// or write. +func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + for _, test := range []struct { + name string + before func(*testing.T, *kv.Txn) + expErr string + }{ + { + name: "nothing before", + before: func(t *testing.T, txn *kv.Txn) {}, + }, + { + name: "read before", + before: func(t *testing.T, txn *kv.Txn) { + _, err := txn.Get(ctx, "k") + require.NoError(t, err) + }, + expErr: "cannot set fixed timestamp, .* already performed reads", + }, + { + name: "write before", + before: func(t *testing.T, txn *kv.Txn) { + require.NoError(t, txn.Put(ctx, "k", "v")) + }, + expErr: "cannot set fixed timestamp, .* already performed writes", + }, + { + name: "read and write before", + before: func(t *testing.T, txn *kv.Txn) { + _, err := txn.Get(ctx, "k") + require.NoError(t, err) + require.NoError(t, txn.Put(ctx, "k", "v")) + }, + expErr: "cannot set fixed timestamp, .* already performed reads", + }, + { + name: "read before, in prior epoch", + before: func(t *testing.T, txn *kv.Txn) { + _, err := txn.Get(ctx, "k") + require.NoError(t, err) + txn.ManualRestart(ctx, txn.ReadTimestamp().Next()) + }, + }, + { + name: "write before, in prior epoch", + before: func(t *testing.T, txn *kv.Txn) { + require.NoError(t, txn.Put(ctx, "k", "v")) + txn.ManualRestart(ctx, txn.ReadTimestamp().Next()) + }, + }, + { + name: "read and write before, in prior epoch", + before: func(t *testing.T, txn *kv.Txn) { + _, err := txn.Get(ctx, "k") + require.NoError(t, err) + require.NoError(t, txn.Put(ctx, "k", "v")) + txn.ManualRestart(ctx, txn.ReadTimestamp().Next()) + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + s := createTestDB(t) + defer s.Stop() + + txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */) + test.before(t, txn) + + ts := s.Clock.Now() + err := txn.SetFixedTimestamp(ctx, ts) + if test.expErr != "" { + require.Error(t, err) + require.Regexp(t, test.expErr, err) + require.False(t, txn.CommitTimestampFixed()) + } else { + require.NoError(t, err) + require.True(t, txn.CommitTimestampFixed()) + require.Equal(t, ts, txn.CommitTimestamp()) + } + }) + } +} diff --git a/pkg/kv/kvclient/rangefeed/db_adapter.go b/pkg/kv/kvclient/rangefeed/db_adapter.go index 5e1cbfaad504..7bf47154392c 100644 --- a/pkg/kv/kvclient/rangefeed/db_adapter.go +++ b/pkg/kv/kvclient/rangefeed/db_adapter.go @@ -74,7 +74,9 @@ func (dbc *dbAdapter) Scan( ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), ) error { return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, asOf) + if err := txn.SetFixedTimestamp(ctx, asOf); err != nil { + return err + } sp := span var b kv.Batch for { diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 29d102407c1b..7dad74e8e880 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3174,7 +3174,7 @@ func TestStrictGCEnforcement(t *testing.T) { } mkStaleTxn = func() *kv.Txn { txn := db.NewTxn(ctx, "foo") - txn.SetFixedTimestamp(ctx, tenSecondsAgo) + require.NoError(t, txn.SetFixedTimestamp(ctx, tenSecondsAgo)) return txn } getRejectedMsg = func() string { diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 99b4ebbf4f87..906df5c836f5 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -219,7 +219,9 @@ func TestReplicaRangefeed(t *testing.T) { // Insert a second key transactionally. ts3 := initTime.Add(0, 3) if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, ts3) + if err := txn.SetFixedTimestamp(ctx, ts3); err != nil { + return err + } return txn.Put(ctx, roachpb.Key("m"), []byte("val3")) }); err != nil { t.Fatal(err) @@ -239,7 +241,9 @@ func TestReplicaRangefeed(t *testing.T) { // Update the originally incremented key transactionally. ts5 := initTime.Add(0, 5) if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, ts5) + if err := txn.SetFixedTimestamp(ctx, ts5); err != nil { + return err + } _, err := txn.Inc(ctx, incArgs.Key, 7) return err }); err != nil { diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index abb99de4ab0b..eacffe47115b 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -109,11 +109,11 @@ func (m *MockTransactionalSender) CommitTimestamp() hlc.Timestamp { // CommitTimestampFixed is part of the TxnSender interface. func (m *MockTransactionalSender) CommitTimestampFixed() bool { - panic("unimplemented") + return m.txn.CommitTimestampFixed } // SetFixedTimestamp is part of the TxnSender interface. -func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Timestamp) { +func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Timestamp) error { m.txn.WriteTimestamp = ts m.txn.ReadTimestamp = ts m.txn.GlobalUncertaintyLimit = ts @@ -122,6 +122,7 @@ func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Ti // Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed // timestamp. This ensures that the MinTimestamp is always <= the other timestamps. m.txn.MinTimestamp.Backward(ts) + return nil } // RequiredFrontier is part of the TxnSender interface. diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 5097cff213e1..9bf6cdf424a7 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -170,11 +170,12 @@ type TxnSender interface { // such that the transaction can't be pushed to a different // timestamp. // - // This is used to support historical queries (AS OF SYSTEM TIME - // queries and backups). This method must be called on every - // transaction retry (but note that retries should be rare for - // read-only queries with no clock uncertainty). - SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) + // This is used to support historical queries (AS OF SYSTEM TIME queries + // and backups). This method must be called on every transaction retry + // (but note that retries should be rare for read-only queries with no + // clock uncertainty). The method must not be called after the + // transaction has been used in the current epoch to read or write. + SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error // ManualRestart bumps the transactions epoch, and can upgrade the // timestamp and priority. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 1e4e3a449c6b..94cc3482df8f 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -328,6 +328,14 @@ func (txn *Txn) CommitTimestamp() hlc.Timestamp { return txn.mu.sender.CommitTimestamp() } +// CommitTimestampFixed returns true if the commit timestamp has +// been fixed to the start timestamp and cannot be pushed forward. +func (txn *Txn) CommitTimestampFixed() bool { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.CommitTimestampFixed() +} + // ProvisionalCommitTimestamp returns the transaction's provisional // commit timestamp. This can evolve throughout a txn's lifecycle. See // the comment on the WriteTimestamp field of TxnMeta for details. @@ -1177,16 +1185,19 @@ func (txn *Txn) recordPreviousTxnIDLocked(prevTxnID uuid.UUID) { // This is used to support historical queries (AS OF SYSTEM TIME queries and // backups). This method must be called on every transaction retry (but note // that retries should be rare for read-only queries with no clock uncertainty). -func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) { +func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error { if txn.typ != RootTxn { - panic(errors.WithContextTags( - errors.AssertionFailedf("SetFixedTimestamp() called on leaf txn"), ctx)) + return errors.WithContextTags(errors.AssertionFailedf( + "SetFixedTimestamp() called on leaf txn"), ctx) } if ts.IsEmpty() { - log.Fatalf(ctx, "empty timestamp is invalid for SetFixedTimestamp()") + return errors.WithContextTags(errors.AssertionFailedf( + "empty timestamp is invalid for SetFixedTimestamp()"), ctx) } - txn.mu.sender.SetFixedTimestamp(ctx, ts) + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.SetFixedTimestamp(ctx, ts) } // GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError that will diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 7c8f9d1129bc..76633df7d470 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -186,7 +186,9 @@ func (sc *SchemaChanger) fixedTimestampTxn( retryable func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error, ) error { return sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { - txn.SetFixedTimestamp(ctx, readAsOf) + if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil { + return err + } return retryable(ctx, txn, descriptors) }) } diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 5a608e71db41..ad1a66ac1d70 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -733,7 +733,9 @@ func (m *Manager) resolveName( if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil { return err } - txn.SetFixedTimestamp(ctx, timestamp) + if err := txn.SetFixedTimestamp(ctx, timestamp); err != nil { + return err + } var found bool var err error found, id, err = catalogkv.LookupObjectID(ctx, txn, m.storage.codec, parentID, parentSchemaID, name) diff --git a/pkg/sql/catalog/lease/lease_internal_test.go b/pkg/sql/catalog/lease/lease_internal_test.go index d9b3c345c91e..24a56d8730c3 100644 --- a/pkg/sql/catalog/lease/lease_internal_test.go +++ b/pkg/sql/catalog/lease/lease_internal_test.go @@ -303,7 +303,9 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); update := func(catalog.MutableDescriptor) error { return nil } logEvent := func(txn *kv.Txn) error { txn2 := kvDB.NewTxn(ctx, "future-read") - txn2.SetFixedTimestamp(ctx, futureTime.Prev()) + if err := txn2.SetFixedTimestamp(ctx, futureTime.Prev()); err != nil { + return err + } if _, err := txn2.Get(ctx, "key"); err != nil { return errors.Wrap(err, "read from other txn in future") } diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 3b3a53e8c1b7..fb33df96758a 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -1685,7 +1685,7 @@ CREATE TABLE t.test0 (k CHAR PRIMARY KEY, v CHAR); log.Infof(ctx, "checking version %d", table.GetVersion()) txn := kv.NewTxn(ctx, t.kvDB, roachpb.NodeID(0)) // Make the txn look back at the known modification timestamp. - txn.SetFixedTimestamp(ctx, table.GetModificationTime()) + require.NoError(t, txn.SetFixedTimestamp(ctx, table.GetModificationTime())) // Look up the descriptor. descKey := catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, descID) diff --git a/pkg/sql/catalog/lease/storage.go b/pkg/sql/catalog/lease/storage.go index 4123bba4b6e5..00b0eb5b0a9f 100644 --- a/pkg/sql/catalog/lease/storage.go +++ b/pkg/sql/catalog/lease/storage.go @@ -211,9 +211,12 @@ func (s storage) getForExpiration( ctx context.Context, expiration hlc.Timestamp, id descpb.ID, ) (catalog.Descriptor, error) { var desc catalog.Descriptor - err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { prevTimestamp := expiration.Prev() - txn.SetFixedTimestamp(ctx, prevTimestamp) + err := txn.SetFixedTimestamp(ctx, prevTimestamp) + if err != nil { + return err + } desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, s.codec, id) if err != nil { return err diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index deaa0fb78da6..31e047be8fc5 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2215,7 +2215,9 @@ func (ex *connExecutor) setTransactionModes( return errors.AssertionFailedf("expected an evaluated AS OF timestamp") } if !asOfTs.IsEmpty() { - ex.state.setHistoricalTimestamp(ex.Ctx(), asOfTs) + if err := ex.state.setHistoricalTimestamp(ex.Ctx(), asOfTs); err != nil { + return err + } ex.state.sqlTimestamp = asOfTs.GoTime() if rwMode == tree.UnspecifiedReadWriteMode { rwMode = tree.ReadOnly diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 2a3a4eef6b3b..22676d158e6a 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -588,7 +588,9 @@ func (ex *connExecutor) execStmtInOpenState( if asOf != nil { p.extendedEvalCtx.AsOfSystemTime = asOf p.extendedEvalCtx.SetTxnTimestamp(asOf.Timestamp.GoTime()) - ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp) + if err := ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp); err != nil { + return makeErrEvent(err) + } } } else { // If we're in an explicit txn, we allow AOST but only if it matches with diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index b45b9b434d6b..bf20cedfd8fb 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -275,7 +275,9 @@ func (ex *connExecutor) populatePrepared( "bounded staleness queries do not yet work with prepared statements", ) } - txn.SetFixedTimestamp(ctx, asOf.Timestamp) + if err := txn.SetFixedTimestamp(ctx, asOf.Timestamp); err != nil { + return 0, err + } } // PREPARE has a limited subset of statements it can be run with. Postgres diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 3da365b71df2..560e0cf34a8b 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -543,7 +543,9 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er if details.AsOf != nil { p.ExtendedEvalContext().AsOfSystemTime = &tree.AsOfSystemTime{Timestamp: *details.AsOf} p.ExtendedEvalContext().SetTxnTimestamp(details.AsOf.GoTime()) - txn.SetFixedTimestamp(ctx, *details.AsOf) + if err := txn.SetFixedTimestamp(ctx, *details.AsOf); err != nil { + return err + } } planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, txn, true /* distribute */) diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go index 4658bc05e2c1..c11b537dc566 100644 --- a/pkg/sql/index_backfiller.go +++ b/pkg/sql/index_backfiller.go @@ -98,7 +98,9 @@ func (ib *IndexBackfillPlanner) scanTargetSpansToPushTimestampCache( ) error { const pageSize = 10000 return ib.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, backfillTimestamp) + if err := txn.SetFixedTimestamp(ctx, backfillTimestamp); err != nil { + return err + } for _, span := range targetSpans { // TODO(dt): a Count() request would be nice here if the target isn't // empty, since we don't need to drag all the results back just to diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 82c3b17f460a..bd4b89b86c33 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -629,7 +629,9 @@ func (rf *Fetcher) StartInconsistentScan( ) } txn := kv.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */) - txn.SetFixedTimestamp(ctx, txnTimestamp) + if err := txn.SetFixedTimestamp(ctx, txnTimestamp); err != nil { + return err + } if log.V(1) { log.Infof(ctx, "starting inconsistent scan at timestamp %v", txnTimestamp) } @@ -644,7 +646,9 @@ func (rf *Fetcher) StartInconsistentScan( txnTimestamp = txnTimestamp.Add(now.Sub(txnStartTime).Nanoseconds(), 0 /* logical */) txnStartTime = now txn = kv.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */) - txn.SetFixedTimestamp(ctx, txnTimestamp) + if err := txn.SetFixedTimestamp(ctx, txnTimestamp); err != nil { + return nil, err + } if log.V(1) { log.Infof(ctx, "bumped inconsistent scan timestamp to %v", txnTimestamp) diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 4f1a76c45e0a..1a33e4961e11 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -268,7 +268,9 @@ func (c *DatumRowConverter) getSequenceAnnotation( err := evalCtx.DB.Txn(evalCtx.Context, func(ctx context.Context, txn *kv.Txn) error { seqNameToMetadata = make(map[string]*SequenceMetadata) seqIDToMetadata = make(map[descpb.ID]*SequenceMetadata) - txn.SetFixedTimestamp(ctx, hlc.Timestamp{WallTime: evalCtx.TxnTimestamp.UnixNano()}) + if err := txn.SetFixedTimestamp(ctx, hlc.Timestamp{WallTime: evalCtx.TxnTimestamp.UnixNano()}); err != nil { + return err + } for seqID := range sequenceIDs { seqDesc, err := catalogkv.MustGetTableDescByID(ctx, txn, evalCtx.Codec, seqID) if err != nil { diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go index b22ef934671f..902ea184d948 100644 --- a/pkg/sql/rowexec/indexbackfiller.go +++ b/pkg/sql/rowexec/indexbackfiller.go @@ -432,7 +432,9 @@ func (ib *indexBackfiller) buildIndexEntryBatch( start := timeutil.Now() var entries []rowenc.IndexEntry if err := ib.flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, readAsOf) + if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil { + return err + } // TODO(knz): do KV tracing in DistSQL processors. var err error diff --git a/pkg/sql/schema_change_plan_node.go b/pkg/sql/schema_change_plan_node.go index 1a4f029fa265..a8dad8a622a1 100644 --- a/pkg/sql/schema_change_plan_node.go +++ b/pkg/sql/schema_change_plan_node.go @@ -84,7 +84,9 @@ func (p *planner) WaitForDescriptorSchemaChanges( if err := p.ExecCfg().CollectionFactory.Txn( ctx, p.ExecCfg().InternalExecutor, p.ExecCfg().DB, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { - txn.SetFixedTimestamp(ctx, now) + if err := txn.SetFixedTimestamp(ctx, now); err != nil { + return err + } table, err := descriptors.GetImmutableTableByID(ctx, txn, descID, tree.ObjectLookupFlags{ CommonLookupFlags: tree.CommonLookupFlags{ diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index bbb34fef730a..95c1fccdc41d 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -247,7 +247,9 @@ func (sc *SchemaChanger) backfillQueryIntoTable( } return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, ts) + if err := txn.SetFixedTimestamp(ctx, ts); err != nil { + return err + } // Create an internal planner as the planner used to serve the user query // would have committed by this point. diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 878f1d6572db..ff986599c537 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -202,7 +202,9 @@ func (ts *txnState) resetForNewSQLTxn( ts.mu.txnStart = timeutil.Now() ts.mu.Unlock() if historicalTimestamp != nil { - ts.setHistoricalTimestamp(ts.Ctx, *historicalTimestamp) + if err := ts.setHistoricalTimestamp(ts.Ctx, *historicalTimestamp); err != nil { + panic(err) + } } if err := ts.setReadOnlyMode(readOnly); err != nil { panic(err) @@ -262,11 +264,16 @@ func (ts *txnState) finishExternalTxn() { ts.mu.Unlock() } -func (ts *txnState) setHistoricalTimestamp(ctx context.Context, historicalTimestamp hlc.Timestamp) { +func (ts *txnState) setHistoricalTimestamp( + ctx context.Context, historicalTimestamp hlc.Timestamp, +) error { ts.mu.Lock() - ts.mu.txn.SetFixedTimestamp(ctx, historicalTimestamp) - ts.mu.Unlock() + defer ts.mu.Unlock() + if err := ts.mu.txn.SetFixedTimestamp(ctx, historicalTimestamp); err != nil { + return err + } ts.isHistorical = true + return nil } // getReadTimestamp returns the transaction's current read timestamp.