diff --git a/internal/source/cdc/provider.go b/internal/source/cdc/provider.go index 25983f6ea..aa2cc5a51 100644 --- a/internal/source/cdc/provider.go +++ b/internal/source/cdc/provider.go @@ -44,6 +44,7 @@ func ProvideMetaTable(cfg *Config) MetaTable { func ProvideResolvers( ctx context.Context, cfg *Config, + leases types.Leases, metaTable MetaTable, pool *pgxpool.Pool, stagers types.Stagers, @@ -55,6 +56,7 @@ func ProvideResolvers( ret := &Resolvers{ cfg: cfg, + leases: leases, metaTable: metaTable.Table(), pool: pool, stagers: stagers, diff --git a/internal/source/cdc/resolved_stamp.go b/internal/source/cdc/resolved_stamp.go index 02ae57759..2fadef850 100644 --- a/internal/source/cdc/resolved_stamp.go +++ b/internal/source/cdc/resolved_stamp.go @@ -69,7 +69,8 @@ type resolvedStamp struct { mu struct { sync.Mutex - tx *txguard.Guard + lease types.Lease // See note in [resolver.readIntoOnce]. + tx *txguard.Guard } } @@ -87,12 +88,25 @@ func (s *resolvedStamp) Commit(ctx context.Context) error { s.mu.Lock() defer s.mu.Unlock() - tx := s.mu.tx - if tx == nil { - return nil + var err error + if tx := s.mu.tx; tx != nil { + s.mu.tx = nil + err = errors.WithStack(tx.Commit(ctx)) + } + + if lease := s.mu.lease; lease != nil { + s.mu.lease = nil + lease.Release() } - s.mu.tx = nil - return errors.WithStack(tx.Commit(ctx)) + + return err +} + +// Context returns a context whose lifetime is bound to the lease. +func (s *resolvedStamp) Context() context.Context { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.lease.Context() } // IsAlive returns any pending error from the transaction-keepalive @@ -105,7 +119,10 @@ func (s *resolvedStamp) IsAlive() error { if tx == nil { return errors.New("resolved-timestamp transaction owned by later stamp") } - return tx.IsAlive() + if err := tx.IsAlive(); err != nil { + return err + } + return s.mu.lease.Context().Err() } // Less implements stamp.Stamp. @@ -129,7 +146,9 @@ func (s *resolvedStamp) NewCommitted() (*resolvedStamp, error) { // NewProposed returns a new resolvedStamp that extends the existing // stamp with a later proposed timestamp. -func (s *resolvedStamp) NewProposed(tx *txguard.Guard, proposed hlc.Time) (*resolvedStamp, error) { +func (s *resolvedStamp) NewProposed( + tx *txguard.Guard, proposed hlc.Time, lease types.Lease, +) (*resolvedStamp, error) { if hlc.Compare(proposed, s.CommittedTime) < 0 { return nil, errors.Errorf("proposed cannot roll back committed time: %s vs %s", proposed, s.CommittedTime) @@ -151,6 +170,7 @@ func (s *resolvedStamp) NewProposed(tx *txguard.Guard, proposed hlc.Time) (*reso ProposedTime: proposed, } // We don't call handoff here, since we have a new transaction. + ret.mu.lease = lease ret.mu.tx = tx return ret, nil } @@ -191,12 +211,15 @@ func (s *resolvedStamp) Rollback() { } s.mu.Lock() defer s.mu.Unlock() - tx := s.mu.tx - if tx == nil { - return + + if tx := s.mu.tx; tx != nil { + tx.Rollback() + s.mu.tx = nil + } + if lease := s.mu.lease; lease != nil { + lease.Release() + s.mu.lease = nil } - s.mu.tx = nil - tx.Rollback() } // String is for debugging use only. @@ -213,8 +236,14 @@ func (s *resolvedStamp) handoff(next *resolvedStamp) *resolvedStamp { defer s.mu.Unlock() if tx := s.mu.tx; tx != nil { - s.mu.tx = nil next.mu.tx = tx + s.mu.tx = nil } + + if lease := s.mu.lease; lease != nil { + next.mu.lease = lease + s.mu.lease = nil + } + return next } diff --git a/internal/source/cdc/resolver.go b/internal/source/cdc/resolver.go index 17e3adbb7..b16785035 100644 --- a/internal/source/cdc/resolver.go +++ b/internal/source/cdc/resolver.go @@ -55,6 +55,7 @@ CREATE TABLE IF NOT EXISTS %[1]s ( type resolver struct { cfg *Config fastWakeup chan struct{} + leases types.Leases loop *logical.Loop // Reference to driving loop, for testing. pool *pgxpool.Pool retirements chan hlc.Time // Drives a goroutine to remove applied mutations. @@ -78,6 +79,7 @@ var ( func newResolver( ctx context.Context, cfg *Config, + leases types.Leases, pool *pgxpool.Pool, metaTable ident.Table, stagers types.Stagers, @@ -92,6 +94,7 @@ func newResolver( ret := &resolver{ cfg: cfg, fastWakeup: make(chan struct{}, 1), + leases: leases, pool: pool, retirements: make(chan hlc.Time, 16), stagers: stagers, @@ -119,6 +122,7 @@ not_before AS ( SELECT source_nanos, source_logical FROM %[1]s WHERE target_db=$1 AND target_schema=$2 ORDER BY source_nanos desc, source_logical desc + FOR UPDATE LIMIT 1), to_insert AS ( SELECT $1::STRING, $2::STRING, $3::INT, $4::INT @@ -227,6 +231,19 @@ func (r *resolver) readInto( func (r *resolver) readIntoOnce( ctx context.Context, ch chan<- logical.Message, prev *resolvedStamp, backfill bool, ) error { + // Acquire a lease for the destination schema. + // + // This exists as a workaround for SELECT FOR UPDATE only using + // un-replicated locks. When the linked issue is closed, this + // hack can be removed. + // + // https://github.com/cockroachdb/cockroach/issues/100194 + lease, err := r.leases.Acquire(ctx, r.target.Raw()) + if errors.Is(err, &types.LeaseBusyError{}) { + return errBlocked + } else if err != nil { + return err + } // This transaction will be used to hold a SELECT FOR UPDATE lock on // the next resolved timestamp to be processed. It will eventually @@ -247,6 +264,7 @@ func (r *resolver) readIntoOnce( defer func() { if !sentWork { guard.Rollback() + lease.Release() } }() @@ -261,7 +279,7 @@ func (r *resolver) readIntoOnce( } // Create a new marker that verifies that we're rolling forward. - work, err := prev.NewProposed(guard, nextResolved) + work, err := prev.NewProposed(guard, nextResolved, lease) // It's possible that the call to Begin() above occurs prior to the // previous resolved-timestamp transaction committing. @@ -311,7 +329,7 @@ func (r *resolver) Process( } var err error - rs, err = r.process(ctx, msg.(*resolvedStamp), events) + rs, err = r.process(msg.(*resolvedStamp), events) if err != nil { return err } @@ -377,9 +395,11 @@ func (r *resolver) ZeroStamp() stamp.Stamp { // process makes incremental progress in fulfilling the given // resolvedStamp. It returns the state to which the resolved timestamp // has been advanced. -func (r *resolver) process( - ctx context.Context, rs *resolvedStamp, events logical.Events, -) (*resolvedStamp, error) { +func (r *resolver) process(rs *resolvedStamp, events logical.Events) (*resolvedStamp, error) { + // Use a context that will be canceled if the associated lease + // cannot be renewed. + ctx := rs.Context() + start := time.Now() targets := r.watcher.Snapshot(r.target).Order @@ -654,6 +674,7 @@ func (r *resolver) wake() { // Resolvers is a factory for Resolver instances. type Resolvers struct { cfg *Config + leases types.Leases noStart bool // Set by test code to disable call to loop.Start() metaTable ident.Table pool *pgxpool.Pool @@ -692,7 +713,7 @@ func (r *Resolvers) get(ctx context.Context, target ident.Schema) (*resolver, er return found, nil } - ret, err := newResolver(ctx, r.cfg, r.pool, r.metaTable, r.stagers, target, r.watchers) + ret, err := newResolver(ctx, r.cfg, r.leases, r.pool, r.metaTable, r.stagers, target, r.watchers) if err != nil { return nil, err } diff --git a/internal/source/cdc/resolver_test.go b/internal/source/cdc/resolver_test.go index 98eb7a0ae..f45c220f6 100644 --- a/internal/source/cdc/resolver_test.go +++ b/internal/source/cdc/resolver_test.go @@ -56,6 +56,7 @@ func TestResolverDeQueue(t *testing.T) { for i := int64(0); i < rowCount; i++ { r.NoError(resolver.Mark(ctx, hlc.New(i+1, 0))) + r.NoError(resolver.Mark(ctx, hlc.New(i, 0))) } log.Info("marked") diff --git a/internal/source/cdc/test_fixture.go b/internal/source/cdc/test_fixture.go index 3fdf98fa6..c638606f8 100644 --- a/internal/source/cdc/test_fixture.go +++ b/internal/source/cdc/test_fixture.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/script" "github.com/cockroachdb/cdc-sink/internal/source/logical" "github.com/cockroachdb/cdc-sink/internal/target/auth/trust" + "github.com/cockroachdb/cdc-sink/internal/target/leases" "github.com/cockroachdb/cdc-sink/internal/target/sinktest" "github.com/google/wire" ) @@ -33,6 +34,7 @@ func newTestFixture(*sinktest.Fixture, *Config) (*testFixture, func(), error) { wire.FieldsOf(new(*sinktest.BaseFixture), "Context"), wire.FieldsOf(new(*sinktest.Fixture), "Appliers", "BaseFixture", "Stagers", "Watchers"), + leases.Set, logical.Set, script.Set, trust.New, // Is valid to use as a provider. diff --git a/internal/source/cdc/wire_gen.go b/internal/source/cdc/wire_gen.go index 4c429616f..fb0800303 100644 --- a/internal/source/cdc/wire_gen.go +++ b/internal/source/cdc/wire_gen.go @@ -10,6 +10,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/script" "github.com/cockroachdb/cdc-sink/internal/source/logical" "github.com/cockroachdb/cdc-sink/internal/target/auth/trust" + "github.com/cockroachdb/cdc-sink/internal/target/leases" "github.com/cockroachdb/cdc-sink/internal/target/sinktest" ) @@ -36,10 +37,20 @@ func newTestFixture(fixture *sinktest.Fixture, config *Config) (*testFixture, fu if err != nil { return nil, nil, err } + stagingDB, err := logical.ProvideStagingDB(baseConfig) + if err != nil { + cleanup() + return nil, nil, err + } + typesLeases, err := leases.ProvideLeases(context, pool, stagingDB) + if err != nil { + cleanup() + return nil, nil, err + } metaTable := ProvideMetaTable(config) stagers := fixture.Stagers watchers := fixture.Watchers - resolvers, cleanup2, err := ProvideResolvers(context, config, metaTable, pool, stagers, watchers) + resolvers, cleanup2, err := ProvideResolvers(context, config, typesLeases, metaTable, pool, stagers, watchers) if err != nil { cleanup() return nil, nil, err diff --git a/internal/source/server/injector.go b/internal/source/server/injector.go index 0551f347f..8524c0555 100644 --- a/internal/source/server/injector.go +++ b/internal/source/server/injector.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/source/cdc" "github.com/cockroachdb/cdc-sink/internal/source/logical" "github.com/cockroachdb/cdc-sink/internal/target" + "github.com/cockroachdb/cdc-sink/internal/target/leases" "github.com/google/wire" ) @@ -27,6 +28,7 @@ func NewServer(ctx context.Context, config *Config) (*Server, func(), error) { panic(wire.Build( Set, cdc.Set, + leases.Set, logical.Set, script.Set, target.Set, diff --git a/internal/source/server/test_fixture.go b/internal/source/server/test_fixture.go index 1470ad5af..c0ce75db3 100644 --- a/internal/source/server/test_fixture.go +++ b/internal/source/server/test_fixture.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/source/cdc" "github.com/cockroachdb/cdc-sink/internal/source/logical" "github.com/cockroachdb/cdc-sink/internal/target" + "github.com/cockroachdb/cdc-sink/internal/target/leases" "github.com/cockroachdb/cdc-sink/internal/types" "github.com/cockroachdb/cdc-sink/internal/util/ident" "github.com/google/wire" @@ -43,6 +44,7 @@ func newTestFixture(context.Context, *Config) (*testFixture, func(), error) { panic(wire.Build( Set, cdc.Set, + leases.Set, logical.Set, script.Set, target.Set, diff --git a/internal/source/server/wire_gen.go b/internal/source/server/wire_gen.go index fe56e845b..268daa0fc 100644 --- a/internal/source/server/wire_gen.go +++ b/internal/source/server/wire_gen.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/source/cdc" "github.com/cockroachdb/cdc-sink/internal/source/logical" "github.com/cockroachdb/cdc-sink/internal/target/apply" + "github.com/cockroachdb/cdc-sink/internal/target/leases" "github.com/cockroachdb/cdc-sink/internal/target/schemawatch" "github.com/cockroachdb/cdc-sink/internal/target/stage" "github.com/cockroachdb/cdc-sink/internal/types" @@ -75,9 +76,19 @@ func NewServer(ctx context.Context, config *Config) (*Server, func(), error) { return nil, nil, err } cdcConfig := &config.CDC + typesLeases, err := leases.ProvideLeases(ctx, pool, stagingDB) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return nil, nil, err + } metaTable := cdc.ProvideMetaTable(cdcConfig) stagers := stage.ProvideFactory(pool, stagingDB) - resolvers, cleanup7, err := cdc.ProvideResolvers(ctx, cdcConfig, metaTable, pool, stagers, watchers) + resolvers, cleanup7, err := cdc.ProvideResolvers(ctx, cdcConfig, typesLeases, metaTable, pool, stagers, watchers) if err != nil { cleanup6() cleanup5() @@ -167,9 +178,19 @@ func newTestFixture(contextContext context.Context, config *Config) (*testFixtur watchers, cleanup5 := schemawatch.ProvideFactory(pool) appliers, cleanup6 := apply.ProvideFactory(configs, watchers) cdcConfig := &config.CDC + typesLeases, err := leases.ProvideLeases(contextContext, pool, stagingDB) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return nil, nil, err + } metaTable := cdc.ProvideMetaTable(cdcConfig) stagers := stage.ProvideFactory(pool, stagingDB) - resolvers, cleanup7, err := cdc.ProvideResolvers(contextContext, cdcConfig, metaTable, pool, stagers, watchers) + resolvers, cleanup7, err := cdc.ProvideResolvers(contextContext, cdcConfig, typesLeases, metaTable, pool, stagers, watchers) if err != nil { cleanup6() cleanup5() diff --git a/internal/target/leases/leases.go b/internal/target/leases/leases.go index 039899a29..34d26e136 100644 --- a/internal/target/leases/leases.go +++ b/internal/target/leases/leases.go @@ -15,6 +15,7 @@ import ( "context" "fmt" "math/rand" + "runtime" "time" "github.com/cockroachdb/cdc-sink/internal/types" @@ -88,6 +89,24 @@ type leases struct { } } +// leaseFacade implements the public types.Lease interface. +type leaseFacade struct { + cancel func() + ctx context.Context +} + +var _ types.Lease = (*leaseFacade)(nil) + +// Context implements types.Lease. +func (f *leaseFacade) Context() context.Context { + return f.ctx +} + +// Release implements types.Lease. +func (f *leaseFacade) Release() { + f.cancel() +} + var _ types.Leases = (*leases)(nil) const ( @@ -117,6 +136,35 @@ func New(ctx context.Context, cfg Config) (types.Leases, error) { return l, nil } +// Acquire the named lease, keep it alive, and return a facade. +func (l *leases) Acquire(ctx context.Context, name string) (types.Lease, error) { + acquired, ok, err := l.acquire(ctx, name) + if err != nil { + return nil, err + } + if !ok { + return nil, &types.LeaseBusyError{} + } + + ctx, cancel := context.WithCancel(ctx) + go func() { + l.keepRenewed(ctx, acquired) + cancel() + }() + + ret := &leaseFacade{ + cancel: func() { + _, _ = l.release(ctx, acquired) + cancel() + }, + ctx: ctx, + } + + runtime.SetFinalizer(ret, func(f *leaseFacade) { f.Release() }) + + return ret, nil +} + // Singleton executes a callback when the named lease is acquired. // // The lease will be released in the following circumstances: diff --git a/internal/target/leases/leases_test.go b/internal/target/leases/leases_test.go index 2381f6d22..66dd1ada0 100644 --- a/internal/target/leases/leases_test.go +++ b/internal/target/leases/leases_test.go @@ -269,6 +269,28 @@ func TestLeases(t *testing.T) { // Make sure the context has not timed out. a.NoError(ctx.Err()) }) + + t.Run("lease_facade", func(t *testing.T) { + a := assert.New(t) + + // Initial acquisition. + facade, err := l.Acquire(ctx, t.Name()) + a.NoError(err) + + // Verify that a duplicate fails. + _, err = l.Acquire(ctx, t.Name()) + a.ErrorIs(err, &types.LeaseBusyError{}) + + // Verify that releasing cancels the lease. + a.Nil(facade.Context().Err()) + facade.Release() + + a.ErrorIs(facade.Context().Err(), context.Canceled) + + // Re-acquisition should succeed. + _, err = l.Acquire(ctx, t.Name()) + a.NoError(err) + }) } func TestSanitize(t *testing.T) { diff --git a/internal/target/leases/provider.go b/internal/target/leases/provider.go new file mode 100644 index 000000000..440c1ee5d --- /dev/null +++ b/internal/target/leases/provider.go @@ -0,0 +1,45 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package leases + +import ( + "context" + "time" + + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/google/wire" + "github.com/jackc/pgx/v4/pgxpool" +) + +// Set is used by Wire. +var Set = wire.NewSet( + ProvideLeases, +) + +// ProvideLeases is called by Wire to configure the work-leasing strategy. +// +// This can be removed once stagingDB once SELECT FOR UPDATE uses +// replicated locks. +// +// https://github.com/cockroachdb/cockroach/issues/100194 +func ProvideLeases( + ctx context.Context, pool *pgxpool.Pool, stagingDB ident.StagingDB, +) (types.Leases, error) { + return New(ctx, Config{ + Guard: time.Second, + Lifetime: 5 * time.Second, + RetryDelay: time.Second, + Poll: time.Second, + Pool: pool, + Target: ident.NewTable(stagingDB.Ident(), ident.Public, ident.New("leases")), + }) +} diff --git a/internal/types/types.go b/internal/types/types.go index 08f2e543e..83b8ac810 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -48,12 +48,33 @@ type Authenticator interface { // Deadlines associate a column identifier with a duration. type Deadlines map[ident.Ident]time.Duration -// ErrCancelSingleton may be returned by callbacks passed to -// leases.Singleton to shut down cleanly. -var ErrCancelSingleton = errors.New("singleton requested cancellation") +var ( + // ErrCancelSingleton may be returned by callbacks passed to + // leases.Singleton to shut down cleanly. + ErrCancelSingleton = errors.New("singleton requested cancellation") +) + +// A Lease represents a time-based, exclusive lock. +type Lease interface { + // Context will be canceled when the lease has expired. + Context() context.Context + + // Release terminates the Lease. + Release() +} + +// LeaseBusyError is returned by [Leases.Acquire] if another caller +// holds the lease. +type LeaseBusyError struct{} + +func (e *LeaseBusyError) Error() string { return "lease is held by another caller" } // Leases coordinates behavior across multiple instances of cdc-sink. type Leases interface { + // Acquire the named lease. A [LeaseBusyError] will be returned if + // another caller has already acquired the lease. + Acquire(ctx context.Context, name string) (Lease, error) + // Singleton executes a callback when the named lease is acquired. // // The lease will be released in the following circumstances: diff --git a/internal/util/retry/retry.go b/internal/util/retry/retry.go index d01242fe5..01952ede8 100644 --- a/internal/util/retry/retry.go +++ b/internal/util/retry/retry.go @@ -15,10 +15,10 @@ package retry import ( "context" + "errors" "github.com/jackc/pgconn" "github.com/jackc/pgtype/pgxtype" - "github.com/pkg/errors" ) // Marker is a settable flag. @@ -50,7 +50,7 @@ func Retry(ctx context.Context, idempotent func(context.Context) error) error { } // inLoop is a key used by Loop to detect reentrant behavior. -var inLoop struct{} +type inLoop struct{} // Loop is a convenience wrapper to automatically retry idempotent // database operations that experience a transaction or a connection @@ -62,16 +62,14 @@ var inLoop struct{} // suppressed within an inner loop, allowing the retryable error to // percolate into the outer loop. func Loop(ctx context.Context, fn func(ctx context.Context, sideEffect *Marker) error) error { - top := ctx.Value(inLoop) == nil - if !top { - var sideEffect Marker - return fn(ctx, &sideEffect) + if outerMarker, ok := ctx.Value(inLoop{}).(*Marker); ok { + return fn(ctx, outerMarker) } - ctx = context.WithValue(ctx, inLoop, inLoop) + var sideEffect Marker + ctx = context.WithValue(ctx, inLoop{}, &sideEffect) actionsCount.Inc() for { - var sideEffect Marker err := fn(ctx, &sideEffect) if err == nil || sideEffect.Marked() { return err