Skip to content

Commit

Permalink
Revert "sql: lease acquisition of OFFLINE descs may starve bulk opera…
Browse files Browse the repository at this point in the history
…tions"

Fixes: cockroachdb#62864, cockroachdb#62853, cockroachdb#62849, cockroachdb#62844

Reverts offline descriptor leasing change to fix
intermittent failures introduced inside the
importccl tests.

Release note: None
  • Loading branch information
fqazi committed Mar 31, 2021
1 parent 99c29ef commit 3e1c9f1
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 189 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (tc *Collection) getLeasedDescriptorByName(
// Read the descriptor from the store in the face of some specific errors
// because of a known limitation of AcquireByName. See the known
// limitations of AcquireByName for details.
if (catalog.HasInactiveDescriptorError(err) && errors.Is(err, catalog.ErrDescriptorDropped)) ||
if catalog.HasInactiveDescriptorError(err) ||
errors.Is(err, catalog.ErrDescriptorNotFound) {
return nil, true, nil
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -68,9 +67,7 @@ go_test(
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqltestutils",
"//pkg/sql/sqlutil",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
Expand All @@ -87,7 +84,6 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_cockroach_go//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
52 changes: 15 additions & 37 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (s storage) acquire(
return err
}
if err := catalog.FilterDescriptorState(
desc, tree.CommonLookupFlags{IncludeOffline: true}, // filter dropped only
desc, tree.CommonLookupFlags{}, // filter all non-public state
); err != nil {
return err
}
Expand Down Expand Up @@ -981,7 +981,7 @@ func purgeOldVersions(
ctx context.Context,
db *kv.DB,
id descpb.ID,
dropped bool,
takenOffline bool,
minVersion descpb.DescriptorVersion,
m *Manager,
) error {
Expand All @@ -995,24 +995,24 @@ func purgeOldVersions(
}
empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0
t.mu.Unlock()
if empty && !dropped {
if empty && !takenOffline {
// We don't currently have a version on this descriptor, so no need to refresh
// anything.
return nil
}

removeInactives := func(dropped bool) {
removeInactives := func(takenOffline bool) {
t.mu.Lock()
t.mu.takenOffline = dropped
t.mu.takenOffline = takenOffline
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
}
}

if dropped {
removeInactives(true /* dropped */)
if takenOffline {
removeInactives(true /* takenOffline */)
return nil
}

Expand All @@ -1028,7 +1028,7 @@ func purgeOldVersions(
return errRenewLease
}
newest.incRefcount()
removeInactives(false /* dropped */)
removeInactives(false /* takenOffline */)
s, err := t.release(newest.Descriptor, m.removeOnceDereferenced())
if err != nil {
return err
Expand Down Expand Up @@ -1398,28 +1398,6 @@ func (m *Manager) AcquireByName(
parentSchemaID descpb.ID,
name string,
) (catalog.Descriptor, hlc.Timestamp, error) {
// When offline descriptor leases were not allowed to be cached,
// attempt to acquire a lease on them would generate a descriptor
// offline error. Recent changes allow offline descriptor leases
// to be cached, but callers still need the offline error generated.
// This logic will release the lease (the lease manager will still
// cache it), and generate the offline descriptor error.
validateDescriptorForReturn := func(desc catalog.Descriptor,
expiration hlc.Timestamp) (catalog.Descriptor, hlc.Timestamp, error) {
if desc.Offline() {
if err := catalog.FilterDescriptorState(
desc, tree.CommonLookupFlags{},
); err != nil {
err2 := m.Release(desc)
if err2 != nil {
log.Warningf(ctx, "error releasing lease: %s", err2)
}
return nil, hlc.Timestamp{}, err
}
}
return desc, expiration, nil
}

// Check if we have cached an ID for this name.
descVersion := m.names.get(parentID, parentSchemaID, name, timestamp)
if descVersion != nil {
Expand All @@ -1434,7 +1412,7 @@ func (m *Manager) AcquireByName(
}
}
}
return validateDescriptorForReturn(descVersion.Descriptor, descVersion.expiration)
return descVersion.Descriptor, descVersion.expiration, nil
}
if err := m.Release(descVersion); err != nil {
return nil, hlc.Timestamp{}, err
Expand All @@ -1444,7 +1422,7 @@ func (m *Manager) AcquireByName(
if err != nil {
return nil, hlc.Timestamp{}, err
}
return validateDescriptorForReturn(desc, expiration)
return desc, expiration, nil
}

// We failed to find something in the cache, or what we found is not
Expand Down Expand Up @@ -1513,7 +1491,7 @@ func (m *Manager) AcquireByName(
return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound
}
}
return validateDescriptorForReturn(desc, expiration)
return desc, expiration, nil
}

// resolveName resolves a descriptor name to a descriptor ID at a particular
Expand Down Expand Up @@ -1716,11 +1694,11 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB)
}

id, version, name, state := descpb.GetDescriptorMetadata(desc)
dropped := state == descpb.DescriptorState_DROP
goingOffline := state == descpb.DescriptorState_DROP || state == descpb.DescriptorState_OFFLINE
// Try to refresh the lease to one >= this version.
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)",
id, version, dropped)
if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil {
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (offline %v)",
id, version, goingOffline)
if err := purgeOldVersions(ctx, db, id, goingOffline, version, m); err != nil {
log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s",
id, name, err)
}
Expand Down
148 changes: 1 addition & 147 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand All @@ -41,9 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqltestutils"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand All @@ -54,12 +50,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/lib/pq"
Expand Down Expand Up @@ -2361,7 +2355,6 @@ func TestLeaseWithOfflineTables(t *testing.T) {
func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
flags := tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc)
flags.CommonLookupFlags.IncludeOffline = true
flags.CommonLookupFlags.IncludeDropped = true
desc, err := descsCol.GetMutableTableByID(ctx, txn, testTableID(), flags)
require.NoError(t, err)
require.Equal(t, desc.State, expected)
Expand Down Expand Up @@ -2405,16 +2398,9 @@ func TestLeaseWithOfflineTables(t *testing.T) {
checkLeaseState(true /* shouldBePresent */)

// Take the table offline and back online again.
// This should not relinquish the lease anymore
// and offline ones will now be held.
// This should relinquish the lease.
setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_OFFLINE)
setTableState(descpb.DescriptorState_OFFLINE, descpb.DescriptorState_PUBLIC)
checkLeaseState(true /* shouldBePresent */)

// Take the table dropped and back online again.
// This should relinquish the lease.
setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_DROP)
setTableState(descpb.DescriptorState_DROP, descpb.DescriptorState_PUBLIC)
checkLeaseState(false /* shouldBePresent */)

// Query the table, thereby acquiring a lease once again.
Expand Down Expand Up @@ -2717,135 +2703,3 @@ func TestDropDescriptorRacesWithAcquisition(t *testing.T) {
return true
})
}

// TestOfflineLeaseRefresh validates that no live lock can occur,
// after a table is brought offline. Specifically a table a will be
// brought offline, and then one transaction will attempt to bring it
// online while another transaction will attempt to do a read. The read
// transaction could previously push back the lease of transaction
// trying to online the table perpetually (as seen in issue #61798).
func TestOfflineLeaseRefresh(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
waitForTxn := make(chan chan struct{})
waitForRqstFilter := make(chan chan struct{})
errorChan := make(chan error)
var txnID uuid.UUID
var mu syncutil.RWMutex

knobs := &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error {
mu.RLock()
checkRequest := req.Txn != nil && req.Txn.ID.Equal(txnID)
mu.RUnlock()
if _, ok := req.GetArg(roachpb.EndTxn); checkRequest && ok {
notify := make(chan struct{})
waitForRqstFilter <- notify
<-notify
}
return nil
},
}
params := base.TestServerArgs{Knobs: base.TestingKnobs{Store: knobs}}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params})
s := tc.Server(0)
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)

// Create t1 that will be offline, and t2,
// that will serve inserts.
_, err := conn.Exec(`
CREATE DATABASE d1;
CREATE TABLE d1.t1 (name int);
INSERT INTO d1.t1 values(5);
INSERT INTO d1.t1 values(5);
INSERT INTO d1.t1 values(5);
CREATE TABLE d1.t2 (name int);
`)
require.NoError(t, err)

tableID := descpb.InvalidID

// Force the table descriptor into a offline state
err = descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(),
func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
_, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, tree.NewTableNameWithSchema("d1", "public", "t1"), tree.ObjectLookupFlagsWithRequired())
if err != nil {
return err
}
tableDesc.SetOffline("For unit test")
err = descriptors.WriteDesc(ctx, false, tableDesc, txn)
if err != nil {
return err
}
tableID = tableDesc.ID
return nil
})
require.NoError(t, err)

_, err = s.LeaseManager().(*lease.Manager).WaitForOneVersion(ctx, tableID, retry.Options{})
require.NoError(t, err)

go func() {
err := descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager),
s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(),
func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
close(waitForRqstFilter)
mu.Lock()
waitForRqstFilter = make(chan chan struct{})
txnID = txn.ID()
mu.Unlock()

// Online the descriptor by making it public
_, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn,
tree.NewTableNameWithSchema("d1", "public", "t1"),
tree.ObjectLookupFlags{CommonLookupFlags: tree.CommonLookupFlags{
Required: true,
RequireMutable: true,
IncludeOffline: true,
AvoidCached: true,
}})
if err != nil {
return err
}
tableDesc.SetPublic()
err = descriptors.WriteDesc(ctx, false, tableDesc, txn)
if err != nil {
return err
}
// Allow the select on the table to proceed,
// so that it waits on the channel at the appropriate
// moment.
notify := make(chan struct{})
waitForTxn <- notify
<-notify

// Select from an unrelated table
_, err = s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
"insert into d1.t2 values (10);")
return err

})
close(waitForTxn)
close(waitForRqstFilter)
errorChan <- err
}()

for notify := range waitForTxn {
close(notify)
mu.RLock()
rqstFilterChannel := waitForRqstFilter
mu.RUnlock()
for notify2 := range rqstFilterChannel {
// Push the query trying to online the table out by
// leasing out the table again
_, err = conn.Query("select * from d1.t1")
require.EqualError(t, err, "pq: relation \"t1\" is offline: For unit test",
"Table offline error was not generated as expected")
close(notify2)
}
}
require.NoError(t, <-errorChan)
close(errorChan)
}

0 comments on commit 3e1c9f1

Please sign in to comment.