diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 965e2f531566..a56c99b5c428 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -328,7 +328,9 @@ func spansForAllTableIndexes( checkForKVInBounds := func(start, end roachpb.Key, endTime hlc.Timestamp) (bool, error) { var foundKV bool err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, endTime) + if err := txn.SetFixedTimestamp(ctx, endTime); err != nil { + return err + } res, err := txn.Scan(ctx, start, end, 1 /* maxRows */) if err != nil { return err diff --git a/pkg/ccl/backupccl/backupresolver/targets.go b/pkg/ccl/backupccl/backupresolver/targets.go index 03b64e66bfa3..e9ece2248c56 100644 --- a/pkg/ccl/backupccl/backupresolver/targets.go +++ b/pkg/ccl/backupccl/backupresolver/targets.go @@ -571,8 +571,11 @@ func LoadAllDescs( var allDescs []catalog.Descriptor if err := db.Txn( ctx, - func(ctx context.Context, txn *kv.Txn) (err error) { - txn.SetFixedTimestamp(ctx, asOf) + func(ctx context.Context, txn *kv.Txn) error { + err := txn.SetFixedTimestamp(ctx, asOf) + if err != nil { + return err + } allDescs, err = catalogkv.GetAllDescriptors( ctx, txn, codec, true, /* shouldRunPostDeserializationChanges */ ) diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 3e6b617309fb..ee5f6bfe1ed3 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -129,7 +129,9 @@ func fetchSpansForTargets( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { spans = nil - txn.SetFixedTimestamp(ctx, ts) + if err := txn.SetFixedTimestamp(ctx, ts); err != nil { + return err + } // Note that all targets are currently guaranteed to be tables. for tableID := range targets { flags := tree.ObjectLookupFlagsWithRequired() diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index cd42b11ef699..3c098202fefd 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -103,7 +103,9 @@ func (p *scanRequestScanner) exportSpan( if log.V(2) { log.Infof(ctx, `sending ScanRequest %s at %s`, span, ts) } - txn.SetFixedTimestamp(ctx, ts) + if err := txn.SetFixedTimestamp(ctx, ts); err != nil { + return err + } stopwatchStart := timeutil.Now() var scanDuration, bufferDuration time.Duration const targetBytesPerScan = 16 << 20 // 16 MiB diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index 63d6d715a60f..6e8eb87b0c70 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -101,8 +101,10 @@ func (c *rowFetcherCache) TableDescForKey( // descs.Collection directly here. // TODO (SQL Schema): #53751. if err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, ts) - var err error + err := txn.SetFixedTimestamp(ctx, ts) + if err != nil { + return err + } tableDesc, err = c.collection.GetImmutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{}) return err }); err != nil { diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 43303029a2f4..041f94475016 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -274,7 +274,9 @@ func (tf *schemaFeed) primeInitialTableDescs(ctx context.Context) error { ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { initialDescs = initialDescs[:0] - txn.SetFixedTimestamp(ctx, initialTableDescTs) + if err := txn.SetFixedTimestamp(ctx, initialTableDescTs); err != nil { + return err + } // Note that all targets are currently guaranteed to be tables. for tableID := range tf.targets { flags := tree.ObjectLookupFlagsWithRequired() diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 5a6fbc6a5023..cf11f95a3503 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -400,11 +400,11 @@ func TestOracle(t *testing.T) { c := kv.NewDB(log.AmbientContext{Tracer: tracing.NewTracer()}, kv.MockTxnSenderFactory{}, clock, stopper) staleTxn := kv.NewTxn(ctx, c, 0) - staleTxn.SetFixedTimestamp(ctx, stale) + require.NoError(t, staleTxn.SetFixedTimestamp(ctx, stale)) currentTxn := kv.NewTxn(ctx, c, 0) - currentTxn.SetFixedTimestamp(ctx, current) + require.NoError(t, currentTxn.SetFixedTimestamp(ctx, current)) futureTxn := kv.NewTxn(ctx, c, 0) - futureTxn.SetFixedTimestamp(ctx, future) + require.NoError(t, futureTxn.SetFixedTimestamp(ctx, future)) nodes := mockNodeStore{ {NodeID: 1, Address: util.MakeUnresolvedAddr("tcp", "1")}, diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 1525b79b8157..be30aa59d065 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1008,9 +1008,19 @@ func (tc *TxnCoordSender) CommitTimestampFixed() bool { } // SetFixedTimestamp is part of the client.TxnSender interface. -func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) { +func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error { tc.mu.Lock() defer tc.mu.Unlock() + // The transaction must not have already been used in this epoch. + if !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() { + return errors.WithContextTags(errors.AssertionFailedf( + "cannot set fixed timestamp, txn %s already performed reads", tc.mu.txn), ctx) + } + if tc.mu.txn.Sequence != 0 { + return errors.WithContextTags(errors.AssertionFailedf( + "cannot set fixed timestamp, txn %s already performed writes", tc.mu.txn), ctx) + } + tc.mu.txn.ReadTimestamp = ts tc.mu.txn.WriteTimestamp = ts tc.mu.txn.GlobalUncertaintyLimit = ts @@ -1019,6 +1029,7 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam // Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed // timestamp. This ensures that the MinTimestamp is always <= the other timestamps. tc.mu.txn.MinTimestamp.Backward(ts) + return nil } // RequiredFrontier is part of the client.TxnSender interface. diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index c19659d09041..3bc03b680640 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -2672,3 +2672,91 @@ func TestTxnManualRefresh(t *testing.T) { }) } } + +// TestTxnCoordSenderSetFixedTimestamp tests that SetFixedTimestamp cannot be +// called after a transaction has already been used in the current epoch to read +// or write. +func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + for _, test := range []struct { + name string + before func(*testing.T, *kv.Txn) + expErr string + }{ + { + name: "nothing before", + before: func(t *testing.T, txn *kv.Txn) {}, + }, + { + name: "read before", + before: func(t *testing.T, txn *kv.Txn) { + _, err := txn.Get(ctx, "k") + require.NoError(t, err) + }, + expErr: "cannot set fixed timestamp, .* already performed reads", + }, + { + name: "write before", + before: func(t *testing.T, txn *kv.Txn) { + require.NoError(t, txn.Put(ctx, "k", "v")) + }, + expErr: "cannot set fixed timestamp, .* already performed writes", + }, + { + name: "read and write before", + before: func(t *testing.T, txn *kv.Txn) { + _, err := txn.Get(ctx, "k") + require.NoError(t, err) + require.NoError(t, txn.Put(ctx, "k", "v")) + }, + expErr: "cannot set fixed timestamp, .* already performed reads", + }, + { + name: "read before, in prior epoch", + before: func(t *testing.T, txn *kv.Txn) { + _, err := txn.Get(ctx, "k") + require.NoError(t, err) + txn.ManualRestart(ctx, txn.ReadTimestamp().Next()) + }, + }, + { + name: "write before, in prior epoch", + before: func(t *testing.T, txn *kv.Txn) { + require.NoError(t, txn.Put(ctx, "k", "v")) + txn.ManualRestart(ctx, txn.ReadTimestamp().Next()) + }, + }, + { + name: "read and write before, in prior epoch", + before: func(t *testing.T, txn *kv.Txn) { + _, err := txn.Get(ctx, "k") + require.NoError(t, err) + require.NoError(t, txn.Put(ctx, "k", "v")) + txn.ManualRestart(ctx, txn.ReadTimestamp().Next()) + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + s := createTestDB(t) + defer s.Stop() + + txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */) + test.before(t, txn) + + ts := s.Clock.Now() + err := txn.SetFixedTimestamp(ctx, ts) + if test.expErr != "" { + require.Error(t, err) + require.Regexp(t, test.expErr, err) + require.False(t, txn.CommitTimestampFixed()) + } else { + require.NoError(t, err) + require.True(t, txn.CommitTimestampFixed()) + require.Equal(t, ts, txn.CommitTimestamp()) + } + }) + } +} diff --git a/pkg/kv/kvclient/rangefeed/db_adapter.go b/pkg/kv/kvclient/rangefeed/db_adapter.go index 5e1cbfaad504..7bf47154392c 100644 --- a/pkg/kv/kvclient/rangefeed/db_adapter.go +++ b/pkg/kv/kvclient/rangefeed/db_adapter.go @@ -74,7 +74,9 @@ func (dbc *dbAdapter) Scan( ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), ) error { return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, asOf) + if err := txn.SetFixedTimestamp(ctx, asOf); err != nil { + return err + } sp := span var b kv.Batch for { diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 29d102407c1b..7dad74e8e880 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3174,7 +3174,7 @@ func TestStrictGCEnforcement(t *testing.T) { } mkStaleTxn = func() *kv.Txn { txn := db.NewTxn(ctx, "foo") - txn.SetFixedTimestamp(ctx, tenSecondsAgo) + require.NoError(t, txn.SetFixedTimestamp(ctx, tenSecondsAgo)) return txn } getRejectedMsg = func() string { diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 99b4ebbf4f87..906df5c836f5 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -219,7 +219,9 @@ func TestReplicaRangefeed(t *testing.T) { // Insert a second key transactionally. ts3 := initTime.Add(0, 3) if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, ts3) + if err := txn.SetFixedTimestamp(ctx, ts3); err != nil { + return err + } return txn.Put(ctx, roachpb.Key("m"), []byte("val3")) }); err != nil { t.Fatal(err) @@ -239,7 +241,9 @@ func TestReplicaRangefeed(t *testing.T) { // Update the originally incremented key transactionally. ts5 := initTime.Add(0, 5) if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, ts5) + if err := txn.SetFixedTimestamp(ctx, ts5); err != nil { + return err + } _, err := txn.Inc(ctx, incArgs.Key, 7) return err }); err != nil { diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index abb99de4ab0b..eacffe47115b 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -109,11 +109,11 @@ func (m *MockTransactionalSender) CommitTimestamp() hlc.Timestamp { // CommitTimestampFixed is part of the TxnSender interface. func (m *MockTransactionalSender) CommitTimestampFixed() bool { - panic("unimplemented") + return m.txn.CommitTimestampFixed } // SetFixedTimestamp is part of the TxnSender interface. -func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Timestamp) { +func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Timestamp) error { m.txn.WriteTimestamp = ts m.txn.ReadTimestamp = ts m.txn.GlobalUncertaintyLimit = ts @@ -122,6 +122,7 @@ func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Ti // Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed // timestamp. This ensures that the MinTimestamp is always <= the other timestamps. m.txn.MinTimestamp.Backward(ts) + return nil } // RequiredFrontier is part of the TxnSender interface. diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 5097cff213e1..9bf6cdf424a7 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -170,11 +170,12 @@ type TxnSender interface { // such that the transaction can't be pushed to a different // timestamp. // - // This is used to support historical queries (AS OF SYSTEM TIME - // queries and backups). This method must be called on every - // transaction retry (but note that retries should be rare for - // read-only queries with no clock uncertainty). - SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) + // This is used to support historical queries (AS OF SYSTEM TIME queries + // and backups). This method must be called on every transaction retry + // (but note that retries should be rare for read-only queries with no + // clock uncertainty). The method must not be called after the + // transaction has been used in the current epoch to read or write. + SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error // ManualRestart bumps the transactions epoch, and can upgrade the // timestamp and priority. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 1e4e3a449c6b..94cc3482df8f 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -328,6 +328,14 @@ func (txn *Txn) CommitTimestamp() hlc.Timestamp { return txn.mu.sender.CommitTimestamp() } +// CommitTimestampFixed returns true if the commit timestamp has +// been fixed to the start timestamp and cannot be pushed forward. +func (txn *Txn) CommitTimestampFixed() bool { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.CommitTimestampFixed() +} + // ProvisionalCommitTimestamp returns the transaction's provisional // commit timestamp. This can evolve throughout a txn's lifecycle. See // the comment on the WriteTimestamp field of TxnMeta for details. @@ -1177,16 +1185,19 @@ func (txn *Txn) recordPreviousTxnIDLocked(prevTxnID uuid.UUID) { // This is used to support historical queries (AS OF SYSTEM TIME queries and // backups). This method must be called on every transaction retry (but note // that retries should be rare for read-only queries with no clock uncertainty). -func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) { +func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error { if txn.typ != RootTxn { - panic(errors.WithContextTags( - errors.AssertionFailedf("SetFixedTimestamp() called on leaf txn"), ctx)) + return errors.WithContextTags(errors.AssertionFailedf( + "SetFixedTimestamp() called on leaf txn"), ctx) } if ts.IsEmpty() { - log.Fatalf(ctx, "empty timestamp is invalid for SetFixedTimestamp()") + return errors.WithContextTags(errors.AssertionFailedf( + "empty timestamp is invalid for SetFixedTimestamp()"), ctx) } - txn.mu.sender.SetFixedTimestamp(ctx, ts) + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.SetFixedTimestamp(ctx, ts) } // GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError that will diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 7c8f9d1129bc..76633df7d470 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -186,7 +186,9 @@ func (sc *SchemaChanger) fixedTimestampTxn( retryable func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error, ) error { return sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { - txn.SetFixedTimestamp(ctx, readAsOf) + if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil { + return err + } return retryable(ctx, txn, descriptors) }) } diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 5a608e71db41..ad1a66ac1d70 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -733,7 +733,9 @@ func (m *Manager) resolveName( if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil { return err } - txn.SetFixedTimestamp(ctx, timestamp) + if err := txn.SetFixedTimestamp(ctx, timestamp); err != nil { + return err + } var found bool var err error found, id, err = catalogkv.LookupObjectID(ctx, txn, m.storage.codec, parentID, parentSchemaID, name) diff --git a/pkg/sql/catalog/lease/lease_internal_test.go b/pkg/sql/catalog/lease/lease_internal_test.go index d9b3c345c91e..24a56d8730c3 100644 --- a/pkg/sql/catalog/lease/lease_internal_test.go +++ b/pkg/sql/catalog/lease/lease_internal_test.go @@ -303,7 +303,9 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); update := func(catalog.MutableDescriptor) error { return nil } logEvent := func(txn *kv.Txn) error { txn2 := kvDB.NewTxn(ctx, "future-read") - txn2.SetFixedTimestamp(ctx, futureTime.Prev()) + if err := txn2.SetFixedTimestamp(ctx, futureTime.Prev()); err != nil { + return err + } if _, err := txn2.Get(ctx, "key"); err != nil { return errors.Wrap(err, "read from other txn in future") } diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 3b3a53e8c1b7..fb33df96758a 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -1685,7 +1685,7 @@ CREATE TABLE t.test0 (k CHAR PRIMARY KEY, v CHAR); log.Infof(ctx, "checking version %d", table.GetVersion()) txn := kv.NewTxn(ctx, t.kvDB, roachpb.NodeID(0)) // Make the txn look back at the known modification timestamp. - txn.SetFixedTimestamp(ctx, table.GetModificationTime()) + require.NoError(t, txn.SetFixedTimestamp(ctx, table.GetModificationTime())) // Look up the descriptor. descKey := catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, descID) diff --git a/pkg/sql/catalog/lease/storage.go b/pkg/sql/catalog/lease/storage.go index 4123bba4b6e5..00b0eb5b0a9f 100644 --- a/pkg/sql/catalog/lease/storage.go +++ b/pkg/sql/catalog/lease/storage.go @@ -211,9 +211,12 @@ func (s storage) getForExpiration( ctx context.Context, expiration hlc.Timestamp, id descpb.ID, ) (catalog.Descriptor, error) { var desc catalog.Descriptor - err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { prevTimestamp := expiration.Prev() - txn.SetFixedTimestamp(ctx, prevTimestamp) + err := txn.SetFixedTimestamp(ctx, prevTimestamp) + if err != nil { + return err + } desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, s.codec, id) if err != nil { return err diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index deaa0fb78da6..31e047be8fc5 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2215,7 +2215,9 @@ func (ex *connExecutor) setTransactionModes( return errors.AssertionFailedf("expected an evaluated AS OF timestamp") } if !asOfTs.IsEmpty() { - ex.state.setHistoricalTimestamp(ex.Ctx(), asOfTs) + if err := ex.state.setHistoricalTimestamp(ex.Ctx(), asOfTs); err != nil { + return err + } ex.state.sqlTimestamp = asOfTs.GoTime() if rwMode == tree.UnspecifiedReadWriteMode { rwMode = tree.ReadOnly diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 2a3a4eef6b3b..22676d158e6a 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -588,7 +588,9 @@ func (ex *connExecutor) execStmtInOpenState( if asOf != nil { p.extendedEvalCtx.AsOfSystemTime = asOf p.extendedEvalCtx.SetTxnTimestamp(asOf.Timestamp.GoTime()) - ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp) + if err := ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp); err != nil { + return makeErrEvent(err) + } } } else { // If we're in an explicit txn, we allow AOST but only if it matches with diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index b45b9b434d6b..bf20cedfd8fb 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -275,7 +275,9 @@ func (ex *connExecutor) populatePrepared( "bounded staleness queries do not yet work with prepared statements", ) } - txn.SetFixedTimestamp(ctx, asOf.Timestamp) + if err := txn.SetFixedTimestamp(ctx, asOf.Timestamp); err != nil { + return 0, err + } } // PREPARE has a limited subset of statements it can be run with. Postgres diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 3da365b71df2..560e0cf34a8b 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -543,7 +543,9 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er if details.AsOf != nil { p.ExtendedEvalContext().AsOfSystemTime = &tree.AsOfSystemTime{Timestamp: *details.AsOf} p.ExtendedEvalContext().SetTxnTimestamp(details.AsOf.GoTime()) - txn.SetFixedTimestamp(ctx, *details.AsOf) + if err := txn.SetFixedTimestamp(ctx, *details.AsOf); err != nil { + return err + } } planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, txn, true /* distribute */) diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go index 4658bc05e2c1..c11b537dc566 100644 --- a/pkg/sql/index_backfiller.go +++ b/pkg/sql/index_backfiller.go @@ -98,7 +98,9 @@ func (ib *IndexBackfillPlanner) scanTargetSpansToPushTimestampCache( ) error { const pageSize = 10000 return ib.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, backfillTimestamp) + if err := txn.SetFixedTimestamp(ctx, backfillTimestamp); err != nil { + return err + } for _, span := range targetSpans { // TODO(dt): a Count() request would be nice here if the target isn't // empty, since we don't need to drag all the results back just to diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 82c3b17f460a..bd4b89b86c33 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -629,7 +629,9 @@ func (rf *Fetcher) StartInconsistentScan( ) } txn := kv.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */) - txn.SetFixedTimestamp(ctx, txnTimestamp) + if err := txn.SetFixedTimestamp(ctx, txnTimestamp); err != nil { + return err + } if log.V(1) { log.Infof(ctx, "starting inconsistent scan at timestamp %v", txnTimestamp) } @@ -644,7 +646,9 @@ func (rf *Fetcher) StartInconsistentScan( txnTimestamp = txnTimestamp.Add(now.Sub(txnStartTime).Nanoseconds(), 0 /* logical */) txnStartTime = now txn = kv.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */) - txn.SetFixedTimestamp(ctx, txnTimestamp) + if err := txn.SetFixedTimestamp(ctx, txnTimestamp); err != nil { + return nil, err + } if log.V(1) { log.Infof(ctx, "bumped inconsistent scan timestamp to %v", txnTimestamp) diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 4f1a76c45e0a..1a33e4961e11 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -268,7 +268,9 @@ func (c *DatumRowConverter) getSequenceAnnotation( err := evalCtx.DB.Txn(evalCtx.Context, func(ctx context.Context, txn *kv.Txn) error { seqNameToMetadata = make(map[string]*SequenceMetadata) seqIDToMetadata = make(map[descpb.ID]*SequenceMetadata) - txn.SetFixedTimestamp(ctx, hlc.Timestamp{WallTime: evalCtx.TxnTimestamp.UnixNano()}) + if err := txn.SetFixedTimestamp(ctx, hlc.Timestamp{WallTime: evalCtx.TxnTimestamp.UnixNano()}); err != nil { + return err + } for seqID := range sequenceIDs { seqDesc, err := catalogkv.MustGetTableDescByID(ctx, txn, evalCtx.Codec, seqID) if err != nil { diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go index b22ef934671f..902ea184d948 100644 --- a/pkg/sql/rowexec/indexbackfiller.go +++ b/pkg/sql/rowexec/indexbackfiller.go @@ -432,7 +432,9 @@ func (ib *indexBackfiller) buildIndexEntryBatch( start := timeutil.Now() var entries []rowenc.IndexEntry if err := ib.flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, readAsOf) + if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil { + return err + } // TODO(knz): do KV tracing in DistSQL processors. var err error diff --git a/pkg/sql/schema_change_plan_node.go b/pkg/sql/schema_change_plan_node.go index 1a4f029fa265..a8dad8a622a1 100644 --- a/pkg/sql/schema_change_plan_node.go +++ b/pkg/sql/schema_change_plan_node.go @@ -84,7 +84,9 @@ func (p *planner) WaitForDescriptorSchemaChanges( if err := p.ExecCfg().CollectionFactory.Txn( ctx, p.ExecCfg().InternalExecutor, p.ExecCfg().DB, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { - txn.SetFixedTimestamp(ctx, now) + if err := txn.SetFixedTimestamp(ctx, now); err != nil { + return err + } table, err := descriptors.GetImmutableTableByID(ctx, txn, descID, tree.ObjectLookupFlags{ CommonLookupFlags: tree.CommonLookupFlags{ diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index bbb34fef730a..95c1fccdc41d 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -247,7 +247,9 @@ func (sc *SchemaChanger) backfillQueryIntoTable( } return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, ts) + if err := txn.SetFixedTimestamp(ctx, ts); err != nil { + return err + } // Create an internal planner as the planner used to serve the user query // would have committed by this point. diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 878f1d6572db..ff986599c537 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -202,7 +202,9 @@ func (ts *txnState) resetForNewSQLTxn( ts.mu.txnStart = timeutil.Now() ts.mu.Unlock() if historicalTimestamp != nil { - ts.setHistoricalTimestamp(ts.Ctx, *historicalTimestamp) + if err := ts.setHistoricalTimestamp(ts.Ctx, *historicalTimestamp); err != nil { + panic(err) + } } if err := ts.setReadOnlyMode(readOnly); err != nil { panic(err) @@ -262,11 +264,16 @@ func (ts *txnState) finishExternalTxn() { ts.mu.Unlock() } -func (ts *txnState) setHistoricalTimestamp(ctx context.Context, historicalTimestamp hlc.Timestamp) { +func (ts *txnState) setHistoricalTimestamp( + ctx context.Context, historicalTimestamp hlc.Timestamp, +) error { ts.mu.Lock() - ts.mu.txn.SetFixedTimestamp(ctx, historicalTimestamp) - ts.mu.Unlock() + defer ts.mu.Unlock() + if err := ts.mu.txn.SetFixedTimestamp(ctx, historicalTimestamp); err != nil { + return err + } ts.isHistorical = true + return nil } // getReadTimestamp returns the transaction's current read timestamp.