From db124b39d425ded34be49967e0a95da8263b5f08 Mon Sep 17 00:00:00 2001 From: Bob Vawter Date: Wed, 12 Apr 2023 16:12:41 -0400 Subject: [PATCH] cdc: Use lease package when retrieving work The locks created by a SELECT FOR UPDATE command are unreplicated and fragile if the leaseholder for the associated range is moved. This change creates an explicit lease for the destination schema to work around the limitation. https://github.com/cockroachdb/cockroach/issues/100194 --- internal/source/cdc/provider.go | 2 + internal/source/cdc/resolved_stamp.go | 57 +++++++++++++++++++------- internal/source/cdc/resolver.go | 33 ++++++++++++--- internal/source/cdc/resolver_test.go | 1 + internal/source/cdc/test_fixture.go | 2 + internal/source/cdc/wire_gen.go | 13 +++++- internal/source/server/injector.go | 2 + internal/source/server/test_fixture.go | 2 + internal/source/server/wire_gen.go | 25 ++++++++++- internal/target/leases/leases.go | 48 ++++++++++++++++++++++ internal/target/leases/leases_test.go | 22 ++++++++++ internal/target/leases/provider.go | 45 ++++++++++++++++++++ internal/types/types.go | 27 ++++++++++-- internal/util/retry/retry.go | 14 +++---- 14 files changed, 259 insertions(+), 34 deletions(-) create mode 100644 internal/target/leases/provider.go diff --git a/internal/source/cdc/provider.go b/internal/source/cdc/provider.go index 25983f6ea..aa2cc5a51 100644 --- a/internal/source/cdc/provider.go +++ b/internal/source/cdc/provider.go @@ -44,6 +44,7 @@ func ProvideMetaTable(cfg *Config) MetaTable { func ProvideResolvers( ctx context.Context, cfg *Config, + leases types.Leases, metaTable MetaTable, pool *pgxpool.Pool, stagers types.Stagers, @@ -55,6 +56,7 @@ func ProvideResolvers( ret := &Resolvers{ cfg: cfg, + leases: leases, metaTable: metaTable.Table(), pool: pool, stagers: stagers, diff --git a/internal/source/cdc/resolved_stamp.go b/internal/source/cdc/resolved_stamp.go index 02ae57759..2fadef850 100644 --- a/internal/source/cdc/resolved_stamp.go +++ b/internal/source/cdc/resolved_stamp.go @@ -69,7 +69,8 @@ type resolvedStamp struct { mu struct { sync.Mutex - tx *txguard.Guard + lease types.Lease // See note in [resolver.readIntoOnce]. + tx *txguard.Guard } } @@ -87,12 +88,25 @@ func (s *resolvedStamp) Commit(ctx context.Context) error { s.mu.Lock() defer s.mu.Unlock() - tx := s.mu.tx - if tx == nil { - return nil + var err error + if tx := s.mu.tx; tx != nil { + s.mu.tx = nil + err = errors.WithStack(tx.Commit(ctx)) + } + + if lease := s.mu.lease; lease != nil { + s.mu.lease = nil + lease.Release() } - s.mu.tx = nil - return errors.WithStack(tx.Commit(ctx)) + + return err +} + +// Context returns a context whose lifetime is bound to the lease. +func (s *resolvedStamp) Context() context.Context { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.lease.Context() } // IsAlive returns any pending error from the transaction-keepalive @@ -105,7 +119,10 @@ func (s *resolvedStamp) IsAlive() error { if tx == nil { return errors.New("resolved-timestamp transaction owned by later stamp") } - return tx.IsAlive() + if err := tx.IsAlive(); err != nil { + return err + } + return s.mu.lease.Context().Err() } // Less implements stamp.Stamp. @@ -129,7 +146,9 @@ func (s *resolvedStamp) NewCommitted() (*resolvedStamp, error) { // NewProposed returns a new resolvedStamp that extends the existing // stamp with a later proposed timestamp. -func (s *resolvedStamp) NewProposed(tx *txguard.Guard, proposed hlc.Time) (*resolvedStamp, error) { +func (s *resolvedStamp) NewProposed( + tx *txguard.Guard, proposed hlc.Time, lease types.Lease, +) (*resolvedStamp, error) { if hlc.Compare(proposed, s.CommittedTime) < 0 { return nil, errors.Errorf("proposed cannot roll back committed time: %s vs %s", proposed, s.CommittedTime) @@ -151,6 +170,7 @@ func (s *resolvedStamp) NewProposed(tx *txguard.Guard, proposed hlc.Time) (*reso ProposedTime: proposed, } // We don't call handoff here, since we have a new transaction. + ret.mu.lease = lease ret.mu.tx = tx return ret, nil } @@ -191,12 +211,15 @@ func (s *resolvedStamp) Rollback() { } s.mu.Lock() defer s.mu.Unlock() - tx := s.mu.tx - if tx == nil { - return + + if tx := s.mu.tx; tx != nil { + tx.Rollback() + s.mu.tx = nil + } + if lease := s.mu.lease; lease != nil { + lease.Release() + s.mu.lease = nil } - s.mu.tx = nil - tx.Rollback() } // String is for debugging use only. @@ -213,8 +236,14 @@ func (s *resolvedStamp) handoff(next *resolvedStamp) *resolvedStamp { defer s.mu.Unlock() if tx := s.mu.tx; tx != nil { - s.mu.tx = nil next.mu.tx = tx + s.mu.tx = nil } + + if lease := s.mu.lease; lease != nil { + next.mu.lease = lease + s.mu.lease = nil + } + return next } diff --git a/internal/source/cdc/resolver.go b/internal/source/cdc/resolver.go index 17e3adbb7..b16785035 100644 --- a/internal/source/cdc/resolver.go +++ b/internal/source/cdc/resolver.go @@ -55,6 +55,7 @@ CREATE TABLE IF NOT EXISTS %[1]s ( type resolver struct { cfg *Config fastWakeup chan struct{} + leases types.Leases loop *logical.Loop // Reference to driving loop, for testing. pool *pgxpool.Pool retirements chan hlc.Time // Drives a goroutine to remove applied mutations. @@ -78,6 +79,7 @@ var ( func newResolver( ctx context.Context, cfg *Config, + leases types.Leases, pool *pgxpool.Pool, metaTable ident.Table, stagers types.Stagers, @@ -92,6 +94,7 @@ func newResolver( ret := &resolver{ cfg: cfg, fastWakeup: make(chan struct{}, 1), + leases: leases, pool: pool, retirements: make(chan hlc.Time, 16), stagers: stagers, @@ -119,6 +122,7 @@ not_before AS ( SELECT source_nanos, source_logical FROM %[1]s WHERE target_db=$1 AND target_schema=$2 ORDER BY source_nanos desc, source_logical desc + FOR UPDATE LIMIT 1), to_insert AS ( SELECT $1::STRING, $2::STRING, $3::INT, $4::INT @@ -227,6 +231,19 @@ func (r *resolver) readInto( func (r *resolver) readIntoOnce( ctx context.Context, ch chan<- logical.Message, prev *resolvedStamp, backfill bool, ) error { + // Acquire a lease for the destination schema. + // + // This exists as a workaround for SELECT FOR UPDATE only using + // un-replicated locks. When the linked issue is closed, this + // hack can be removed. + // + // https://github.com/cockroachdb/cockroach/issues/100194 + lease, err := r.leases.Acquire(ctx, r.target.Raw()) + if errors.Is(err, &types.LeaseBusyError{}) { + return errBlocked + } else if err != nil { + return err + } // This transaction will be used to hold a SELECT FOR UPDATE lock on // the next resolved timestamp to be processed. It will eventually @@ -247,6 +264,7 @@ func (r *resolver) readIntoOnce( defer func() { if !sentWork { guard.Rollback() + lease.Release() } }() @@ -261,7 +279,7 @@ func (r *resolver) readIntoOnce( } // Create a new marker that verifies that we're rolling forward. - work, err := prev.NewProposed(guard, nextResolved) + work, err := prev.NewProposed(guard, nextResolved, lease) // It's possible that the call to Begin() above occurs prior to the // previous resolved-timestamp transaction committing. @@ -311,7 +329,7 @@ func (r *resolver) Process( } var err error - rs, err = r.process(ctx, msg.(*resolvedStamp), events) + rs, err = r.process(msg.(*resolvedStamp), events) if err != nil { return err } @@ -377,9 +395,11 @@ func (r *resolver) ZeroStamp() stamp.Stamp { // process makes incremental progress in fulfilling the given // resolvedStamp. It returns the state to which the resolved timestamp // has been advanced. -func (r *resolver) process( - ctx context.Context, rs *resolvedStamp, events logical.Events, -) (*resolvedStamp, error) { +func (r *resolver) process(rs *resolvedStamp, events logical.Events) (*resolvedStamp, error) { + // Use a context that will be canceled if the associated lease + // cannot be renewed. + ctx := rs.Context() + start := time.Now() targets := r.watcher.Snapshot(r.target).Order @@ -654,6 +674,7 @@ func (r *resolver) wake() { // Resolvers is a factory for Resolver instances. type Resolvers struct { cfg *Config + leases types.Leases noStart bool // Set by test code to disable call to loop.Start() metaTable ident.Table pool *pgxpool.Pool @@ -692,7 +713,7 @@ func (r *Resolvers) get(ctx context.Context, target ident.Schema) (*resolver, er return found, nil } - ret, err := newResolver(ctx, r.cfg, r.pool, r.metaTable, r.stagers, target, r.watchers) + ret, err := newResolver(ctx, r.cfg, r.leases, r.pool, r.metaTable, r.stagers, target, r.watchers) if err != nil { return nil, err } diff --git a/internal/source/cdc/resolver_test.go b/internal/source/cdc/resolver_test.go index 98eb7a0ae..f45c220f6 100644 --- a/internal/source/cdc/resolver_test.go +++ b/internal/source/cdc/resolver_test.go @@ -56,6 +56,7 @@ func TestResolverDeQueue(t *testing.T) { for i := int64(0); i < rowCount; i++ { r.NoError(resolver.Mark(ctx, hlc.New(i+1, 0))) + r.NoError(resolver.Mark(ctx, hlc.New(i, 0))) } log.Info("marked") diff --git a/internal/source/cdc/test_fixture.go b/internal/source/cdc/test_fixture.go index 3fdf98fa6..c638606f8 100644 --- a/internal/source/cdc/test_fixture.go +++ b/internal/source/cdc/test_fixture.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/script" "github.com/cockroachdb/cdc-sink/internal/source/logical" "github.com/cockroachdb/cdc-sink/internal/target/auth/trust" + "github.com/cockroachdb/cdc-sink/internal/target/leases" "github.com/cockroachdb/cdc-sink/internal/target/sinktest" "github.com/google/wire" ) @@ -33,6 +34,7 @@ func newTestFixture(*sinktest.Fixture, *Config) (*testFixture, func(), error) { wire.FieldsOf(new(*sinktest.BaseFixture), "Context"), wire.FieldsOf(new(*sinktest.Fixture), "Appliers", "BaseFixture", "Stagers", "Watchers"), + leases.Set, logical.Set, script.Set, trust.New, // Is valid to use as a provider. diff --git a/internal/source/cdc/wire_gen.go b/internal/source/cdc/wire_gen.go index 4c429616f..fb0800303 100644 --- a/internal/source/cdc/wire_gen.go +++ b/internal/source/cdc/wire_gen.go @@ -10,6 +10,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/script" "github.com/cockroachdb/cdc-sink/internal/source/logical" "github.com/cockroachdb/cdc-sink/internal/target/auth/trust" + "github.com/cockroachdb/cdc-sink/internal/target/leases" "github.com/cockroachdb/cdc-sink/internal/target/sinktest" ) @@ -36,10 +37,20 @@ func newTestFixture(fixture *sinktest.Fixture, config *Config) (*testFixture, fu if err != nil { return nil, nil, err } + stagingDB, err := logical.ProvideStagingDB(baseConfig) + if err != nil { + cleanup() + return nil, nil, err + } + typesLeases, err := leases.ProvideLeases(context, pool, stagingDB) + if err != nil { + cleanup() + return nil, nil, err + } metaTable := ProvideMetaTable(config) stagers := fixture.Stagers watchers := fixture.Watchers - resolvers, cleanup2, err := ProvideResolvers(context, config, metaTable, pool, stagers, watchers) + resolvers, cleanup2, err := ProvideResolvers(context, config, typesLeases, metaTable, pool, stagers, watchers) if err != nil { cleanup() return nil, nil, err diff --git a/internal/source/server/injector.go b/internal/source/server/injector.go index 0551f347f..8524c0555 100644 --- a/internal/source/server/injector.go +++ b/internal/source/server/injector.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/source/cdc" "github.com/cockroachdb/cdc-sink/internal/source/logical" "github.com/cockroachdb/cdc-sink/internal/target" + "github.com/cockroachdb/cdc-sink/internal/target/leases" "github.com/google/wire" ) @@ -27,6 +28,7 @@ func NewServer(ctx context.Context, config *Config) (*Server, func(), error) { panic(wire.Build( Set, cdc.Set, + leases.Set, logical.Set, script.Set, target.Set, diff --git a/internal/source/server/test_fixture.go b/internal/source/server/test_fixture.go index 1470ad5af..c0ce75db3 100644 --- a/internal/source/server/test_fixture.go +++ b/internal/source/server/test_fixture.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/source/cdc" "github.com/cockroachdb/cdc-sink/internal/source/logical" "github.com/cockroachdb/cdc-sink/internal/target" + "github.com/cockroachdb/cdc-sink/internal/target/leases" "github.com/cockroachdb/cdc-sink/internal/types" "github.com/cockroachdb/cdc-sink/internal/util/ident" "github.com/google/wire" @@ -43,6 +44,7 @@ func newTestFixture(context.Context, *Config) (*testFixture, func(), error) { panic(wire.Build( Set, cdc.Set, + leases.Set, logical.Set, script.Set, target.Set, diff --git a/internal/source/server/wire_gen.go b/internal/source/server/wire_gen.go index fe56e845b..268daa0fc 100644 --- a/internal/source/server/wire_gen.go +++ b/internal/source/server/wire_gen.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/source/cdc" "github.com/cockroachdb/cdc-sink/internal/source/logical" "github.com/cockroachdb/cdc-sink/internal/target/apply" + "github.com/cockroachdb/cdc-sink/internal/target/leases" "github.com/cockroachdb/cdc-sink/internal/target/schemawatch" "github.com/cockroachdb/cdc-sink/internal/target/stage" "github.com/cockroachdb/cdc-sink/internal/types" @@ -75,9 +76,19 @@ func NewServer(ctx context.Context, config *Config) (*Server, func(), error) { return nil, nil, err } cdcConfig := &config.CDC + typesLeases, err := leases.ProvideLeases(ctx, pool, stagingDB) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return nil, nil, err + } metaTable := cdc.ProvideMetaTable(cdcConfig) stagers := stage.ProvideFactory(pool, stagingDB) - resolvers, cleanup7, err := cdc.ProvideResolvers(ctx, cdcConfig, metaTable, pool, stagers, watchers) + resolvers, cleanup7, err := cdc.ProvideResolvers(ctx, cdcConfig, typesLeases, metaTable, pool, stagers, watchers) if err != nil { cleanup6() cleanup5() @@ -167,9 +178,19 @@ func newTestFixture(contextContext context.Context, config *Config) (*testFixtur watchers, cleanup5 := schemawatch.ProvideFactory(pool) appliers, cleanup6 := apply.ProvideFactory(configs, watchers) cdcConfig := &config.CDC + typesLeases, err := leases.ProvideLeases(contextContext, pool, stagingDB) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return nil, nil, err + } metaTable := cdc.ProvideMetaTable(cdcConfig) stagers := stage.ProvideFactory(pool, stagingDB) - resolvers, cleanup7, err := cdc.ProvideResolvers(contextContext, cdcConfig, metaTable, pool, stagers, watchers) + resolvers, cleanup7, err := cdc.ProvideResolvers(contextContext, cdcConfig, typesLeases, metaTable, pool, stagers, watchers) if err != nil { cleanup6() cleanup5() diff --git a/internal/target/leases/leases.go b/internal/target/leases/leases.go index 039899a29..34d26e136 100644 --- a/internal/target/leases/leases.go +++ b/internal/target/leases/leases.go @@ -15,6 +15,7 @@ import ( "context" "fmt" "math/rand" + "runtime" "time" "github.com/cockroachdb/cdc-sink/internal/types" @@ -88,6 +89,24 @@ type leases struct { } } +// leaseFacade implements the public types.Lease interface. +type leaseFacade struct { + cancel func() + ctx context.Context +} + +var _ types.Lease = (*leaseFacade)(nil) + +// Context implements types.Lease. +func (f *leaseFacade) Context() context.Context { + return f.ctx +} + +// Release implements types.Lease. +func (f *leaseFacade) Release() { + f.cancel() +} + var _ types.Leases = (*leases)(nil) const ( @@ -117,6 +136,35 @@ func New(ctx context.Context, cfg Config) (types.Leases, error) { return l, nil } +// Acquire the named lease, keep it alive, and return a facade. +func (l *leases) Acquire(ctx context.Context, name string) (types.Lease, error) { + acquired, ok, err := l.acquire(ctx, name) + if err != nil { + return nil, err + } + if !ok { + return nil, &types.LeaseBusyError{} + } + + ctx, cancel := context.WithCancel(ctx) + go func() { + l.keepRenewed(ctx, acquired) + cancel() + }() + + ret := &leaseFacade{ + cancel: func() { + _, _ = l.release(ctx, acquired) + cancel() + }, + ctx: ctx, + } + + runtime.SetFinalizer(ret, func(f *leaseFacade) { f.Release() }) + + return ret, nil +} + // Singleton executes a callback when the named lease is acquired. // // The lease will be released in the following circumstances: diff --git a/internal/target/leases/leases_test.go b/internal/target/leases/leases_test.go index 2381f6d22..66dd1ada0 100644 --- a/internal/target/leases/leases_test.go +++ b/internal/target/leases/leases_test.go @@ -269,6 +269,28 @@ func TestLeases(t *testing.T) { // Make sure the context has not timed out. a.NoError(ctx.Err()) }) + + t.Run("lease_facade", func(t *testing.T) { + a := assert.New(t) + + // Initial acquisition. + facade, err := l.Acquire(ctx, t.Name()) + a.NoError(err) + + // Verify that a duplicate fails. + _, err = l.Acquire(ctx, t.Name()) + a.ErrorIs(err, &types.LeaseBusyError{}) + + // Verify that releasing cancels the lease. + a.Nil(facade.Context().Err()) + facade.Release() + + a.ErrorIs(facade.Context().Err(), context.Canceled) + + // Re-acquisition should succeed. + _, err = l.Acquire(ctx, t.Name()) + a.NoError(err) + }) } func TestSanitize(t *testing.T) { diff --git a/internal/target/leases/provider.go b/internal/target/leases/provider.go new file mode 100644 index 000000000..440c1ee5d --- /dev/null +++ b/internal/target/leases/provider.go @@ -0,0 +1,45 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package leases + +import ( + "context" + "time" + + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/google/wire" + "github.com/jackc/pgx/v4/pgxpool" +) + +// Set is used by Wire. +var Set = wire.NewSet( + ProvideLeases, +) + +// ProvideLeases is called by Wire to configure the work-leasing strategy. +// +// This can be removed once stagingDB once SELECT FOR UPDATE uses +// replicated locks. +// +// https://github.com/cockroachdb/cockroach/issues/100194 +func ProvideLeases( + ctx context.Context, pool *pgxpool.Pool, stagingDB ident.StagingDB, +) (types.Leases, error) { + return New(ctx, Config{ + Guard: time.Second, + Lifetime: 5 * time.Second, + RetryDelay: time.Second, + Poll: time.Second, + Pool: pool, + Target: ident.NewTable(stagingDB.Ident(), ident.Public, ident.New("leases")), + }) +} diff --git a/internal/types/types.go b/internal/types/types.go index 08f2e543e..83b8ac810 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -48,12 +48,33 @@ type Authenticator interface { // Deadlines associate a column identifier with a duration. type Deadlines map[ident.Ident]time.Duration -// ErrCancelSingleton may be returned by callbacks passed to -// leases.Singleton to shut down cleanly. -var ErrCancelSingleton = errors.New("singleton requested cancellation") +var ( + // ErrCancelSingleton may be returned by callbacks passed to + // leases.Singleton to shut down cleanly. + ErrCancelSingleton = errors.New("singleton requested cancellation") +) + +// A Lease represents a time-based, exclusive lock. +type Lease interface { + // Context will be canceled when the lease has expired. + Context() context.Context + + // Release terminates the Lease. + Release() +} + +// LeaseBusyError is returned by [Leases.Acquire] if another caller +// holds the lease. +type LeaseBusyError struct{} + +func (e *LeaseBusyError) Error() string { return "lease is held by another caller" } // Leases coordinates behavior across multiple instances of cdc-sink. type Leases interface { + // Acquire the named lease. A [LeaseBusyError] will be returned if + // another caller has already acquired the lease. + Acquire(ctx context.Context, name string) (Lease, error) + // Singleton executes a callback when the named lease is acquired. // // The lease will be released in the following circumstances: diff --git a/internal/util/retry/retry.go b/internal/util/retry/retry.go index d01242fe5..01952ede8 100644 --- a/internal/util/retry/retry.go +++ b/internal/util/retry/retry.go @@ -15,10 +15,10 @@ package retry import ( "context" + "errors" "github.com/jackc/pgconn" "github.com/jackc/pgtype/pgxtype" - "github.com/pkg/errors" ) // Marker is a settable flag. @@ -50,7 +50,7 @@ func Retry(ctx context.Context, idempotent func(context.Context) error) error { } // inLoop is a key used by Loop to detect reentrant behavior. -var inLoop struct{} +type inLoop struct{} // Loop is a convenience wrapper to automatically retry idempotent // database operations that experience a transaction or a connection @@ -62,16 +62,14 @@ var inLoop struct{} // suppressed within an inner loop, allowing the retryable error to // percolate into the outer loop. func Loop(ctx context.Context, fn func(ctx context.Context, sideEffect *Marker) error) error { - top := ctx.Value(inLoop) == nil - if !top { - var sideEffect Marker - return fn(ctx, &sideEffect) + if outerMarker, ok := ctx.Value(inLoop{}).(*Marker); ok { + return fn(ctx, outerMarker) } - ctx = context.WithValue(ctx, inLoop, inLoop) + var sideEffect Marker + ctx = context.WithValue(ctx, inLoop{}, &sideEffect) actionsCount.Inc() for { - var sideEffect Marker err := fn(ctx, &sideEffect) if err == nil || sideEffect.Marked() { return err