Skip to content

Commit

Permalink
catalog/lease: fix test flake in schema_repair
Browse files Browse the repository at this point in the history
Previously, the schema_repair test would delete a descriptor forcefully
and re-inject it with the same version. When using unsafe repair
functions, there is no lease based waits to confirm if the deletion is
completed inside the system.lease table. This test would not flake in
the past because the expiry based model allowed both the deleted and
re-added descriptors to have rows for the same version of the
descriptor. With the session based model are more strict and only allow
one row per descriptor version on a node. As a result its crucical that
in such repair operations either the version is bumped or we wait for
the lease manager to clean up the old descriptor from storage. To
address this, this patch adds a safety inside unsafe_upsert_descriptor
to wait for any leases to expire if a descriptor is being added back
again (only if the force option is not used). This ensures that
transactions running after this point only use the new version.

Fixes: #120445

Release note: None
  • Loading branch information
fqazi committed Mar 21, 2024
1 parent 860ba6f commit 48ebfd1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
14 changes: 10 additions & 4 deletions pkg/sql/catalog/lease/kv_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func leaseTableWithID(id descpb.ID, table systemschema.SystemTable) catalog.Tabl
}

func (w *kvWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
return w.do(ctx, txn, l, func(b *kv.Batch) error {
if err := w.do(ctx, txn, l, func(b *kv.Batch) error {
// We support writing both session based and expiry based leases within
// the KV writer. To be able to support a migration between the two types
// of writer will in some cases need to be able to write both types of leases.
Expand All @@ -91,11 +91,14 @@ func (w *kvWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields)
}
}
return nil
})
}); err != nil {
return errors.Wrapf(err, "failed to insert lease %v", l)
}
return nil
}

func (w *kvWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
return w.do(ctx, txn, l, func(b *kv.Batch) error {
if err := w.do(ctx, txn, l, func(b *kv.Batch) error {
// We support deleting both session based and expiry based leases within
// the KV writer. To be able to support a migration between the two types
// of writer will in some cases need to be able to delete both types of leases.
Expand All @@ -118,7 +121,10 @@ func (w *kvWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields)
}
}
return nil
})
}); err != nil {
return errors.Wrapf(err, "failed to delete lease: %v", l)
}
return nil
}

type addToBatchFunc = func(*kv.Batch) error
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/regions"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -174,6 +176,22 @@ func (p *planner) UnsafeUpsertDescriptor(
p.Descriptors().SkipValidationOnWrite()
}

// If we are pushing out a brand new descriptor confirm that no leases
// exist before we publish it. This could happen if we did an unsafe delete,
// since we will not wait for all leases to expire. So, as a safety force the
// unsafe upserts to wait for no leases to exist on this descriptor.
if !force &&
mut.GetVersion() == 1 {
execCfg := p.execCfg
regionCache, err := regions.NewCachedDatabaseRegions(ctx, execCfg.DB, execCfg.LeaseManager)
if err != nil {
return err
}
if err := execCfg.LeaseManager.WaitForNoVersion(ctx, mut.GetID(), regionCache, retry.Options{}); err != nil {
return err
}
}

{
b := p.txn.NewBatch()
if err := p.Descriptors().WriteDescToBatch(
Expand Down

0 comments on commit 48ebfd1

Please sign in to comment.