Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rangefeed: fix premature checkpoint due to intent resolution race #117612

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ go_test(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand All @@ -85,12 +86,14 @@ go_test(
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/future",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
323 changes: 323 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"runtime/pprof"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -25,17 +26,20 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1229,3 +1233,322 @@ func TestRangeFeedStartTimeExclusive(t *testing.T) {
t.Fatal("timed out waiting for event")
}
}

// TestRangeFeedIntentResolutionRace is a regression test for
// https://github.com/cockroachdb/cockroach/issues/104309, i.e. the
// following scenario:
//
// - A rangefeed is running on a lagging follower.
// - A txn writes an intent, which is replicated to the follower.
// - The closed timestamp advances past the intent.
// - The txn commits and resolves the intent at the original write timestamp,
// then GCs its txn record. This is not yet applied on the follower.
// - The rangefeed pushes the txn to advance its resolved timestamp.
// - The txn is GCed, so the push returns ABORTED (it can't know whether the
// txn was committed or aborted after its record is GCed).
// - The rangefeed disregards the "aborted" txn and advances the resolved
// timestamp, emitting a checkpoint.
// - The follower applies the resolved intent and emits an event below
// the checkpoint, violating the checkpoint guarantee.
//
// This scenario is addressed by running a Barrier request through Raft and
// waiting for it to apply locally before removing the txn from resolved ts
// tracking. This ensures the pending intent resolution is applied before
// the resolved ts can advance.
func TestRangeFeedIntentResolutionRace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t) // too slow, times out
skip.UnderDeadlock(t)

// Use a timeout, to prevent a hung test.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
// defer cancel() after Stopper.Stop(), so the context cancels first.
// Otherwise, the stopper will hang waiting for a rangefeed whose context is
// not yet cancelled.

// Set up an apply filter that blocks Raft application on n3 (follower) for
// the given range.
var blockRangeOnN3 atomic.Int64
unblockRangeC := make(chan struct{})
applyFilter := func(args kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
if args.StoreID == 3 {
if rangeID := blockRangeOnN3.Load(); rangeID > 0 && rangeID == int64(args.RangeID) {
t.Logf("blocked r%d on s%d", args.RangeID, args.StoreID)
select {
case <-unblockRangeC:
t.Logf("unblocked r%d on s%d", args.RangeID, args.StoreID)
case <-ctx.Done():
return 0, kvpb.NewError(ctx.Err())
}
}
}
return 0, nil
}

// Set up a request filter that blocks transaction pushes for a specific key.
// This is used to prevent the rangefeed txn pusher from pushing the txn
// timestamp above the closed timestamp before it commits, only allowing the
// push to happen after the transaction has committed and GCed below the
// closed timestamp.
var blockPush atomic.Pointer[roachpb.Key]
unblockPushC := make(chan struct{})
reqFilter := func(ctx context.Context, br *kvpb.BatchRequest) *kvpb.Error {
if br.IsSinglePushTxnRequest() {
req := br.Requests[0].GetPushTxn()
if key := blockPush.Load(); key != nil && req.Key.Equal(*key) {
t.Logf("blocked push for txn %s", req.PusheeTxn)
select {
case <-unblockPushC:
t.Logf("unblocked push for txn %s", req.PusheeTxn)
case <-ctx.Done():
return kvpb.NewError(ctx.Err())
}
}
}
return nil
}

// Speed up the test by reducing various closed/resolved timestamp intervals.
const interval = 100 * time.Millisecond
st := cluster.MakeClusterSettings()
kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, interval)
closedts.SideTransportCloseInterval.Override(ctx, &st.SV, interval)
closedts.TargetDuration.Override(ctx, &st.SV, interval)

// Start a cluster with 3 nodes.
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: reqFilter,
TestingApplyCalledTwiceFilter: applyFilter,
RangeFeedPushTxnsInterval: interval,
RangeFeedPushTxnsAge: interval,
},
},
},
})
defer tc.Stopper().Stop(ctx)
defer cancel()

n1 := tc.Server(0)
s3 := tc.GetFirstStoreFromServer(t, 2)
clock := n1.ApplicationLayer().Clock()

// Determine the key/value we're going to write.
prefix := append(n1.ApplicationLayer().Codec().TenantPrefix(), keys.ScratchRangeMin...)
key := append(prefix.Clone(), []byte("/foo")...)
value := []byte("value")

// Split off a range and upreplicate it, with leaseholder on n1.
_, _, err := n1.StorageLayer().SplitRange(prefix)
require.NoError(t, err)
desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...)
t.Logf("split off range %s", desc)

repl1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(prefix)) // leaseholder
repl3 := tc.GetFirstStoreFromServer(t, 2).LookupReplica(roachpb.RKey(prefix)) // lagging follower

require.True(t, repl1.OwnsValidLease(ctx, clock.NowAsClockTimestamp()))

// Block pushes of the txn, to ensure it can write at a fixed timestamp.
// Otherwise, the rangefeed or someone else may push it beyond the closed
// timestamp.
blockPush.Store(&key)

// We'll use n3 as our lagging follower. Start a rangefeed on it directly.
req := kvpb.RangeFeedRequest{
Header: kvpb.Header{
RangeID: desc.RangeID,
},
Span: desc.RSpan().AsRawSpanWithNoLocals(),
}
eventC := make(chan *kvpb.RangeFeedEvent)
sink := newChannelSink(ctx, eventC)
fErr := future.MakeAwaitableFuture(s3.RangeFeed(&req, sink))
require.NoError(t, fErr.Get()) // check if we've errored yet
t.Logf("started rangefeed on %s", repl3)

// Spawn a rangefeed monitor, which posts checkpoint updates to checkpointC.
// This uses a buffered channel of size 1, and empties it out before posting a
// new update, such that it contains the latest known checkpoint and does not
// block the rangefeed. It also posts emitted values for our key to valueC,
// which should only happen once.
checkpointC := make(chan *kvpb.RangeFeedCheckpoint, 1)
valueC := make(chan *kvpb.RangeFeedValue, 1)
go func() {
defer close(checkpointC)
defer close(valueC)
for {
select {
case e := <-eventC:
switch {
case e.Checkpoint != nil:
// Clear checkpointC such that it always contains the latest.
select {
case <-checkpointC:
default:
}
checkpointC <- e.Checkpoint
case e.Val != nil && e.Val.Key.Equal(key):
select {
case valueC <- e.Val:
default:
t.Errorf("duplicate value event for %s: %s", key, e)
}
}
case <-ctx.Done():
return
}
}
}()

waitForCheckpoint := func(t *testing.T, ts hlc.Timestamp) hlc.Timestamp {
t.Helper()
timeoutC := time.After(3 * time.Second)
for {
select {
case c := <-checkpointC:
require.NotNil(t, c, "nil checkpoint")
if ts.LessEq(c.ResolvedTS) {
t.Logf("rangefeed checkpoint at %s >= %s", c.ResolvedTS, ts)
return c.ResolvedTS
}
case <-timeoutC:
require.Fail(t, "timed out waiting for checkpoint", "wanted %s", ts)
}
}
}

// Wait for the initial checkpoint.
rts := waitForCheckpoint(t, clock.Now())
t.Logf("initial checkpoint at %s", rts)

// Start a new transaction on n1 with a fixed timestamp (to make sure it
// remains below the closed timestamp). Write an intent, and read it back to
// make sure it has applied.
writeTS := clock.Now()
txn := n1.ApplicationLayer().DB().NewTxn(ctx, "test")
require.NoError(t, txn.SetFixedTimestamp(ctx, writeTS))
require.NoError(t, txn.Put(ctx, key, value))
_, err = txn.Get(ctx, key)
require.NoError(t, err)
t.Logf("wrote %s", key)

// Wait for both the leaseholder and the follower to close the transaction's
// write timestamp.
waitForClosedTimestamp := func(t *testing.T, repl *kvserver.Replica, ts hlc.Timestamp) hlc.Timestamp {
t.Helper()
timeoutC := time.After(3 * time.Second)
for {
if closedTS := repl.GetCurrentClosedTimestamp(ctx); ts.LessEq(closedTS) {
t.Logf("replica %s closed timestamp at %s >= %s", repl, closedTS, ts)
return closedTS
}
select {
case <-time.After(10 * time.Millisecond):
case <-timeoutC:
require.Fail(t, "timeout out waiting for closed timestamp", "wanted %s", ts)
}
}
}
cts := waitForClosedTimestamp(t, repl1, writeTS)
_ = waitForClosedTimestamp(t, repl3, writeTS)
t.Logf("closed timestamp %s is above txn write timestamp %s", cts, writeTS)

// Wait for the rangefeed checkpoint to reach writeTS.Prev(). This ensures the
// rangefeed's view of the closed timestamp has been updated, and is now only
// blocked by the intent.
waitTS := writeTS.Prev()
waitTS.Logical = 0
rts = waitForCheckpoint(t, waitTS)
t.Logf("rangefeed caught up to txn write timestamp at %s", rts)

// Block Raft application on repl3.
blockRangeOnN3.Store(int64(desc.RangeID))

// Commit the transaction, and check its commit timestamp.
require.NoError(t, txn.Commit(ctx))
commitTS, err := txn.CommitTimestamp()
require.NoError(t, err)
require.Equal(t, commitTS, writeTS)
t.Logf("txn committed at %s", writeTS)

// Unblock transaction pushes. Since repl3 won't apply the intent resolution
// yet, the rangefeed will keep trying to push the transaction. Once the
// transaction record is GCed (which happens async), the rangefeed will see an
// ABORTED status.
//
// It may see the intermediate COMMITTED state too, but at the time of writing
// that does not matter, since the rangefeed needs to keep tracking the
// intent until it applies the resolution, and it will also see the ABORTED
// status before that happens.
blockPush.Store(nil)
close(unblockPushC)

// Make sure repl3 does not emit a checkpoint beyond the write timestamp. Its
// closed timestamp has already advanced past it, but the unresolved intent
// should prevent the resolved timestamp from advancing, despite the false
// ABORTED state. We also make sure no value has been emitted.
waitC := time.After(3 * time.Second)
for done := false; !done; {
select {
case c := <-checkpointC:
require.NotNil(t, c)
require.False(t, commitTS.LessEq(c.ResolvedTS),
"repl %s emitted checkpoint %s beyond write timestamp %s", repl3, c.ResolvedTS, commitTS)
case v := <-valueC:
require.Fail(t, "repl3 emitted premature value %s", v)
case <-waitC:
done = true
}
}
t.Logf("checkpoint still below write timestamp")

// Unblock application on repl3. Wait for the checkpoint to catch up to the
// commit timestamp, and the committed value to be emitted.
blockRangeOnN3.Store(0)
close(unblockRangeC)

rts = waitForCheckpoint(t, writeTS)
t.Logf("checkpoint %s caught up to write timestamp %s", rts, writeTS)

select {
case v := <-valueC:
require.Equal(t, v.Key, key)
require.Equal(t, v.Value.Timestamp, writeTS)
t.Logf("received value %s", v)
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for event")
}

// The rangefeed should still be running.
require.NoError(t, fErr.Get())
}

// channelSink is a rangefeed sink which posts events to a channel.
type channelSink struct {
ctx context.Context
ch chan<- *kvpb.RangeFeedEvent
}

func newChannelSink(ctx context.Context, ch chan<- *kvpb.RangeFeedEvent) *channelSink {
return &channelSink{ctx: ctx, ch: ch}
}

func (c *channelSink) Context() context.Context {
return c.ctx
}

func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error {
select {
case c.ch <- e:
return nil
case <-c.ctx.Done():
return c.ctx.Err()
}
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/concurrency/isolation",
Expand Down
Loading