Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient/rangefeed: emit checkpoint events #70317

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ go_test(
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_golang_mock//gomock",
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package rangefeed
import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/retry"
)

Expand All @@ -27,6 +28,7 @@ type config struct {
withInitialScan bool
withDiff bool
onInitialScanError OnInitialScanError
onCheckpoint OnCheckpoint
}

type optionFunc func(*config)
Expand Down Expand Up @@ -80,6 +82,17 @@ func WithRetry(options retry.Options) Option {
})
}

// OnCheckpoint is called when a rangefeed checkpoint occurs.
type OnCheckpoint func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint)

// WithOnCheckpoint sets up a callback that's invoked whenever a check point
// event is emitted.
func WithOnCheckpoint(f OnCheckpoint) Option {
return optionFunc(func(c *config) {
c.onCheckpoint = f
})
}

func initConfig(c *config, options []Option) {
*c = config{} // the default config is its zero value
for _, o := range options {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ func (f *RangeFeed) processEvents(
if _, err := frontier.Forward(ev.Checkpoint.Span, ev.Checkpoint.ResolvedTS); err != nil {
return err
}
if f.onCheckpoint != nil {
f.onCheckpoint(ctx, ev.Checkpoint)
}
case ev.Error != nil:
// Intentionally do nothing, we'll get an error returned from the
// call to RangeFeed.
Expand Down
124 changes: 114 additions & 10 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@ package rangefeed_test

import (
"context"
"errors"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -64,16 +69,18 @@ func TestRangeFeedIntegration(t *testing.T) {
require.NoError(t, err)
rows := make(chan *roachpb.RangeFeedValue)
initialScanDone := make(chan struct{})
r, err := f.RangeFeed(ctx, "test", sp, afterB, func(
ctx context.Context, value *roachpb.RangeFeedValue,
) {
select {
case rows <- value:
case <-ctx.Done():
}
}, rangefeed.WithDiff(), rangefeed.WithInitialScan(func(ctx context.Context) {
close(initialScanDone)
}))
r, err := f.RangeFeed(ctx, "test", sp, afterB,
func(ctx context.Context, value *roachpb.RangeFeedValue) {
select {
case rows <- value:
case <-ctx.Done():
}
},
rangefeed.WithDiff(),
rangefeed.WithInitialScan(func(ctx context.Context) {
close(initialScanDone)
}),
)
require.NoError(t, err)
defer r.Close()
{
Expand Down Expand Up @@ -106,3 +113,100 @@ func TestRangeFeedIntegration(t *testing.T) {
require.Equal(t, int64(4), updated)
}
}

// TestWithOnCheckpoint verifies that we correctly emit rangefeed checkpoint
// events.
func TestWithOnCheckpoint(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

db := tc.Server(0).DB()
scratchKey := tc.ScratchRange(t)
scratchKey = scratchKey[:len(scratchKey):len(scratchKey)]
mkKey := func(k string) roachpb.Key {
return encoding.EncodeStringAscending(scratchKey, k)
}

sp := roachpb.Span{
Key: scratchKey,
EndKey: scratchKey.PrefixEnd(),
}
{
// Enable rangefeeds, otherwise the thing will retry until they are enabled.
_, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true")
require.NoError(t, err)
}
{
// Lower the closed timestamp target duration to speed up the test.
_, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
require.NoError(t, err)
}

f, err := rangefeed.NewFactory(tc.Stopper(), db, nil)
require.NoError(t, err)

var mu syncutil.RWMutex
var afterWriteTS hlc.Timestamp
checkpoints := make(chan *roachpb.RangeFeedCheckpoint)

// We need to start a goroutine that reads of the checkpoints channel, so to
// not block the rangefeed itself.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// We should expect a checkpoint event covering the key we just wrote, at a
// timestamp greater than when we wrote it.
testutils.SucceedsSoon(t, func() error {
for {
select {
case c := <-checkpoints:
mu.RLock()
writeTSUnset := afterWriteTS.IsEmpty()
mu.RUnlock()
if writeTSUnset {
return errors.New("write to key hasn't gone through yet")
}

if afterWriteTS.LessEq(c.ResolvedTS) && c.Span.ContainsKey(mkKey("a")) {
return nil
}
default:
return errors.New("no valid checkpoints found")
}
}
})
}()

rows := make(chan *roachpb.RangeFeedValue)
r, err := f.RangeFeed(ctx, "test", sp, db.Clock().Now(),
func(ctx context.Context, value *roachpb.RangeFeedValue) {
select {
case rows <- value:
case <-ctx.Done():
}
},
rangefeed.WithOnCheckpoint(func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) {
select {
case checkpoints <- checkpoint:
case <-ctx.Done():
}
}),
)
require.NoError(t, err)
defer r.Close()

require.NoError(t, db.Put(ctx, mkKey("a"), 1))
mu.Lock()
afterWriteTS = db.Clock().Now()
mu.Unlock()
{
v := <-rows
require.Equal(t, mkKey("a"), v.Key)
}

wg.Wait()
}