Skip to content

Commit

Permalink
server: wait for active configuration profiles during startup
Browse files Browse the repository at this point in the history
Before, we did not wait for these configuration profiles at
startup. This produces confusing behaviour where the behaviour of the
cluster changes substantially a few moments after startup.

For instance the replication-source configuration profile runs

  SET CLUSTER SETT server.controller.default_target_cluster = 'application'

If this is delayed until after we start accepting connections, then
for a few moments new connections will go to the system tenant and
then afterwards they will go to the application tenant.

Here, we narrow the window of confusion by waiting for the
configuration profiles to be complete during the preStart sequence in
SQL.

Note that this doesn't solve startup coordination. There are still at
least two problems:

1. Async server startup still means the user may get an error for a
   few moments after startup until the server is started.

2. Async settings propagation still means that the
  default_target_cluster setting can still be delayed.

Informs #111637

Release note: None
  • Loading branch information
stevendanna committed Nov 6, 2023
1 parent d131065 commit 0969935
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 3 deletions.
5 changes: 3 additions & 2 deletions pkg/server/autoconfig/acprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ type Provider interface {
// initial event immediately.
EnvUpdate() <-chan struct{}

// ActiveEnvironments returns the IDs of environments that have
// tasks available for execution.
// ActiveEnvironments returns the IDs of environments that
// have tasks available for execution. A nil or empty array is
// returned when there are no more tasks to run.
ActiveEnvironments() []autoconfigpb.EnvironmentID

// Peek will block waiting for the first task the provider believes
Expand Down
130 changes: 129 additions & 1 deletion pkg/server/autoconfig/auto_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ package autoconfig_test

import (
"context"
gosql "database/sql"
"math"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/autoconfig"
"github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider"
"github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb"
Expand Down Expand Up @@ -45,6 +49,15 @@ type testTask struct {
seen bool
}

var endProfileTask = autoconfigpb.Task{
TaskID: autoconfigpb.TaskID(math.MaxUint64),
Description: "end of configuration profile",
MinVersion: clusterversion.TestingBinaryVersion,
Payload: &autoconfigpb.Task_SimpleSQL{
SimpleSQL: &autoconfigpb.SimpleSQL{},
},
}

var testTasks = []testTask{
{task: autoconfigpb.Task{
TaskID: 123,
Expand Down Expand Up @@ -85,6 +98,7 @@ var testTasks = []testTask{
},
},
}},
{task: endProfileTask},
}

func (p *testProvider) EnvUpdate() <-chan struct{} {
Expand All @@ -93,7 +107,12 @@ func (p *testProvider) EnvUpdate() <-chan struct{} {
}

func (p *testProvider) ActiveEnvironments() []autoconfigpb.EnvironmentID {
return []autoconfigpb.EnvironmentID{testEnvID}
p.Lock()
defer p.Unlock()
if len(p.tasks) > 0 {
return []autoconfigpb.EnvironmentID{testEnvID}
}
return nil
}

func (p *testProvider) Pop(
Expand Down Expand Up @@ -161,6 +180,7 @@ func TestAutoConfig(t *testing.T) {
provider.notifyCh <- struct{}{}

ctx := context.Background()
noWait := time.Duration(0)
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109462),

Expand All @@ -169,6 +189,9 @@ func TestAutoConfig(t *testing.T) {
AutoConfig: &autoconfig.TestingKnobs{
Provider: provider,
},
Server: &server.TestingKnobs{
AutoConfigProfileStartupWaitTime: &noWait,
},
},
})
defer s.Stopper().Stop(ctx)
Expand Down Expand Up @@ -225,3 +248,108 @@ SELECT value FROM system.job_info WHERE job_id = $1 AND info_key = $2 LIMIT 1`,
err = sqlDB.QueryRowContext(ctx, `SELECT count(*) FROM system.bar`).Scan(&unused)
require.NoError(t, err)
}

func TestAutoConfigWaitAtStartupTimesOut(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

provider := &testProvider{
t: t,
notifyCh: make(chan struct{}, 1),
peekWaitCh: make(chan struct{}),
tasks: []testTask{
{task: autoconfigpb.Task{
TaskID: 123,
Description: "test task that takes longer than the maxWait",
MinVersion: clusterversion.TestingBinaryVersion,
Payload: &autoconfigpb.Task_SimpleSQL{
SimpleSQL: &autoconfigpb.SimpleSQL{
UsernameProto: username.NodeUserName().EncodeProto(),
TransactionalStatements: []string{
"SELECT pg_sleep(300)",
},
},
},
}},
{task: endProfileTask},
},
}
provider.notifyCh <- struct{}{}
close(provider.peekWaitCh)

ctx := context.Background()
shortWait := 50 * time.Millisecond
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109462),
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
AutoConfig: &autoconfig.TestingKnobs{
Provider: provider,
},
Server: &server.TestingKnobs{
AutoConfigProfileStartupWaitTime: &shortWait,
},
},
})
defer s.Stopper().Stop(ctx)

// To keep us honest, assert that the task hasn't been
// completed even though server startup has returned.
taskRef := autoconfig.InfoKeyTaskRef{Environment: testEnvID, Task: provider.tasks[0].task.TaskID}
completionMarker := taskRef.EncodeCompletionMarkerKey()
var result bool
err := sqlDB.QueryRowContext(ctx, `SELECT true FROM system.job_info WHERE job_id = $1 AND info_key = $2 LIMIT 1`,
jobs.AutoConfigRunnerJobID,
completionMarker).Scan(&result)
require.Error(t, err, gosql.ErrNoRows)
}

func TestAutoConfigWaitAtStartup(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

provider := &testProvider{
t: t,
notifyCh: make(chan struct{}, 1),
peekWaitCh: make(chan struct{}),
tasks: []testTask{
{task: autoconfigpb.Task{
TaskID: 123,
Description: "create-test-table",
MinVersion: clusterversion.TestingBinaryVersion,
Payload: &autoconfigpb.Task_SimpleSQL{
SimpleSQL: &autoconfigpb.SimpleSQL{
UsernameProto: username.NodeUserName().EncodeProto(),
TransactionalStatements: []string{
"SELECT pg_sleep(1)",
"CREATE TABLE system.t (pk int primary key)",
"INSERT INTO system.t VALUES (1)",
},
},
},
}},
{task: endProfileTask},
},
}
provider.notifyCh <- struct{}{}
close(provider.peekWaitCh)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109462),
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
AutoConfig: &autoconfig.TestingKnobs{
Provider: provider,
},
},
})
defer s.Stopper().Stop(ctx)

// The side-effects of the profile execution should be
// immediately visible.
var value int
err := sqlDB.QueryRow("SELECT pk FROM system.t").Scan(&value)
require.NoError(t, err)
require.Equal(t, 1, value)
}
46 changes: 46 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,9 +1744,55 @@ func (s *SQLServer) preStart(
}
}

s.waitForActiveAutoConfigEnvironments(ctx)

return nil
}

// waitForActiveAutoConfigEnvironments waits until the set of
// ActiveEnvironments is empty. ActiveEnvironments is empty once there
// are no more tasks to run.
//
// This is sufficient to ensure all configuration task jobs have
// completed becuase the environment runner only enqueues a task after
// the previous task has completed and configuration profiles include
// an "end task" that runs after all previous tasks.
func (s *SQLServer) waitForActiveAutoConfigEnvironments(ctx context.Context) {
maxWait := 2 * time.Minute
serverKnobs := s.cfg.TestingKnobs.Server
if serverKnobs != nil && serverKnobs.(*TestingKnobs).AutoConfigProfileStartupWaitTime != nil {
maxWait = *serverKnobs.(*TestingKnobs).AutoConfigProfileStartupWaitTime
}

if maxWait == 0 {
log.Infof(ctx, "waiting for auto-configuration environments disabled")
return
}

envs := s.execCfg.AutoConfigProvider.ActiveEnvironments()
if len(envs) == 0 {
log.Infof(ctx, "auto-configuration environments not set or already complete")
return
}

log.Infof(ctx, "waiting up to %s for auto-configuration environments %v to complete", maxWait, envs)
ctx, cancel := context.WithTimeout(ctx, maxWait) // nolint:context
defer cancel()
retryCfg := retry.Options{
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: 5 * time.Second,
}
waitStart := timeutil.Now()
for i := retry.StartWithCtx(ctx, retryCfg); i.Next(); {
envs := s.execCfg.AutoConfigProvider.ActiveEnvironments()
if len(envs) == 0 {
log.Infof(ctx, "auto-configuration environments reported no active tasks after %s", timeutil.Since(waitStart))
return
}
}
log.Warningf(ctx, "auto-configuration environments still running after %s, moving on", timeutil.Since(waitStart))
}

// startCheckService verifies that the tenant has the right
// service mode initially, then starts an async checker
// to stop the server if the service mode changes.
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ type TestingKnobs struct {
// // TODO(ahmad/healthy-pod): Remove this once `v23.2` is cut and update `TestTenantAutoUpgrade`
// to reflect the changes.
AllowTenantAutoUpgradeOnInternalVersionChanges bool

// If non-nil, AutoConfigProfileStartupWaitTime is used when
// waiting for any active configuration environments to
// complete their tasks.
AutoConfigProfileStartupWaitTime *time.Duration
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 0969935

Please sign in to comment.