Skip to content

Commit

Permalink
Merge pull request #102642 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1-102593

release-23.1: c2c: skip and deflake a few unit tests
  • Loading branch information
msbutler authored May 3, 2023
2 parents c44b4bd + 39d36a8 commit fbea48d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ func TestTenantStreamingDeleteRange(t *testing.T) {
func TestTenantStreamingMultipleNodes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 86206)

skip.UnderRace(t, "takes too long with multiple nodes")

Expand Down
16 changes: 9 additions & 7 deletions pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (f *pgConnReplicationFeedSource) Error() error {

// startReplication starts replication stream, specified as query and its args.
func startReplication(
ctx context.Context,
t *testing.T,
r *replicationtestutils.ReplicationHelper,
codecFactory eventDecoderFactory,
Expand All @@ -173,7 +174,7 @@ func startReplication(
pgxConfig, err := pgx.ParseConfig(sink.String())
require.NoError(t, err)

queryCtx, cancel := context.WithCancel(context.Background())
queryCtx, cancel := context.WithCancel(ctx)
conn, err := pgx.ConnectConfig(queryCtx, pgxConfig)
require.NoError(t, err)

Expand Down Expand Up @@ -357,7 +358,8 @@ USE d;
t2Descr := desctestutils.TestingGetPublicTableDescriptor(h.SysServer.DB(), srcTenant.Codec, "d", "t2")

t.Run("stream-table-cursor-error", func(t *testing.T) {
_, feed := startReplication(t, h, makePartitionStreamDecoder,
skip.WithIssue(t, 102286)
_, feed := startReplication(ctx, t, h, makePartitionStreamDecoder,
streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp, hlc.Timestamp{}, "t2"))
defer feed.Close(ctx)

Expand All @@ -384,7 +386,7 @@ USE d;
})

t.Run("stream-table", func(t *testing.T) {
_, feed := startReplication(t, h, makePartitionStreamDecoder,
_, feed := startReplication(ctx, t, h, makePartitionStreamDecoder,
streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp,
hlc.Timestamp{}, "t1"))
defer feed.Close(ctx)
Expand Down Expand Up @@ -414,7 +416,7 @@ USE d;
srcTenant.SQL.Exec(t, `UPDATE d.t1 SET a = 'привет' WHERE i = 42`)
srcTenant.SQL.Exec(t, `UPDATE d.t1 SET b = 'мир' WHERE i = 42`)

_, feed := startReplication(t, h, makePartitionStreamDecoder,
_, feed := startReplication(ctx, t, h, makePartitionStreamDecoder,
streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp,
beforeUpdateTS, "t1"))
defer feed.Close(ctx)
Expand Down Expand Up @@ -451,7 +453,7 @@ CREATE TABLE t3(
// Add few rows.
addRows(0, 10)

source, feed := startReplication(t, h, makePartitionStreamDecoder,
source, feed := startReplication(ctx, t, h, makePartitionStreamDecoder,
streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp,
hlc.Timestamp{}, "t1"))
defer feed.Close(ctx)
Expand Down Expand Up @@ -526,7 +528,7 @@ USE d;
if addSSTableBeforeRangefeed {
srcTenant.SQL.Exec(t, fmt.Sprintf("IMPORT INTO %s CSV DATA ($1)", table), dataSrv.URL)
}
source, feed := startReplication(t, h, makePartitionStreamDecoder,
source, feed := startReplication(ctx, t, h, makePartitionStreamDecoder,
streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp,
previousHighWater, table))
defer feed.Close(ctx)
Expand Down Expand Up @@ -679,7 +681,7 @@ USE d;

const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)`
// Only subscribe to table t1 and t2, not t3.
source, feed := startReplication(t, h, makePartitionStreamDecoder,
source, feed := startReplication(ctx, t, h, makePartitionStreamDecoder,
streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp,
hlc.Timestamp{}, "t1", "t2"))
defer feed.Close(ctx)
Expand Down

0 comments on commit fbea48d

Please sign in to comment.