Skip to content

Commit

Permalink
sql: lease acquisition of OFFLINE descs may starve bulk operations
Browse files Browse the repository at this point in the history
Fixes: cockroachdb#61798

Previously, offline descriptors would never have their leases
cached and they would be released once the reference count hit zero.
This was inadequate because when attempting to online these tables
again the lease acquisition could be pushed back by other operations,
leading to starvation / live locks. To address this, this patch will
allow the leases of offline descriptors to be cached.

Release note (bug fix): Lease acquisitions of descriptor in a
offline state may starve out bulk operations (backup / restore)
  • Loading branch information
fqazi committed Apr 1, 2021
1 parent 7160798 commit 12fd474
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 18 deletions.
16 changes: 15 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2257,11 +2257,18 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor
}
return nil
})

if err != nil {
return err
}

// Wait for the table to be public before completing.
for _, tbl := range details.Tables {
_, err := lm.WaitForOneVersion(ctx, tbl.Desc.ID, retry.Options{})
if err != nil {
return errors.Wrap(err, "publishing tables waiting for one version")
}
}

// Initiate a run of CREATE STATISTICS. We don't know the actual number of
// rows affected per table, so we use a large number because we want to make
// sure that stats always get created/refreshed here.
Expand Down Expand Up @@ -2309,6 +2316,13 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{})
}); err != nil {
return err
}
// Wait for the tables to become public before completing.
for _, tableDesc := range r.job.Details().(jobspb.ImportDetails).Tables {
_, err := cfg.LeaseManager.WaitForOneVersion(ctx, tableDesc.Desc.ID, retry.Options{})
if err != nil {
return errors.Wrap(err, "rolling back tables waiting for them to be public")
}
}

// Run any jobs which might have been queued when dropping the schemas.
// This would be a job to drop all the schemas, and a job to update the parent
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (tc *Collection) getLeasedDescriptorByName(
// Read the descriptor from the store in the face of some specific errors
// because of a known limitation of AcquireByName. See the known
// limitations of AcquireByName for details.
if catalog.HasInactiveDescriptorError(err) ||
if (catalog.HasInactiveDescriptorError(err) && errors.Is(err, catalog.ErrDescriptorDropped)) ||
errors.Is(err, catalog.ErrDescriptorNotFound) {
return nil, true, nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -68,7 +69,9 @@ go_test(
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqltestutils",
"//pkg/sql/sqlutil",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
Expand All @@ -85,6 +88,7 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_cockroach_go//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
52 changes: 37 additions & 15 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s storage) acquire(
return err
}
if err := catalog.FilterDescriptorState(
desc, tree.CommonLookupFlags{}, // filter all non-public state
desc, tree.CommonLookupFlags{IncludeOffline: true}, // filter dropped only
); err != nil {
return err
}
Expand Down Expand Up @@ -985,7 +985,7 @@ func purgeOldVersions(
ctx context.Context,
db *kv.DB,
id descpb.ID,
takenOffline bool,
dropped bool,
minVersion descpb.DescriptorVersion,
m *Manager,
) error {
Expand All @@ -999,24 +999,24 @@ func purgeOldVersions(
}
empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0
t.mu.Unlock()
if empty && !takenOffline {
if empty && !dropped {
// We don't currently have a version on this descriptor, so no need to refresh
// anything.
return nil
}

removeInactives := func(takenOffline bool) {
removeInactives := func(dropped bool) {
t.mu.Lock()
t.mu.takenOffline = takenOffline
t.mu.takenOffline = dropped
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
}
}

if takenOffline {
removeInactives(true /* takenOffline */)
if dropped {
removeInactives(true /* dropped */)
return nil
}

Expand All @@ -1032,7 +1032,7 @@ func purgeOldVersions(
return errRenewLease
}
newest.incRefcount()
removeInactives(false /* takenOffline */)
removeInactives(false /* dropped */)
s, err := t.release(newest.Descriptor, m.removeOnceDereferenced())
if err != nil {
return err
Expand Down Expand Up @@ -1402,6 +1402,28 @@ func (m *Manager) AcquireByName(
parentSchemaID descpb.ID,
name string,
) (catalog.Descriptor, hlc.Timestamp, error) {
// When offline descriptor leases were not allowed to be cached,
// attempt to acquire a lease on them would generate a descriptor
// offline error. Recent changes allow offline descriptor leases
// to be cached, but callers still need the offline error generated.
// This logic will release the lease (the lease manager will still
// cache it), and generate the offline descriptor error.
validateDescriptorForReturn := func(desc catalog.Descriptor,
expiration hlc.Timestamp) (catalog.Descriptor, hlc.Timestamp, error) {
if desc.Offline() {
if err := catalog.FilterDescriptorState(
desc, tree.CommonLookupFlags{},
); err != nil {
err2 := m.Release(desc)
if err2 != nil {
log.Warningf(ctx, "error releasing lease: %s", err2)
}
return nil, hlc.Timestamp{}, err
}
}
return desc, expiration, nil
}

// Check if we have cached an ID for this name.
descVersion := m.names.get(parentID, parentSchemaID, name, timestamp)
if descVersion != nil {
Expand All @@ -1416,7 +1438,7 @@ func (m *Manager) AcquireByName(
}
}
}
return descVersion.Descriptor, descVersion.expiration, nil
return validateDescriptorForReturn(descVersion.Descriptor, descVersion.expiration)
}
if err := m.Release(descVersion); err != nil {
return nil, hlc.Timestamp{}, err
Expand All @@ -1426,7 +1448,7 @@ func (m *Manager) AcquireByName(
if err != nil {
return nil, hlc.Timestamp{}, err
}
return desc, expiration, nil
return validateDescriptorForReturn(desc, expiration)
}

// We failed to find something in the cache, or what we found is not
Expand Down Expand Up @@ -1495,7 +1517,7 @@ func (m *Manager) AcquireByName(
return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound
}
}
return desc, expiration, nil
return validateDescriptorForReturn(desc, expiration)
}

// resolveName resolves a descriptor name to a descriptor ID at a particular
Expand Down Expand Up @@ -1698,11 +1720,11 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB)
}

id, version, name, state := descpb.GetDescriptorMetadata(desc)
goingOffline := state == descpb.DescriptorState_DROP || state == descpb.DescriptorState_OFFLINE
dropped := state == descpb.DescriptorState_DROP
// Try to refresh the lease to one >= this version.
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (offline %v)",
id, version, goingOffline)
if err := purgeOldVersions(ctx, db, id, goingOffline, version, m); err != nil {
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)",
id, version, dropped)
if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil {
log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s",
id, name, err)
}
Expand Down
148 changes: 147 additions & 1 deletion pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand All @@ -39,7 +41,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqltestutils"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand All @@ -50,10 +54,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/lib/pq"
Expand Down Expand Up @@ -2355,6 +2361,7 @@ func TestLeaseWithOfflineTables(t *testing.T) {
func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
flags := tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc)
flags.CommonLookupFlags.IncludeOffline = true
flags.CommonLookupFlags.IncludeDropped = true
desc, err := descsCol.GetMutableTableByID(ctx, txn, testTableID(), flags)
require.NoError(t, err)
require.Equal(t, desc.State, expected)
Expand Down Expand Up @@ -2398,9 +2405,16 @@ func TestLeaseWithOfflineTables(t *testing.T) {
checkLeaseState(true /* shouldBePresent */)

// Take the table offline and back online again.
// This should relinquish the lease.
// This should not relinquish the lease anymore
// and offline ones will now be held.
setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_OFFLINE)
setTableState(descpb.DescriptorState_OFFLINE, descpb.DescriptorState_PUBLIC)
checkLeaseState(true /* shouldBePresent */)

// Take the table dropped and back online again.
// This should relinquish the lease.
setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_DROP)
setTableState(descpb.DescriptorState_DROP, descpb.DescriptorState_PUBLIC)
checkLeaseState(false /* shouldBePresent */)

// Query the table, thereby acquiring a lease once again.
Expand Down Expand Up @@ -2703,3 +2717,135 @@ func TestDropDescriptorRacesWithAcquisition(t *testing.T) {
return true
})
}

// TestOfflineLeaseRefresh validates that no live lock can occur,
// after a table is brought offline. Specifically a table a will be
// brought offline, and then one transaction will attempt to bring it
// online while another transaction will attempt to do a read. The read
// transaction could previously push back the lease of transaction
// trying to online the table perpetually (as seen in issue #61798).
func TestOfflineLeaseRefresh(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
waitForTxn := make(chan chan struct{})
waitForRqstFilter := make(chan chan struct{})
errorChan := make(chan error)
var txnID uuid.UUID
var mu syncutil.RWMutex

knobs := &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error {
mu.RLock()
checkRequest := req.Txn != nil && req.Txn.ID.Equal(txnID)
mu.RUnlock()
if _, ok := req.GetArg(roachpb.EndTxn); checkRequest && ok {
notify := make(chan struct{})
waitForRqstFilter <- notify
<-notify
}
return nil
},
}
params := base.TestServerArgs{Knobs: base.TestingKnobs{Store: knobs}}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params})
s := tc.Server(0)
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)

// Create t1 that will be offline, and t2,
// that will serve inserts.
_, err := conn.Exec(`
CREATE DATABASE d1;
CREATE TABLE d1.t1 (name int);
INSERT INTO d1.t1 values(5);
INSERT INTO d1.t1 values(5);
INSERT INTO d1.t1 values(5);
CREATE TABLE d1.t2 (name int);
`)
require.NoError(t, err)

tableID := descpb.InvalidID

// Force the table descriptor into a offline state
err = descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(),
func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
_, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, tree.NewTableNameWithSchema("d1", "public", "t1"), tree.ObjectLookupFlagsWithRequired())
if err != nil {
return err
}
tableDesc.SetOffline("For unit test")
err = descriptors.WriteDesc(ctx, false, tableDesc, txn)
if err != nil {
return err
}
tableID = tableDesc.ID
return nil
})
require.NoError(t, err)

_, err = s.LeaseManager().(*lease.Manager).WaitForOneVersion(ctx, tableID, retry.Options{})
require.NoError(t, err)

go func() {
err := descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager),
s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(),
func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
close(waitForRqstFilter)
mu.Lock()
waitForRqstFilter = make(chan chan struct{})
txnID = txn.ID()
mu.Unlock()

// Online the descriptor by making it public
_, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn,
tree.NewTableNameWithSchema("d1", "public", "t1"),
tree.ObjectLookupFlags{CommonLookupFlags: tree.CommonLookupFlags{
Required: true,
RequireMutable: true,
IncludeOffline: true,
AvoidCached: true,
}})
if err != nil {
return err
}
tableDesc.SetPublic()
err = descriptors.WriteDesc(ctx, false, tableDesc, txn)
if err != nil {
return err
}
// Allow the select on the table to proceed,
// so that it waits on the channel at the appropriate
// moment.
notify := make(chan struct{})
waitForTxn <- notify
<-notify

// Select from an unrelated table
_, err = s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
"insert into d1.t2 values (10);")
return err

})
close(waitForTxn)
close(waitForRqstFilter)
errorChan <- err
}()

for notify := range waitForTxn {
close(notify)
mu.RLock()
rqstFilterChannel := waitForRqstFilter
mu.RUnlock()
for notify2 := range rqstFilterChannel {
// Push the query trying to online the table out by
// leasing out the table again
_, err = conn.Query("select * from d1.t1")
require.EqualError(t, err, "pq: relation \"t1\" is offline: For unit test",
"Table offline error was not generated as expected")
close(notify2)
}
}
require.NoError(t, <-errorChan)
close(errorChan)
}

0 comments on commit 12fd474

Please sign in to comment.