Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: initialize sql instance during instance provider start #83358

Merged
merged 1 commit into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -40,6 +41,9 @@ type partitionToEvent map[string][]streamingccl.Event

func TestStreamIngestionFrontierProcessor(t *testing.T) {
defer leaktest.AfterTest(t)()

skip.UnderRaceWithIssue(t, 83867)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, s
func TestTenantStreaming(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 82706)
skip.WithIssue(t, 83697)

skip.UnderRace(t, "slow under race")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -157,6 +158,8 @@ func TestStreamIngestionProcessor(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRaceWithIssue(t, 83867)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{})
Expand Down Expand Up @@ -409,6 +412,8 @@ func TestRandomClientGeneration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRaceWithIssue(t, 83867)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRaceWithIssue(t, 83867)
skip.UnderStressRace(t, "slow under stressrace")

dataSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
22 changes: 15 additions & 7 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.AmbientCtx,
cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs,
)
cfg.sqlInstanceProvider = instanceprovider.New(
cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.Locality, cfg.rangeFeedFactory, cfg.clock,
)
// If the node id is already populated, we only need to create a placeholder
// instance provider without initializing the instance, since this is not a
// SQL pod server.
_, isNotSQLPod := cfg.nodeIDContainer.OptionalNodeID()
if isNotSQLPod {
cfg.sqlInstanceProvider = sqlinstance.NewFakeSQLProvider()
} else {
cfg.sqlInstanceProvider = instanceprovider.New(
cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.Locality, cfg.rangeFeedFactory, cfg.clock,
)
}

if !codec.ForSystemTenant() {
// In a multi-tenant environment, use the sqlInstanceProvider to resolve
Expand Down Expand Up @@ -1150,7 +1158,7 @@ func (s *SQLServer) startSQLLivenessAndInstanceProviders(ctx context.Context) er
return nil
}

func (s *SQLServer) initInstanceID(ctx context.Context) error {
func (s *SQLServer) setInstanceID(ctx context.Context) error {
if _, ok := s.sqlIDContainer.OptionalNodeID(); ok {
// sqlIDContainer has already been initialized with a node ID,
// we don't need to initialize a SQL instance ID in this case
Expand Down Expand Up @@ -1179,8 +1187,8 @@ func (s *SQLServer) preStart(
socketFile string,
orphanedLeasesTimeThresholdNanos int64,
) error {
// The sqlliveness and sqlinstance subsystem should be started first to ensure instance ID is
// initialized prior to any other systems that need it.
// The sqlliveness and sqlinstance subsystem should be started first to ensure
// the instance ID is initialized prior to any other systems that need it.
if err := s.startSQLLivenessAndInstanceProviders(ctx); err != nil {
return err
}
Expand All @@ -1191,7 +1199,7 @@ func (s *SQLServer) preStart(
if err := maybeCheckTenantExists(ctx, s.execCfg.Codec, s.execCfg.DB); err != nil {
return err
}
if err := s.initInstanceID(ctx); err != nil {
if err := s.setInstanceID(ctx); err != nil {
return err
}
s.connManager = connManager
Expand Down
25 changes: 21 additions & 4 deletions pkg/sql/sqlinstance/instanceprovider/instanceprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type writer interface {
ReleaseInstanceID(ctx context.Context, instanceID base.SQLInstanceID) error
}

// provider implements the sqlinstance.Provider interface for access to the sqlinstance subsystem.
// provider implements the sqlinstance.Provider interface for access to the
// sqlinstance subsystem.
type provider struct {
*instancestorage.Reader
storage writer
Expand Down Expand Up @@ -85,6 +86,12 @@ func (p *provider) Start(ctx context.Context) error {
if p.started() {
return p.initError
}
// Initialize the instance. We need to do this before starting the reader, so
// that the reader sees the instance.
if err := p.initAndWait(ctx); err != nil {
return err
}

if err := p.Reader.Start(ctx); err != nil {
p.initOnce.Do(func() {
p.initError = err
Expand All @@ -110,21 +117,31 @@ func (p *provider) Instance(
if !p.started() {
return base.SQLInstanceID(0), "", sqlinstance.NotStartedError
}

p.maybeInitialize()
select {
case <-ctx.Done():
return base.SQLInstanceID(0), "", ctx.Err()
case <-p.stopper.ShouldQuiesce():
return base.SQLInstanceID(0), "", stop.ErrUnavailable
case <-p.initialized:
return p.instanceID, p.sessionID, p.initError
}
}

func (p *provider) initAndWait(ctx context.Context) error {
p.maybeInitialize()
select {
case <-ctx.Done():
return ctx.Err()
case <-p.stopper.ShouldQuiesce():
return stop.ErrUnavailable
case <-p.initialized:
if p.initError == nil {
log.Ops.Infof(ctx, "created SQL instance %d", p.instanceID)
} else {
log.Ops.Warningf(ctx, "error creating SQL instance: %s", p.initError)
}
return p.instanceID, p.sessionID, p.initError
}
return p.initError
}

func (p *provider) maybeInitialize() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestInstanceProvider(t *testing.T) {
defer stopper.Stop(ctx)
instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, addr)
slInstance.Start(ctx)
instanceProvider.InitAndWaitForTest(ctx)
instanceID, sessionID, err := instanceProvider.Instance(ctx)
require.NoError(t, err)
require.Equal(t, expectedInstanceID, instanceID)
Expand Down Expand Up @@ -101,6 +102,7 @@ func TestInstanceProvider(t *testing.T) {
instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, "addr")
slInstance.Start(ctx)
instanceProvider.ShutdownSQLInstanceForTest(ctx)
instanceProvider.InitAndWaitForTest(ctx)
_, _, err := instanceProvider.Instance(ctx)
require.Error(t, err)
require.Equal(t, "instance never initialized", err.Error())
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/sqlinstance/instanceprovider/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
)

// TestInstanceProvider exposes ShutdownSQLInstanceForTest
// method for testing purposes.
// and InitAndWaitForTest methods for testing purposes.
type TestInstanceProvider interface {
sqlinstance.Provider
InitAndWaitForTest(context.Context)
ShutdownSQLInstanceForTest(context.Context)
}

Expand All @@ -43,6 +44,11 @@ func NewTestInstanceProvider(
return p
}

// InitAndWaitForTest explicitly calls initAndWait for testing purposes.
func (p *provider) InitAndWaitForTest(ctx context.Context) {
_ = p.initAndWait(ctx)
}

// ShutdownSQLInstanceForTest explicitly calls shutdownSQLInstance for testing purposes.
func (p *provider) ShutdownSQLInstanceForTest(ctx context.Context) {
p.shutdownSQLInstance(ctx)
Expand Down
41 changes: 39 additions & 2 deletions pkg/sql/sqlinstance/sqlinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,50 @@ type Provider interface {
// Instance returns the instance ID and sqlliveness.SessionID for the
// current SQL instance.
Instance(context.Context) (base.SQLInstanceID, sqlliveness.SessionID, error)
// Start starts the instanceprovider. This will block until
// the underlying instance data reader has been started.
// Start starts the instanceprovider and initializes the current SQL instance.
// This will block until the underlying instance data reader has been started.
Start(context.Context) error
}

// fakeSQLProvider implements the sqlinstance.Provider interface as a
// placeholder for an instance provider, when an instance provider must be
// instantiated for a non-SQL instance. It starts a Reader to provide the
// AddressResolver interface, but otherwise throws unsupported errors.
type fakeSQLProvider struct{}

// NewFakeSQLProvider returns a new placeholder instance Provider.
func NewFakeSQLProvider() Provider {
return &fakeSQLProvider{}
}

// Start implements the sqlinstance.Provider interface.
func (p *fakeSQLProvider) Start(ctx context.Context) error {
return nil
}

// Instance implements the sqlinstance.Provider interface.
func (p *fakeSQLProvider) Instance(
ctx context.Context,
) (_ base.SQLInstanceID, _ sqlliveness.SessionID, err error) {
return base.SQLInstanceID(0), "", NotASQLInstanceError
}

// GetInstance implements the AddressResolver interface.
func (p *fakeSQLProvider) GetInstance(context.Context, base.SQLInstanceID) (InstanceInfo, error) {
return InstanceInfo{}, NotASQLInstanceError
}

// GetAllInstances implements the AddressResolver interface.
func (p *fakeSQLProvider) GetAllInstances(context.Context) ([]InstanceInfo, error) {
return nil, NotASQLInstanceError
}

// NonExistentInstanceError can be returned if a SQL instance does not exist.
var NonExistentInstanceError = errors.Errorf("non existent SQL instance")

// NotStartedError can be returned if the sqlinstance subsystem has not been started yet.
var NotStartedError = errors.Errorf("sqlinstance subsystem not started")

// NotASQLInstanceError can be returned if a function is is not supported for
// non-SQL instances.
var NotASQLInstanceError = errors.Errorf("not supported for non-SQL instance")
1 change: 1 addition & 0 deletions pkg/sql/sqlliveness/slinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/retry",
Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -123,6 +124,7 @@ type Instance struct {
ttl func() time.Duration
hb func() time.Duration
testKnobs sqlliveness.TestingKnobs
startErr error
mu struct {
started bool
syncutil.Mutex
Expand Down Expand Up @@ -188,6 +190,11 @@ func (l *Instance) createSession(ctx context.Context) (*session, error) {
if everySecond.ShouldLog() {
log.Errorf(ctx, "failed to create a session at %d-th attempt: %v", i, err)
}
// Unauthenticated errors are unrecoverable, we should break instead
// of retrying.
if grpcutil.IsAuthError(err) {
break
}
continue
}
break
Expand Down Expand Up @@ -248,6 +255,15 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
if s == nil {
newSession, err := l.createSession(ctx)
if err != nil {
func() {
l.mu.Lock()
defer l.mu.Unlock()
l.startErr = err
// There was an unrecoverable error when trying to
// create the session. Notify all calls to Session that
// the session failed.
close(l.mu.blockCh)
}()
return
}
l.setSession(newSession)
Expand Down Expand Up @@ -341,6 +357,15 @@ func (l *Instance) Session(ctx context.Context) (sqlliveness.Session, error) {
case <-ctx.Done():
return nil, ctx.Err()
case <-ch:
var err error
func() {
l.mu.Lock()
defer l.mu.Unlock()
err = l.startErr
}()
if err != nil {
return nil, err
}
}
}
}