Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
112992: kvclient: draining not started SQL r=JeffSwenson a=andrewbaptist

Previously a drain would assume the SQL instance had been started on a node prior to draining. This would result in a failure if the node attempting to drain had never started SQL.

Epic: none

Release note: None

112995: backupccl: deflake TestCleanupIntentsDuringBackupPerformanceRegression r=msbutler a=miraradeva

The test was counting batches of pushes for the entire duration of the test, not just the back itself. This patch resets the counters to do a more targeted assertion.

Fixes: #112812

Release note: None

113038: c2c: remove TestTenantStreamingUnavailableAddress r=stevendanna a=msbutler

All this test does is flake and essentially tests that dsp.PartitionsSpans() excludes the shutdown node. We already have coverage for this in our roachtest -- if dsp.PartitionsSpans() did include a shutdown node, our shutdown tests would never complete.

Fixes #112917

Release note: none

Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: Mira Radeva <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
4 people committed Oct 25, 2023
4 parents e7a8669 + d667f86 + 7147c1f + a8333a5 commit fb63e6c
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 92 deletions.
18 changes: 12 additions & 6 deletions pkg/ccl/backupccl/backup_intents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,14 @@ func TestCleanupIntentsDuringBackupPerformanceRegression(t *testing.T) {
}
}

// Reset the counters to avoid counting pushes and intent resolutions not
// part of the backup.
numIntentResolveBatches.Store(0)
numPushBatches.Store(0)

_, err = sqlDb.Exec("backup table foo to 'userfile:///test.foo'")
require.NoError(t, err, "Failed to run backup")

if !abort {
for _, tx := range transactions {
require.NoError(t, tx.Commit())
}
}

// We expect each group of 10 intents to take 2 intent resolution batches:
// - One intent gets discovered and added to the lock table, which forces the
// abandoned txn to be pushed and added to the txnStatusCache.
Expand All @@ -120,5 +119,12 @@ func TestCleanupIntentsDuringBackupPerformanceRegression(t *testing.T) {
// Each of the 1,000 transactions is expected to get pushed once, but in an
// actual run of the test we might see more pushes (e.g. of other transactions).
require.GreaterOrEqual(t, 1100, int(numPushBatches.Load()))

if !abort {
for _, tx := range transactions {
// Ensure the long-running transactions can commit.
require.NoError(t, tx.Commit())
}
}
})
}
80 changes: 0 additions & 80 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,86 +477,6 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) {
})
}

// TestTenantStreamingUnavailableStreamAddress verifies that after a
// pause/resume (replan) we will not use a dead server as a source.
func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderDeadlock(t, "multi-node may time out under deadlock")
skip.UnderRace(t, "takes too long with multiple nodes")
skip.UnderStress(t, "multi node test times out under stress")

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.MultitenantSingleClusterNumNodes = 4

c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
defer cleanup()

replicationtestutils.CreateScatteredTable(t, c, 4)
srcScatteredData := c.SrcTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key")

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

srcTime := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID))

c.DestSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID)
jobutils.WaitForJobToPause(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

// We should've persisted the original topology
progress := jobutils.GetJobProgress(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
streamAddresses := progress.GetStreamIngest().StreamAddresses
require.Greater(t, len(streamAddresses), 1)

// Write something to the source cluster, note that the job is paused - and
// therefore not replicated for now.
c.SrcTenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)")
c.SrcTenantSQL.Exec(t, `INSERT INTO d.x VALUES (3);`)

// Stop a server on the source cluster. Note that in this test we are trying
// to avoid using the source cluster after this point because if we do the
// test flakes, see #107499 for more info.
destroyedAddress := c.SrcURL.String()
require.NoError(t, c.SrcTenantConn.Close())
c.SrcTenantServer.AppStopper().Stop(ctx)
c.SrcCluster.StopServer(0)

// Switch the SQL connection to a new node, as node 0 has shutdown-- recall that
// the source and destination tenant are on the same cluster.
c.DestSysSQL = sqlutils.MakeSQLRunner(c.DestCluster.Conns[1])
c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID)
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

cutoverTime := c.SrcCluster.Server(1).Clock().Now().GoTime()
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(c.T, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

cleanUpTenant := c.StartDestTenant(ctx, nil, 1)
defer func() {
require.NoError(t, cleanUpTenant())
}()

// The destroyed address should have been removed from the topology.
progress = jobutils.GetJobProgress(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
newStreamAddresses := progress.GetStreamIngest().StreamAddresses
require.Contains(t, streamAddresses, destroyedAddress)
require.NotContains(t, newStreamAddresses, destroyedAddress)

// Verify the destination tenant is fully replicated.
destData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.x")
require.Equal(c.T, [][]string{{"3", "NULL"}}, destData)
dstScatteredData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key")
require.Equal(t, srcScatteredData, dstScatteredData)
}

func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
12 changes: 7 additions & 5 deletions pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,13 @@ func (s *drainServer) drainClients(
if err != nil {
return err
}

instanceID := s.sqlServer.sqlIDContainer.SQLInstanceID()
err = s.sqlServer.sqlInstanceStorage.ReleaseInstance(ctx, session, instanceID)
if err != nil {
return err
// If we started a sql session on this node.
if session != "" {
instanceID := s.sqlServer.sqlIDContainer.SQLInstanceID()
err = s.sqlServer.sqlInstanceStorage.ReleaseInstance(ctx, session, instanceID)
if err != nil {
return err
}
}

// Mark the node as fully drained.
Expand Down
24 changes: 24 additions & 0 deletions pkg/server/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,27 @@ func TestServerShutdownReleasesSession(t *testing.T) {
require.False(t, sessionExists(*session), "expected session %s to be deleted from the sqlliveness table, but it still exists", *session)
require.Nil(t, queryOwner(tmpSQLInstance), "expected sql_instance %d to have no owning session_id", tmpSQLInstance)
}

// Verify that drain works correctly even if we don't start the sql instance.
func TestNoSQLServer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 2,
base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
DisableSQLServer: true,
},
})

defer tc.Stopper().Stop(ctx)
req := serverpb.DrainRequest{Shutdown: false, DoDrain: true, NodeId: "2"}
drainStream, err := tc.Server(0).ApplicationLayer().GetAdminClient(t).Drain(ctx, &req)
require.NoError(t, err)
// When we get this next response the drain has started - check the error.
drainResp, err := drainStream.Recv()
require.NoError(t, err)
require.True(t, drainResp.IsDraining)
}
2 changes: 1 addition & 1 deletion pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (l *Instance) Release(ctx context.Context) (sqlliveness.SessionID, error) {
}()

if session == nil {
return sqlliveness.SessionID(""), errors.New("no session to release")
return "", nil
}

if err := l.storage.Delete(ctx, session.ID()); err != nil {
Expand Down

0 comments on commit fb63e6c

Please sign in to comment.