Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rangefeed: fix client side tests flakiness #111282

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 81 additions & 29 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -92,7 +93,14 @@ func TestRangeFeedIntegration(t *testing.T) {

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
ctx := context.Background()
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{})
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
defer srv.Stopper().Stop(ctx)
ts := srv.ApplicationLayer()

Expand Down Expand Up @@ -126,8 +134,6 @@ func TestRangeFeedIntegration(t *testing.T) {
// storage layer and the application layer.
kvserver.RangefeedEnabled.Override(ctx, &l.ClusterSettings().SV, true)
}
// Scheduler use is a property of the storage layer.
kvserver.RangeFeedUseScheduler.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, s.useScheduler)

f, err := rangefeed.NewFactory(ts.AppStopper(), db, ts.ClusterSettings(), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -190,8 +196,14 @@ func TestWithOnFrontierAdvance(t *testing.T) {

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{Settings: settings},
})
defer tc.Stopper().Stop(ctx)

Expand All @@ -217,8 +229,6 @@ func TestWithOnFrontierAdvance(t *testing.T) {
// Lower the closed timestamp target duration to speed up the test.
closedts.TargetDuration.Override(ctx, &l.ClusterSettings().SV, 100*time.Millisecond)
}
// Scheduler use is a property of the storage layer.
kvserver.RangeFeedUseScheduler.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, s.useScheduler)

// Split the range into two so we know the frontier has more than one span to
// track for certain. We later write to both these ranges.
Expand Down Expand Up @@ -318,7 +328,14 @@ func TestWithOnCheckpoint(t *testing.T) {

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
ctx := context.Background()
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{})
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
defer srv.Stopper().Stop(ctx)
ts := srv.ApplicationLayer()

Expand All @@ -340,8 +357,6 @@ func TestWithOnCheckpoint(t *testing.T) {
// Lower the closed timestamp target duration to speed up the test.
closedts.TargetDuration.Override(ctx, &l.ClusterSettings().SV, 100*time.Millisecond)
}
// Scheduler use is a property of the storage layer.
kvserver.RangeFeedUseScheduler.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, s.useScheduler)

f, err := rangefeed.NewFactory(ts.AppStopper(), db, ts.ClusterSettings(), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -420,7 +435,14 @@ func TestRangefeedValueTimestamps(t *testing.T) {

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
ctx := context.Background()
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{})
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
defer srv.Stopper().Stop(ctx)
ts := srv.ApplicationLayer()

Expand All @@ -442,8 +464,6 @@ func TestRangefeedValueTimestamps(t *testing.T) {
// Lower the closed timestamp target duration to speed up the test.
closedts.TargetDuration.Override(ctx, &l.ClusterSettings().SV, 100*time.Millisecond)
}
// Scheduler use is a property of the storage layer.
kvserver.RangeFeedUseScheduler.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, s.useScheduler)

f, err := rangefeed.NewFactory(ts.AppStopper(), db, ts.ClusterSettings(), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -536,7 +556,13 @@ func TestWithOnSSTable(t *testing.T) {

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109473),
})
defer srv.Stopper().Stop(ctx)
Expand All @@ -549,8 +575,6 @@ func TestWithOnSSTable(t *testing.T) {
// Enable rangefeeds, otherwise the thing will retry until they are enabled.
kvserver.RangefeedEnabled.Override(ctx, &l.ClusterSettings().SV, true)
}
// Scheduler use is a property of the storage layer.
kvserver.RangeFeedUseScheduler.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, s.useScheduler)

f, err := rangefeed.NewFactory(tsrv.AppStopper(), db, tsrv.ClusterSettings(), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -638,13 +662,20 @@ func TestWithOnSSTableCatchesUpIfNotSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109473), Knobs: base.TestingKnobs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109473),
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
SmallEngineBlocks: smallEngineBlocks,
},
},
Settings: settings,
},
})
defer tc.Stopper().Stop(ctx)
Expand All @@ -660,8 +691,6 @@ func TestWithOnSSTableCatchesUpIfNotSet(t *testing.T) {
// Enable rangefeeds, otherwise the thing will retry until they are enabled.
kvserver.RangefeedEnabled.Override(ctx, &l.ClusterSettings().SV, true)
}
// Scheduler use is a property of the storage layer.
kvserver.RangeFeedUseScheduler.Override(ctx, &tsrv.SystemLayer().ClusterSettings().SV, s.useScheduler)

f, err := rangefeed.NewFactory(srv.AppStopper(), db, srv.ClusterSettings(), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -746,8 +775,14 @@ func TestWithOnDeleteRange(t *testing.T) {

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
SmallEngineBlocks: smallEngineBlocks,
Expand All @@ -768,8 +803,6 @@ func TestWithOnDeleteRange(t *testing.T) {
// Enable rangefeeds, otherwise the thing will retry until they are enabled.
kvserver.RangefeedEnabled.Override(ctx, &l.ClusterSettings().SV, true)
}
// Scheduler use is a property of the storage layer.
kvserver.RangeFeedUseScheduler.Override(ctx, &tsrv.SystemLayer().ClusterSettings().SV, s.useScheduler)

f, err := rangefeed.NewFactory(srv.AppStopper(), db, srv.ClusterSettings(), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -930,13 +963,19 @@ func TestUnrecoverableErrors(t *testing.T) {

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
ctx := context.Background()
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109472),
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ConfigureScratchRange: true,
},
},
Settings: settings,
})

defer srv.Stopper().Stop(ctx)
Expand All @@ -957,8 +996,6 @@ func TestUnrecoverableErrors(t *testing.T) {
// Lower the closed timestamp target duration to speed up the test.
closedts.TargetDuration.Override(ctx, &l.ClusterSettings().SV, 100*time.Millisecond)
}
// Scheduler use is a property of the storage layer.
kvserver.RangeFeedUseScheduler.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, s.useScheduler)

store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID())
require.NoError(t, err)
Expand Down Expand Up @@ -1029,7 +1066,14 @@ func TestMVCCHistoryMutationError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

srv, _, db := serverutils.StartServer(t, base.TestServerArgs{})
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
defer srv.Stopper().Stop(ctx)
ts := srv.ApplicationLayer()

Expand All @@ -1047,8 +1091,6 @@ func TestMVCCHistoryMutationError(t *testing.T) {
// Lower the closed timestamp target duration to speed up the test.
closedts.TargetDuration.Override(ctx, &l.ClusterSettings().SV, 100*time.Millisecond)
}
// Scheduler use is a property of the storage layer.
kvserver.RangeFeedUseScheduler.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, s.useScheduler)

// Set up a rangefeed.
f, err := rangefeed.NewFactory(ts.AppStopper(), db, ts.ClusterSettings(), nil)
Expand Down Expand Up @@ -1113,7 +1155,14 @@ func TestRangefeedWithLabelsOption(t *testing.T) {

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
ctx := context.Background()
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{})
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
defer srv.Stopper().Stop(ctx)
ts := srv.ApplicationLayer()

Expand Down Expand Up @@ -1144,8 +1193,6 @@ func TestRangefeedWithLabelsOption(t *testing.T) {
// Enable rangefeeds, otherwise the thing will retry until they are enabled.
kvserver.RangefeedEnabled.Override(ctx, &l.ClusterSettings().SV, true)
}
// Scheduler use is a property of the storage layer.
kvserver.RangeFeedUseScheduler.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, s.useScheduler)

const rangefeedName = "test-feed"
type label struct {
Expand Down Expand Up @@ -1228,7 +1275,14 @@ func TestRangeFeedStartTimeExclusive(t *testing.T) {

testutils.RunValues(t, "feed_type", procTypes, func(t *testing.T, s feedProcessorType) {
ctx := context.Background()
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{})
settings := cluster.MakeClusterSettings()
// We must enable desired scheduler settings before we start cluster,
// otherwise we will trigger processor restarts later and this test can't
// handle duplicated events.
kvserver.RangeFeedUseScheduler.Override(ctx, &settings.SV, s.useScheduler)
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
})
defer srv.Stopper().Stop(ctx)
ts := srv.ApplicationLayer()

Expand All @@ -1253,8 +1307,6 @@ func TestRangeFeedStartTimeExclusive(t *testing.T) {
// Enable rangefeeds, otherwise the thing will retry until they are enabled.
kvserver.RangefeedEnabled.Override(ctx, &l.ClusterSettings().SV, true)
}
// Scheduler use is a property of the storage layer.
kvserver.RangeFeedUseScheduler.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, s.useScheduler)

f, err := rangefeed.NewFactory(ts.AppStopper(), db, ts.ClusterSettings(), nil)
require.NoError(t, err)
Expand Down