Skip to content

Commit

Permalink
sql: Stop using txn.OrigTimestamp to reset TableCollection
Browse files Browse the repository at this point in the history
We use the transaction OrigTimestamp to figure out if a
transaction is being retried, triggering a reset of
TableCollection. This logic would get triggered when
running PREPARE within a transaction, because PREPARE
uses it's own transaction and thus has a different
OrigTimestamp.

Removed this logic because transaction retries trigger
resetting the TableCollection in the new conn-executor code.

fixes #24578

Release note sql: fix PREPARE hanging when run in the same transaction
as a CREATE TABLE
  • Loading branch information
vivekmenezes committed Apr 17, 2018
1 parent 482ab05 commit 2e8349f
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 32 deletions.
85 changes: 85 additions & 0 deletions pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,91 @@ SELECT EXISTS(SELECT * FROM t.foo);
}
}

// TestDescriptorRefreshOnRetry tests that all descriptors acquired by
// a query are properly released before the query is retried.
func TestDescriptorRefreshOnRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := tests.CreateTestServerParams()

fooAcquiredCount := int32(0)
fooReleaseCount := int32(0)

params.Knobs = base.TestingKnobs{
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{
// Set this so we observe a release event from the cache
// when the API releases the descriptor.
RemoveOnceDereferenced: true,
LeaseAcquiredEvent: func(table sqlbase.TableDescriptor, _ error) {
if table.Name == "foo" {
atomic.AddInt32(&fooAcquiredCount, 1)
}
},
LeaseReleasedEvent: func(table sqlbase.TableDescriptor, _ error) {
if table.Name == "foo" {
atomic.AddInt32(&fooReleaseCount, 1)
}
},
},
},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.foo (v INT);
`); err != nil {
t.Fatal(err)
}

if atomic.LoadInt32(&fooAcquiredCount) > 0 {
t.Fatalf("CREATE TABLE has acquired a descriptor")
}

tx, err := sqlDB.Begin()
if err != nil {
t.Fatal(err)
}

if _, err := tx.Exec(`
SELECT * FROM t.foo;
`); err != nil {
t.Fatal(err)
}

// Descriptor has been acquired.
if atomic.LoadInt32(&fooAcquiredCount) != 1 {
t.Fatal("descriptor not acquired")
}

// Descriptor has not been released.
if atomic.LoadInt32(&fooReleaseCount) > 0 {
t.Fatal("released descriptor")
}

if _, err := tx.Exec(
"SELECT CRDB_INTERNAL.FORCE_RETRY('1s':::INTERVAL)"); !testutils.IsError(
err, `forced by crdb_internal\.force_retry\(\)`) {
t.Fatal(err)
}

if atomic.LoadInt32(&fooAcquiredCount) > 1 {
t.Fatal("descriptor reacquired")
}

testutils.SucceedsSoon(t, func() error {
if atomic.LoadInt32(&fooReleaseCount) == 0 {
return errors.Errorf("didnt release descriptor")
}
return nil
})

if err := tx.Rollback(); err != nil {
t.Fatal(err)
}
}

// Test that a transaction created way in the past will use the correct
// table descriptor and will thus obey the modififcation time of the
// table descriptor.
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/prepare
Original file line number Diff line number Diff line change
Expand Up @@ -416,3 +416,19 @@ PREPARE x21701c AS SELECT * FROM kv WHERE k IS NOT DISTINCT FROM $1
query II
EXECUTE x21701c(NULL)
----

# Test that a PREPARE statement after a CREATE TABLE in the same TRANSACTION
# doesn't hang.
subtest 24578

statement ok
BEGIN TRANSACTION

statement ok
create table bar (id integer)

statement ok
PREPARE forbar AS insert into bar (id) VALUES (1)

statement ok
COMMIT TRANSACTION
34 changes: 2 additions & 32 deletions pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand Down Expand Up @@ -135,10 +134,6 @@ type uncommittedDatabase struct {
// end of each transaction on the session, or on hitting conditions such
// as errors, or retries that result in transaction timestamp changes.
type TableCollection struct {
// The timestamp used to pick tables. The timestamp falls within the
// validity window of every table in leasedTables.
timestamp hlc.Timestamp

// leaseMgr manages acquiring and releasing per-table leases.
leaseMgr *LeaseManager
// A collection of table descriptor valid for the timestamp.
Expand Down Expand Up @@ -186,17 +181,6 @@ type dbCacheSubscriber interface {
waitForCacheState(cond func(*databaseCache) (bool, error)) error
}

// Check if the timestamp used so far to pick tables has changed because
// of a transaction retry.
func (tc *TableCollection) resetForTxnRetry(ctx context.Context, txn *client.Txn) {
if tc.timestamp != (hlc.Timestamp{}) &&
tc.timestamp != txn.OrigTimestamp() {
if err := tc.releaseTables(ctx, dontBlockForDBCacheUpdate); err != nil {
log.Warningf(ctx, "error releasing tables")
}
}
}

// getTableVersion returns a table descriptor with a version suitable for
// the transaction: table.ModificationTime <= txn.Timestamp < expirationTime.
// The table must be released by calling tc.releaseTables().
Expand Down Expand Up @@ -259,10 +243,6 @@ func (tc *TableCollection) getTableVersion(
}
}

// If the txn has been pushed the table collection is released and
// txn deadline is reset.
tc.resetForTxnRetry(ctx, flags.txn)

if refuseFurtherLookup, table, err := tc.getUncommittedTable(
dbID, tn, flags.required); refuseFurtherLookup || err != nil {
return nil, nil, err
Expand Down Expand Up @@ -299,7 +279,7 @@ func (tc *TableCollection) getTableVersion(
// know how to deal with, so propagate the error.
return nil, nil, err
}
tc.timestamp = origTimestamp

tc.leasedTables = append(tc.leasedTables, table)
log.VEventf(ctx, 2, "added table '%s' to table collection", tn)

Expand Down Expand Up @@ -328,10 +308,6 @@ func (tc *TableCollection) getTableVersionByID(
return table, nil
}

// If the txn has been pushed the table collection is released and
// txn deadline is reset.
tc.resetForTxnRetry(ctx, txn)

for _, table := range tc.uncommittedTables {
if table.ID == tableID {
log.VEventf(ctx, 2, "found uncommitted table %d", tableID)
Expand Down Expand Up @@ -364,7 +340,7 @@ func (tc *TableCollection) getTableVersionByID(
}
return nil, err
}
tc.timestamp = origTimestamp

tc.leasedTables = append(tc.leasedTables, table)
log.VEventf(ctx, 2, "added table '%s' to table collection", table.Name)

Expand Down Expand Up @@ -403,7 +379,6 @@ func (tc *TableCollection) releaseLeases(ctx context.Context) {

// releaseTables releases all tables currently held by the TableCollection.
func (tc *TableCollection) releaseTables(ctx context.Context, opt releaseOpt) error {
tc.timestamp = hlc.Timestamp{}
if len(tc.leasedTables) > 0 {
log.VEventf(ctx, 2, "releasing %d tables", len(tc.leasedTables))
for _, table := range tc.leasedTables {
Expand Down Expand Up @@ -550,16 +525,11 @@ func (tc *TableCollection) getUncommittedTable(
func (tc *TableCollection) getAllDescriptors(
ctx context.Context, txn *client.Txn,
) ([]sqlbase.DescriptorProto, error) {
// If the txn has been pushed the table collection is released and txn
// deadline is reset.
tc.resetForTxnRetry(ctx, txn)

if tc.allDescriptors == nil {
descs, err := GetAllDescriptors(ctx, txn)
if err != nil {
return nil, err
}
tc.timestamp = txn.OrigTimestamp()
tc.allDescriptors = descs
}
return tc.allDescriptors, nil
Expand Down

0 comments on commit 2e8349f

Please sign in to comment.