Skip to content

Commit

Permalink
cdc: Use lease package when retrieving work
Browse files Browse the repository at this point in the history
The locks created by a SELECT FOR UPDATE command are unreplicated and fragile
if the leaseholder for the associated range is moved. This change creates an
explicit lease for the destination schema to work around the limitation.

cockroachdb/cockroach#100194
  • Loading branch information
bobvawter committed Apr 12, 2023
1 parent 0cf97a1 commit 6c68eec
Show file tree
Hide file tree
Showing 14 changed files with 259 additions and 34 deletions.
2 changes: 2 additions & 0 deletions internal/source/cdc/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func ProvideMetaTable(cfg *Config) MetaTable {
func ProvideResolvers(
ctx context.Context,
cfg *Config,
leases types.Leases,
metaTable MetaTable,
pool *pgxpool.Pool,
stagers types.Stagers,
Expand All @@ -55,6 +56,7 @@ func ProvideResolvers(

ret := &Resolvers{
cfg: cfg,
leases: leases,
metaTable: metaTable.Table(),
pool: pool,
stagers: stagers,
Expand Down
57 changes: 43 additions & 14 deletions internal/source/cdc/resolved_stamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type resolvedStamp struct {

mu struct {
sync.Mutex
tx *txguard.Guard
lease types.Lease // See note in [resolver.readIntoOnce].
tx *txguard.Guard
}
}

Expand All @@ -87,12 +88,25 @@ func (s *resolvedStamp) Commit(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()

tx := s.mu.tx
if tx == nil {
return nil
var err error
if tx := s.mu.tx; tx != nil {
s.mu.tx = nil
err = errors.WithStack(tx.Commit(ctx))
}

if lease := s.mu.lease; lease != nil {
s.mu.lease = nil
lease.Release()
}
s.mu.tx = nil
return errors.WithStack(tx.Commit(ctx))

return err
}

// Context returns a context whose lifetime is bound to the lease.
func (s *resolvedStamp) Context() context.Context {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.lease.Context()
}

// IsAlive returns any pending error from the transaction-keepalive
Expand All @@ -105,7 +119,10 @@ func (s *resolvedStamp) IsAlive() error {
if tx == nil {
return errors.New("resolved-timestamp transaction owned by later stamp")
}
return tx.IsAlive()
if err := tx.IsAlive(); err != nil {
return err
}
return s.mu.lease.Context().Err()
}

// Less implements stamp.Stamp.
Expand All @@ -129,7 +146,9 @@ func (s *resolvedStamp) NewCommitted() (*resolvedStamp, error) {

// NewProposed returns a new resolvedStamp that extends the existing
// stamp with a later proposed timestamp.
func (s *resolvedStamp) NewProposed(tx *txguard.Guard, proposed hlc.Time) (*resolvedStamp, error) {
func (s *resolvedStamp) NewProposed(
tx *txguard.Guard, proposed hlc.Time, lease types.Lease,
) (*resolvedStamp, error) {
if hlc.Compare(proposed, s.CommittedTime) < 0 {
return nil, errors.Errorf("proposed cannot roll back committed time: %s vs %s",
proposed, s.CommittedTime)
Expand All @@ -151,6 +170,7 @@ func (s *resolvedStamp) NewProposed(tx *txguard.Guard, proposed hlc.Time) (*reso
ProposedTime: proposed,
}
// We don't call handoff here, since we have a new transaction.
ret.mu.lease = lease
ret.mu.tx = tx
return ret, nil
}
Expand Down Expand Up @@ -191,12 +211,15 @@ func (s *resolvedStamp) Rollback() {
}
s.mu.Lock()
defer s.mu.Unlock()
tx := s.mu.tx
if tx == nil {
return

if tx := s.mu.tx; tx != nil {
tx.Rollback()
s.mu.tx = nil
}
if lease := s.mu.lease; lease != nil {
lease.Release()
s.mu.lease = nil
}
s.mu.tx = nil
tx.Rollback()
}

// String is for debugging use only.
Expand All @@ -213,8 +236,14 @@ func (s *resolvedStamp) handoff(next *resolvedStamp) *resolvedStamp {
defer s.mu.Unlock()

if tx := s.mu.tx; tx != nil {
s.mu.tx = nil
next.mu.tx = tx
s.mu.tx = nil
}

if lease := s.mu.lease; lease != nil {
next.mu.lease = lease
s.mu.lease = nil
}

return next
}
33 changes: 27 additions & 6 deletions internal/source/cdc/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ CREATE TABLE IF NOT EXISTS %[1]s (
type resolver struct {
cfg *Config
fastWakeup chan struct{}
leases types.Leases
loop *logical.Loop // Reference to driving loop, for testing.
pool *pgxpool.Pool
retirements chan hlc.Time // Drives a goroutine to remove applied mutations.
Expand All @@ -78,6 +79,7 @@ var (
func newResolver(
ctx context.Context,
cfg *Config,
leases types.Leases,
pool *pgxpool.Pool,
metaTable ident.Table,
stagers types.Stagers,
Expand All @@ -92,6 +94,7 @@ func newResolver(
ret := &resolver{
cfg: cfg,
fastWakeup: make(chan struct{}, 1),
leases: leases,
pool: pool,
retirements: make(chan hlc.Time, 16),
stagers: stagers,
Expand Down Expand Up @@ -119,6 +122,7 @@ not_before AS (
SELECT source_nanos, source_logical FROM %[1]s
WHERE target_db=$1 AND target_schema=$2
ORDER BY source_nanos desc, source_logical desc
FOR UPDATE
LIMIT 1),
to_insert AS (
SELECT $1::STRING, $2::STRING, $3::INT, $4::INT
Expand Down Expand Up @@ -227,6 +231,19 @@ func (r *resolver) readInto(
func (r *resolver) readIntoOnce(
ctx context.Context, ch chan<- logical.Message, prev *resolvedStamp, backfill bool,
) error {
// Acquire a lease for the destination schema.
//
// This exists as a workaround for SELECT FOR UPDATE only using
// un-replicated locks. When the linked issue is closed, this
// hack can be removed.
//
// https://github.com/cockroachdb/cockroach/issues/100194
lease, err := r.leases.Acquire(ctx, r.target.Raw())
if errors.Is(err, &types.LeaseBusyError{}) {
return errBlocked
} else if err != nil {
return err
}

// This transaction will be used to hold a SELECT FOR UPDATE lock on
// the next resolved timestamp to be processed. It will eventually
Expand All @@ -247,6 +264,7 @@ func (r *resolver) readIntoOnce(
defer func() {
if !sentWork {
guard.Rollback()
lease.Release()
}
}()

Expand All @@ -261,7 +279,7 @@ func (r *resolver) readIntoOnce(
}

// Create a new marker that verifies that we're rolling forward.
work, err := prev.NewProposed(guard, nextResolved)
work, err := prev.NewProposed(guard, nextResolved, lease)

// It's possible that the call to Begin() above occurs prior to the
// previous resolved-timestamp transaction committing.
Expand Down Expand Up @@ -311,7 +329,7 @@ func (r *resolver) Process(
}

var err error
rs, err = r.process(ctx, msg.(*resolvedStamp), events)
rs, err = r.process(msg.(*resolvedStamp), events)
if err != nil {
return err
}
Expand Down Expand Up @@ -377,9 +395,11 @@ func (r *resolver) ZeroStamp() stamp.Stamp {
// process makes incremental progress in fulfilling the given
// resolvedStamp. It returns the state to which the resolved timestamp
// has been advanced.
func (r *resolver) process(
ctx context.Context, rs *resolvedStamp, events logical.Events,
) (*resolvedStamp, error) {
func (r *resolver) process(rs *resolvedStamp, events logical.Events) (*resolvedStamp, error) {
// Use a context that will be canceled if the associated lease
// cannot be renewed.
ctx := rs.Context()

start := time.Now()
targets := r.watcher.Snapshot(r.target).Order

Expand Down Expand Up @@ -654,6 +674,7 @@ func (r *resolver) wake() {
// Resolvers is a factory for Resolver instances.
type Resolvers struct {
cfg *Config
leases types.Leases
noStart bool // Set by test code to disable call to loop.Start()
metaTable ident.Table
pool *pgxpool.Pool
Expand Down Expand Up @@ -692,7 +713,7 @@ func (r *Resolvers) get(ctx context.Context, target ident.Schema) (*resolver, er
return found, nil
}

ret, err := newResolver(ctx, r.cfg, r.pool, r.metaTable, r.stagers, target, r.watchers)
ret, err := newResolver(ctx, r.cfg, r.leases, r.pool, r.metaTable, r.stagers, target, r.watchers)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions internal/source/cdc/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestResolverDeQueue(t *testing.T) {

for i := int64(0); i < rowCount; i++ {
r.NoError(resolver.Mark(ctx, hlc.New(i+1, 0)))
r.NoError(resolver.Mark(ctx, hlc.New(i, 0)))
}

log.Info("marked")
Expand Down
2 changes: 2 additions & 0 deletions internal/source/cdc/test_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cdc-sink/internal/script"
"github.com/cockroachdb/cdc-sink/internal/source/logical"
"github.com/cockroachdb/cdc-sink/internal/target/auth/trust"
"github.com/cockroachdb/cdc-sink/internal/target/leases"
"github.com/cockroachdb/cdc-sink/internal/target/sinktest"
"github.com/google/wire"
)
Expand All @@ -33,6 +34,7 @@ func newTestFixture(*sinktest.Fixture, *Config) (*testFixture, func(), error) {
wire.FieldsOf(new(*sinktest.BaseFixture), "Context"),
wire.FieldsOf(new(*sinktest.Fixture),
"Appliers", "BaseFixture", "Stagers", "Watchers"),
leases.Set,
logical.Set,
script.Set,
trust.New, // Is valid to use as a provider.
Expand Down
13 changes: 12 additions & 1 deletion internal/source/cdc/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions internal/source/server/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"github.com/cockroachdb/cdc-sink/internal/source/cdc"
"github.com/cockroachdb/cdc-sink/internal/source/logical"
"github.com/cockroachdb/cdc-sink/internal/target"
"github.com/cockroachdb/cdc-sink/internal/target/leases"
"github.com/google/wire"
)

func NewServer(ctx context.Context, config *Config) (*Server, func(), error) {
panic(wire.Build(
Set,
cdc.Set,
leases.Set,
logical.Set,
script.Set,
target.Set,
Expand Down
2 changes: 2 additions & 0 deletions internal/source/server/test_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cdc-sink/internal/source/cdc"
"github.com/cockroachdb/cdc-sink/internal/source/logical"
"github.com/cockroachdb/cdc-sink/internal/target"
"github.com/cockroachdb/cdc-sink/internal/target/leases"
"github.com/cockroachdb/cdc-sink/internal/types"
"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/google/wire"
Expand All @@ -43,6 +44,7 @@ func newTestFixture(context.Context, *Config) (*testFixture, func(), error) {
panic(wire.Build(
Set,
cdc.Set,
leases.Set,
logical.Set,
script.Set,
target.Set,
Expand Down
25 changes: 23 additions & 2 deletions internal/source/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6c68eec

Please sign in to comment.