Skip to content

Commit

Permalink
Merge #113666
Browse files Browse the repository at this point in the history
113666: server: wait for active configuration profiles during startup r=yuzefovich a=stevendanna

Before, we did not wait for these configuration profiles at startup. This produces confusing behaviour where the behaviour of the cluster changes substantially a few moments after startup.

For instance the replication-source configuration profile runs

  SET CLUSTER SETTING server.controller.default_target_cluster = 'application'

If this is delayed until after we start accepting connections, then for a few moments new connections will go to the system tenant and then afterwards they will go to the application tenant.

Here, we narrow the window of confusion by waiting for the configuration profiles to be complete during the preStart sequence in SQL.

Note that this doesn't solve startup coordination. There are still at least two problems:

1. Async server startup still means the user may get an error for a few moments after startup until the server is started.

2. Async settings propagation still means that the default_target_cluster setting can still be delayed.

Informs #111637

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Nov 6, 2023
2 parents c63719f + 0969935 commit 56cf731
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 3 deletions.
5 changes: 3 additions & 2 deletions pkg/server/autoconfig/acprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ type Provider interface {
// initial event immediately.
EnvUpdate() <-chan struct{}

// ActiveEnvironments returns the IDs of environments that have
// tasks available for execution.
// ActiveEnvironments returns the IDs of environments that
// have tasks available for execution. A nil or empty array is
// returned when there are no more tasks to run.
ActiveEnvironments() []autoconfigpb.EnvironmentID

// Peek will block waiting for the first task the provider believes
Expand Down
130 changes: 129 additions & 1 deletion pkg/server/autoconfig/auto_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ package autoconfig_test

import (
"context"
gosql "database/sql"
"math"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/autoconfig"
"github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider"
"github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb"
Expand Down Expand Up @@ -45,6 +49,15 @@ type testTask struct {
seen bool
}

var endProfileTask = autoconfigpb.Task{
TaskID: autoconfigpb.TaskID(math.MaxUint64),
Description: "end of configuration profile",
MinVersion: clusterversion.TestingBinaryVersion,
Payload: &autoconfigpb.Task_SimpleSQL{
SimpleSQL: &autoconfigpb.SimpleSQL{},
},
}

var testTasks = []testTask{
{task: autoconfigpb.Task{
TaskID: 123,
Expand Down Expand Up @@ -85,6 +98,7 @@ var testTasks = []testTask{
},
},
}},
{task: endProfileTask},
}

func (p *testProvider) EnvUpdate() <-chan struct{} {
Expand All @@ -93,7 +107,12 @@ func (p *testProvider) EnvUpdate() <-chan struct{} {
}

func (p *testProvider) ActiveEnvironments() []autoconfigpb.EnvironmentID {
return []autoconfigpb.EnvironmentID{testEnvID}
p.Lock()
defer p.Unlock()
if len(p.tasks) > 0 {
return []autoconfigpb.EnvironmentID{testEnvID}
}
return nil
}

func (p *testProvider) Pop(
Expand Down Expand Up @@ -161,6 +180,7 @@ func TestAutoConfig(t *testing.T) {
provider.notifyCh <- struct{}{}

ctx := context.Background()
noWait := time.Duration(0)
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109462),

Expand All @@ -169,6 +189,9 @@ func TestAutoConfig(t *testing.T) {
AutoConfig: &autoconfig.TestingKnobs{
Provider: provider,
},
Server: &server.TestingKnobs{
AutoConfigProfileStartupWaitTime: &noWait,
},
},
})
defer s.Stopper().Stop(ctx)
Expand Down Expand Up @@ -225,3 +248,108 @@ SELECT value FROM system.job_info WHERE job_id = $1 AND info_key = $2 LIMIT 1`,
err = sqlDB.QueryRowContext(ctx, `SELECT count(*) FROM system.bar`).Scan(&unused)
require.NoError(t, err)
}

func TestAutoConfigWaitAtStartupTimesOut(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

provider := &testProvider{
t: t,
notifyCh: make(chan struct{}, 1),
peekWaitCh: make(chan struct{}),
tasks: []testTask{
{task: autoconfigpb.Task{
TaskID: 123,
Description: "test task that takes longer than the maxWait",
MinVersion: clusterversion.TestingBinaryVersion,
Payload: &autoconfigpb.Task_SimpleSQL{
SimpleSQL: &autoconfigpb.SimpleSQL{
UsernameProto: username.NodeUserName().EncodeProto(),
TransactionalStatements: []string{
"SELECT pg_sleep(300)",
},
},
},
}},
{task: endProfileTask},
},
}
provider.notifyCh <- struct{}{}
close(provider.peekWaitCh)

ctx := context.Background()
shortWait := 50 * time.Millisecond
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109462),
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
AutoConfig: &autoconfig.TestingKnobs{
Provider: provider,
},
Server: &server.TestingKnobs{
AutoConfigProfileStartupWaitTime: &shortWait,
},
},
})
defer s.Stopper().Stop(ctx)

// To keep us honest, assert that the task hasn't been
// completed even though server startup has returned.
taskRef := autoconfig.InfoKeyTaskRef{Environment: testEnvID, Task: provider.tasks[0].task.TaskID}
completionMarker := taskRef.EncodeCompletionMarkerKey()
var result bool
err := sqlDB.QueryRowContext(ctx, `SELECT true FROM system.job_info WHERE job_id = $1 AND info_key = $2 LIMIT 1`,
jobs.AutoConfigRunnerJobID,
completionMarker).Scan(&result)
require.Error(t, err, gosql.ErrNoRows)
}

func TestAutoConfigWaitAtStartup(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

provider := &testProvider{
t: t,
notifyCh: make(chan struct{}, 1),
peekWaitCh: make(chan struct{}),
tasks: []testTask{
{task: autoconfigpb.Task{
TaskID: 123,
Description: "create-test-table",
MinVersion: clusterversion.TestingBinaryVersion,
Payload: &autoconfigpb.Task_SimpleSQL{
SimpleSQL: &autoconfigpb.SimpleSQL{
UsernameProto: username.NodeUserName().EncodeProto(),
TransactionalStatements: []string{
"SELECT pg_sleep(1)",
"CREATE TABLE system.t (pk int primary key)",
"INSERT INTO system.t VALUES (1)",
},
},
},
}},
{task: endProfileTask},
},
}
provider.notifyCh <- struct{}{}
close(provider.peekWaitCh)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestIsForStuffThatShouldWorkWithSecondaryTenantsButDoesntYet(109462),
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
AutoConfig: &autoconfig.TestingKnobs{
Provider: provider,
},
},
})
defer s.Stopper().Stop(ctx)

// The side-effects of the profile execution should be
// immediately visible.
var value int
err := sqlDB.QueryRow("SELECT pk FROM system.t").Scan(&value)
require.NoError(t, err)
require.Equal(t, 1, value)
}
46 changes: 46 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,9 +1744,55 @@ func (s *SQLServer) preStart(
}
}

s.waitForActiveAutoConfigEnvironments(ctx)

return nil
}

// waitForActiveAutoConfigEnvironments waits until the set of
// ActiveEnvironments is empty. ActiveEnvironments is empty once there
// are no more tasks to run.
//
// This is sufficient to ensure all configuration task jobs have
// completed becuase the environment runner only enqueues a task after
// the previous task has completed and configuration profiles include
// an "end task" that runs after all previous tasks.
func (s *SQLServer) waitForActiveAutoConfigEnvironments(ctx context.Context) {
maxWait := 2 * time.Minute
serverKnobs := s.cfg.TestingKnobs.Server
if serverKnobs != nil && serverKnobs.(*TestingKnobs).AutoConfigProfileStartupWaitTime != nil {
maxWait = *serverKnobs.(*TestingKnobs).AutoConfigProfileStartupWaitTime
}

if maxWait == 0 {
log.Infof(ctx, "waiting for auto-configuration environments disabled")
return
}

envs := s.execCfg.AutoConfigProvider.ActiveEnvironments()
if len(envs) == 0 {
log.Infof(ctx, "auto-configuration environments not set or already complete")
return
}

log.Infof(ctx, "waiting up to %s for auto-configuration environments %v to complete", maxWait, envs)
ctx, cancel := context.WithTimeout(ctx, maxWait) // nolint:context
defer cancel()
retryCfg := retry.Options{
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: 5 * time.Second,
}
waitStart := timeutil.Now()
for i := retry.StartWithCtx(ctx, retryCfg); i.Next(); {
envs := s.execCfg.AutoConfigProvider.ActiveEnvironments()
if len(envs) == 0 {
log.Infof(ctx, "auto-configuration environments reported no active tasks after %s", timeutil.Since(waitStart))
return
}
}
log.Warningf(ctx, "auto-configuration environments still running after %s, moving on", timeutil.Since(waitStart))
}

// startCheckService verifies that the tenant has the right
// service mode initially, then starts an async checker
// to stop the server if the service mode changes.
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ type TestingKnobs struct {
// // TODO(ahmad/healthy-pod): Remove this once `v23.2` is cut and update `TestTenantAutoUpgrade`
// to reflect the changes.
AllowTenantAutoUpgradeOnInternalVersionChanges bool

// If non-nil, AutoConfigProfileStartupWaitTime is used when
// waiting for any active configuration environments to
// complete their tasks.
AutoConfigProfileStartupWaitTime *time.Duration
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 56cf731

Please sign in to comment.