From 00d78c7db0595415eedd2396ef6b280d2a318ba8 Mon Sep 17 00:00:00 2001 From: Xiang Gu Date: Wed, 12 Jul 2023 17:27:59 -0700 Subject: [PATCH 1/5] logic: skip_on_retry works when errors are expected Previously, we have `skip_on_retry` directive for logic test which, when set, it skips the rest of test if a statement fails with TransactionRetryError. However, it won't skip if the statement is expected to fail with certain error message. This PR ensures that whenever we have a TransactionRetryError and `skip_on_retry` is set, we always skip the rest of the test, even if the stmt is expected to fail. Informs #104464 Release note: None --- pkg/ccl/logictestccl/testdata/logic_test/schema_change_in_txn | 4 ++++ pkg/sql/logictest/logic.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/logictestccl/testdata/logic_test/schema_change_in_txn b/pkg/ccl/logictestccl/testdata/logic_test/schema_change_in_txn index e89155928a90..702ad5075e02 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/schema_change_in_txn +++ b/pkg/ccl/logictestccl/testdata/logic_test/schema_change_in_txn @@ -1,3 +1,7 @@ +# Skip the rest of the test if a retry occurs. They can happen and are fine +# but there's no way to encapsulate that in logictests. +skip_on_retry + # Backing up and restoring a descriptor will increment the version of the # descriptor before restoring it so we cannot achieve the expected behaviour in # this test. diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 5e1980993ff9..06527ddd2f48 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -3130,8 +3130,8 @@ func (t *logicTest) maybeSkipOnRetry(err error) { func (t *logicTest) verifyError( sql, pos, expectNotice, expectErr, expectErrCode string, err error, ) (bool, error) { + t.maybeSkipOnRetry(err) if expectErr == "" && expectErrCode == "" && err != nil { - t.maybeSkipOnRetry(err) return t.unexpectedError(sql, pos, err) } if expectNotice != "" { From a3ecaaa0db95243514583f052daf2ddaa00ccf01 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Fri, 14 Jul 2023 11:40:55 +0100 Subject: [PATCH 2/5] testutils: add helper to target transactions for retries This helper makes it a little quicker to write a test that tests whether a particular transaction is retry safe. Informs #106417 Epic: none Release note: none --- pkg/jobs/BUILD.bazel | 1 - pkg/jobs/jobs_test.go | 19 +------ pkg/sql/catalog/descs/BUILD.bazel | 2 +- pkg/sql/catalog/descs/collection_test.go | 18 +----- pkg/testutils/BUILD.bazel | 1 + pkg/testutils/txn_restart.go | 70 ++++++++++++++++++++++++ 6 files changed, 78 insertions(+), 33 deletions(-) create mode 100644 pkg/testutils/txn_restart.go diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index aa0bc7e1dff7..13f387fb12be 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -114,7 +114,6 @@ go_test( "//pkg/keys", "//pkg/keyvisualizer", "//pkg/kv", - "//pkg/kv/kvpb", "//pkg/kv/kvserver", "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb", diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index b4f4e654611c..b82a38206a7f 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -33,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/keyvisualizer" - "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -2758,22 +2757,10 @@ func TestStartableJobTxnRetry(t *testing.T) { ctx := context.Background() const txnName = "create job" - haveInjectedRetry := false params := base.TestServerArgs{} + requestFilter, verifyFunc := testutils.TestingRequestFilterRetryTxnWithPrefix(t, txnName, 1) params.Knobs.Store = &kvserver.StoreTestingKnobs{ - TestingRequestFilter: func(ctx context.Context, r *kvpb.BatchRequest) *kvpb.Error { - if r.Txn == nil || r.Txn.Name != txnName { - return nil - } - if _, ok := r.GetArg(kvpb.EndTxn); ok { - if !haveInjectedRetry { - haveInjectedRetry = true - // Force a retry error the first time. - return kvpb.NewError(kvpb.NewTransactionRetryError(kvpb.RETRY_REASON_UNKNOWN, "injected error")) - } - } - return nil - }, + TestingRequestFilter: requestFilter, } s, _, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) @@ -2794,7 +2781,7 @@ func TestStartableJobTxnRetry(t *testing.T) { txn.KV().SetDebugName(txnName) return jr.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, rec) })) - require.True(t, haveInjectedRetry) + verifyFunc() require.NoError(t, sj.Start(ctx)) } diff --git a/pkg/sql/catalog/descs/BUILD.bazel b/pkg/sql/catalog/descs/BUILD.bazel index 58e46fc62ef0..659a405ba025 100644 --- a/pkg/sql/catalog/descs/BUILD.bazel +++ b/pkg/sql/catalog/descs/BUILD.bazel @@ -93,7 +93,6 @@ go_test( "//pkg/base", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvpb", "//pkg/kv/kvserver", "//pkg/security/securityassets", "//pkg/security/securitytest", @@ -123,6 +122,7 @@ go_test( "//pkg/sql/sqlliveness/sqllivenesstestutils", "//pkg/sql/tests", "//pkg/sql/types", + "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", diff --git a/pkg/sql/catalog/descs/collection_test.go b/pkg/sql/catalog/descs/collection_test.go index e21a05ec982a..ba2bc5f675ff 100644 --- a/pkg/sql/catalog/descs/collection_test.go +++ b/pkg/sql/catalog/descs/collection_test.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -46,6 +45,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -152,23 +152,11 @@ func TestTxnClearsCollectionOnRetry(t *testing.T) { ctx := context.Background() const txnName = "descriptor update" - haveInjectedRetry := false var serverArgs base.TestServerArgs params := base.TestClusterArgs{ServerArgs: serverArgs} + filterFunc, _ := testutils.TestingRequestFilterRetryTxnWithPrefix(t, txnName, 1) params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ - TestingRequestFilter: func(ctx context.Context, r *kvpb.BatchRequest) *kvpb.Error { - if r.Txn == nil || r.Txn.Name != txnName { - return nil - } - if _, ok := r.GetArg(kvpb.EndTxn); ok { - if !haveInjectedRetry { - haveInjectedRetry = true - // Force a retry error the first time. - return kvpb.NewError(kvpb.NewTransactionRetryError(kvpb.RETRY_REASON_UNKNOWN, "injected error")) - } - } - return nil - }, + TestingRequestFilter: filterFunc, } tc := testcluster.StartTestCluster(t, 1, params) defer tc.Stopper().Stop(ctx) diff --git a/pkg/testutils/BUILD.bazel b/pkg/testutils/BUILD.bazel index 9e3caaff30af..26c9b8dbdfb0 100644 --- a/pkg/testutils/BUILD.bazel +++ b/pkg/testutils/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "subtest.go", "tb.go", "trace.go", + "txn_restart.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/testutils", visibility = ["//visibility:public"], diff --git a/pkg/testutils/txn_restart.go b/pkg/testutils/txn_restart.go new file mode 100644 index 000000000000..85420e30326a --- /dev/null +++ b/pkg/testutils/txn_restart.go @@ -0,0 +1,70 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package testutils + +import ( + "context" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// TestingRequestFilterRetryTxnWithPrefix returns a TestingRequestFilter that +// forces maxCount retry errors for each transaction whose debug name +// begins with the given prefix. +// +// The second return value can be used to verify that at least one +// transaction was retried. +// +// Example: +// +// filterFunc, verifyFunc := testutils.TestingRequestFilterRetryTxnWithPrefix("ttljob-", 1) +// base.TestingKnobs{ +// Store: &kvserver.StoreTestingKnobs{ +// TestingRequestFilter: filterFunc, +// }, +// } +func TestingRequestFilterRetryTxnWithPrefix( + t testing.TB, prefix string, maxCount int, +) (func(ctx context.Context, r *kvpb.BatchRequest) *kvpb.Error, func()) { + txnTracker := struct { + syncutil.Mutex + retryCounts map[string]int + }{ + retryCounts: make(map[string]int), + } + verifyFunc := func() { + txnTracker.Lock() + defer txnTracker.Unlock() + if len(txnTracker.retryCounts) == 0 { + t.Errorf("expected at least 1 transaction to match prefix %q", prefix) + } + } + filterFunc := func(ctx context.Context, r *kvpb.BatchRequest) *kvpb.Error { + if r.Txn == nil || !strings.HasPrefix(r.Txn.Name, prefix) { + return nil + } + + if _, ok := r.GetArg(kvpb.EndTxn); ok { + txnTracker.Lock() + defer txnTracker.Unlock() + if txnTracker.retryCounts[r.Txn.Name] < maxCount { + txnTracker.retryCounts[r.Txn.Name]++ + return kvpb.NewError(kvpb.NewTransactionRetryError(kvpb.RETRY_REASON_UNKNOWN, "injected error")) + } + } + return nil + } + + return filterFunc, verifyFunc +} From 68ca3ec7fd7a1f2076536fede653e801532d373f Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 14 Jul 2023 16:02:36 +0200 Subject: [PATCH 3/5] spanconfigccl: remove uses of `TODOTestTenantDisabled` There's a mix of tests that control their tenants directly, and tests that should really work with virtualization enabled but don't. Followup issues: #106821 and #106818. Release note: None --- .../kvaccessor_test.go | 3 +-- .../datadriven_test.go | 4 +-- .../spanconfigsplitterccl/datadriven_test.go | 4 +-- .../datadriven_test.go | 5 ++-- .../sqlwatcher_test.go | 25 ++++++++----------- 5 files changed, 16 insertions(+), 25 deletions(-) diff --git a/pkg/ccl/spanconfigccl/spanconfigkvaccessorccl/kvaccessor_test.go b/pkg/ccl/spanconfigccl/spanconfigkvaccessorccl/kvaccessor_test.go index 7eac54478954..6d45200e95ae 100644 --- a/pkg/ccl/spanconfigccl/spanconfigkvaccessorccl/kvaccessor_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigkvaccessorccl/kvaccessor_test.go @@ -38,8 +38,7 @@ func TestCommitTSIntervals(t *testing.T) { var i interceptor ts, _, _ := serverutils.StartServer(t, base.TestServerArgs{ - // Manually starts a tenant below. No need to start one here. - DefaultTestTenant: base.TODOTestTenantDisabled, + DefaultTestTenant: base.TestControlsTenantsExplicitly, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ WallClock: manual, diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go index 6dc86e486a64..692355532506 100644 --- a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go @@ -99,9 +99,7 @@ func TestDataDriven(t *testing.T) { } tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ - // Test fails when run under the default test tenant. More - // investigation is required. - DefaultTestTenant: base.TODOTestTenantDisabled, + DefaultTestTenant: base.TestControlsTenantsExplicitly, Knobs: base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speeds up test SpanConfig: scKnobs, diff --git a/pkg/ccl/spanconfigccl/spanconfigsplitterccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigsplitterccl/datadriven_test.go index 4ebf55724dd2..09317a1016c9 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsplitterccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigsplitterccl/datadriven_test.go @@ -67,8 +67,8 @@ func TestDataDriven(t *testing.T) { datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ - // Fails with nil pointer dereference. Tracked with #76378. - DefaultTestTenant: base.TODOTestTenantDisabled, + // Fails with nil pointer dereference. Tracked with #76378 and #106818. + DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106818), Knobs: base.TestingKnobs{ SpanConfig: scKnobs, }, diff --git a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go index 9d5e807df58c..143720a6ea8f 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go @@ -109,9 +109,8 @@ func TestDataDriven(t *testing.T) { datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ - // Test fails when run within a tenant. More investigation - // is required. Tracked with #76378. - DefaultTestTenant: base.TODOTestTenantDisabled, + // Fails with nil pointer dereference. Tracked with #76378 and #106818. + DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106818), Knobs: base.TestingKnobs{ GCJob: gcTestingKnobs, SpanConfig: scKnobs, diff --git a/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/sqlwatcher_test.go b/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/sqlwatcher_test.go index d7022b0999cf..926c58ab5452 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/sqlwatcher_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/sqlwatcher_test.go @@ -60,9 +60,8 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) { defer dirCleanupFn() tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ - ExternalIODir: dir, - // Test already runs from a tenant. - DefaultTestTenant: base.TODOTestTenantDisabled, + ExternalIODir: dir, + DefaultTestTenant: base.TestControlsTenantsExplicitly, Knobs: base.TestingKnobs{ SpanConfig: &spanconfig.TestingKnobs{ ManagerDisableJobCreation: true, // disable the automatic job creation. @@ -288,8 +287,7 @@ func TestSQLWatcherMultiple(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ - // Test already runs from a tenant. - DefaultTestTenant: base.TODOTestTenantDisabled, + DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106821), Knobs: base.TestingKnobs{ SpanConfig: &spanconfig.TestingKnobs{ ManagerDisableJobCreation: true, // disable the automatic job creation. @@ -307,7 +305,7 @@ func TestSQLWatcherMultiple(t *testing.T) { noopCheckpointDuration := 100 * time.Millisecond sqlWatcher := spanconfigsqlwatcher.New( - keys.SystemSQLCodec, + ts.Codec(), ts.ClusterSettings(), ts.RangeFeedFactory().(*rangefeed.Factory), 1<<20, /* 1 MB, bufferMemLimit */ @@ -420,8 +418,7 @@ func TestSQLWatcherOnEventError(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ - // Test already runs from a tenant. - DefaultTestTenant: base.TODOTestTenantDisabled, + DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106821), Knobs: base.TestingKnobs{ SpanConfig: &spanconfig.TestingKnobs{ ManagerDisableJobCreation: true, // disable the automatic job creation. @@ -438,7 +435,7 @@ func TestSQLWatcherOnEventError(t *testing.T) { tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) sqlWatcher := spanconfigsqlwatcher.New( - keys.SystemSQLCodec, + ts.Codec(), ts.ClusterSettings(), ts.RangeFeedFactory().(*rangefeed.Factory), 1<<20, /* 1 MB, bufferMemLimit */ @@ -471,8 +468,7 @@ func TestSQLWatcherHandlerError(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ - // Test already runs from a tenant. - DefaultTestTenant: base.TODOTestTenantDisabled, + DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106821), Knobs: base.TestingKnobs{ SpanConfig: &spanconfig.TestingKnobs{ ManagerDisableJobCreation: true, // disable the automatic job creation. @@ -490,7 +486,7 @@ func TestSQLWatcherHandlerError(t *testing.T) { noopCheckpointDuration := 100 * time.Millisecond sqlWatcher := spanconfigsqlwatcher.New( - keys.SystemSQLCodec, + ts.Codec(), ts.ClusterSettings(), ts.RangeFeedFactory().(*rangefeed.Factory), 1<<20, /* 1 MB, bufferMemLimit */ @@ -549,8 +545,7 @@ func TestWatcherReceivesNoopCheckpoints(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ - // Test already runs from a tenant. - DefaultTestTenant: base.TODOTestTenantDisabled, + DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106821), Knobs: base.TestingKnobs{ SpanConfig: &spanconfig.TestingKnobs{ ManagerDisableJobCreation: true, // disable the automatic job creation. @@ -568,7 +563,7 @@ func TestWatcherReceivesNoopCheckpoints(t *testing.T) { noopCheckpointDuration := 25 * time.Millisecond sqlWatcher := spanconfigsqlwatcher.New( - keys.SystemSQLCodec, + ts.Codec(), ts.ClusterSettings(), ts.RangeFeedFactory().(*rangefeed.Factory), 1<<20, /* 1 MB, bufferMemLimit */ From ab581e58e376c3bd98843bcd7640ee80eeefddc9 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 13 Jul 2023 16:25:36 +0100 Subject: [PATCH 4/5] streamingccl: unskip TestStreamDeleteRange This test had previously timed out. The timeout we saw was the result of a couple of issues. When waiting for all delete ranges, our loop exit invariant was very strict. We would only stop looking for rows if the number of delete ranges was exactly 3. If, however, we got 4 delete ranges, with 2 coming in a single batch, we would never hit this condition. How would we hit that condition though? One possibility are rangefeed duplicates. Another, and what appears to have been happening in this test, is that the representation of the range deletes observed by the rangefeed consumer is slightly different depending on whether the range delete is delivered as part of a catch-up scan or as part of the rangefeeds steady state. I believe this is because the range deletes overlap but are issued at different time points. When we get them as part of the steady state, we get a trimmed version of the original event. When we get them as part of the catch-up scan, we get them broke up at the point of overlap. Fixes #93568 Epic: none Release note: None --- .../streamingccl/streamproducer/BUILD.bazel | 1 + .../streamproducer/replication_stream_test.go | 122 +++++++++++------- 2 files changed, 79 insertions(+), 44 deletions(-) diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel index ee72cd5a00e4..d7c868706420 100644 --- a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel @@ -111,6 +111,7 @@ go_test( "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", + "//pkg/util/span", "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/uuid", diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go index c28caae6d94a..1b2907aa41b6 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -13,7 +13,6 @@ import ( "fmt" "net/http" "net/http/httptest" - "sort" "strings" "testing" "time" @@ -44,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/jackc/pgx/v4" @@ -297,6 +297,16 @@ func TestReplicationStreamInitialization(t *testing.T) { }) } +func spansForTables(db *kv.DB, codec keys.SQLCodec, tables ...string) []roachpb.Span { + spans := make([]roachpb.Span, 0, len(tables)) + for _, table := range tables { + desc := desctestutils.TestingGetPublicTableDescriptor( + db, codec, "d", table) + spans = append(spans, desc.PrimaryIndexSpan(codec)) + } + return spans +} + func encodeSpec( t *testing.T, h *replicationtestutils.ReplicationHelper, @@ -305,13 +315,16 @@ func encodeSpec( previousReplicatedTime hlc.Timestamp, tables ...string, ) []byte { - var spans []roachpb.Span - for _, table := range tables { - desc := desctestutils.TestingGetPublicTableDescriptor( - h.SysServer.DB(), srcTenant.Codec, "d", table) - spans = append(spans, desc.PrimaryIndexSpan(srcTenant.Codec)) - } + spans := spansForTables(h.SysServer.DB(), srcTenant.Codec, tables...) + return encodeSpecForSpans(t, initialScanTime, previousReplicatedTime, spans) +} +func encodeSpecForSpans( + t *testing.T, + initialScanTime hlc.Timestamp, + previousReplicatedTime hlc.Timestamp, + spans []roachpb.Span, +) []byte { spec := &streampb.StreamPartitionSpec{ InitialScanTimestamp: initialScanTime, PreviousReplicatedTimestamp: previousReplicatedTime, @@ -632,23 +645,10 @@ func TestCompleteStreamReplication(t *testing.T) { } } -func sortDelRanges(receivedDelRanges []kvpb.RangeFeedDeleteRange) { - sort.Slice(receivedDelRanges, func(i, j int) bool { - if !receivedDelRanges[i].Timestamp.Equal(receivedDelRanges[j].Timestamp) { - return receivedDelRanges[i].Timestamp.Compare(receivedDelRanges[j].Timestamp) < 0 - } - if !receivedDelRanges[i].Span.Key.Equal(receivedDelRanges[j].Span.Key) { - return receivedDelRanges[i].Span.Key.Compare(receivedDelRanges[j].Span.Key) < 0 - } - return receivedDelRanges[i].Span.EndKey.Compare(receivedDelRanges[j].Span.EndKey) < 0 - }) -} - func TestStreamDeleteRange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 93568) skip.UnderStressRace(t, "disabled under stress and race") h, cleanup := replicationtestutils.NewReplicationHelper(t, base.TestServerArgs{ @@ -676,15 +676,46 @@ USE d; replicationProducerSpec := h.StartReplicationStream(t, testTenantName) streamID := replicationProducerSpec.StreamID initialScanTimestamp := replicationProducerSpec.ReplicationStartTime + streamResumeTimestamp := h.SysServer.Clock().Now() const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)` // Only subscribe to table t1 and t2, not t3. + // We start the stream at a resume timestamp to avoid any initial scan. + spans := spansForTables(h.SysServer.DB(), srcTenant.Codec, "t1", "t2") + spec := encodeSpecForSpans(t, initialScanTimestamp, streamResumeTimestamp, spans) + source, feed := startReplication(ctx, t, h, makePartitionStreamDecoder, - streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp, - hlc.Timestamp{}, "t1", "t2")) + streamPartitionQuery, streamID, spec) defer feed.Close(ctx) + codec := source.mu.codec.(*partitionStreamDecoder) + + // We wait for the frontier to advance because we want to + // ensure that we encounter the range deletes during the + // rangefeed's steady state rather than the catchup scan. + // + // The representation of the range deletes we send is slightly + // different if we encounter them during the catchup scan. + // + // NB: It is _still_ possible that we encounter the range + // deletes during a catchup scan if we hit a rangefeed restart + // during the test. + f, err := span.MakeFrontier(spans...) + require.NoError(t, err) + for f.Frontier().IsEmpty() { + t.Logf("waiting for frontier to advance to a non-zero timestamp") + source.mu.Lock() + source.mu.rows.Next() + source.mu.codec.decode() + if codec.e.Checkpoint != nil { + for _, rs := range codec.e.Checkpoint.ResolvedSpans { + _, err := f.Forward(rs.Span, rs.Timestamp) + require.NoError(t, err) + } + } + source.mu.Unlock() + } + t.Logf("frontier advanced to a %s", f.Frontier()) - // TODO(casper): Replace with DROP TABLE once drop table uses the MVCC-compatible DelRange t1Span, t2Span, t3Span := h.TableSpan(srcTenant.Codec, "t1"), h.TableSpan(srcTenant.Codec, "t2"), h.TableSpan(srcTenant.Codec, "t3") // Range deleted is outside the subscribed spans @@ -694,30 +725,31 @@ USE d; // Range is t1e - t2sn, emitting t2s - t2sn. require.NoError(t, h.SysServer.DB().DelRangeUsingTombstone(ctx, t1Span.EndKey, t2Span.Key.Next())) - // Expected DelRange spans after sorting. - expectedDelRangeSpan1 := roachpb.Span{Key: t1Span.Key, EndKey: t1Span.EndKey} - expectedDelRangeSpan2 := roachpb.Span{Key: t2Span.Key, EndKey: t2Span.EndKey} - expectedDelRangeSpan3 := roachpb.Span{Key: t2Span.Key, EndKey: t2Span.Key.Next()} + // Expected DelRange events. We store these and the received + // del ranges in maps to account for possible duplicate + // delivery. + expectedDelRanges := make(map[string]struct{}) + expectedDelRanges[roachpb.Span{Key: t1Span.Key, EndKey: t1Span.EndKey}.String()] = struct{}{} + expectedDelRanges[roachpb.Span{Key: t2Span.Key, EndKey: t2Span.EndKey}.String()] = struct{}{} + expectedDelRanges[roachpb.Span{Key: t2Span.Key, EndKey: t2Span.Key.Next()}.String()] = struct{}{} - codec := source.mu.codec.(*partitionStreamDecoder) - receivedDelRanges := make([]kvpb.RangeFeedDeleteRange, 0, 3) + receivedDelRanges := make(map[string]struct{}) for { source.mu.Lock() require.True(t, source.mu.rows.Next()) source.mu.codec.decode() if codec.e.Batch != nil { - receivedDelRanges = append(receivedDelRanges, codec.e.Batch.DelRanges...) + for _, dr := range codec.e.Batch.DelRanges { + receivedDelRanges[dr.Span.String()] = struct{}{} + } } source.mu.Unlock() - if len(receivedDelRanges) == 3 { + if len(receivedDelRanges) >= 3 { break } } - sortDelRanges(receivedDelRanges) - require.Equal(t, expectedDelRangeSpan1, receivedDelRanges[0].Span) - require.Equal(t, expectedDelRangeSpan2, receivedDelRanges[1].Span) - require.Equal(t, expectedDelRangeSpan3, receivedDelRanges[2].Span) + require.Equal(t, expectedDelRanges, receivedDelRanges) // Adding a SSTable that contains DeleteRange batchHLCTime := h.SysServer.Clock().Now() @@ -734,17 +766,19 @@ USE d; // Delete range for t3s - t3e, emitting nothing. storageutils.RangeKV(string(t3Span.Key), string(t3Span.EndKey), ts, ""), }) - expectedDelRange1 := kvpb.RangeFeedDeleteRange{Span: t1Span, Timestamp: batchHLCTime} - expectedDelRange2 := kvpb.RangeFeedDeleteRange{Span: t2Span, Timestamp: batchHLCTime} require.Equal(t, t1Span.Key, start) require.Equal(t, t3Span.EndKey, end) + expectedDelRanges = make(map[string]struct{}) + expectedDelRanges[t1Span.String()] = struct{}{} + expectedDelRanges[t2Span.String()] = struct{}{} + // Using same batch ts so that this SST can be emitted through rangefeed. - _, _, _, err := h.SysServer.DB().AddSSTableAtBatchTimestamp(ctx, start, end, data, false, + _, _, _, err = h.SysServer.DB().AddSSTableAtBatchTimestamp(ctx, start, end, data, false, false, hlc.Timestamp{}, nil, false, batchHLCTime) require.NoError(t, err) - receivedDelRanges = receivedDelRanges[:0] + receivedDelRanges = make(map[string]struct{}) receivedKVs := make([]roachpb.KeyValue, 0) for { source.mu.Lock() @@ -753,18 +787,18 @@ USE d; if codec.e.Batch != nil { require.Empty(t, codec.e.Batch.Ssts) receivedKVs = append(receivedKVs, codec.e.Batch.KeyValues...) - receivedDelRanges = append(receivedDelRanges, codec.e.Batch.DelRanges...) + for _, dr := range codec.e.Batch.DelRanges { + receivedDelRanges[dr.Span.String()] = struct{}{} + } } source.mu.Unlock() - if len(receivedDelRanges) == 2 && len(receivedKVs) == 1 { + if len(receivedDelRanges) >= 2 && len(receivedKVs) >= 1 { break } } - sortDelRanges(receivedDelRanges) require.Equal(t, t2Span.Key, receivedKVs[0].Key) require.Equal(t, batchHLCTime, receivedKVs[0].Value.Timestamp) - require.Equal(t, expectedDelRange1, receivedDelRanges[0]) - require.Equal(t, expectedDelRange2, receivedDelRanges[1]) + require.Equal(t, expectedDelRanges, receivedDelRanges) } From e2eb31e52e9f3ff18aa96e3ac5af198cef53af14 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 14 Jul 2023 17:39:49 +0200 Subject: [PATCH 5/5] server: bark loudly if the test tenant cannot be created For context, the automatic test tenant machinery is currently dependent on a CCL enterprise license check. (This is fundamentally not necessary - see #103772 - but sadly this is the way it is for now) Prior to this patch, if the user or a test selected the creation of a test tenant, but the test code forgot to import the required CCL go package, the framework would announce that "a test tenant was created" but it was actually silently failing to do so. This led to confusing investigations where a test tenant was expected, a test was appearing to succeed, but with a release build the same condition would fail. This commit enhances the situation by ensuring we have clear logging output when the test tenant cannot be created due to the missing CCL import. Release note: None --- pkg/server/BUILD.bazel | 1 + pkg/server/testserver.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index fd0f6f672dee..f6ac69d75461 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -306,6 +306,7 @@ go_library( "//pkg/util/log/eventpb", "//pkg/util/log/logcrash", "//pkg/util/log/logpb", + "//pkg/util/log/severity", "//pkg/util/metric", "//pkg/util/mon", "//pkg/util/netutil", diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 97f5d6ccfa2d..37f6a1c018ff 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -61,6 +61,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/severity" "github.com/cockroachdb/cockroach/pkg/util/metric" addrutil "github.com/cockroachdb/cockroach/pkg/util/netutil/addr" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -545,6 +546,7 @@ func (ts *TestServer) TestTenants() []serverutils.TestTenantInterface { func (ts *TestServer) maybeStartDefaultTestTenant(ctx context.Context) error { clusterID := ts.sqlServer.execCfg.NodeInfo.LogicalClusterID if err := base.CheckEnterpriseEnabled(ts.st, clusterID(), "SQL servers"); err != nil { + log.Shoutf(ctx, severity.ERROR, "test tenant requested by configuration, but code organization prevents start!\n%v", err) // If not enterprise enabled, we won't be able to use SQL Servers so eat // the error and return without creating/starting a SQL server. ts.cfg.DisableDefaultTestTenant = true