Skip to content

Commit

Permalink
c2c: remove TestTenantStreamingUnavailableAddress
Browse files Browse the repository at this point in the history
All this test does is flake and essentially tests that dsp.PartitionsSpans()
excludes the shutdown node. We already have coverage for this in our roachtest
-- if dsp.PartitionsSpans() did include a shutdown node, our shutdown
tests would never complete.

Fixes cockroachdb#112917

Release note: none
  • Loading branch information
msbutler committed Oct 25, 2023
1 parent 447f59d commit a8333a5
Showing 1 changed file with 0 additions and 80 deletions.
80 changes: 0 additions & 80 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,86 +477,6 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) {
})
}

// TestTenantStreamingUnavailableStreamAddress verifies that after a
// pause/resume (replan) we will not use a dead server as a source.
func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderDeadlock(t, "multi-node may time out under deadlock")
skip.UnderRace(t, "takes too long with multiple nodes")
skip.UnderStress(t, "multi node test times out under stress")

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.MultitenantSingleClusterNumNodes = 4

c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
defer cleanup()

replicationtestutils.CreateScatteredTable(t, c, 4)
srcScatteredData := c.SrcTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key")

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

srcTime := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))

c.DestSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID)
jobutils.WaitForJobToPause(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

// We should've persisted the original topology
progress := jobutils.GetJobProgress(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
streamAddresses := progress.GetStreamIngest().StreamAddresses
require.Greater(t, len(streamAddresses), 1)

// Write something to the source cluster, note that the job is paused - and
// therefore not replicated for now.
c.SrcTenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)")
c.SrcTenantSQL.Exec(t, `INSERT INTO d.x VALUES (3);`)

// Stop a server on the source cluster. Note that in this test we are trying
// to avoid using the source cluster after this point because if we do the
// test flakes, see #107499 for more info.
destroyedAddress := c.SrcURL.String()
require.NoError(t, c.SrcTenantConn.Close())
c.SrcTenantServer.AppStopper().Stop(ctx)
c.SrcCluster.StopServer(0)

// Switch the SQL connection to a new node, as node 0 has shutdown-- recall that
// the source and destination tenant are on the same cluster.
c.DestSysSQL = sqlutils.MakeSQLRunner(c.DestCluster.Conns[1])
c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID)
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

cutoverTime := c.SrcCluster.Server(1).Clock().Now().GoTime()
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(c.T, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

cleanUpTenant := c.StartDestTenant(ctx, nil, 1)
defer func() {
require.NoError(t, cleanUpTenant())
}()

// The destroyed address should have been removed from the topology.
progress = jobutils.GetJobProgress(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
newStreamAddresses := progress.GetStreamIngest().StreamAddresses
require.Contains(t, streamAddresses, destroyedAddress)
require.NotContains(t, newStreamAddresses, destroyedAddress)

// Verify the destination tenant is fully replicated.
destData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.x")
require.Equal(c.T, [][]string{{"3", "NULL"}}, destData)
dstScatteredData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key")
require.Equal(t, srcScatteredData, dstScatteredData)
}

func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit a8333a5

Please sign in to comment.