diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index 6b875c10bc09..e66ff901df9f 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -477,86 +477,6 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) { }) } -// TestTenantStreamingUnavailableStreamAddress verifies that after a -// pause/resume (replan) we will not use a dead server as a source. -func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.UnderDeadlock(t, "multi-node may time out under deadlock") - skip.UnderRace(t, "takes too long with multiple nodes") - skip.UnderStress(t, "multi node test times out under stress") - - ctx := context.Background() - args := replicationtestutils.DefaultTenantStreamingClustersArgs - args.MultitenantSingleClusterNumNodes = 4 - - c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args) - defer cleanup() - - replicationtestutils.CreateScatteredTable(t, c, 4) - srcScatteredData := c.SrcTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key") - - producerJobID, ingestionJobID := c.StartStreamReplication(ctx) - jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID)) - jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - - srcTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) - - c.DestSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID) - jobutils.WaitForJobToPause(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - - // We should've persisted the original topology - progress := jobutils.GetJobProgress(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - streamAddresses := progress.GetStreamIngest().StreamAddresses - require.Greater(t, len(streamAddresses), 1) - - // Write something to the source cluster, note that the job is paused - and - // therefore not replicated for now. - c.SrcTenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)") - c.SrcTenantSQL.Exec(t, `INSERT INTO d.x VALUES (3);`) - - // Stop a server on the source cluster. Note that in this test we are trying - // to avoid using the source cluster after this point because if we do the - // test flakes, see #107499 for more info. - destroyedAddress := c.SrcURL.String() - require.NoError(t, c.SrcTenantConn.Close()) - c.SrcTenantServer.AppStopper().Stop(ctx) - c.SrcCluster.StopServer(0) - - // Switch the SQL connection to a new node, as node 0 has shutdown-- recall that - // the source and destination tenant are on the same cluster. - c.DestSysSQL = sqlutils.MakeSQLRunner(c.DestCluster.Conns[1]) - c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID) - jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - - cutoverTime := c.SrcCluster.Server(1).Clock().Now().GoTime() - var cutoverStr string - c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, - c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr) - cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) - require.Equal(c.T, cutoverTime, cutoverOutput.GoTime()) - jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - - cleanUpTenant := c.StartDestTenant(ctx, nil, 1) - defer func() { - require.NoError(t, cleanUpTenant()) - }() - - // The destroyed address should have been removed from the topology. - progress = jobutils.GetJobProgress(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - newStreamAddresses := progress.GetStreamIngest().StreamAddresses - require.Contains(t, streamAddresses, destroyedAddress) - require.NotContains(t, newStreamAddresses, destroyedAddress) - - // Verify the destination tenant is fully replicated. - destData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.x") - require.Equal(c.T, [][]string{{"3", "NULL"}}, destData) - dstScatteredData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key") - require.Equal(t, srcScatteredData, dstScatteredData) -} - func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)