Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
109343: streamingccl: attempt to deflake TestStreamDeleteRange r=dt a=stevendanna

Previously, we re-enabled this test, understanding that this test could flake if we happened to hit something that caused us to see the range feeds during a catchup scan rather than as part of the caught up range feed.

Here, we add code to normalize the representation of the delete ranges. We also improve the consumer code to ensure that we are always looking at a complete set of range feed updates.

Possibly Fixes cockroachdb#109312

Epic: none

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Aug 24, 2023
2 parents 1b933fa + 6c496c2 commit 2274ab3
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 81 deletions.
21 changes: 13 additions & 8 deletions pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"math/rand"
"net/url"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -218,14 +217,20 @@ func NewReplicationHelper(
// Start server
s, db, _ := serverutils.StartServer(t, serverArgs)

// Set required cluster settings.
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.ExecMultiple(t, strings.Split(`
SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING cross_cluster_replication.enabled = true;
SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '10ms'
`, `;`)...)
sqlDB.ExecMultiple(t,
// Required for replication stremas to work.
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
`SET CLUSTER SETTING cross_cluster_replication.enabled = true`,

// Speeds up the tests a bit.
`SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '200ms'`,
`ALTER TENANT ALL SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '200ms'`,
`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'`,
`ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'`,
`SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms'`,
`ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms'`,
`SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '10ms'`)

// Sink to read data from.
sink, cleanupSink := sqlutils.PGUrl(t, s.AdvSQLAddr(), t.Name(), url.User(username.RootUser))
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ go_test(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
Expand Down
158 changes: 85 additions & 73 deletions pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package streamproducer_test

import (
"bytes"
"context"
"fmt"
"net/http"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -40,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -695,32 +698,83 @@ USE d;
defer feed.Close(ctx)
codec := source.mu.codec.(*partitionStreamDecoder)

// We wait for the frontier to advance because we want to
// ensure that we encounter the range deletes during the
// rangefeed's steady state rather than the catchup scan.
//
// The representation of the range deletes we send is slightly
// different if we encounter them during the catchup scan.
//
// NB: It is _still_ possible that we encounter the range
// deletes during a catchup scan if we hit a rangefeed restart
// during the test.
f, err := span.MakeFrontier(spans...)
require.NoError(t, err)
for f.Frontier().IsEmpty() {
t.Logf("waiting for frontier to advance to a non-zero timestamp")
source.mu.Lock()
source.mu.rows.Next()
source.mu.codec.decode()
if codec.e.Checkpoint != nil {
for _, rs := range codec.e.Checkpoint.ResolvedSpans {
_, err := f.Forward(rs.Span, rs.Timestamp)
require.NoError(t, err)
// normalizeRangeKeys pushes all range keys into an in-memory pebble
// engine and then reads them back out. The goal here is to account for
// the fact that we may see a logical equivalent set of range keys on
// the replication stream that doesn't exactly match the keys above if
// we happen to get the range keys during the catchup scan.
normalizeRangeKeys := func(in []roachpb.Span) []roachpb.Span {
var (
// We don't currently care about timestamps in this
// test, so we write at a known ts.
startTS = hlc.Timestamp{WallTime: 1}
writeTS = hlc.Timestamp{WallTime: 2}
endTS = hlc.Timestamp{WallTime: 10}
sp = srcTenant.Codec.TenantSpan()
)
engine := storage.NewDefaultInMemForTesting()
defer engine.Close()
for _, key := range in {
rk := storage.MVCCRangeKey{
StartKey: key.Key,
EndKey: key.EndKey,
Timestamp: writeTS,
}
require.NoError(t, engine.PutRawMVCCRangeKey(rk, []byte{}))
}
source.mu.Unlock()
require.NoError(t, engine.Flush())

var sstFile bytes.Buffer
_, _, err := storage.MVCCExportToSST(ctx,
cluster.MakeTestingClusterSettings(), engine,
storage.MVCCExportOptions{
StartKey: storage.MVCCKey{Key: sp.Key, Timestamp: startTS},
EndKey: sp.EndKey,
StartTS: startTS,
EndTS: endTS,
ExportAllRevisions: true,
}, &sstFile)
require.NoError(t, err, "failed to export expected data")
keys, rKeys := storageutils.KeysFromSST(t, sstFile.Bytes())
require.Equal(t, 0, len(keys), "unexpected point keys")
rKeySpans := make([]roachpb.Span, 0, len(rKeys))
for _, rk := range rKeys {
rKeySpans = append(rKeySpans, rk.Bounds)
require.Equal(t, 1, len(rk.Versions))
}
return rKeySpans
}

consumeUntilTimestamp := func(ts hlc.Timestamp) ([]roachpb.KeyValue, []roachpb.Span) {
t.Logf("consuming until %s", ts)
receivedKVs := make([]roachpb.KeyValue, 0)
receivedDelRangeSpans := make([]roachpb.Span, 0)
f, err := span.MakeFrontier(spans...)
require.NoError(t, err)
for f.Frontier().Less(ts) {
source.mu.Lock()
source.mu.rows.Next()
source.mu.codec.decode()
if codec.e.Checkpoint != nil {
for _, rs := range codec.e.Checkpoint.ResolvedSpans {
_, err := f.Forward(rs.Span, rs.Timestamp)
if err != nil {
source.mu.Unlock()
}
require.NoError(t, err)
t.Logf("%s current frontier %s", timeutil.Now(), f.Frontier())
}
} else if codec.e.Batch != nil {
receivedKVs = append(receivedKVs, codec.e.Batch.KeyValues...)
for _, dr := range codec.e.Batch.DelRanges {
receivedDelRangeSpans = append(receivedDelRangeSpans, dr.Span)
}
}
source.mu.Unlock()
}
t.Logf("consumer done")
return receivedKVs, receivedDelRangeSpans
}
t.Logf("frontier advanced to a %s", f.Frontier())

t1Span, t2Span, t3Span := h.TableSpan(srcTenant.Codec, "t1"),
h.TableSpan(srcTenant.Codec, "t2"), h.TableSpan(srcTenant.Codec, "t3")
Expand All @@ -731,31 +785,12 @@ USE d;
// Range is t1e - t2sn, emitting t2s - t2sn.
require.NoError(t, h.SysServer.DB().DelRangeUsingTombstone(ctx, t1Span.EndKey, t2Span.Key.Next()))

// Expected DelRange events. We store these and the received
// del ranges in maps to account for possible duplicate
// delivery.
expectedDelRanges := make(map[string]struct{})
expectedDelRanges[roachpb.Span{Key: t1Span.Key, EndKey: t1Span.EndKey}.String()] = struct{}{}
expectedDelRanges[roachpb.Span{Key: t2Span.Key, EndKey: t2Span.EndKey}.String()] = struct{}{}
expectedDelRanges[roachpb.Span{Key: t2Span.Key, EndKey: t2Span.Key.Next()}.String()] = struct{}{}
// NB: We won't see this in our normalized form.
// {Key: t2Span.Key, EndKey: t2Span.Key.Next()},
expectedDelRanges := []roachpb.Span{t1Span, t2Span}

receivedDelRanges := make(map[string]struct{})
for {
source.mu.Lock()
require.True(t, source.mu.rows.Next())
source.mu.codec.decode()
if codec.e.Batch != nil {
for _, dr := range codec.e.Batch.DelRanges {
receivedDelRanges[dr.Span.String()] = struct{}{}
}
}
source.mu.Unlock()
if len(receivedDelRanges) >= 3 {
break
}
}

require.Equal(t, expectedDelRanges, receivedDelRanges)
_, actualDelRangeSpans := consumeUntilTimestamp(h.SysServer.Clock().Now())
require.Equal(t, expectedDelRanges, normalizeRangeKeys(actualDelRangeSpans))

// Adding a SSTable that contains DeleteRange
batchHLCTime := h.SysServer.Clock().Now()
Expand All @@ -775,38 +810,15 @@ USE d;
require.Equal(t, t1Span.Key, start)
require.Equal(t, t3Span.EndKey, end)

expectedDelRanges = make(map[string]struct{})
expectedDelRanges[t1Span.String()] = struct{}{}
expectedDelRanges[t2Span.String()] = struct{}{}

// Using same batch ts so that this SST can be emitted through rangefeed.
_, _, _, err = h.SysServer.DB().AddSSTableAtBatchTimestamp(ctx, start, end, data, false,
_, _, _, err := h.SysServer.DB().AddSSTableAtBatchTimestamp(ctx, start, end, data, false,
false, hlc.Timestamp{}, nil, false, batchHLCTime)
require.NoError(t, err)

receivedDelRanges = make(map[string]struct{})
receivedKVs := make([]roachpb.KeyValue, 0)
for {
source.mu.Lock()
require.True(t, source.mu.rows.Next())
source.mu.codec.decode()
if codec.e.Batch != nil {
require.Empty(t, codec.e.Batch.Ssts)
receivedKVs = append(receivedKVs, codec.e.Batch.KeyValues...)
for _, dr := range codec.e.Batch.DelRanges {
receivedDelRanges[dr.Span.String()] = struct{}{}
}
}
source.mu.Unlock()

if len(receivedDelRanges) >= 2 && len(receivedKVs) >= 1 {
break
}
}

receivedKVs, receivedDelRangeSpans := consumeUntilTimestamp(batchHLCTime)
require.Equal(t, t2Span.Key, receivedKVs[0].Key)
require.Equal(t, batchHLCTime, receivedKVs[0].Value.Timestamp)
require.Equal(t, expectedDelRanges, receivedDelRanges)
require.Equal(t, expectedDelRanges, normalizeRangeKeys(receivedDelRangeSpans))
}

func TestStreamSpanConfigs(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/storageutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"//pkg/util/syncutil/singleflight",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_stretchr_testify//require",
],
)
40 changes: 40 additions & 0 deletions pkg/testutils/storageutils/sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/pebble"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -71,3 +72,42 @@ func MakeSST(

return sstFile.Data(), start, end
}

// KeysFromSST takes an SST as a byte slice and returns all point
// and range keys in the SST.
func KeysFromSST(t *testing.T, data []byte) ([]storage.MVCCKey, []storage.MVCCRangeKeyStack) {
var results []storage.MVCCKey
var rangeKeyRes []storage.MVCCRangeKeyStack
it, err := storage.NewMemSSTIterator(data, false, storage.IterOptions{
KeyTypes: pebble.IterKeyTypePointsAndRanges,
LowerBound: keys.MinKey,
UpperBound: keys.MaxKey,
})
require.NoError(t, err, "Failed to read exported data")
defer it.Close()
for it.SeekGE(storage.MVCCKey{Key: []byte{}}); ; {
ok, err := it.Valid()
require.NoError(t, err, "Failed to advance iterator while preparing data")
if !ok {
break
}

if it.RangeKeyChanged() {
hasPoint, hasRange := it.HasPointAndRange()
if hasRange {
rangeKeyRes = append(rangeKeyRes, it.RangeKeys().Clone())
}
if !hasPoint {
it.Next()
continue
}
}

results = append(results, storage.MVCCKey{
Key: append(roachpb.Key(nil), it.UnsafeKey().Key...),
Timestamp: it.UnsafeKey().Timestamp,
})
it.Next()
}
return results, rangeKeyRes
}

0 comments on commit 2274ab3

Please sign in to comment.