Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

c2c: remove TestTenantStreamingUnavailableAddress #113038

Merged
merged 1 commit into from
Oct 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 0 additions & 80 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,86 +477,6 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) {
})
}

// TestTenantStreamingUnavailableStreamAddress verifies that after a
// pause/resume (replan) we will not use a dead server as a source.
func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderDeadlock(t, "multi-node may time out under deadlock")
skip.UnderRace(t, "takes too long with multiple nodes")
skip.UnderStress(t, "multi node test times out under stress")

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.MultitenantSingleClusterNumNodes = 4

c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
defer cleanup()

replicationtestutils.CreateScatteredTable(t, c, 4)
srcScatteredData := c.SrcTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key")

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

srcTime := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))

c.DestSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID)
jobutils.WaitForJobToPause(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

// We should've persisted the original topology
progress := jobutils.GetJobProgress(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
streamAddresses := progress.GetStreamIngest().StreamAddresses
require.Greater(t, len(streamAddresses), 1)

// Write something to the source cluster, note that the job is paused - and
// therefore not replicated for now.
c.SrcTenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)")
c.SrcTenantSQL.Exec(t, `INSERT INTO d.x VALUES (3);`)

// Stop a server on the source cluster. Note that in this test we are trying
// to avoid using the source cluster after this point because if we do the
// test flakes, see #107499 for more info.
destroyedAddress := c.SrcURL.String()
require.NoError(t, c.SrcTenantConn.Close())
c.SrcTenantServer.AppStopper().Stop(ctx)
c.SrcCluster.StopServer(0)

// Switch the SQL connection to a new node, as node 0 has shutdown-- recall that
// the source and destination tenant are on the same cluster.
c.DestSysSQL = sqlutils.MakeSQLRunner(c.DestCluster.Conns[1])
c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID)
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

cutoverTime := c.SrcCluster.Server(1).Clock().Now().GoTime()
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(c.T, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

cleanUpTenant := c.StartDestTenant(ctx, nil, 1)
defer func() {
require.NoError(t, cleanUpTenant())
}()

// The destroyed address should have been removed from the topology.
progress = jobutils.GetJobProgress(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
newStreamAddresses := progress.GetStreamIngest().StreamAddresses
require.Contains(t, streamAddresses, destroyedAddress)
require.NotContains(t, newStreamAddresses, destroyedAddress)

// Verify the destination tenant is fully replicated.
destData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.x")
require.Equal(c.T, [][]string{{"3", "NULL"}}, destData)
dstScatteredData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key")
require.Equal(t, srcScatteredData, dstScatteredData)
}

func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down