Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: unexport Server #108010

Merged
merged 1 commit into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,7 @@ GO_TARGETS = [
"//pkg/server/profiler:profiler",
"//pkg/server/profiler:profiler_test",
"//pkg/server/rangetestutils:rangetestutils",
"//pkg/server/serverctl:serverctl",
"//pkg/server/serverpb:serverpb",
"//pkg/server/serverpb:serverpb_test",
"//pkg/server/serverrules:serverrules",
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ go_library(
"//pkg/server/autoconfig/acprovider",
"//pkg/server/pgurl",
"//pkg/server/profiler",
"//pkg/server/serverctl",
"//pkg/server/serverpb",
"//pkg/server/status",
"//pkg/server/status/statuspb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/mt_start_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cli/clierrorplus"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverctl"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/redact"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -78,7 +79,7 @@ func runStartSQL(cmd *cobra.Command, args []string) error {
)
}

newServerFn := func(ctx context.Context, serverCfg server.Config, stopper *stop.Stopper) (serverStartupInterface, error) {
newServerFn := func(ctx context.Context, serverCfg server.Config, stopper *stop.Stopper) (serverctl.ServerStartupInterface, error) {
// Beware of not writing simply 'return server.NewServer()'. This is
// because it would cause the serverStartupInterface reference to
// always be non-nil, even if NewServer returns a nil pointer (and
Expand Down
75 changes: 16 additions & 59 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/clientsecopts"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverctl"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -312,45 +313,7 @@ func initTempStorageConfig(
return tempStorageConfig, nil
}

type newServerFn func(ctx context.Context, serverCfg server.Config, stopper *stop.Stopper) (serverStartupInterface, error)

type serverStartupInterface interface {
serverShutdownInterface

// ClusterSettings retrieves this server's settings.
ClusterSettings() *cluster.Settings

// LogicalClusterID retrieves this server's logical cluster ID.
LogicalClusterID() uuid.UUID

// PreStart starts the server on the specified port(s) and
// initializes subsystems.
// It does not activate the pgwire listener over the network / unix
// socket, which is done by the AcceptClients() method. The separation
// between the two exists so that SQL initialization can take place
// before the first client is accepted.
PreStart(ctx context.Context) error

// AcceptClients starts listening for incoming SQL clients over the network.
AcceptClients(ctx context.Context) error
// AcceptInternalClients starts listening for incoming internal SQL clients over the
// loopback interface.
AcceptInternalClients(ctx context.Context) error

// InitialStart returns whether this node is starting for the first time.
// This is (currently) used when displaying the server status report
// on the terminal & in logs. We know that some folk have automation
// that depend on certain strings displayed from this when orchestrating
// KV-only nodes.
InitialStart() bool

// RunInitialSQL runs the SQL initialization for brand new clusters,
// if the cluster is being started for the first time.
// The arguments are:
// - startSingleNode is used by 'demo' and 'start-single-node'.
// - adminUser/adminPassword is used for 'demo'.
RunInitialSQL(ctx context.Context, startSingleNode bool, adminUser, adminPassword string) error
}
type newServerFn func(ctx context.Context, serverCfg server.Config, stopper *stop.Stopper) (serverctl.ServerStartupInterface, error)

var errCannotUseJoin = errors.New("cannot use --join with 'cockroach start-single-node' -- use 'cockroach start' instead")

Expand Down Expand Up @@ -398,9 +361,9 @@ func runStartJoin(cmd *cobra.Command, args []string) error {
func runStart(cmd *cobra.Command, args []string, startSingleNode bool) error {
const serverType redact.SafeString = "node"

newServerFn := func(_ context.Context, serverCfg server.Config, stopper *stop.Stopper) (serverStartupInterface, error) {
newServerFn := func(_ context.Context, serverCfg server.Config, stopper *stop.Stopper) (serverctl.ServerStartupInterface, error) {
// Beware of not writing simply 'return server.NewServer()'. This is
// because it would cause the serverStartupInterface reference to
// because it would cause the serverctl.ServerStartupInterface reference to
// always be non-nil, even if NewServer returns a nil pointer (and
// an error). The code below is dependent on the interface
// reference remaining nil in case of error.
Expand Down Expand Up @@ -757,10 +720,10 @@ func createAndStartServerAsync(
newServerFn newServerFn,
startSingleNode bool,
serverType redact.SafeString,
) (srvStatus *serverStatus, serverShutdownReqC <-chan server.ShutdownRequest) {
) (srvStatus *serverStatus, serverShutdownReqC <-chan serverctl.ShutdownRequest) {
var serverStatusMu serverStatus
var s serverStartupInterface
shutdownReqC := make(chan server.ShutdownRequest, 1)
var s serverctl.ServerStartupInterface
shutdownReqC := make(chan serverctl.ShutdownRequest, 1)

log.Ops.Infof(ctx, "starting cockroach %s", serverType)

Expand Down Expand Up @@ -851,8 +814,8 @@ func createAndStartServerAsync(
return reportServerInfo(ctx, tBegin, serverCfg, s.ClusterSettings(),
serverType, s.InitialStart(), s.LogicalClusterID())
}(); err != nil {
shutdownReqC <- server.MakeShutdownRequest(
server.ShutdownReasonServerStartupError, errors.Wrapf(err, "server startup failed"))
shutdownReqC <- serverctl.MakeShutdownRequest(
serverctl.ShutdownReasonServerStartupError, errors.Wrapf(err, "server startup failed"))
} else {
// Start a goroutine that watches for shutdown requests and notifies
// errChan.
Expand Down Expand Up @@ -885,7 +848,7 @@ type serverStatus struct {
// s is a reference to the server, to be used by the shutdown process. This
// starts as nil, and is set by setStarted(). Once set, a graceful shutdown
// should use a soft drain.
s serverShutdownInterface
s serverctl.ServerShutdownInterface
// stopper is the server's stopper. This is set in setStarted(), together with
// `s`. The stopper is handed out to callers of startShutdown(), who will
// Stop() it.
Expand All @@ -904,7 +867,9 @@ type serverStatus struct {
// setStarted returns whether shutdown has been requested already. If it has,
// then the serverStatus does not take ownership of the stopper; the caller is
// responsible for calling stopper.Stop().
func (s *serverStatus) setStarted(server serverShutdownInterface, stopper *stop.Stopper) bool {
func (s *serverStatus) setStarted(
server serverctl.ServerShutdownInterface, stopper *stop.Stopper,
) bool {
s.Lock()
defer s.Unlock()
if s.shutdownRequested {
Expand All @@ -927,21 +892,13 @@ func (s *serverStatus) shutdownInProgress() bool {
// was started already. If the server started, a reference to the server is also
// returned, and a reference to the stopper that the caller needs to eventually
// Stop().
func (s *serverStatus) startShutdown() (bool, serverShutdownInterface, *stop.Stopper) {
func (s *serverStatus) startShutdown() (bool, serverctl.ServerShutdownInterface, *stop.Stopper) {
s.Lock()
defer s.Unlock()
s.shutdownRequested = true
return s.s != nil, s.s, s.stopper
}

// serverShutdownInterface is the subset of the APIs on a server
// object that's sufficient to run a server shutdown.
type serverShutdownInterface interface {
AnnotateCtx(context.Context) context.Context
Drain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error)
ShutdownRequested() <-chan server.ShutdownRequest
}

// waitForShutdown blocks until interrupted by a shutdown signal, which can come
// in several forms:
// - a shutdown request coming from an internal module being signaled on
Expand All @@ -953,7 +910,7 @@ type serverShutdownInterface interface {
// before shutting down.
func waitForShutdown(
stopper *stop.Stopper,
shutdownC <-chan server.ShutdownRequest,
shutdownC <-chan serverctl.ShutdownRequest,
signalCh <-chan os.Signal,
serverStatusMu *serverStatus,
) (returnErr error) {
Expand All @@ -966,7 +923,7 @@ func waitForShutdown(
case shutdownRequest := <-shutdownC:
returnErr = shutdownRequest.ShutdownCause()
// There's no point in draining if the server didn't even fully start.
drain := shutdownRequest.Reason != server.ShutdownReasonServerStartupError
drain := shutdownRequest.Reason != serverctl.ShutdownReasonServerStartupError
startShutdownAsync(serverStatusMu, stopWithoutDrain, drain)

case sig := <-signalCh:
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ go_library(
"//pkg/server/pgurl",
"//pkg/server/privchecker",
"//pkg/server/profiler",
"//pkg/server/serverctl",
"//pkg/server/serverpb",
"//pkg/server/serverrules",
"//pkg/server/settingswatcher",
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type systemAdminServer struct {
*adminServer

nodeLiveness *liveness.NodeLiveness
server *Server
server *topLevelServer
}

// noteworthyAdminMemoryUsageBytes is the minimum size tracked by the
Expand Down Expand Up @@ -161,7 +161,7 @@ func newSystemAdminServer(
distSender *kvcoord.DistSender,
grpc *grpcServer,
drainServer *drainServer,
s *Server,
s *topLevelServer,
) *systemAdminServer {
adminServer := newAdminServer(
sqlServer,
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/auto_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// startAttemptUpgrade attempts to upgrade cluster version.
func (s *Server) startAttemptUpgrade(ctx context.Context) error {
func (s *topLevelServer) startAttemptUpgrade(ctx context.Context) error {
return s.stopper.RunAsyncTask(ctx, "auto-upgrade", func(ctx context.Context) {
ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx)
defer cancel()
Expand Down Expand Up @@ -120,7 +120,7 @@ const (

// upgradeStatus lets the main checking loop know if we should do upgrade,
// keep checking upgrade status, or stop attempting upgrade.
func (s *Server) upgradeStatus(
func (s *topLevelServer) upgradeStatus(
ctx context.Context, clusterVersion string,
) (st upgradeStatus, err error) {
nodes, err := s.status.ListNodesInternal(ctx, nil)
Expand Down Expand Up @@ -205,7 +205,7 @@ func (s *Server) upgradeStatus(
// clusterVersion returns the current cluster version from the SQL subsystem
// (which returns the version from the KV store as opposed to the possibly
// lagging settings subsystem).
func (s *Server) clusterVersion(ctx context.Context) (string, error) {
func (s *topLevelServer) clusterVersion(ctx context.Context) (string, error) {
row, err := s.sqlServer.internalExecutor.QueryRowEx(
ctx, "show-version", nil, /* txn */
sessiondata.RootUserSessionDataOverride,
Expand Down
8 changes: 5 additions & 3 deletions pkg/server/clock_monotonicity.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (

// startMonitoringForwardClockJumps starts a background task to monitor forward
// clock jumps based on a cluster setting.
func (s *Server) startMonitoringForwardClockJumps(ctx context.Context) error {
func (s *topLevelServer) startMonitoringForwardClockJumps(ctx context.Context) error {
forwardJumpCheckEnabled := make(chan bool, 1)
s.stopper.AddCloser(stop.CloserFn(func() { close(forwardJumpCheckEnabled) }))

Expand All @@ -69,7 +69,7 @@ func (s *Server) startMonitoringForwardClockJumps(ctx context.Context) error {
// checkHLCUpperBoundExists determines whether there's an HLC
// upper bound that will need to refreshed/persisted after
// the server has initialized.
func (s *Server) checkHLCUpperBoundExistsAndEnsureMonotonicity(
func (s *topLevelServer) checkHLCUpperBoundExistsAndEnsureMonotonicity(
ctx context.Context, initialStart bool,
) (hlcUpperBoundExists bool, err error) {
if initialStart {
Expand Down Expand Up @@ -238,7 +238,9 @@ func periodicallyPersistHLCUpperBound(
//
// tickCallback is called whenever persistHLCUpperBoundCh or a ticker tick is
// processed
func (s *Server) startPersistingHLCUpperBound(ctx context.Context, hlcUpperBoundExists bool) error {
func (s *topLevelServer) startPersistingHLCUpperBound(
ctx context.Context, hlcUpperBoundExists bool,
) error {
tickerFn := time.NewTicker
persistHLCUpperBoundFn := func(t int64) error { /* function to persist upper bound of HLC to all stores */
return s.node.SetHLCUpperBound(context.Background(), t)
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func getPingCheckDecommissionFn(
// or remove actions. If maxErrors >0, range checks will stop once maxError is
// reached.
// The error returned is a gRPC error.
func (s *Server) DecommissionPreCheck(
func (s *topLevelServer) DecommissionPreCheck(
ctx context.Context,
nodeIDs []roachpb.NodeID,
strictReadiness bool,
Expand Down Expand Up @@ -314,7 +314,7 @@ func evaluateRangeCheckResult(

// Decommission idempotently sets the decommissioning flag for specified nodes.
// The error return is a gRPC error.
func (s *Server) Decommission(
func (s *topLevelServer) Decommission(
ctx context.Context, targetStatus livenesspb.MembershipStatus, nodeIDs []roachpb.NodeID,
) error {
// If we're asked to decommission ourself we may lose access to cluster RPC,
Expand Down Expand Up @@ -396,7 +396,7 @@ func (s *Server) Decommission(

// DecommissioningNodeMap returns the set of node IDs that are decommissioning
// from the perspective of the server.
func (s *Server) DecommissioningNodeMap() map[roachpb.NodeID]interface{} {
func (s *topLevelServer) DecommissioningNodeMap() map[roachpb.NodeID]interface{} {
s.decomNodeMap.RLock()
defer s.decomNodeMap.RUnlock()
nodes := make(map[roachpb.NodeID]interface{})
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverctl"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -197,7 +198,7 @@ func (s *drainServer) maybeShutdownAfterDrain(
// away (and who knows whether gRPC-goroutines are tied up in some
// stopper task somewhere).
s.grpc.Stop()
s.stopTrigger.signalStop(ctx, MakeShutdownRequest(ShutdownReasonDrainRPC, nil /* err */))
s.stopTrigger.signalStop(ctx, serverctl.MakeShutdownRequest(serverctl.ShutdownReasonDrainRPC, nil /* err */))
}()

select {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/import_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
// to be written to the DB.
const maxBatchSize = 10000

func maybeImportTS(ctx context.Context, s *Server) (returnErr error) {
func maybeImportTS(ctx context.Context, s *topLevelServer) (returnErr error) {
// We don't want to do startup retries as this is not meant to be run in
// production.
ctx = startup.WithoutChecks(ctx)
Expand Down
8 changes: 5 additions & 3 deletions pkg/server/initial_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// and `cockroach demo` with 2 nodes or fewer.
// If adminUser is non-empty, an admin user with that name is
// created upon initialization. Its password is then also returned.
func (s *Server) RunInitialSQL(
func (s *topLevelServer) RunInitialSQL(
ctx context.Context, startSingleNode bool, adminUser, adminPassword string,
) error {
if s.cfg.ObsServiceAddr == base.ObsServiceEmbedFlagValue {
Expand Down Expand Up @@ -75,7 +75,9 @@ func (s *SQLServerWrapper) RunInitialSQL(context.Context, bool, string, string)
}

// createAdminUser creates an admin user with the given name.
func (s *Server) createAdminUser(ctx context.Context, adminUser, adminPassword string) error {
func (s *topLevelServer) createAdminUser(
ctx context.Context, adminUser, adminPassword string,
) error {
ie := s.sqlServer.internalExecutor
_, err := ie.Exec(
ctx, "admin-user", nil,
Expand All @@ -97,7 +99,7 @@ func (s *Server) createAdminUser(ctx context.Context, adminUser, adminPassword s
//
// The change is effected using the internal SQL interface of the
// given server object.
func (s *Server) disableReplication(ctx context.Context) (retErr error) {
func (s *topLevelServer) disableReplication(ctx context.Context) (retErr error) {
ie := s.sqlServer.internalExecutor

it, err := ie.QueryIterator(ctx, "get-zones", nil,
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func maybeRunLossOfQuorumRecoveryCleanup(
ctx context.Context,
ie isql.Executor,
stores *kvserver.Stores,
server *Server,
server *topLevelServer,
stopper *stop.Stopper,
) {
publishCtx, publishCancel := stopper.WithCancelOnQuiesce(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// migrationServer is an implementation of the Migration service. The RPCs here
// are used to power the upgrades infrastructure in pkg/upgrades.
type migrationServer struct {
server *Server
server *topLevelServer

// We use this mutex to serialize attempts to bump the cluster version.
syncutil.Mutex
Expand Down
Loading