Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: assert txn unused in SetFixedTimestamp #68426

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ func spansForAllTableIndexes(
checkForKVInBounds := func(start, end roachpb.Key, endTime hlc.Timestamp) (bool, error) {
var foundKV bool
err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, endTime)
if err := txn.SetFixedTimestamp(ctx, endTime); err != nil {
return err
}
res, err := txn.Scan(ctx, start, end, 1 /* maxRows */)
if err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/backupccl/backupresolver/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,11 @@ func LoadAllDescs(
var allDescs []catalog.Descriptor
if err := db.Txn(
ctx,
func(ctx context.Context, txn *kv.Txn) (err error) {
txn.SetFixedTimestamp(ctx, asOf)
func(ctx context.Context, txn *kv.Txn) error {
err := txn.SetFixedTimestamp(ctx, asOf)
if err != nil {
return err
}
allDescs, err = catalogkv.GetAllDescriptors(
ctx, txn, codec, true, /* shouldRunPostDeserializationChanges */
)
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ func fetchSpansForTargets(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
spans = nil
txn.SetFixedTimestamp(ctx, ts)
if err := txn.SetFixedTimestamp(ctx, ts); err != nil {
return err
}
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
flags := tree.ObjectLookupFlagsWithRequired()
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func (p *scanRequestScanner) exportSpan(
if log.V(2) {
log.Infof(ctx, `sending ScanRequest %s at %s`, span, ts)
}
txn.SetFixedTimestamp(ctx, ts)
if err := txn.SetFixedTimestamp(ctx, ts); err != nil {
return err
}
stopwatchStart := timeutil.Now()
var scanDuration, bufferDuration time.Duration
const targetBytesPerScan = 16 << 20 // 16 MiB
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ func (c *rowFetcherCache) TableDescForKey(
// descs.Collection directly here.
// TODO (SQL Schema): #53751.
if err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, ts)
var err error
err := txn.SetFixedTimestamp(ctx, ts)
if err != nil {
return err
}
tableDesc, err = c.collection.GetImmutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{})
return err
}); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ func (tf *schemaFeed) primeInitialTableDescs(ctx context.Context) error {
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
initialDescs = initialDescs[:0]
txn.SetFixedTimestamp(ctx, initialTableDescTs)
if err := txn.SetFixedTimestamp(ctx, initialTableDescTs); err != nil {
return err
}
// Note that all targets are currently guaranteed to be tables.
for tableID := range tf.targets {
flags := tree.ObjectLookupFlagsWithRequired()
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,11 @@ func TestOracle(t *testing.T) {

c := kv.NewDB(log.AmbientContext{Tracer: tracing.NewTracer()}, kv.MockTxnSenderFactory{}, clock, stopper)
staleTxn := kv.NewTxn(ctx, c, 0)
staleTxn.SetFixedTimestamp(ctx, stale)
require.NoError(t, staleTxn.SetFixedTimestamp(ctx, stale))
currentTxn := kv.NewTxn(ctx, c, 0)
currentTxn.SetFixedTimestamp(ctx, current)
require.NoError(t, currentTxn.SetFixedTimestamp(ctx, current))
futureTxn := kv.NewTxn(ctx, c, 0)
futureTxn.SetFixedTimestamp(ctx, future)
require.NoError(t, futureTxn.SetFixedTimestamp(ctx, future))

nodes := mockNodeStore{
{NodeID: 1, Address: util.MakeUnresolvedAddr("tcp", "1")},
Expand Down
13 changes: 12 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,9 +1008,19 @@ func (tc *TxnCoordSender) CommitTimestampFixed() bool {
}

// SetFixedTimestamp is part of the client.TxnSender interface.
func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) {
func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error {
tc.mu.Lock()
defer tc.mu.Unlock()
// The transaction must not have already been used in this epoch.
if !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() {
return errors.WithContextTags(errors.AssertionFailedf(
"cannot set fixed timestamp, txn %s already performed reads", tc.mu.txn), ctx)
}
if tc.mu.txn.Sequence != 0 {
return errors.WithContextTags(errors.AssertionFailedf(
"cannot set fixed timestamp, txn %s already performed writes", tc.mu.txn), ctx)
}

tc.mu.txn.ReadTimestamp = ts
tc.mu.txn.WriteTimestamp = ts
tc.mu.txn.GlobalUncertaintyLimit = ts
Expand All @@ -1019,6 +1029,7 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam
// Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed
// timestamp. This ensures that the MinTimestamp is always <= the other timestamps.
tc.mu.txn.MinTimestamp.Backward(ts)
return nil
}

// RequiredFrontier is part of the client.TxnSender interface.
Expand Down
88 changes: 88 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2672,3 +2672,91 @@ func TestTxnManualRefresh(t *testing.T) {
})
}
}

// TestTxnCoordSenderSetFixedTimestamp tests that SetFixedTimestamp cannot be
// called after a transaction has already been used in the current epoch to read
// or write.
func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

for _, test := range []struct {
name string
before func(*testing.T, *kv.Txn)
expErr string
}{
{
name: "nothing before",
before: func(t *testing.T, txn *kv.Txn) {},
},
{
name: "read before",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
},
expErr: "cannot set fixed timestamp, .* already performed reads",
},
{
name: "write before",
before: func(t *testing.T, txn *kv.Txn) {
require.NoError(t, txn.Put(ctx, "k", "v"))
},
expErr: "cannot set fixed timestamp, .* already performed writes",
},
{
name: "read and write before",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
require.NoError(t, txn.Put(ctx, "k", "v"))
},
expErr: "cannot set fixed timestamp, .* already performed reads",
},
{
name: "read before, in prior epoch",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
},
},
{
name: "write before, in prior epoch",
before: func(t *testing.T, txn *kv.Txn) {
require.NoError(t, txn.Put(ctx, "k", "v"))
txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
},
},
{
name: "read and write before, in prior epoch",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
require.NoError(t, txn.Put(ctx, "k", "v"))
txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
},
},
} {
t.Run(test.name, func(t *testing.T) {
s := createTestDB(t)
defer s.Stop()

txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
test.before(t, txn)

ts := s.Clock.Now()
err := txn.SetFixedTimestamp(ctx, ts)
if test.expErr != "" {
require.Error(t, err)
require.Regexp(t, test.expErr, err)
require.False(t, txn.CommitTimestampFixed())
} else {
require.NoError(t, err)
require.True(t, txn.CommitTimestampFixed())
require.Equal(t, ts, txn.CommitTimestamp())
}
})
}
}
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/rangefeed/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ func (dbc *dbAdapter) Scan(
ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue),
) error {
return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, asOf)
if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
return err
}
sp := span
var b kv.Batch
for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3174,7 +3174,7 @@ func TestStrictGCEnforcement(t *testing.T) {
}
mkStaleTxn = func() *kv.Txn {
txn := db.NewTxn(ctx, "foo")
txn.SetFixedTimestamp(ctx, tenSecondsAgo)
require.NoError(t, txn.SetFixedTimestamp(ctx, tenSecondsAgo))
return txn
}
getRejectedMsg = func() string {
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ func TestReplicaRangefeed(t *testing.T) {
// Insert a second key transactionally.
ts3 := initTime.Add(0, 3)
if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, ts3)
if err := txn.SetFixedTimestamp(ctx, ts3); err != nil {
return err
}
return txn.Put(ctx, roachpb.Key("m"), []byte("val3"))
}); err != nil {
t.Fatal(err)
Expand All @@ -239,7 +241,9 @@ func TestReplicaRangefeed(t *testing.T) {
// Update the originally incremented key transactionally.
ts5 := initTime.Add(0, 5)
if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, ts5)
if err := txn.SetFixedTimestamp(ctx, ts5); err != nil {
return err
}
_, err := txn.Inc(ctx, incArgs.Key, 7)
return err
}); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func (m *MockTransactionalSender) CommitTimestamp() hlc.Timestamp {

// CommitTimestampFixed is part of the TxnSender interface.
func (m *MockTransactionalSender) CommitTimestampFixed() bool {
panic("unimplemented")
return m.txn.CommitTimestampFixed
}

// SetFixedTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Timestamp) {
func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Timestamp) error {
m.txn.WriteTimestamp = ts
m.txn.ReadTimestamp = ts
m.txn.GlobalUncertaintyLimit = ts
Expand All @@ -122,6 +122,7 @@ func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Ti
// Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed
// timestamp. This ensures that the MinTimestamp is always <= the other timestamps.
m.txn.MinTimestamp.Backward(ts)
return nil
}

// RequiredFrontier is part of the TxnSender interface.
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,12 @@ type TxnSender interface {
// such that the transaction can't be pushed to a different
// timestamp.
//
// This is used to support historical queries (AS OF SYSTEM TIME
// queries and backups). This method must be called on every
// transaction retry (but note that retries should be rare for
// read-only queries with no clock uncertainty).
SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp)
// This is used to support historical queries (AS OF SYSTEM TIME queries
// and backups). This method must be called on every transaction retry
// (but note that retries should be rare for read-only queries with no
// clock uncertainty). The method must not be called after the
// transaction has been used in the current epoch to read or write.
SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error

// ManualRestart bumps the transactions epoch, and can upgrade the
// timestamp and priority.
Expand Down
21 changes: 16 additions & 5 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,14 @@ func (txn *Txn) CommitTimestamp() hlc.Timestamp {
return txn.mu.sender.CommitTimestamp()
}

// CommitTimestampFixed returns true if the commit timestamp has
// been fixed to the start timestamp and cannot be pushed forward.
func (txn *Txn) CommitTimestampFixed() bool {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.CommitTimestampFixed()
}

// ProvisionalCommitTimestamp returns the transaction's provisional
// commit timestamp. This can evolve throughout a txn's lifecycle. See
// the comment on the WriteTimestamp field of TxnMeta for details.
Expand Down Expand Up @@ -1177,16 +1185,19 @@ func (txn *Txn) recordPreviousTxnIDLocked(prevTxnID uuid.UUID) {
// This is used to support historical queries (AS OF SYSTEM TIME queries and
// backups). This method must be called on every transaction retry (but note
// that retries should be rare for read-only queries with no clock uncertainty).
func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) {
func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error {
if txn.typ != RootTxn {
panic(errors.WithContextTags(
errors.AssertionFailedf("SetFixedTimestamp() called on leaf txn"), ctx))
return errors.WithContextTags(errors.AssertionFailedf(
"SetFixedTimestamp() called on leaf txn"), ctx)
}

if ts.IsEmpty() {
log.Fatalf(ctx, "empty timestamp is invalid for SetFixedTimestamp()")
return errors.WithContextTags(errors.AssertionFailedf(
"empty timestamp is invalid for SetFixedTimestamp()"), ctx)
}
txn.mu.sender.SetFixedTimestamp(ctx, ts)
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.SetFixedTimestamp(ctx, ts)
}

// GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError that will
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func (sc *SchemaChanger) fixedTimestampTxn(
retryable func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error,
) error {
return sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
txn.SetFixedTimestamp(ctx, readAsOf)
if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil {
return err
}
return retryable(ctx, txn, descriptors)
})
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,9 @@ func (m *Manager) resolveName(
if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil {
return err
}
txn.SetFixedTimestamp(ctx, timestamp)
if err := txn.SetFixedTimestamp(ctx, timestamp); err != nil {
return err
}
var found bool
var err error
found, id, err = catalogkv.LookupObjectID(ctx, txn, m.storage.codec, parentID, parentSchemaID, name)
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/catalog/lease/lease_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
update := func(catalog.MutableDescriptor) error { return nil }
logEvent := func(txn *kv.Txn) error {
txn2 := kvDB.NewTxn(ctx, "future-read")
txn2.SetFixedTimestamp(ctx, futureTime.Prev())
if err := txn2.SetFixedTimestamp(ctx, futureTime.Prev()); err != nil {
return err
}
if _, err := txn2.Get(ctx, "key"); err != nil {
return errors.Wrap(err, "read from other txn in future")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1685,7 +1685,7 @@ CREATE TABLE t.test0 (k CHAR PRIMARY KEY, v CHAR);
log.Infof(ctx, "checking version %d", table.GetVersion())
txn := kv.NewTxn(ctx, t.kvDB, roachpb.NodeID(0))
// Make the txn look back at the known modification timestamp.
txn.SetFixedTimestamp(ctx, table.GetModificationTime())
require.NoError(t, txn.SetFixedTimestamp(ctx, table.GetModificationTime()))

// Look up the descriptor.
descKey := catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, descID)
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/catalog/lease/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,12 @@ func (s storage) getForExpiration(
ctx context.Context, expiration hlc.Timestamp, id descpb.ID,
) (catalog.Descriptor, error) {
var desc catalog.Descriptor
err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
prevTimestamp := expiration.Prev()
txn.SetFixedTimestamp(ctx, prevTimestamp)
err := txn.SetFixedTimestamp(ctx, prevTimestamp)
if err != nil {
return err
}
desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, s.codec, id)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2215,7 +2215,9 @@ func (ex *connExecutor) setTransactionModes(
return errors.AssertionFailedf("expected an evaluated AS OF timestamp")
}
if !asOfTs.IsEmpty() {
ex.state.setHistoricalTimestamp(ex.Ctx(), asOfTs)
if err := ex.state.setHistoricalTimestamp(ex.Ctx(), asOfTs); err != nil {
return err
}
ex.state.sqlTimestamp = asOfTs.GoTime()
if rwMode == tree.UnspecifiedReadWriteMode {
rwMode = tree.ReadOnly
Expand Down
Loading