Skip to content

Commit

Permalink
kv: assert txn unused in SetFixedTimestamp
Browse files Browse the repository at this point in the history
This commit asserts that a transaction has not been used to read or to
write by the time that `SetFixedTimestamp` is called on it.

This was extracted from #68194 and modified to return an error from
`SetFixedTimestamp` on misuse instead of fatal-ing. This provides a
sufficient, temporary backstop for #68216 until the conn executor logic
is fixed:

```
[email protected]:26257/movr> create table t (x int);
CREATE TABLE

[email protected]:26257/movr> insert into t values (1);
INSERT 1

[email protected]:26257/movr> select crdb_internal_mvcc_timestamp, * from t;
   crdb_internal_mvcc_timestamp  | x
---------------------------------+----
  1628094563935439000.0000000000 | 1
(1 row)

[email protected]:26257/movr> begin as of system time (1628094563935439000.0000000000-1)::string;
BEGIN

[email protected]:26257/movr  OPEN> select * from t;
  x
-----
(0 rows)

[email protected]:26257/movr  OPEN> prepare y as select * from t as of system time 1628094563935439000.0000000000;
ERROR: internal error: cannot set fixed timestamp, txn "sql txn" meta={id=e5e81c19 pri=0.01517572 epo=0 ts=1628094563.935438999,0 min=1628094563.935438999,0 seq=0} lock=false stat=PENDING rts=1628094563.935438999,0 wto=false gul=1628094563.935438999,0 already performed reads
SQLSTATE: XX000
DETAIL: stack trace:
github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord/txn_coord_sender.go:1016: SetFixedTimestamp()
github.com/cockroachdb/cockroach/pkg/kv/txn.go:1200: SetFixedTimestamp()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:278: populatePrepared()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:220: func1()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:226: prepare()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_prepare.go:112: addPreparedStmt()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_exec.go:570: execStmtInOpenState()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor_exec.go:126: execStmt()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:1626: func1()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:1628: execCmd()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:1550: run()
github.com/cockroachdb/cockroach/pkg/sql/conn_executor.go:627: ServeConn()
github.com/cockroachdb/cockroach/pkg/sql/pgwire/conn.go:645: func1()
runtime/asm_amd64.s:1371: goexit()

HINT: You have encountered an unexpected error.

Please check the public issue tracker to check whether this problem is
already tracked. If you cannot find it there, please report the error
with details by creating a new issue.

If you would rather not post publicly, please contact us directly
using the support form.

We appreciate your feedback.

[email protected]:26257/? ERROR>
```
  • Loading branch information
nvanbenschoten committed Aug 4, 2021
1 parent 9758387 commit b761eba
Show file tree
Hide file tree
Showing 31 changed files with 218 additions and 49 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ func spansForAllTableIndexes(
checkForKVInBounds := func(start, end roachpb.Key, endTime hlc.Timestamp) (bool, error) {
var foundKV bool
err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, endTime)
if err := txn.SetFixedTimestamp(ctx, endTime); err != nil {
return err
}
res, err := txn.Scan(ctx, start, end, 1 /* maxRows */)
if err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/backupccl/backupresolver/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,11 @@ func LoadAllDescs(
var allDescs []catalog.Descriptor
if err := db.Txn(
ctx,
func(ctx context.Context, txn *kv.Txn) (err error) {
txn.SetFixedTimestamp(ctx, asOf)
func(ctx context.Context, txn *kv.Txn) error {
err := txn.SetFixedTimestamp(ctx, asOf)
if err != nil {
return err
}
allDescs, err = catalogkv.GetAllDescriptors(
ctx, txn, codec, true, /* shouldRunPostDeserializationChanges */
)
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ func fetchSpansForTargets(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
spans = nil
txn.SetFixedTimestamp(ctx, ts)
if err := txn.SetFixedTimestamp(ctx, ts); err != nil {
return err
}
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
flags := tree.ObjectLookupFlagsWithRequired()
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func (p *scanRequestScanner) exportSpan(
if log.V(2) {
log.Infof(ctx, `sending ScanRequest %s at %s`, span, ts)
}
txn.SetFixedTimestamp(ctx, ts)
if err := txn.SetFixedTimestamp(ctx, ts); err != nil {
return err
}
stopwatchStart := timeutil.Now()
var scanDuration, bufferDuration time.Duration
const targetBytesPerScan = 16 << 20 // 16 MiB
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ func (c *rowFetcherCache) TableDescForKey(
// descs.Collection directly here.
// TODO (SQL Schema): #53751.
if err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, ts)
var err error
err := txn.SetFixedTimestamp(ctx, ts)
if err != nil {
return err
}
tableDesc, err = c.collection.GetImmutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{})
return err
}); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ func (tf *schemaFeed) primeInitialTableDescs(ctx context.Context) error {
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
initialDescs = initialDescs[:0]
txn.SetFixedTimestamp(ctx, initialTableDescTs)
if err := txn.SetFixedTimestamp(ctx, initialTableDescTs); err != nil {
return err
}
// Note that all targets are currently guaranteed to be tables.
for tableID := range tf.targets {
flags := tree.ObjectLookupFlagsWithRequired()
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,11 @@ func TestOracle(t *testing.T) {

c := kv.NewDB(log.AmbientContext{Tracer: tracing.NewTracer()}, kv.MockTxnSenderFactory{}, clock, stopper)
staleTxn := kv.NewTxn(ctx, c, 0)
staleTxn.SetFixedTimestamp(ctx, stale)
require.NoError(t, staleTxn.SetFixedTimestamp(ctx, stale))
currentTxn := kv.NewTxn(ctx, c, 0)
currentTxn.SetFixedTimestamp(ctx, current)
require.NoError(t, currentTxn.SetFixedTimestamp(ctx, current))
futureTxn := kv.NewTxn(ctx, c, 0)
futureTxn.SetFixedTimestamp(ctx, future)
require.NoError(t, futureTxn.SetFixedTimestamp(ctx, future))

nodes := mockNodeStore{
{NodeID: 1, Address: util.MakeUnresolvedAddr("tcp", "1")},
Expand Down
13 changes: 12 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,9 +1008,19 @@ func (tc *TxnCoordSender) CommitTimestampFixed() bool {
}

// SetFixedTimestamp is part of the client.TxnSender interface.
func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) {
func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error {
tc.mu.Lock()
defer tc.mu.Unlock()
// The transaction must not have already been used in this epoch.
if !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() {
return errors.WithContextTags(errors.AssertionFailedf(
"cannot set fixed timestamp, txn %s already performed reads", tc.mu.txn), ctx)
}
if tc.mu.txn.Sequence != 0 {
return errors.WithContextTags(errors.AssertionFailedf(
"cannot set fixed timestamp, txn %s already performed writes", tc.mu.txn), ctx)
}

tc.mu.txn.ReadTimestamp = ts
tc.mu.txn.WriteTimestamp = ts
tc.mu.txn.GlobalUncertaintyLimit = ts
Expand All @@ -1019,6 +1029,7 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam
// Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed
// timestamp. This ensures that the MinTimestamp is always <= the other timestamps.
tc.mu.txn.MinTimestamp.Backward(ts)
return nil
}

// RequiredFrontier is part of the client.TxnSender interface.
Expand Down
88 changes: 88 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2672,3 +2672,91 @@ func TestTxnManualRefresh(t *testing.T) {
})
}
}

// TestTxnCoordSenderSetFixedTimestamp tests that SetFixedTimestamp cannot be
// called after a transaction has already been used in the current epoch to read
// or write.
func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

for _, test := range []struct {
name string
before func(*testing.T, *kv.Txn)
expErr string
}{
{
name: "nothing before",
before: func(t *testing.T, txn *kv.Txn) {},
},
{
name: "read before",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
},
expErr: "cannot set fixed timestamp, .* already performed reads",
},
{
name: "write before",
before: func(t *testing.T, txn *kv.Txn) {
require.NoError(t, txn.Put(ctx, "k", "v"))
},
expErr: "cannot set fixed timestamp, .* already performed writes",
},
{
name: "read and write before",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
require.NoError(t, txn.Put(ctx, "k", "v"))
},
expErr: "cannot set fixed timestamp, .* already performed reads",
},
{
name: "read before, in prior epoch",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
},
},
{
name: "write before, in prior epoch",
before: func(t *testing.T, txn *kv.Txn) {
require.NoError(t, txn.Put(ctx, "k", "v"))
txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
},
},
{
name: "read and write before, in prior epoch",
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
require.NoError(t, txn.Put(ctx, "k", "v"))
txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
},
},
} {
t.Run(test.name, func(t *testing.T) {
s := createTestDB(t)
defer s.Stop()

txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
test.before(t, txn)

ts := s.Clock.Now()
err := txn.SetFixedTimestamp(ctx, ts)
if test.expErr != "" {
require.Error(t, err)
require.Regexp(t, test.expErr, err)
require.False(t, txn.CommitTimestampFixed())
} else {
require.NoError(t, err)
require.True(t, txn.CommitTimestampFixed())
require.Equal(t, ts, txn.CommitTimestamp())
}
})
}
}
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/rangefeed/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ func (dbc *dbAdapter) Scan(
ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue),
) error {
return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, asOf)
if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
return err
}
sp := span
var b kv.Batch
for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3174,7 +3174,7 @@ func TestStrictGCEnforcement(t *testing.T) {
}
mkStaleTxn = func() *kv.Txn {
txn := db.NewTxn(ctx, "foo")
txn.SetFixedTimestamp(ctx, tenSecondsAgo)
require.NoError(t, txn.SetFixedTimestamp(ctx, tenSecondsAgo))
return txn
}
getRejectedMsg = func() string {
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ func TestReplicaRangefeed(t *testing.T) {
// Insert a second key transactionally.
ts3 := initTime.Add(0, 3)
if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, ts3)
if err := txn.SetFixedTimestamp(ctx, ts3); err != nil {
return err
}
return txn.Put(ctx, roachpb.Key("m"), []byte("val3"))
}); err != nil {
t.Fatal(err)
Expand All @@ -239,7 +241,9 @@ func TestReplicaRangefeed(t *testing.T) {
// Update the originally incremented key transactionally.
ts5 := initTime.Add(0, 5)
if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, ts5)
if err := txn.SetFixedTimestamp(ctx, ts5); err != nil {
return err
}
_, err := txn.Inc(ctx, incArgs.Key, 7)
return err
}); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func (m *MockTransactionalSender) CommitTimestamp() hlc.Timestamp {

// CommitTimestampFixed is part of the TxnSender interface.
func (m *MockTransactionalSender) CommitTimestampFixed() bool {
panic("unimplemented")
return m.txn.CommitTimestampFixed
}

// SetFixedTimestamp is part of the TxnSender interface.
func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Timestamp) {
func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Timestamp) error {
m.txn.WriteTimestamp = ts
m.txn.ReadTimestamp = ts
m.txn.GlobalUncertaintyLimit = ts
Expand All @@ -122,6 +122,7 @@ func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Ti
// Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed
// timestamp. This ensures that the MinTimestamp is always <= the other timestamps.
m.txn.MinTimestamp.Backward(ts)
return nil
}

// RequiredFrontier is part of the TxnSender interface.
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,12 @@ type TxnSender interface {
// such that the transaction can't be pushed to a different
// timestamp.
//
// This is used to support historical queries (AS OF SYSTEM TIME
// queries and backups). This method must be called on every
// transaction retry (but note that retries should be rare for
// read-only queries with no clock uncertainty).
SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp)
// This is used to support historical queries (AS OF SYSTEM TIME queries
// and backups). This method must be called on every transaction retry
// (but note that retries should be rare for read-only queries with no
// clock uncertainty). The method must not be called after the
// transaction has been used in the current epoch to read or write.
SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error

// ManualRestart bumps the transactions epoch, and can upgrade the
// timestamp and priority.
Expand Down
21 changes: 16 additions & 5 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,14 @@ func (txn *Txn) CommitTimestamp() hlc.Timestamp {
return txn.mu.sender.CommitTimestamp()
}

// CommitTimestampFixed returns true if the commit timestamp has
// been fixed to the start timestamp and cannot be pushed forward.
func (txn *Txn) CommitTimestampFixed() bool {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.CommitTimestampFixed()
}

// ProvisionalCommitTimestamp returns the transaction's provisional
// commit timestamp. This can evolve throughout a txn's lifecycle. See
// the comment on the WriteTimestamp field of TxnMeta for details.
Expand Down Expand Up @@ -1177,16 +1185,19 @@ func (txn *Txn) recordPreviousTxnIDLocked(prevTxnID uuid.UUID) {
// This is used to support historical queries (AS OF SYSTEM TIME queries and
// backups). This method must be called on every transaction retry (but note
// that retries should be rare for read-only queries with no clock uncertainty).
func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) {
func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error {
if txn.typ != RootTxn {
panic(errors.WithContextTags(
errors.AssertionFailedf("SetFixedTimestamp() called on leaf txn"), ctx))
return errors.WithContextTags(errors.AssertionFailedf(
"SetFixedTimestamp() called on leaf txn"), ctx)
}

if ts.IsEmpty() {
log.Fatalf(ctx, "empty timestamp is invalid for SetFixedTimestamp()")
return errors.WithContextTags(errors.AssertionFailedf(
"empty timestamp is invalid for SetFixedTimestamp()"), ctx)
}
txn.mu.sender.SetFixedTimestamp(ctx, ts)
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.SetFixedTimestamp(ctx, ts)
}

// GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError that will
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func (sc *SchemaChanger) fixedTimestampTxn(
retryable func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error,
) error {
return sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
txn.SetFixedTimestamp(ctx, readAsOf)
if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil {
return err
}
return retryable(ctx, txn, descriptors)
})
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,9 @@ func (m *Manager) resolveName(
if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil {
return err
}
txn.SetFixedTimestamp(ctx, timestamp)
if err := txn.SetFixedTimestamp(ctx, timestamp); err != nil {
return err
}
var found bool
var err error
found, id, err = catalogkv.LookupObjectID(ctx, txn, m.storage.codec, parentID, parentSchemaID, name)
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/catalog/lease/lease_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
update := func(catalog.MutableDescriptor) error { return nil }
logEvent := func(txn *kv.Txn) error {
txn2 := kvDB.NewTxn(ctx, "future-read")
txn2.SetFixedTimestamp(ctx, futureTime.Prev())
if err := txn2.SetFixedTimestamp(ctx, futureTime.Prev()); err != nil {
return err
}
if _, err := txn2.Get(ctx, "key"); err != nil {
return errors.Wrap(err, "read from other txn in future")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1685,7 +1685,7 @@ CREATE TABLE t.test0 (k CHAR PRIMARY KEY, v CHAR);
log.Infof(ctx, "checking version %d", table.GetVersion())
txn := kv.NewTxn(ctx, t.kvDB, roachpb.NodeID(0))
// Make the txn look back at the known modification timestamp.
txn.SetFixedTimestamp(ctx, table.GetModificationTime())
require.NoError(t, txn.SetFixedTimestamp(ctx, table.GetModificationTime()))

// Look up the descriptor.
descKey := catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, descID)
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/catalog/lease/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,12 @@ func (s storage) getForExpiration(
ctx context.Context, expiration hlc.Timestamp, id descpb.ID,
) (catalog.Descriptor, error) {
var desc catalog.Descriptor
err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
prevTimestamp := expiration.Prev()
txn.SetFixedTimestamp(ctx, prevTimestamp)
err := txn.SetFixedTimestamp(ctx, prevTimestamp)
if err != nil {
return err
}
desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, s.codec, id)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2215,7 +2215,9 @@ func (ex *connExecutor) setTransactionModes(
return errors.AssertionFailedf("expected an evaluated AS OF timestamp")
}
if !asOfTs.IsEmpty() {
ex.state.setHistoricalTimestamp(ex.Ctx(), asOfTs)
if err := ex.state.setHistoricalTimestamp(ex.Ctx(), asOfTs); err != nil {
return err
}
ex.state.sqlTimestamp = asOfTs.GoTime()
if rwMode == tree.UnspecifiedReadWriteMode {
rwMode = tree.ReadOnly
Expand Down
Loading

0 comments on commit b761eba

Please sign in to comment.