Skip to content

Commit

Permalink
sql: lease acquisition of OFFLINE descs may starve bulk operations
Browse files Browse the repository at this point in the history
Fixes: cockroachdb#61798

Previously, offline descriptors would never have their leases
cached and they would be released once the reference count hit zero.
This was inadequate because when attempting to online these tables
again the lease acquisition could be pushed back by other operations,
leading to starvation / live locks. To address this, this patch will
allow the leases of offline descriptors to be cached.

Release note (bug fix): Lease acquisitions of descriptor in a
offline state may starve out bulk operations (backup / restore)
  • Loading branch information
fqazi committed Mar 29, 2021
1 parent ff01389 commit 21bba87
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (tc *Collection) getLeasedDescriptorByName(
// Read the descriptor from the store in the face of some specific errors
// because of a known limitation of AcquireByName. See the known
// limitations of AcquireByName for details.
if catalog.HasInactiveDescriptorError(err) ||
if (catalog.HasInactiveDescriptorError(err) && errors.Is(err, catalog.ErrDescriptorDropped)) ||
errors.Is(err, catalog.ErrDescriptorNotFound) {
return nil, true, nil
}
Expand Down
56 changes: 44 additions & 12 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (s storage) acquire(
return err
}
if err := catalog.FilterDescriptorState(
desc, tree.CommonLookupFlags{}, // filter all non-public state
desc, tree.CommonLookupFlags{IncludeOffline: true}, // filter dropped only
); err != nil {
return err
}
Expand Down Expand Up @@ -984,7 +984,7 @@ func purgeOldVersions(
ctx context.Context,
db *kv.DB,
id descpb.ID,
takenOffline bool,
dropped bool,
minVersion descpb.DescriptorVersion,
m *Manager,
) error {
Expand All @@ -998,24 +998,24 @@ func purgeOldVersions(
}
empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0
t.mu.Unlock()
if empty && !takenOffline {
if empty && !dropped {
// We don't currently have a version on this descriptor, so no need to refresh
// anything.
return nil
}

removeInactives := func(takenOffline bool) {
removeInactives := func(dropped bool) {
t.mu.Lock()
t.mu.takenOffline = takenOffline
t.mu.takenOffline = dropped
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
}
}

if takenOffline {
removeInactives(true /* takenOffline */)
if dropped {
removeInactives(true /* dropped */)
return nil
}

Expand All @@ -1031,7 +1031,7 @@ func purgeOldVersions(
return errRenewLease
}
newest.incRefcount()
removeInactives(false /* takenOffline */)
removeInactives(false /* dropped */)
s, err := t.release(newest.Descriptor, m.removeOnceDereferenced())
if err != nil {
return err
Expand Down Expand Up @@ -1419,6 +1419,16 @@ func (m *Manager) AcquireByName(
}
}
}
// For offline descriptors acquire the lease, but return
// back that the descriptor is offline.
if descVersion.Offline() {
if err := catalog.FilterDescriptorState(
descVersion, tree.CommonLookupFlags{},
); err != nil {
m.Release(descVersion)
return nil, hlc.Timestamp{}, err
}
}
return descVersion.Descriptor, descVersion.expiration, nil
}
if err := m.Release(descVersion); err != nil {
Expand All @@ -1429,6 +1439,17 @@ func (m *Manager) AcquireByName(
if err != nil {
return nil, hlc.Timestamp{}, err
}

// For offline descriptors acquire the lease, but return
// back that the descriptor is offline.
if desc.Offline() {
if err := catalog.FilterDescriptorState(
desc, tree.CommonLookupFlags{},
); err != nil {
m.Release(descVersion)
return nil, hlc.Timestamp{}, err
}
}
return desc, expiration, nil
}

Expand Down Expand Up @@ -1498,6 +1519,17 @@ func (m *Manager) AcquireByName(
return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound
}
}

// For offline descriptors acquire the lease, but return
// back that the descriptor is offline.
if desc.Offline() {
if err := catalog.FilterDescriptorState(
desc, tree.CommonLookupFlags{},
); err != nil {
m.Release(descVersion)
return nil, hlc.Timestamp{}, err
}
}
return desc, expiration, nil
}

Expand Down Expand Up @@ -1713,11 +1745,11 @@ func (m *Manager) refreshLeases(
}

id, version, name, state := descpb.GetDescriptorMetadata(desc)
goingOffline := state == descpb.DescriptorState_DROP || state == descpb.DescriptorState_OFFLINE
dropped := state == descpb.DescriptorState_DROP
// Try to refresh the lease to one >= this version.
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (offline %v)",
id, version, goingOffline)
if err := purgeOldVersions(ctx, db, id, goingOffline, version, m); err != nil {
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)",
id, version, dropped)
if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil {
log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s",
id, name, err)
}
Expand Down
143 changes: 142 additions & 1 deletion pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand All @@ -39,7 +41,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqltestutils"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand All @@ -50,10 +54,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/lib/pq"
Expand Down Expand Up @@ -2361,6 +2367,7 @@ func TestLeaseWithOfflineTables(t *testing.T) {
func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
flags := tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc)
flags.CommonLookupFlags.IncludeOffline = true
flags.CommonLookupFlags.IncludeDropped = true
desc, err := descsCol.GetMutableTableByID(ctx, txn, testTableID(), flags)
require.NoError(t, err)
require.Equal(t, desc.State, expected)
Expand Down Expand Up @@ -2404,9 +2411,16 @@ func TestLeaseWithOfflineTables(t *testing.T) {
checkLeaseState(true /* shouldBePresent */)

// Take the table offline and back online again.
// This should relinquish the lease.
// This should not relinquish the lease anymore
// and offline ones will now be held.
setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_OFFLINE)
setTableState(descpb.DescriptorState_OFFLINE, descpb.DescriptorState_PUBLIC)
checkLeaseState(true /* shouldBePresent */)

// Take the table dropped and back online again.
// This should relinquish the lease.
setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_DROP)
setTableState(descpb.DescriptorState_DROP, descpb.DescriptorState_PUBLIC)
checkLeaseState(false /* shouldBePresent */)

// Query the table, thereby acquiring a lease once again.
Expand Down Expand Up @@ -2709,3 +2723,130 @@ func TestDropDescriptorRacesWithAcquisition(t *testing.T) {
return true
})
}

// TestOfflineLeaseRefresh validates that no live lock can occur,
// after a table is brought offline. Specifically a table a will be
// brought offline, and then one transaction will attempt to bring it
// online while another transaction will attempt to do a read. The read
// transaction could previously push back the lease of transaction
// trying to online the table perpetually (as seen in issue #61798).
func TestOfflineLeaseRefresh(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
waitForTxn := make(chan chan struct{})
waitForRqstFilter := make(chan chan struct{})
errorChan := make(chan error)
var txnID uuid.UUID
var mu syncutil.RWMutex

knobs := &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error {
mu.RLock()
checkRequest := req.Txn != nil && req.Txn.ID.Equal(txnID)
mu.RUnlock()
if _, ok := req.GetArg(roachpb.EndTxn); checkRequest && ok {
notify := make(chan struct{})
waitForRqstFilter <- notify
<-notify
}
return nil
},
}
params := base.TestServerArgs{Knobs: base.TestingKnobs{Store: knobs}}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params})
s := tc.Server(0)
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)

// Create t1 that will be offline, and t2,
// that will serve inserts.
_, err := conn.Exec(`
CREATE DATABASE d1;
CREATE TABLE d1.t1 (name int);
INSERT INTO d1.t1 values(5);
INSERT INTO d1.t1 values(5);
INSERT INTO d1.t1 values(5);
CREATE TABLE d1.t2 (name int);
`)
require.NoError(t, err)

tableID := descpb.InvalidID

// Force the table descriptor into a offline state
err = descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(),
func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
_, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, tree.NewTableNameWithSchema("d1", "public", "t1"), tree.ObjectLookupFlagsWithRequired())
if err != nil {
return err
}
tableDesc.SetOffline("For unit test")
descriptors.WriteDesc(ctx, false, tableDesc, txn)
tableID = tableDesc.ID
return nil
})
require.NoError(t, err)

_, err = s.LeaseManager().(*lease.Manager).WaitForOneVersion(ctx, tableID, retry.Options{})
require.NoError(t, err)

go func() {
err := descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager),
s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(),
func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
close(waitForRqstFilter)
mu.Lock()
waitForRqstFilter = make(chan chan struct{})
txnID = txn.ID()
mu.Unlock()

// Online the descriptor by making it public
_, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn,
tree.NewTableNameWithSchema("d1", "public", "t1"),
tree.ObjectLookupFlags{CommonLookupFlags: tree.CommonLookupFlags{
Required: true,
RequireMutable: true,
IncludeOffline: true,
AvoidCached: true,
}})
if err != nil {
return err
}
tableDesc.SetPublic()
descriptors.WriteDesc(ctx, false, tableDesc, txn)

// Allow the select on the table to proceed,
// so that it waits on the channel at the appropriate
// moment.
notify := make(chan struct{})
waitForTxn <- notify
<-notify

// Select from an unrelated table
_, err = s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
"insert into d1.t2 values (10);")
return err

})
close(waitForTxn)
close(waitForRqstFilter)
errorChan <- err
}()

for notify := range waitForTxn {
close(notify)
mu.RLock()
rqstFilterChannel := waitForRqstFilter
mu.RUnlock()
for notify2 := range rqstFilterChannel {
// Push the query trying to online the table out by
// leasing out the table again
_, err = conn.Query("select * from d1.t1")
require.EqualError(t, err, "pq: relation \"t1\" is offline: For unit test",
"Table offline error was not generated as expected")
close(notify2)
}
}
require.NoError(t, <-errorChan)
close(errorChan)
}

0 comments on commit 21bba87

Please sign in to comment.