Skip to content

Commit

Permalink
server: clean up sql session and instance on shutdown
Browse files Browse the repository at this point in the history
Remove the sql server's system.sql_instance and sqlliveness.sqlliveness
rows on sql server shut down. The original motivation for this change is
it improves the behavior of DistSQL for serverless tenants. SQL servers
would attempt to schedule DistSQL flows on sql_instance rows that
belonged to shutdown servers.

Cleaning up the session and instance have a hand full of small benefits.
- The sql_instances pre-allocated in a region are less likely to run out
  and trigger the slow cold start path.
- Resources leased by the session, like jobs, may be re-claimed more
  quickly after the server shuts down.

Fixes: CC-9095

Release note: none
  • Loading branch information
jeffswenson committed Apr 3, 2023
1 parent cc9e0c6 commit ab2a286
Show file tree
Hide file tree
Showing 12 changed files with 325 additions and 182 deletions.
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ go_test(
"//pkg/base",
"//pkg/base/serverident",
"//pkg/build",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/cli/exit",
"//pkg/clusterversion",
"//pkg/config",
Expand Down
12 changes: 10 additions & 2 deletions pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,16 @@ func (s *drainServer) drainClients(
// errors/warnings, if any.
log.Infof(ctx, "SQL server drained successfully; SQL queries cannot execute any more")

// FIXME(Jeff): Add code here to remove the sql_instances row or
// something similar.
session, err := s.sqlServer.sqlLivenessProvider.Release(ctx)
if err != nil {
return err
}

instanceID := s.sqlServer.sqlIDContainer.SQLInstanceID()
err = s.sqlServer.sqlInstanceStorage.ReleaseInstance(ctx, session, instanceID)
if err != nil {
return err
}

// Mark the node as fully drained.
s.sqlServer.gracefulDrainComplete.Set(true)
Expand Down
46 changes: 46 additions & 0 deletions pkg/server/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -307,3 +308,48 @@ func getAdminClientForServer(
_ = conn.Close() // nolint:grpcconnclose
}, nil
}

func TestServerShutdownReleasesSession(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

s, _, _ := serverutils.StartServer(t, base.TestServerArgs{
DisableDefaultTestTenant: true,
})
defer s.Stopper().Stop(ctx)

tenantArgs := base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
}

// ensure the tenant conenctor is linked
var _ = kvtenantccl.Connector{}
tenant, tenantSQLRaw := serverutils.StartTenant(t, s, tenantArgs)
defer tenant.Stopper().Stop(ctx)
tenantSQL := sqlutils.MakeSQLRunner(tenantSQLRaw)

queryOwner := func(id base.SQLInstanceID) (owner *string) {
tenantSQL.QueryRow(t, "SELECT session_id FROM system.sql_instances WHERE id = $1", id).Scan(&owner)
return owner
}

sessionExists := func(session string) bool {
rows := tenantSQL.QueryStr(t, "SELECT session_id FROM system.sqlliveness WHERE session_id = $1", session)
return 0 < len(rows)
}

tmpTenant, err := s.StartTenant(ctx, tenantArgs)
require.NoError(t, err)

tmpSQLInstance := tmpTenant.SQLInstanceID()
session := queryOwner(tmpSQLInstance)
require.NotNil(t, session)
require.True(t, sessionExists(*session))

require.NoError(t, tmpTenant.DrainClients(context.Background()))

require.False(t, sessionExists(*session), "expected session %s to be deleted from the sqlliveness table, but it still exists", *session)
require.Nil(t, queryOwner(tmpSQLInstance), "expected sql_instance %d to have no owning session_id", tmpSQLInstance)
}
7 changes: 2 additions & 5 deletions pkg/sql/sqlinstance/instancestorage/instancereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
Expand Down Expand Up @@ -145,7 +144,6 @@ func TestReader(t *testing.T) {
require.NoError(t, reader.WaitForStarted(ctx))

// Set up expected test data.
region := enum.One
instanceIDs := []base.SQLInstanceID{1, 2, 3}
rpcAddresses := []string{"addr1", "addr2", "addr3"}
sqlAddresses := []string{"addr4", "addr5", "addr6"}
Expand Down Expand Up @@ -259,7 +257,7 @@ func TestReader(t *testing.T) {

// Release an instance and verify only active instances are returned.
{
err := storage.ReleaseInstanceID(ctx, region, instanceIDs[0])
err := storage.ReleaseInstance(ctx, sessionIDs[0], instanceIDs[0])
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -314,7 +312,6 @@ func TestReader(t *testing.T) {
reader.Start(ctx, sqlinstance.InstanceInfo{})
require.NoError(t, reader.WaitForStarted(ctx))
// Create three instances and release one.
region := enum.One
instanceIDs := [...]base.SQLInstanceID{1, 2, 3}
rpcAddresses := [...]string{"addr1", "addr2", "addr3"}
sqlAddresses := [...]string{"addr4", "addr5", "addr6"}
Expand Down Expand Up @@ -367,7 +364,7 @@ func TestReader(t *testing.T) {

// Verify request for released instance data results in an error.
{
err := storage.ReleaseInstanceID(ctx, region, instanceIDs[0])
err := storage.ReleaseInstance(ctx, sessionIDs[0], instanceIDs[0])
if err != nil {
t.Fatal(err)
}
Expand Down
86 changes: 56 additions & 30 deletions pkg/sql/sqlinstance/instancestorage/instancestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,62 @@ func (s *Storage) CreateInstance(
return s.createInstanceRow(ctx, sessionID, sessionExpiration, rpcAddr, sqlAddr, locality, binaryVersion, noNodeID)
}

// ReleaseInstance deallocates the instance id iff it is currently owned by the
// provided sessionID.
func (s *Storage) ReleaseInstance(
ctx context.Context, sessionID sqlliveness.SessionID, instanceID base.SQLInstanceID,
) error {
return s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
version, err := s.versionGuard(ctx, txn)
if err != nil {
return err
}

region, _, err := slstorage.UnsafeDecodeSessionID(sessionID)
if err != nil {
return errors.Wrap(err, "unable to determine region for sql_instance")
}

readCodec := s.getReadCodec(&version)

key := readCodec.encodeKey(region, instanceID)
kv, err := txn.Get(ctx, key)
if err != nil {
return err
}

instance, err := readCodec.decodeRow(kv.Key, kv.Value)
if err != nil {
return err
}

if instance.sessionID != sessionID {
// Great! The session was already released or released and
// claimed by another server.
return nil
}

batch := txn.NewBatch()

value, err := readCodec.encodeAvailableValue()
if err != nil {
return err
}
batch.Put(key, value)

if dualCodec := s.getDualWriteCodec(&version); dualCodec != nil {
dualKey := dualCodec.encodeKey(region, instanceID)
dualValue, err := dualCodec.encodeAvailableValue()
if err != nil {
return err
}
batch.Put(dualKey, dualValue)
}

return txn.CommitInBatch(ctx, batch)
})
}

func (s *Storage) createInstanceRow(
ctx context.Context,
sessionID sqlliveness.SessionID,
Expand Down Expand Up @@ -507,36 +563,6 @@ func (s *Storage) getInstanceRows(
return instances, nil
}

// ReleaseInstanceID deletes an instance ID record. The instance ID becomes
// available to be reused by another SQL pod of the same tenant.
// TODO(jeffswenson): delete this, it is unused.
func (s *Storage) ReleaseInstanceID(
ctx context.Context, region []byte, id base.SQLInstanceID,
) error {
// TODO(andrei): Ensure that we do not delete an instance ID that we no longer
// own, instead of deleting blindly.
ctx = multitenant.WithTenantCostControlExemption(ctx)
err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
version, err := s.versionGuard(ctx, txn)
if err != nil {
return err
}

b := txn.NewBatch()

readCodec := s.getReadCodec(&version)
b.Del(readCodec.encodeKey(region, id))
if dualCodec := s.getDualWriteCodec(&version); dualCodec != nil {
b.Del(dualCodec.encodeKey(region, id))
}
return txn.CommitInBatch(ctx, b)
})
if err != nil {
return errors.Wrapf(err, "could not delete instance %d", id)
}
return nil
}

// RunInstanceIDReclaimLoop runs a background task that allocates available
// instance IDs and reclaim expired ones within the sql_instances table.
func (s *Storage) RunInstanceIDReclaimLoop(
Expand Down
Loading

0 comments on commit ab2a286

Please sign in to comment.