Skip to content

Commit

Permalink
rangefeed: fix premature checkpoint due to intent resolution race
Browse files Browse the repository at this point in the history
It was possible for rangefeeds to emit a premature checkpoint, before
all writes below its timestamp had been emitted. This in turn would
cause changefeeds to not emit these write events at all.

This could happen because `PushTxn` may return a false `ABORTED` status
for a transaction that has in fact been committed, if the transaction
record has been GCed (after resolving all intents). The timestamp cache
does not retain sufficient information to disambiguate a committed
transaction from an aborted one in this case, so it pessimistically
assumes an abort (see `Replica.CanCreateTxnRecord` and
`batcheval.SynthesizeTxnFromMeta`).

However, the rangefeed txn pusher trusted this `ABORTED` status,
ignoring the pending txn intents and allowing the resolved timestamp to
advance past them before emitting the committed intents. This can lead to
the following scenario:

- A rangefeed is running on a lagging follower.
- A txn writes an intent, which is replicated to the follower.
- The closed timestamp advances past the intent.
- The txn commits and resolves the intent at the original write timestamp,
  then GCs its txn record. This is not yet applied on the follower.
- The rangefeed pushes the txn to advance its resolved timestamp.
- The txn is GCed, so the push returns ABORTED (it can't know whether the
  txn was committed or aborted after its record is GCed).
- The rangefeed disregards the "aborted" txn and advances the resolved
  timestamp, emitting a checkpoint.
- The follower applies the resolved intent and emits an event below
  the checkpoint, violating the checkpoint guarantee.
- The changefeed sees an event below its frontier and drops it, never
  emitting these events at all.

This patch fixes the bug by submitting a barrier command to the
leaseholder which waits for all past and ongoing writes (including
intent resolution) to complete and apply, and then waits for the local
replica to apply the barrier as well. This ensures any committed intent
resolution will be applied and emitted before the transaction is removed
from resolved timestamp tracking.

Epic: none
Release note (bug fix): fixed a bug where a changefeed could omit events
in rare cases, logging the error "cdc ux violation: detected timestamp
... that is less or equal to the local frontier". This can happen if a
rangefeed runs on a follower replica that lags significantly behind the
leaseholder, a transaction commits and removes its transaction record
before its intent resolution is applied on the follower, the follower's
closed timestamp has advanced past the transaction commit timestamp, and
the rangefeed attempts to push the transaction to a new timestamp (at
least 10 seconds after the transaction began). This may cause the
rangefeed to prematurely emit a checkpoint before emitting writes at
lower timestamps, which in turn may cause the changefeed to drop these
events entirely, never emitting them.
  • Loading branch information
erikgrinaker committed Jan 30, 2024
1 parent 4816b01 commit f68bef8
Show file tree
Hide file tree
Showing 8 changed files with 463 additions and 30 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_test(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand All @@ -82,12 +83,14 @@ go_test(
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/future",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
323 changes: 323 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"runtime/pprof"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -26,17 +27,20 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1335,3 +1339,322 @@ func TestRangeFeedStartTimeExclusive(t *testing.T) {
}
})
}

// TestRangeFeedIntentResolutionRace is a regression test for
// https://github.com/cockroachdb/cockroach/issues/104309, i.e. the
// following scenario:
//
// - A rangefeed is running on a lagging follower.
// - A txn writes an intent, which is replicated to the follower.
// - The closed timestamp advances past the intent.
// - The txn commits and resolves the intent at the original write timestamp,
// then GCs its txn record. This is not yet applied on the follower.
// - The rangefeed pushes the txn to advance its resolved timestamp.
// - The txn is GCed, so the push returns ABORTED (it can't know whether the
// txn was committed or aborted after its record is GCed).
// - The rangefeed disregards the "aborted" txn and advances the resolved
// timestamp, emitting a checkpoint.
// - The follower applies the resolved intent and emits an event below
// the checkpoint, violating the checkpoint guarantee.
//
// This scenario is addressed by running a Barrier request through Raft and
// waiting for it to apply locally before removing the txn from resolved ts
// tracking. This ensures the pending intent resolution is applied before
// the resolved ts can advance.
func TestRangeFeedIntentResolutionRace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t) // too slow, times out
skip.UnderDeadlock(t)

// Use a timeout, to prevent a hung test.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
// defer cancel() after Stopper.Stop(), so the context cancels first.
// Otherwise, the stopper will hang waiting for a rangefeed whose context is
// not yet cancelled.

// Set up an apply filter that blocks Raft application on n3 (follower) for
// the given range.
var blockRangeOnN3 atomic.Int64
unblockRangeC := make(chan struct{})
applyFilter := func(args kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
if args.StoreID == 3 {
if rangeID := blockRangeOnN3.Load(); rangeID > 0 && rangeID == int64(args.RangeID) {
t.Logf("blocked r%d on s%d", args.RangeID, args.StoreID)
select {
case <-unblockRangeC:
t.Logf("unblocked r%d on s%d", args.RangeID, args.StoreID)
case <-ctx.Done():
return 0, kvpb.NewError(ctx.Err())
}
}
}
return 0, nil
}

// Set up a request filter that blocks transaction pushes for a specific key.
// This is used to prevent the rangefeed txn pusher from pushing the txn
// timestamp above the closed timestamp before it commits, only allowing the
// push to happen after the transaction has committed and GCed below the
// closed timestamp.
var blockPush atomic.Pointer[roachpb.Key]
unblockPushC := make(chan struct{})
reqFilter := func(ctx context.Context, br *kvpb.BatchRequest) *kvpb.Error {
if br.IsSinglePushTxnRequest() {
req := br.Requests[0].GetPushTxn()
if key := blockPush.Load(); key != nil && req.Key.Equal(*key) {
t.Logf("blocked push for txn %s", req.PusheeTxn)
select {
case <-unblockPushC:
t.Logf("unblocked push for txn %s", req.PusheeTxn)
case <-ctx.Done():
return kvpb.NewError(ctx.Err())
}
}
}
return nil
}

// Speed up the test by reducing various closed/resolved timestamp intervals.
const interval = 100 * time.Millisecond
st := cluster.MakeTestingClusterSettings()
kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, interval)
closedts.SideTransportCloseInterval.Override(ctx, &st.SV, interval)
closedts.TargetDuration.Override(ctx, &st.SV, interval)

// Start a cluster with 3 nodes.
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: reqFilter,
TestingApplyCalledTwiceFilter: applyFilter,
RangeFeedPushTxnsInterval: interval,
RangeFeedPushTxnsAge: interval,
},
},
},
})
defer tc.Stopper().Stop(ctx)
defer cancel()

n1 := tc.Server(0)
s3 := tc.GetFirstStoreFromServer(t, 2)
clock := n1.ApplicationLayer().Clock()

// Determine the key/value we're going to write.
prefix := append(n1.ApplicationLayer().Codec().TenantPrefix(), keys.ScratchRangeMin...)
key := append(prefix.Clone(), []byte("/foo")...)
value := []byte("value")

// Split off a range and upreplicate it, with leaseholder on n1.
_, _, err := n1.StorageLayer().SplitRange(prefix)
require.NoError(t, err)
desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...)
t.Logf("split off range %s", desc)

repl1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(prefix)) // leaseholder
repl3 := tc.GetFirstStoreFromServer(t, 2).LookupReplica(roachpb.RKey(prefix)) // lagging follower

require.True(t, repl1.OwnsValidLease(ctx, clock.NowAsClockTimestamp()))

// Block pushes of the txn, to ensure it can write at a fixed timestamp.
// Otherwise, the rangefeed or someone else may push it beyond the closed
// timestamp.
blockPush.Store(&key)

// We'll use n3 as our lagging follower. Start a rangefeed on it directly.
req := kvpb.RangeFeedRequest{
Header: kvpb.Header{
RangeID: desc.RangeID,
},
Span: desc.RSpan().AsRawSpanWithNoLocals(),
}
eventC := make(chan *kvpb.RangeFeedEvent)
sink := newChannelSink(ctx, eventC)
fErr := future.MakeAwaitableFuture(s3.RangeFeed(&req, sink))
require.NoError(t, fErr.Get()) // check if we've errored yet
t.Logf("started rangefeed on %s", repl3)

// Spawn a rangefeed monitor, which posts checkpoint updates to checkpointC.
// This uses a buffered channel of size 1, and empties it out before posting a
// new update, such that it contains the latest known checkpoint and does not
// block the rangefeed. It also posts emitted values for our key to valueC,
// which should only happen once.
checkpointC := make(chan *kvpb.RangeFeedCheckpoint, 1)
valueC := make(chan *kvpb.RangeFeedValue, 1)
go func() {
defer close(checkpointC)
defer close(valueC)
for {
select {
case e := <-eventC:
switch {
case e.Checkpoint != nil:
// Clear checkpointC such that it always contains the latest.
select {
case <-checkpointC:
default:
}
checkpointC <- e.Checkpoint
case e.Val != nil && e.Val.Key.Equal(key):
select {
case valueC <- e.Val:
default:
t.Errorf("duplicate value event for %s: %s", key, e)
}
}
case <-ctx.Done():
return
}
}
}()

waitForCheckpoint := func(t *testing.T, ts hlc.Timestamp) hlc.Timestamp {
t.Helper()
timeoutC := time.After(3 * time.Second)
for {
select {
case c := <-checkpointC:
require.NotNil(t, c, "nil checkpoint")
if ts.LessEq(c.ResolvedTS) {
t.Logf("rangefeed checkpoint at %s >= %s", c.ResolvedTS, ts)
return c.ResolvedTS
}
case <-timeoutC:
require.Fail(t, "timed out waiting for checkpoint", "wanted %s", ts)
}
}
}

// Wait for the initial checkpoint.
rts := waitForCheckpoint(t, clock.Now())
t.Logf("initial checkpoint at %s", rts)

// Start a new transaction on n1 with a fixed timestamp (to make sure it
// remains below the closed timestamp). Write an intent, and read it back to
// make sure it has applied.
writeTS := clock.Now()
txn := n1.ApplicationLayer().DB().NewTxn(ctx, "test")
require.NoError(t, txn.SetFixedTimestamp(ctx, writeTS))
require.NoError(t, txn.Put(ctx, key, value))
_, err = txn.Get(ctx, key)
require.NoError(t, err)
t.Logf("wrote %s", key)

// Wait for both the leaseholder and the follower to close the transaction's
// write timestamp.
waitForClosedTimestamp := func(t *testing.T, repl *kvserver.Replica, ts hlc.Timestamp) hlc.Timestamp {
t.Helper()
timeoutC := time.After(3 * time.Second)
for {
if closedTS := repl.GetCurrentClosedTimestamp(ctx); ts.LessEq(closedTS) {
t.Logf("replica %s closed timestamp at %s >= %s", repl, closedTS, ts)
return closedTS
}
select {
case <-time.After(10 * time.Millisecond):
case <-timeoutC:
require.Fail(t, "timeout out waiting for closed timestamp", "wanted %s", ts)
}
}
}
cts := waitForClosedTimestamp(t, repl1, writeTS)
_ = waitForClosedTimestamp(t, repl3, writeTS)
t.Logf("closed timestamp %s is above txn write timestamp %s", cts, writeTS)

// Wait for the rangefeed checkpoint to reach writeTS.Prev(). This ensures the
// rangefeed's view of the closed timestamp has been updated, and is now only
// blocked by the intent.
waitTS := writeTS.Prev()
waitTS.Logical = 0
rts = waitForCheckpoint(t, waitTS)
t.Logf("rangefeed caught up to txn write timestamp at %s", rts)

// Block Raft application on repl3.
blockRangeOnN3.Store(int64(desc.RangeID))

// Commit the transaction, and check its commit timestamp.
require.NoError(t, txn.Commit(ctx))
commitTS, err := txn.CommitTimestamp()
require.NoError(t, err)
require.Equal(t, commitTS, writeTS)
t.Logf("txn committed at %s", writeTS)

// Unblock transaction pushes. Since repl3 won't apply the intent resolution
// yet, the rangefeed will keep trying to push the transaction. Once the
// transaction record is GCed (which happens async), the rangefeed will see an
// ABORTED status.
//
// It may see the intermediate COMMITTED state too, but at the time of writing
// that does not matter, since the rangefeed needs to keep tracking the
// intent until it applies the resolution, and it will also see the ABORTED
// status before that happens.
blockPush.Store(nil)
close(unblockPushC)

// Make sure repl3 does not emit a checkpoint beyond the write timestamp. Its
// closed timestamp has already advanced past it, but the unresolved intent
// should prevent the resolved timestamp from advancing, despite the false
// ABORTED state. We also make sure no value has been emitted.
waitC := time.After(3 * time.Second)
for done := false; !done; {
select {
case c := <-checkpointC:
require.NotNil(t, c)
require.False(t, commitTS.LessEq(c.ResolvedTS),
"repl %s emitted checkpoint %s beyond write timestamp %s", repl3, c.ResolvedTS, commitTS)
case v := <-valueC:
require.Fail(t, "repl3 emitted premature value %s", v)
case <-waitC:
done = true
}
}
t.Logf("checkpoint still below write timestamp")

// Unblock application on repl3. Wait for the checkpoint to catch up to the
// commit timestamp, and the committed value to be emitted.
blockRangeOnN3.Store(0)
close(unblockRangeC)

rts = waitForCheckpoint(t, writeTS)
t.Logf("checkpoint %s caught up to write timestamp %s", rts, writeTS)

select {
case v := <-valueC:
require.Equal(t, v.Key, key)
require.Equal(t, v.Value.Timestamp, writeTS)
t.Logf("received value %s", v)
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for event")
}

// The rangefeed should still be running.
require.NoError(t, fErr.Get())
}

// channelSink is a rangefeed sink which posts events to a channel.
type channelSink struct {
ctx context.Context
ch chan<- *kvpb.RangeFeedEvent
}

func newChannelSink(ctx context.Context, ch chan<- *kvpb.RangeFeedEvent) *channelSink {
return &channelSink{ctx: ctx, ch: ch}
}

func (c *channelSink) Context() context.Context {
return c.ctx
}

func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error {
select {
case c.ch <- e:
return nil
case <-c.ctx.Done():
return c.ctx.Err()
}
}
Loading

0 comments on commit f68bef8

Please sign in to comment.