diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index 06c2ea91ffb2..ce3c6de28f3f 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -462,8 +462,8 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) { func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 105956) + skip.UnderDeadlock(t, "multi-node may time out under deadlock") skip.UnderRace(t, "takes too long with multiple nodes") ctx := context.Background() @@ -510,11 +510,6 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { defer alternateSrcTenantConn.Close() alternateSrcTenantSQL := sqlutils.MakeSQLRunner(alternateSrcTenantConn) - cleanUpTenant := c.StartDestTenant(ctx) - defer func() { - require.NoError(t, cleanUpTenant()) - }() - alternateCompareResult := func(query string) { sourceData := alternateSrcTenantSQL.QueryStr(c.T, query) destData := c.DestTenantSQL.QueryStr(c.T, query) @@ -537,6 +532,11 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { require.Equal(c.T, cutoverTime, cutoverOutput.GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) + cleanUpTenant := c.StartDestTenant(ctx) + defer func() { + require.NoError(t, cleanUpTenant()) + }() + // The destroyed address should have been removed from the topology progress = jobutils.GetJobProgress(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) newStreamAddresses := progress.GetStreamIngest().StreamAddresses @@ -652,6 +652,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderDeadlock(t, "multi-node may time out under deadlock") skip.UnderRace(t, "takes too long with multiple nodes") ctx := context.Background()