Skip to content

Commit

Permalink
cdc: Use lease package when retrieving work
Browse files Browse the repository at this point in the history
The locks created by a SELECT FOR UPDATE command are unreplicated and fragile
if the leaseholder for the associated range is moved. This change creates an
explicit lease for the destination schema to work around the limitation.

cockroachdb/cockroach#100194
  • Loading branch information
bobvawter committed Apr 12, 2023
1 parent 0cf97a1 commit c9871e2
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 30 deletions.
2 changes: 2 additions & 0 deletions internal/source/cdc/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func ProvideMetaTable(cfg *Config) MetaTable {
func ProvideResolvers(
ctx context.Context,
cfg *Config,
leases types.Leases,
metaTable MetaTable,
pool *pgxpool.Pool,
stagers types.Stagers,
Expand All @@ -55,6 +56,7 @@ func ProvideResolvers(

ret := &Resolvers{
cfg: cfg,
leases: leases,
metaTable: metaTable.Table(),
pool: pool,
stagers: stagers,
Expand Down
50 changes: 36 additions & 14 deletions internal/source/cdc/resolved_stamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type resolvedStamp struct {

mu struct {
sync.Mutex
tx *txguard.Guard
lease types.Lease // See note in [resolver.readIntoOnce].
tx *txguard.Guard
}
}

Expand All @@ -87,12 +88,18 @@ func (s *resolvedStamp) Commit(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()

tx := s.mu.tx
if tx == nil {
return nil
var err error
if tx := s.mu.tx; tx != nil {
s.mu.tx = nil
err = errors.WithStack(tx.Commit(ctx))
}

if lease := s.mu.lease; lease != nil {
s.mu.lease = nil
lease.Release()
}
s.mu.tx = nil
return errors.WithStack(tx.Commit(ctx))

return err
}

// IsAlive returns any pending error from the transaction-keepalive
Expand All @@ -105,7 +112,10 @@ func (s *resolvedStamp) IsAlive() error {
if tx == nil {
return errors.New("resolved-timestamp transaction owned by later stamp")
}
return tx.IsAlive()
if err := tx.IsAlive(); err != nil {
return err
}
return s.mu.lease.Context().Err()
}

// Less implements stamp.Stamp.
Expand All @@ -129,7 +139,9 @@ func (s *resolvedStamp) NewCommitted() (*resolvedStamp, error) {

// NewProposed returns a new resolvedStamp that extends the existing
// stamp with a later proposed timestamp.
func (s *resolvedStamp) NewProposed(tx *txguard.Guard, proposed hlc.Time) (*resolvedStamp, error) {
func (s *resolvedStamp) NewProposed(
tx *txguard.Guard, proposed hlc.Time, lease types.Lease,
) (*resolvedStamp, error) {
if hlc.Compare(proposed, s.CommittedTime) < 0 {
return nil, errors.Errorf("proposed cannot roll back committed time: %s vs %s",
proposed, s.CommittedTime)
Expand All @@ -151,6 +163,7 @@ func (s *resolvedStamp) NewProposed(tx *txguard.Guard, proposed hlc.Time) (*reso
ProposedTime: proposed,
}
// We don't call handoff here, since we have a new transaction.
ret.mu.lease = lease
ret.mu.tx = tx
return ret, nil
}
Expand Down Expand Up @@ -191,12 +204,15 @@ func (s *resolvedStamp) Rollback() {
}
s.mu.Lock()
defer s.mu.Unlock()
tx := s.mu.tx
if tx == nil {
return

if tx := s.mu.tx; tx != nil {
tx.Rollback()
s.mu.tx = nil
}
if lease := s.mu.lease; lease != nil {
lease.Release()
s.mu.lease = nil
}
s.mu.tx = nil
tx.Rollback()
}

// String is for debugging use only.
Expand All @@ -213,8 +229,14 @@ func (s *resolvedStamp) handoff(next *resolvedStamp) *resolvedStamp {
defer s.mu.Unlock()

if tx := s.mu.tx; tx != nil {
s.mu.tx = nil
next.mu.tx = tx
s.mu.tx = nil
}

if lease := s.mu.lease; lease != nil {
next.mu.lease = lease
s.mu.lease = nil
}

return next
}
23 changes: 21 additions & 2 deletions internal/source/cdc/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ CREATE TABLE IF NOT EXISTS %[1]s (
type resolver struct {
cfg *Config
fastWakeup chan struct{}
leases types.Leases
loop *logical.Loop // Reference to driving loop, for testing.
pool *pgxpool.Pool
retirements chan hlc.Time // Drives a goroutine to remove applied mutations.
Expand All @@ -78,6 +79,7 @@ var (
func newResolver(
ctx context.Context,
cfg *Config,
leases types.Leases,
pool *pgxpool.Pool,
metaTable ident.Table,
stagers types.Stagers,
Expand All @@ -92,6 +94,7 @@ func newResolver(
ret := &resolver{
cfg: cfg,
fastWakeup: make(chan struct{}, 1),
leases: leases,
pool: pool,
retirements: make(chan hlc.Time, 16),
stagers: stagers,
Expand Down Expand Up @@ -119,6 +122,7 @@ not_before AS (
SELECT source_nanos, source_logical FROM %[1]s
WHERE target_db=$1 AND target_schema=$2
ORDER BY source_nanos desc, source_logical desc
FOR UPDATE
LIMIT 1),
to_insert AS (
SELECT $1::STRING, $2::STRING, $3::INT, $4::INT
Expand Down Expand Up @@ -227,6 +231,19 @@ func (r *resolver) readInto(
func (r *resolver) readIntoOnce(
ctx context.Context, ch chan<- logical.Message, prev *resolvedStamp, backfill bool,
) error {
// Acquire a lease for the destination schema.
//
// This exists as a workaround for SELECT FOR UPDATE only using
// un-replicated locks. When the linked issue is closed, this
// hack can be removed.
//
// https://github.com/cockroachdb/cockroach/issues/100194
lease, err := r.leases.Acquire(ctx, r.target.Raw())
if errors.Is(err, &types.LeaseBusyError{}) {
return errBlocked
} else if err != nil {
return err
}

// This transaction will be used to hold a SELECT FOR UPDATE lock on
// the next resolved timestamp to be processed. It will eventually
Expand All @@ -247,6 +264,7 @@ func (r *resolver) readIntoOnce(
defer func() {
if !sentWork {
guard.Rollback()
lease.Release()
}
}()

Expand All @@ -261,7 +279,7 @@ func (r *resolver) readIntoOnce(
}

// Create a new marker that verifies that we're rolling forward.
work, err := prev.NewProposed(guard, nextResolved)
work, err := prev.NewProposed(guard, nextResolved, lease)

// It's possible that the call to Begin() above occurs prior to the
// previous resolved-timestamp transaction committing.
Expand Down Expand Up @@ -654,6 +672,7 @@ func (r *resolver) wake() {
// Resolvers is a factory for Resolver instances.
type Resolvers struct {
cfg *Config
leases types.Leases
noStart bool // Set by test code to disable call to loop.Start()
metaTable ident.Table
pool *pgxpool.Pool
Expand Down Expand Up @@ -692,7 +711,7 @@ func (r *Resolvers) get(ctx context.Context, target ident.Schema) (*resolver, er
return found, nil
}

ret, err := newResolver(ctx, r.cfg, r.pool, r.metaTable, r.stagers, target, r.watchers)
ret, err := newResolver(ctx, r.cfg, r.leases, r.pool, r.metaTable, r.stagers, target, r.watchers)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions internal/source/cdc/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestResolverDeQueue(t *testing.T) {

for i := int64(0); i < rowCount; i++ {
r.NoError(resolver.Mark(ctx, hlc.New(i+1, 0)))
r.NoError(resolver.Mark(ctx, hlc.New(i, 0)))
}

log.Info("marked")
Expand Down
1 change: 1 addition & 0 deletions internal/source/cdc/test_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type testFixture struct {
func newTestFixture(*sinktest.Fixture, *Config) (*testFixture, func(), error) {
panic(wire.Build(
Set,
sinktest.ProvideLeases,
wire.FieldsOf(new(*sinktest.BaseFixture), "Context"),
wire.FieldsOf(new(*sinktest.Fixture),
"Appliers", "BaseFixture", "Stagers", "Watchers"),
Expand Down
12 changes: 11 additions & 1 deletion internal/source/cdc/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions internal/source/server/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cdc-sink/internal/target/apply"
"github.com/cockroachdb/cdc-sink/internal/target/auth/jwt"
"github.com/cockroachdb/cdc-sink/internal/target/auth/trust"
"github.com/cockroachdb/cdc-sink/internal/target/leases"
"github.com/cockroachdb/cdc-sink/internal/types"
"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/google/wire"
Expand All @@ -42,6 +43,7 @@ import (
// Set is used by Wire.
var Set = wire.NewSet(
ProvideAuthenticator,
ProvideLeases,
ProvideListener,
ProvideMux,
ProvideServer,
Expand All @@ -61,6 +63,25 @@ func ProvideAuthenticator(
return jwt.ProvideAuth(ctx, pool, stagingDB)
}

// ProvideLeases is called by Wire to configure the work-leasing strategy.
//
// This can be removed once stagingDB once SELECT FOR UPDATE uses
// replicated locks.
//
// https://github.com/cockroachdb/cockroach/issues/100194
func ProvideLeases(
ctx context.Context, pool *pgxpool.Pool, stagingDB ident.StagingDB,
) (types.Leases, error) {
return leases.New(ctx, leases.Config{
Guard: time.Second,
Lifetime: 5 * time.Second,
RetryDelay: time.Second,
Poll: time.Second,
Pool: pool,
Target: ident.NewTable(stagingDB.Ident(), ident.Public, ident.New("leases")),
})
}

// ProvideListener is called by Wire to construct the incoming network
// socket for the server.
func ProvideListener(config *Config) (net.Listener, func(), error) {
Expand Down
24 changes: 22 additions & 2 deletions internal/source/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions internal/target/leases/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"fmt"
"math/rand"
"runtime"
"time"

"github.com/cockroachdb/cdc-sink/internal/types"
Expand Down Expand Up @@ -88,6 +89,24 @@ type leases struct {
}
}

// leaseFacade implements the public types.Lease interface.
type leaseFacade struct {
cancel func()
ctx context.Context
}

var _ types.Lease = (*leaseFacade)(nil)

// Context implements types.Lease.
func (f *leaseFacade) Context() context.Context {
return f.ctx
}

// Release implements types.Lease.
func (f *leaseFacade) Release() {
f.cancel()
}

var _ types.Leases = (*leases)(nil)

const (
Expand Down Expand Up @@ -117,6 +136,35 @@ func New(ctx context.Context, cfg Config) (types.Leases, error) {
return l, nil
}

// Acquire the named lease, keep it alive, and return a facade.
func (l *leases) Acquire(ctx context.Context, name string) (types.Lease, error) {
acquired, ok, err := l.acquire(ctx, name)
if err != nil {
return nil, err
}
if !ok {
return nil, &types.LeaseBusyError{}
}

ctx, cancel := context.WithCancel(ctx)
go func() {
l.keepRenewed(ctx, acquired)
cancel()
}()

ret := &leaseFacade{
cancel: func() {
_, _ = l.release(ctx, acquired)
cancel()
},
ctx: ctx,
}

runtime.SetFinalizer(ret, func(f *leaseFacade) { f.Release() })

return ret, nil
}

// Singleton executes a callback when the named lease is acquired.
//
// The lease will be released in the following circumstances:
Expand Down
Loading

0 comments on commit c9871e2

Please sign in to comment.