Skip to content

Commit

Permalink
server: avoid missing service mode changes
Browse files Browse the repository at this point in the history
In cockroachdb#112001 we introduced a bug and an unintended behaviour change.

The bug is that if we receive a notification of a state change from
none to shared when the server is still shutting down, that state
change will be ignored. Namely, the following can happen:

1. ALTER VIRTUAL CLUSTER a STOP SERVICE
2. Watcher gets notification of shutdown and notifies virtual
   cluster's SQL server.
3. Tenant "a" starts shutdown but does not fully complete it
4. ALTER VIRTUAL CLUSTER a START SERVICE SHARED
5. Watcher notifies the server orchestrator; but, since the SQL server has
   not finished stopping from the previous stop request, it appears as if
   it is already started.
6. Tenant "a" finishes shutdown.
7. Server orchestrator never again tries to start the virtual cluster.

The newly added test reveals this under stress.

The behaviour change is that previously if a SQL server for a virtual
cluster failed to start up, it would previously be restarted.

Here, we fix both of these by re-introducing a periodic polling of the
service state.  Unlike the previous polling, we poll the watcher state
so we are not generating a SQL query every second.

Further, since we are now calling the tenantcapabailities watcher
GetAllTenants method every second in addition to on every update, I've
moved where we allocate the list of all tenants to our handle update
call.

An alternative here would be to revert cockroachdb#112001 completely.  I think
there are still advantage to using the watcher: not generating a SQL
query on every node once per second and more responsive server startup
after the integration of cockroachdb#112094.

Fixes cockroachdb#112077

Release note: None
  • Loading branch information
stevendanna committed Oct 17, 2023
1 parent 97b5615 commit 0b0689a
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ func waitUntilTenantServerStopped(
if err != nil {
return err
}
defer func() { _ = db.Close() }()
if err := db.Ping(); err == nil {
t.Logf("tenant %q is still accepting connections", tenantName)
return errors.Newf("tenant %q still accepting connections")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ type Watcher struct {
mu struct {
syncutil.RWMutex

store map[roachpb.TenantID]*watcherEntry
byName map[roachpb.TenantName]roachpb.TenantID
store map[roachpb.TenantID]*watcherEntry
byName map[roachpb.TenantName]roachpb.TenantID
allTenants []tenantcapabilities.Entry

// anyChangeChs is closed on any change to the set of
// anyChangeCh is closed on any change to the set of
// tenants.
anyChangeCh chan struct{}
}
Expand Down Expand Up @@ -166,18 +167,18 @@ func (w *Watcher) GetCapabilities(
// GetAllTenants returns all known tenant entries and a channel that
// is closed if any of the tenants change or any tenants are added or
// removed.
//
// TODO(ssd): Memory ownership could be a bit cleaner here. To avoid
// needlessly allocating the returned slice every time this is called,
// we return the same slice to all callers and assume they don't do
// anything problematic with it. At the moment, there is only one
// caller of this function so this isn't particularly problematic, but
// it could be if we add more callers.
func (w *Watcher) GetAllTenants() ([]tenantcapabilities.Entry, <-chan struct{}) {
w.mu.RLock()
defer w.mu.RUnlock()

entries := make([]tenantcapabilities.Entry, 0, len(w.mu.store))
for _, v := range w.mu.store {
if v.Entry != nil {
entries = append(entries, *v.Entry)
}
}

return entries, w.mu.anyChangeCh
return w.mu.allTenants, w.mu.anyChangeCh
}

// GetGlobalCapabilityState implements the tenantcapabilities.Reader interface.
Expand Down Expand Up @@ -302,13 +303,22 @@ func (w *Watcher) handleUpdate(ctx context.Context, u rangefeedcache.Update) {
}

if len(updates) > 0 {
w.closeAnyChangeCh()
w.onAnyChange()
}
}

func (w *Watcher) closeAnyChangeCh() {
func (w *Watcher) onAnyChange() {
w.mu.Lock()
defer w.mu.Unlock()

entries := make([]tenantcapabilities.Entry, 0, len(w.mu.store))
for _, v := range w.mu.store {
if v.Entry != nil {
entries = append(entries, *v.Entry)
}
}
w.mu.allTenants = entries

close(w.mu.anyChangeCh)
w.mu.anyChangeCh = make(chan struct{})
}
Expand Down
49 changes: 35 additions & 14 deletions pkg/server/server_controller_orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ package server

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand All @@ -41,28 +44,33 @@ func (c *serverController) start(ctx context.Context, ie isql.Executor) error {

// Run the detection of which servers should be started or stopped.
return c.stopper.RunAsyncTask(ctx, "mark-tenant-services", func(ctx context.Context) {
// We receieve updates from the tenantcapabilities
// watcher, but we also refresh our state at a fixed
// interval to account for:
//
// - A rapid stop & start in which the start is
// initially ignored because the server is still
// stopping.
//
// - Startup failures that we want to retry.
const watchInterval = 1 * time.Second
ctx, cancel := c.stopper.WithCancelOnQuiesce(ctx)
defer cancel()

timer := timeutil.NewTimer()
defer timer.Stop()

for {
allTenants, updateCh := c.watcher.GetAllTenants()
tenantsToStart := make([]roachpb.TenantName, 0, len(allTenants))
for _, e := range allTenants {
if e.Name == "" {
continue
}

if e.DataState == mtinfopb.DataStateReady && e.ServiceMode == mtinfopb.ServiceModeShared {
tenantsToStart = append(tenantsToStart, e.Name)
}
}

if err := c.startMissingServers(ctx, tenantsToStart); err != nil {
if err := c.startMissingServers(ctx, allTenants); err != nil {
log.Warningf(ctx, "cannot update running tenant services: %v", err)
}

timer.Reset(watchInterval)
select {
case <-updateCh:
case <-timer.C:
timer.Read = true
case <-c.stopper.ShouldQuiesce():
// Expedited server shutdown of outer server.
return
Expand Down Expand Up @@ -103,11 +111,24 @@ func (c *serverController) startInitialSecondaryTenantServers(
}

func (c *serverController) startMissingServers(
ctx context.Context, tenants []roachpb.TenantName,
ctx context.Context, tenants []tenantcapabilities.Entry,
) error {
c.mu.Lock()
defer c.mu.Unlock()
for _, name := range tenants {
for _, t := range tenants {
if t.Name == "" {
continue
}

if t.DataState != mtinfopb.DataStateReady {
continue
}

if t.ServiceMode != mtinfopb.ServiceModeShared {
continue
}

name := t.Name
if _, ok := c.mu.servers[name]; !ok {
log.Infof(ctx, "tenant %q has changed service mode, should now start", name)
// Mark the server for async creation.
Expand Down
55 changes: 55 additions & 0 deletions pkg/server/server_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -69,6 +72,58 @@ func TestServerController(t *testing.T) {
// TODO(knz): test something about d.
}

// TestServerControllerStopStart is, when run under stress, a
// regression test for #112077, a bug in which we would fail to
// respond to a service start request that occured while a server was
// shutting down.
func TestServerControllerStopStart(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
})
defer s.Stopper().Stop(ctx)

sqlRunner := sqlutils.MakeSQLRunner(db)
// Speed up the tenant capabilities watcher to increase chance of hitting race.
sqlRunner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
sqlRunner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval ='100ms'")
sqlRunner.Exec(t, "SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval ='100ms'")

tryConnect := func() error {
conn, err := s.SystemLayer().SQLConnE("cluster:hello")
if err != nil {
return err
}
defer func() { _ = conn.Close() }()
return conn.Ping()
}

shouldConnectSoon := func() {
testutils.SucceedsSoon(t, tryConnect)
}

shouldFailToConnectSoon := func() {
testutils.SucceedsSoon(t, func() error {
if err := tryConnect(); err == nil {
return errors.Newf("still accepting connections")
}
return nil
})
}

sqlRunner.Exec(t, "CREATE TENANT hello")
sqlRunner.Exec(t, "ALTER VIRTUAL CLUSTER hello START SERVICE SHARED")
shouldConnectSoon()
sqlRunner.Exec(t, "ALTER VIRTUAL CLUSTER hello STOP SERVICE")
shouldFailToConnectSoon()
sqlRunner.Exec(t, "ALTER VIRTUAL CLUSTER hello START SERVICE SHARED")
shouldConnectSoon()
}

func TestSQLErrorUponInvalidTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 0b0689a

Please sign in to comment.