Skip to content

Commit

Permalink
Merge #63057
Browse files Browse the repository at this point in the history
63057: cli/demo: refactor the server initialization code r=otan a=knz

First commit from #63031. 
This work was requested by @ajstorm to support completion of #62435

The latency simulation code needs to be injected as a latency map
while the servers are initialized, i.e. concurrently with server
startup.

The previous initialization code for `cockroach demo` to achieve this
was exceedly difficult to understand and was, in fact, incorrect.

This commit reworks this code by exposing the overall workings
as a comment and then ensuring the structure of the comment follows
the explanation. It also add logging.

Additionally, this change ensures that the same initialization code is
used regardless of whether latency simulation is requested or not.

Release note: None

Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
craig[bot] and knz committed Apr 5, 2021
2 parents 6e5dfc4 + 19e670f commit 1b8dcba
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 177 deletions.
575 changes: 422 additions & 153 deletions pkg/cli/demo_cluster.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/cli/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func (c *cliState) handleDemoAddNode(cmd []string, nextState, errState cliStateE
return nextState
}

if err := demoCtx.transientCluster.AddNode(cmd[1]); err != nil {
if err := demoCtx.transientCluster.AddNode(context.Background(), cmd[1]); err != nil {
return c.internalServerError(errState, err)
}
addedNodeID := len(demoCtx.transientCluster.servers)
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ func TestJoinVersionGate(t *testing.T) {
}
defer serv.Stop()

if err := serv.Start(); !errors.Is(errors.Cause(err), server.ErrIncompatibleBinaryVersion) {
ctx := context.Background()
if err := serv.Start(ctx); !errors.Is(errors.Cause(err), server.ErrIncompatibleBinaryVersion) {
t.Fatalf("expected error %s, got %v", server.ErrIncompatibleBinaryVersion.Error(), err.Error())
}
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,10 +1281,26 @@ func (s *Server) PreStart(ctx context.Context) error {
if s.cfg.TestingKnobs.Server != nil {
knobs := s.cfg.TestingKnobs.Server.(*TestingKnobs)
if knobs.SignalAfterGettingRPCAddress != nil {
log.Infof(ctx, "signaling caller that RPC address is ready")
close(knobs.SignalAfterGettingRPCAddress)
}
if knobs.PauseAfterGettingRPCAddress != nil {
<-knobs.PauseAfterGettingRPCAddress
log.Infof(ctx, "waiting for signal from caller to proceed with initialization")
select {
case <-knobs.PauseAfterGettingRPCAddress:
// Normal case. Just continue below.

case <-ctx.Done():
// Test timeout or some other condition in the caller, by which
// we are instructed to stop.
return errors.CombineErrors(errors.New("server stopping prematurely from context shutdown"), ctx.Err())

case <-s.stopper.ShouldQuiesce():
// The server is instructed to stop before it even finished
// starting up.
return errors.New("server stopping prematurely")
}
log.Infof(ctx, "caller is letting us proceed with initialization")
}
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/server/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ type TestingKnobs struct {
DefaultZoneConfigOverride *zonepb.ZoneConfig
// DefaultSystemZoneConfigOverride, if set, overrides the default system zone config defined in `pkg/config/zone.go`
DefaultSystemZoneConfigOverride *zonepb.ZoneConfig
// PauseAfterGettingRPCAddress, if non-nil, instructs the server to wait until
// the channel is closed after getting an RPC serving address.
PauseAfterGettingRPCAddress chan struct{}
// SignalAfterGettingRPCAddress, if non-nil, is closed after the server gets
// an RPC server address.
// an RPC server address, and prior to waiting on PauseAfterGettingRPCAddress below.
SignalAfterGettingRPCAddress chan struct{}
// PauseAfterGettingRPCAddress, if non-nil, instructs the server to wait until
// the channel is closed after determining its RPC serving address, and after
// closing SignalAfterGettingRPCAddress.
PauseAfterGettingRPCAddress chan struct{}
// ContextTestingKnobs allows customization of the RPC context testing knobs.
ContextTestingKnobs rpc.ContextTestingKnobs
// DiagnosticsTestingKnobs allows customization of diagnostics testing knobs.
Expand Down
9 changes: 7 additions & 2 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,7 @@ func (ts *TestServer) NodeDialer() *nodedialer.Dialer {
// TestServer.ServingRPCAddr() after Start() for client connections.
// Use TestServer.Stopper().Stop() to shutdown the server after the test
// completes.
func (ts *TestServer) Start() error {
ctx := context.Background()
func (ts *TestServer) Start(ctx context.Context) error {
return ts.Server.Start(ctx)
}

Expand Down Expand Up @@ -844,6 +843,12 @@ func (ts *TestServer) DrainClients(ctx context.Context) error {
return ts.drainClients(ctx, nil /* reporter */)
}

// Readiness returns nil when the server's health probe reports
// readiness, a readiness error otherwise.
func (ts *TestServer) Readiness(ctx context.Context) error {
return ts.admin.checkReadinessForHealthCheck(ctx)
}

// WriteSummaries implements TestServerInterface.
func (ts *TestServer) WriteSummaries() error {
return ts.node.writeNodeStatus(context.TODO(), time.Hour, false)
Expand Down
2 changes: 1 addition & 1 deletion pkg/testutils/reduce/reducesql/reducesql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func isInterestingSQL(contains string) reduce.InterestingFn {
}
serv := ts.(*server.TestServer)
defer serv.Stopper().Stop(ctx)
if err := serv.Start(); err != nil {
if err := serv.Start(context.Background()); err != nil {
panic(err)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (
type TestServerInterface interface {
Stopper() *stop.Stopper

Start() error
Start(context.Context) error

// Node returns the server.Node as an interface{}.
Node() interface{}
Expand Down Expand Up @@ -260,7 +260,7 @@ func StartServer(
if err != nil {
t.Fatalf("%+v", err)
}
if err := server.Start(); err != nil {
if err := server.Start(context.Background()); err != nil {
t.Fatalf("%+v", err)
}
goDB := OpenDBConn(
Expand Down Expand Up @@ -328,7 +328,7 @@ func StartServerRaw(args base.TestServerArgs) (TestServerInterface, error) {
if err != nil {
return nil, err
}
if err := server.Start(); err != nil {
if err := server.Start(context.Background()); err != nil {
return nil, err
}
return server, nil
Expand Down
27 changes: 16 additions & 11 deletions pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (tc *TestCluster) AddServer(serverArgs base.TestServerArgs) (*server.TestSe
// actually starting the server.
func (tc *TestCluster) startServer(idx int, serverArgs base.TestServerArgs) error {
server := tc.Servers[idx]
if err := server.Start(); err != nil {
if err := server.Start(context.Background()); err != nil {
return err
}

Expand Down Expand Up @@ -1327,17 +1327,22 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server.
}
s := srv.(*server.TestServer)

ctx := context.Background()
if err := func() error {
tc.mu.Lock()
defer tc.mu.Unlock()
tc.Servers[idx] = s
tc.mu.serverStoppers[idx] = s.Stopper()

if inspect != nil {
inspect(s)
}
func() {
// Only lock the assignment of the server and the stopper and the call to the inspect function.
// This ensures that the stopper's Stop() method can abort an async Start() call.
tc.mu.Lock()
defer tc.mu.Unlock()
tc.Servers[idx] = s
tc.mu.serverStoppers[idx] = s.Stopper()

if inspect != nil {
inspect(s)
}
}()

if err := srv.Start(); err != nil {
if err := srv.Start(ctx); err != nil {
return err
}

Expand All @@ -1357,7 +1362,7 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server.
// different port, and a cycle of gossip is necessary to make all other nodes
// aware.
return contextutil.RunWithTimeout(
context.Background(), "check-conn", 15*time.Second,
ctx, "check-conn", 15*time.Second,
func(ctx context.Context) error {
r := retry.StartWithCtx(ctx, retry.Options{
InitialBackoff: 1 * time.Millisecond,
Expand Down

0 comments on commit 1b8dcba

Please sign in to comment.