Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport-2.0: sql: stop using txn.OrigTimestamp to reset TableCollection #24874

Merged
merged 1 commit into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,91 @@ SELECT EXISTS(SELECT * FROM t.foo);
}
}

// TestDescriptorRefreshOnRetry tests that all descriptors acquired by
// a query are properly released before the query is retried.
func TestDescriptorRefreshOnRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := tests.CreateTestServerParams()

fooAcquiredCount := int32(0)
fooReleaseCount := int32(0)

params.Knobs = base.TestingKnobs{
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{
// Set this so we observe a release event from the cache
// when the API releases the descriptor.
RemoveOnceDereferenced: true,
LeaseAcquiredEvent: func(table sqlbase.TableDescriptor, _ error) {
if table.Name == "foo" {
atomic.AddInt32(&fooAcquiredCount, 1)
}
},
LeaseReleasedEvent: func(table sqlbase.TableDescriptor, _ error) {
if table.Name == "foo" {
atomic.AddInt32(&fooReleaseCount, 1)
}
},
},
},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.foo (v INT);
`); err != nil {
t.Fatal(err)
}

if atomic.LoadInt32(&fooAcquiredCount) > 0 {
t.Fatalf("CREATE TABLE has acquired a descriptor")
}

tx, err := sqlDB.Begin()
if err != nil {
t.Fatal(err)
}

if _, err := tx.Exec(`
SELECT * FROM t.foo;
`); err != nil {
t.Fatal(err)
}

// Descriptor has been acquired.
if atomic.LoadInt32(&fooAcquiredCount) != 1 {
t.Fatal("descriptor not acquired")
}

// Descriptor has not been released.
if atomic.LoadInt32(&fooReleaseCount) > 0 {
t.Fatal("released descriptor")
}

if _, err := tx.Exec(
"SELECT CRDB_INTERNAL.FORCE_RETRY('1s':::INTERVAL)"); !testutils.IsError(
err, `forced by crdb_internal\.force_retry\(\)`) {
t.Fatal(err)
}

if atomic.LoadInt32(&fooAcquiredCount) > 1 {
t.Fatal("descriptor reacquired")
}

testutils.SucceedsSoon(t, func() error {
if atomic.LoadInt32(&fooReleaseCount) == 0 {
return errors.Errorf("didnt release descriptor")
}
return nil
})

if err := tx.Rollback(); err != nil {
t.Fatal(err)
}
}

// Test that a transaction created way in the past will use the correct
// table descriptor and will thus obey the modififcation time of the
// table descriptor.
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/prepare
Original file line number Diff line number Diff line change
Expand Up @@ -416,3 +416,19 @@ PREPARE x21701c AS SELECT * FROM kv WHERE k IS NOT DISTINCT FROM $1
query II
EXECUTE x21701c(NULL)
----

# Test that a PREPARE statement after a CREATE TABLE in the same TRANSACTION
# doesn't hang.
subtest 24578

statement ok
BEGIN TRANSACTION

statement ok
create table bar (id integer)

statement ok
PREPARE forbar AS insert into bar (id) VALUES (1)

statement ok
COMMIT TRANSACTION
34 changes: 2 additions & 32 deletions pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand Down Expand Up @@ -135,10 +134,6 @@ type uncommittedDatabase struct {
// end of each transaction on the session, or on hitting conditions such
// as errors, or retries that result in transaction timestamp changes.
type TableCollection struct {
// The timestamp used to pick tables. The timestamp falls within the
// validity window of every table in leasedTables.
timestamp hlc.Timestamp

// leaseMgr manages acquiring and releasing per-table leases.
leaseMgr *LeaseManager
// A collection of table descriptor valid for the timestamp.
Expand Down Expand Up @@ -186,17 +181,6 @@ type dbCacheSubscriber interface {
waitForCacheState(cond func(*databaseCache) (bool, error)) error
}

// Check if the timestamp used so far to pick tables has changed because
// of a transaction retry.
func (tc *TableCollection) resetForTxnRetry(ctx context.Context, txn *client.Txn) {
if tc.timestamp != (hlc.Timestamp{}) &&
tc.timestamp != txn.OrigTimestamp() {
if err := tc.releaseTables(ctx, dontBlockForDBCacheUpdate); err != nil {
log.Warningf(ctx, "error releasing tables")
}
}
}

// getTableVersion returns a table descriptor with a version suitable for
// the transaction: table.ModificationTime <= txn.Timestamp < expirationTime.
// The table must be released by calling tc.releaseTables().
Expand Down Expand Up @@ -259,10 +243,6 @@ func (tc *TableCollection) getTableVersion(
}
}

// If the txn has been pushed the table collection is released and
// txn deadline is reset.
tc.resetForTxnRetry(ctx, flags.txn)

if refuseFurtherLookup, table, err := tc.getUncommittedTable(
dbID, tn, flags.required); refuseFurtherLookup || err != nil {
return nil, nil, err
Expand Down Expand Up @@ -299,7 +279,7 @@ func (tc *TableCollection) getTableVersion(
// know how to deal with, so propagate the error.
return nil, nil, err
}
tc.timestamp = origTimestamp

tc.leasedTables = append(tc.leasedTables, table)
log.VEventf(ctx, 2, "added table '%s' to table collection", tn)

Expand Down Expand Up @@ -328,10 +308,6 @@ func (tc *TableCollection) getTableVersionByID(
return table, nil
}

// If the txn has been pushed the table collection is released and
// txn deadline is reset.
tc.resetForTxnRetry(ctx, txn)

for _, table := range tc.uncommittedTables {
if table.ID == tableID {
log.VEventf(ctx, 2, "found uncommitted table %d", tableID)
Expand Down Expand Up @@ -364,7 +340,7 @@ func (tc *TableCollection) getTableVersionByID(
}
return nil, err
}
tc.timestamp = origTimestamp

tc.leasedTables = append(tc.leasedTables, table)
log.VEventf(ctx, 2, "added table '%s' to table collection", table.Name)

Expand Down Expand Up @@ -403,7 +379,6 @@ func (tc *TableCollection) releaseLeases(ctx context.Context) {

// releaseTables releases all tables currently held by the TableCollection.
func (tc *TableCollection) releaseTables(ctx context.Context, opt releaseOpt) error {
tc.timestamp = hlc.Timestamp{}
if len(tc.leasedTables) > 0 {
log.VEventf(ctx, 2, "releasing %d tables", len(tc.leasedTables))
for _, table := range tc.leasedTables {
Expand Down Expand Up @@ -550,16 +525,11 @@ func (tc *TableCollection) getUncommittedTable(
func (tc *TableCollection) getAllDescriptors(
ctx context.Context, txn *client.Txn,
) ([]sqlbase.DescriptorProto, error) {
// If the txn has been pushed the table collection is released and txn
// deadline is reset.
tc.resetForTxnRetry(ctx, txn)

if tc.allDescriptors == nil {
descs, err := GetAllDescriptors(ctx, txn)
if err != nil {
return nil, err
}
tc.timestamp = txn.OrigTimestamp()
tc.allDescriptors = descs
}
return tc.allDescriptors, nil
Expand Down