Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
106738: logic: skip_on_retry works when errors are expected r=Xiang-Gu a=Xiang-Gu

Previously, we have `skip_on_retry` directive for logic test which, when set, it skips the rest of test if a statement fails with TransactionRetryError. However, it won't skip if the statement is expected to fail with certain error message. This PR ensures that whenever we have a TransactionRetryError and `skip_on_retry` is set, we always skip the rest of the test, even if the stmt is expected to fail.

fixes #104464

Release note: None

106759: streamingccl: unskip TestStreamDeleteRange r=msbutler a=stevendanna

This test had previously timed out. The timeout we saw was the result of a couple of issues.

When waiting for all delete ranges, our loop exit condition was very strict. We would only stop looking for rows if the number of delete ranges was exactly 3. If, however, we got 4 delete ranges, with 2 coming in a single batch, we would never hit this condition.

How would that happen though? One possibility are rangefeed duplicates. Another, and what appears to have been happening in this test, is that the representation of the range deletes observed by the rangefeed consumer is slightly different depending on whether the range delete is delivered as part of a catch-up scan or as part of the rangefeeds steady state. I believe this is because the range deletes overlap but are issued at different time points.  When we get them as part of the steady state, we get a trimmed version of the original event. When we get them as part of the catch-up scan, we get them broke up at the point of overlap.

Fixes #93568

Epic: none

Release note: None

106814: testutils: add helper to target transactions for retries r=lidorcarmel a=stevendanna

This helper makes it a little quicker to write a test that tests whether a particular transaction is retry safe.

Informs #106417

Epic: none

Release note: none

106822: spanconfigccl: remove uses of `TODOTestTenantDisabled` r=stevendanna a=knz

Informs #76378 .
Epic: CRDB-18499

There's a mix of tests that control their tenants directly, and tests that should really work with virtualization enabled but don't.

Followup issues: #106821 and #106818.

Release note: None

106832: server: bark loudly if the test tenant cannot be created r=herkolategan a=knz

Informs #76378 
Informs #103772. 
Epic: CRDB-18499

For context, the automatic test tenant machinery is currently dependent on a CCL enterprise license check.
(This is fundamentally not necessary - see #103772 - but sadly this is the way it is for now)

Prior to this patch, if the user or a test selected the creation of a test tenant, but the test code forgot to import the required CCL go package, the framework would announce that "a test tenant was created" but it was actually silently failing to do so.

This led to confusing investigations where a test tenant was expected, a test was appearing to succeed, but with a release build the same condition would fail.

This commit enhances the situation by ensuring we have clear logging output when the test tenant cannot be created due to the missing CCL import.

Release note: None

Co-authored-by: Xiang Gu <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
4 people committed Jul 14, 2023
6 parents b30d31a + 00d78c7 + ab581e5 + a3ecaaa + 68ca3ec + e2eb31e commit b5e9cfd
Show file tree
Hide file tree
Showing 17 changed files with 181 additions and 103 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/schema_change_in_txn
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Skip the rest of the test if a retry occurs. They can happen and are fine
# but there's no way to encapsulate that in logictests.
skip_on_retry

# Backing up and restoring a descriptor will increment the version of the
# descriptor before restoring it so we cannot achieve the expected behaviour in
# this test.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func TestCommitTSIntervals(t *testing.T) {

var i interceptor
ts, _, _ := serverutils.StartServer(t, base.TestServerArgs{
// Manually starts a tenant below. No need to start one here.
DefaultTestTenant: base.TODOTestTenantDisabled,
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manual,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ func TestDataDriven(t *testing.T) {
}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Test fails when run under the default test tenant. More
// investigation is required.
DefaultTestTenant: base.TODOTestTenantDisabled,
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speeds up test
SpanConfig: scKnobs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func TestDataDriven(t *testing.T) {
datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Fails with nil pointer dereference. Tracked with #76378.
DefaultTestTenant: base.TODOTestTenantDisabled,
// Fails with nil pointer dereference. Tracked with #76378 and #106818.
DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106818),
Knobs: base.TestingKnobs{
SpanConfig: scKnobs,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,8 @@ func TestDataDriven(t *testing.T) {
datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Test fails when run within a tenant. More investigation
// is required. Tracked with #76378.
DefaultTestTenant: base.TODOTestTenantDisabled,
// Fails with nil pointer dereference. Tracked with #76378 and #106818.
DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106818),
Knobs: base.TestingKnobs{
GCJob: gcTestingKnobs,
SpanConfig: scKnobs,
Expand Down
25 changes: 10 additions & 15 deletions pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/sqlwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) {
defer dirCleanupFn()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
ExternalIODir: dir,
// Test already runs from a tenant.
DefaultTestTenant: base.TODOTestTenantDisabled,
ExternalIODir: dir,
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ManagerDisableJobCreation: true, // disable the automatic job creation.
Expand Down Expand Up @@ -288,8 +287,7 @@ func TestSQLWatcherMultiple(t *testing.T) {

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Test already runs from a tenant.
DefaultTestTenant: base.TODOTestTenantDisabled,
DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106821),
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ManagerDisableJobCreation: true, // disable the automatic job creation.
Expand All @@ -307,7 +305,7 @@ func TestSQLWatcherMultiple(t *testing.T) {

noopCheckpointDuration := 100 * time.Millisecond
sqlWatcher := spanconfigsqlwatcher.New(
keys.SystemSQLCodec,
ts.Codec(),
ts.ClusterSettings(),
ts.RangeFeedFactory().(*rangefeed.Factory),
1<<20, /* 1 MB, bufferMemLimit */
Expand Down Expand Up @@ -420,8 +418,7 @@ func TestSQLWatcherOnEventError(t *testing.T) {

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Test already runs from a tenant.
DefaultTestTenant: base.TODOTestTenantDisabled,
DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106821),
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ManagerDisableJobCreation: true, // disable the automatic job creation.
Expand All @@ -438,7 +435,7 @@ func TestSQLWatcherOnEventError(t *testing.T) {
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)

sqlWatcher := spanconfigsqlwatcher.New(
keys.SystemSQLCodec,
ts.Codec(),
ts.ClusterSettings(),
ts.RangeFeedFactory().(*rangefeed.Factory),
1<<20, /* 1 MB, bufferMemLimit */
Expand Down Expand Up @@ -471,8 +468,7 @@ func TestSQLWatcherHandlerError(t *testing.T) {

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Test already runs from a tenant.
DefaultTestTenant: base.TODOTestTenantDisabled,
DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106821),
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ManagerDisableJobCreation: true, // disable the automatic job creation.
Expand All @@ -490,7 +486,7 @@ func TestSQLWatcherHandlerError(t *testing.T) {

noopCheckpointDuration := 100 * time.Millisecond
sqlWatcher := spanconfigsqlwatcher.New(
keys.SystemSQLCodec,
ts.Codec(),
ts.ClusterSettings(),
ts.RangeFeedFactory().(*rangefeed.Factory),
1<<20, /* 1 MB, bufferMemLimit */
Expand Down Expand Up @@ -549,8 +545,7 @@ func TestWatcherReceivesNoopCheckpoints(t *testing.T) {

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Test already runs from a tenant.
DefaultTestTenant: base.TODOTestTenantDisabled,
DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(106821),
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ManagerDisableJobCreation: true, // disable the automatic job creation.
Expand All @@ -568,7 +563,7 @@ func TestWatcherReceivesNoopCheckpoints(t *testing.T) {

noopCheckpointDuration := 25 * time.Millisecond
sqlWatcher := spanconfigsqlwatcher.New(
keys.SystemSQLCodec,
ts.Codec(),
ts.ClusterSettings(),
ts.RangeFeedFactory().(*rangefeed.Factory),
1<<20, /* 1 MB, bufferMemLimit */
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/span",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
Expand Down
122 changes: 78 additions & 44 deletions pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -44,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/jackc/pgx/v4"
Expand Down Expand Up @@ -297,6 +297,16 @@ func TestReplicationStreamInitialization(t *testing.T) {
})
}

func spansForTables(db *kv.DB, codec keys.SQLCodec, tables ...string) []roachpb.Span {
spans := make([]roachpb.Span, 0, len(tables))
for _, table := range tables {
desc := desctestutils.TestingGetPublicTableDescriptor(
db, codec, "d", table)
spans = append(spans, desc.PrimaryIndexSpan(codec))
}
return spans
}

func encodeSpec(
t *testing.T,
h *replicationtestutils.ReplicationHelper,
Expand All @@ -305,13 +315,16 @@ func encodeSpec(
previousReplicatedTime hlc.Timestamp,
tables ...string,
) []byte {
var spans []roachpb.Span
for _, table := range tables {
desc := desctestutils.TestingGetPublicTableDescriptor(
h.SysServer.DB(), srcTenant.Codec, "d", table)
spans = append(spans, desc.PrimaryIndexSpan(srcTenant.Codec))
}
spans := spansForTables(h.SysServer.DB(), srcTenant.Codec, tables...)
return encodeSpecForSpans(t, initialScanTime, previousReplicatedTime, spans)
}

func encodeSpecForSpans(
t *testing.T,
initialScanTime hlc.Timestamp,
previousReplicatedTime hlc.Timestamp,
spans []roachpb.Span,
) []byte {
spec := &streampb.StreamPartitionSpec{
InitialScanTimestamp: initialScanTime,
PreviousReplicatedTimestamp: previousReplicatedTime,
Expand Down Expand Up @@ -632,23 +645,10 @@ func TestCompleteStreamReplication(t *testing.T) {
}
}

func sortDelRanges(receivedDelRanges []kvpb.RangeFeedDeleteRange) {
sort.Slice(receivedDelRanges, func(i, j int) bool {
if !receivedDelRanges[i].Timestamp.Equal(receivedDelRanges[j].Timestamp) {
return receivedDelRanges[i].Timestamp.Compare(receivedDelRanges[j].Timestamp) < 0
}
if !receivedDelRanges[i].Span.Key.Equal(receivedDelRanges[j].Span.Key) {
return receivedDelRanges[i].Span.Key.Compare(receivedDelRanges[j].Span.Key) < 0
}
return receivedDelRanges[i].Span.EndKey.Compare(receivedDelRanges[j].Span.EndKey) < 0
})
}

func TestStreamDeleteRange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 93568)
skip.UnderStressRace(t, "disabled under stress and race")

h, cleanup := replicationtestutils.NewReplicationHelper(t, base.TestServerArgs{
Expand Down Expand Up @@ -676,15 +676,46 @@ USE d;
replicationProducerSpec := h.StartReplicationStream(t, testTenantName)
streamID := replicationProducerSpec.StreamID
initialScanTimestamp := replicationProducerSpec.ReplicationStartTime
streamResumeTimestamp := h.SysServer.Clock().Now()

const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)`
// Only subscribe to table t1 and t2, not t3.
// We start the stream at a resume timestamp to avoid any initial scan.
spans := spansForTables(h.SysServer.DB(), srcTenant.Codec, "t1", "t2")
spec := encodeSpecForSpans(t, initialScanTimestamp, streamResumeTimestamp, spans)

source, feed := startReplication(ctx, t, h, makePartitionStreamDecoder,
streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp,
hlc.Timestamp{}, "t1", "t2"))
streamPartitionQuery, streamID, spec)
defer feed.Close(ctx)
codec := source.mu.codec.(*partitionStreamDecoder)

// We wait for the frontier to advance because we want to
// ensure that we encounter the range deletes during the
// rangefeed's steady state rather than the catchup scan.
//
// The representation of the range deletes we send is slightly
// different if we encounter them during the catchup scan.
//
// NB: It is _still_ possible that we encounter the range
// deletes during a catchup scan if we hit a rangefeed restart
// during the test.
f, err := span.MakeFrontier(spans...)
require.NoError(t, err)
for f.Frontier().IsEmpty() {
t.Logf("waiting for frontier to advance to a non-zero timestamp")
source.mu.Lock()
source.mu.rows.Next()
source.mu.codec.decode()
if codec.e.Checkpoint != nil {
for _, rs := range codec.e.Checkpoint.ResolvedSpans {
_, err := f.Forward(rs.Span, rs.Timestamp)
require.NoError(t, err)
}
}
source.mu.Unlock()
}
t.Logf("frontier advanced to a %s", f.Frontier())

// TODO(casper): Replace with DROP TABLE once drop table uses the MVCC-compatible DelRange
t1Span, t2Span, t3Span := h.TableSpan(srcTenant.Codec, "t1"),
h.TableSpan(srcTenant.Codec, "t2"), h.TableSpan(srcTenant.Codec, "t3")
// Range deleted is outside the subscribed spans
Expand All @@ -694,30 +725,31 @@ USE d;
// Range is t1e - t2sn, emitting t2s - t2sn.
require.NoError(t, h.SysServer.DB().DelRangeUsingTombstone(ctx, t1Span.EndKey, t2Span.Key.Next()))

// Expected DelRange spans after sorting.
expectedDelRangeSpan1 := roachpb.Span{Key: t1Span.Key, EndKey: t1Span.EndKey}
expectedDelRangeSpan2 := roachpb.Span{Key: t2Span.Key, EndKey: t2Span.EndKey}
expectedDelRangeSpan3 := roachpb.Span{Key: t2Span.Key, EndKey: t2Span.Key.Next()}
// Expected DelRange events. We store these and the received
// del ranges in maps to account for possible duplicate
// delivery.
expectedDelRanges := make(map[string]struct{})
expectedDelRanges[roachpb.Span{Key: t1Span.Key, EndKey: t1Span.EndKey}.String()] = struct{}{}
expectedDelRanges[roachpb.Span{Key: t2Span.Key, EndKey: t2Span.EndKey}.String()] = struct{}{}
expectedDelRanges[roachpb.Span{Key: t2Span.Key, EndKey: t2Span.Key.Next()}.String()] = struct{}{}

codec := source.mu.codec.(*partitionStreamDecoder)
receivedDelRanges := make([]kvpb.RangeFeedDeleteRange, 0, 3)
receivedDelRanges := make(map[string]struct{})
for {
source.mu.Lock()
require.True(t, source.mu.rows.Next())
source.mu.codec.decode()
if codec.e.Batch != nil {
receivedDelRanges = append(receivedDelRanges, codec.e.Batch.DelRanges...)
for _, dr := range codec.e.Batch.DelRanges {
receivedDelRanges[dr.Span.String()] = struct{}{}
}
}
source.mu.Unlock()
if len(receivedDelRanges) == 3 {
if len(receivedDelRanges) >= 3 {
break
}
}

sortDelRanges(receivedDelRanges)
require.Equal(t, expectedDelRangeSpan1, receivedDelRanges[0].Span)
require.Equal(t, expectedDelRangeSpan2, receivedDelRanges[1].Span)
require.Equal(t, expectedDelRangeSpan3, receivedDelRanges[2].Span)
require.Equal(t, expectedDelRanges, receivedDelRanges)

// Adding a SSTable that contains DeleteRange
batchHLCTime := h.SysServer.Clock().Now()
Expand All @@ -734,17 +766,19 @@ USE d;
// Delete range for t3s - t3e, emitting nothing.
storageutils.RangeKV(string(t3Span.Key), string(t3Span.EndKey), ts, ""),
})
expectedDelRange1 := kvpb.RangeFeedDeleteRange{Span: t1Span, Timestamp: batchHLCTime}
expectedDelRange2 := kvpb.RangeFeedDeleteRange{Span: t2Span, Timestamp: batchHLCTime}
require.Equal(t, t1Span.Key, start)
require.Equal(t, t3Span.EndKey, end)

expectedDelRanges = make(map[string]struct{})
expectedDelRanges[t1Span.String()] = struct{}{}
expectedDelRanges[t2Span.String()] = struct{}{}

// Using same batch ts so that this SST can be emitted through rangefeed.
_, _, _, err := h.SysServer.DB().AddSSTableAtBatchTimestamp(ctx, start, end, data, false,
_, _, _, err = h.SysServer.DB().AddSSTableAtBatchTimestamp(ctx, start, end, data, false,
false, hlc.Timestamp{}, nil, false, batchHLCTime)
require.NoError(t, err)

receivedDelRanges = receivedDelRanges[:0]
receivedDelRanges = make(map[string]struct{})
receivedKVs := make([]roachpb.KeyValue, 0)
for {
source.mu.Lock()
Expand All @@ -753,18 +787,18 @@ USE d;
if codec.e.Batch != nil {
require.Empty(t, codec.e.Batch.Ssts)
receivedKVs = append(receivedKVs, codec.e.Batch.KeyValues...)
receivedDelRanges = append(receivedDelRanges, codec.e.Batch.DelRanges...)
for _, dr := range codec.e.Batch.DelRanges {
receivedDelRanges[dr.Span.String()] = struct{}{}
}
}
source.mu.Unlock()

if len(receivedDelRanges) == 2 && len(receivedKVs) == 1 {
if len(receivedDelRanges) >= 2 && len(receivedKVs) >= 1 {
break
}
}

sortDelRanges(receivedDelRanges)
require.Equal(t, t2Span.Key, receivedKVs[0].Key)
require.Equal(t, batchHLCTime, receivedKVs[0].Value.Timestamp)
require.Equal(t, expectedDelRange1, receivedDelRanges[0])
require.Equal(t, expectedDelRange2, receivedDelRanges[1])
require.Equal(t, expectedDelRanges, receivedDelRanges)
}
1 change: 0 additions & 1 deletion pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ go_test(
"//pkg/keys",
"//pkg/keyvisualizer",
"//pkg/kv",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
Expand Down
Loading

0 comments on commit b5e9cfd

Please sign in to comment.