Skip to content

Commit

Permalink
server: unexport Server
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
knz committed Aug 2, 2023
1 parent eaf0f22 commit 2b29313
Show file tree
Hide file tree
Showing 16 changed files with 83 additions and 79 deletions.
4 changes: 2 additions & 2 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type systemAdminServer struct {
*adminServer

nodeLiveness *liveness.NodeLiveness
server *Server
server *topLevelServer
}

// noteworthyAdminMemoryUsageBytes is the minimum size tracked by the
Expand Down Expand Up @@ -161,7 +161,7 @@ func newSystemAdminServer(
distSender *kvcoord.DistSender,
grpc *grpcServer,
drainServer *drainServer,
s *Server,
s *topLevelServer,
) *systemAdminServer {
adminServer := newAdminServer(
sqlServer,
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/auto_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// startAttemptUpgrade attempts to upgrade cluster version.
func (s *Server) startAttemptUpgrade(ctx context.Context) error {
func (s *topLevelServer) startAttemptUpgrade(ctx context.Context) error {
return s.stopper.RunAsyncTask(ctx, "auto-upgrade", func(ctx context.Context) {
ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx)
defer cancel()
Expand Down Expand Up @@ -120,7 +120,7 @@ const (

// upgradeStatus lets the main checking loop know if we should do upgrade,
// keep checking upgrade status, or stop attempting upgrade.
func (s *Server) upgradeStatus(
func (s *topLevelServer) upgradeStatus(
ctx context.Context, clusterVersion string,
) (st upgradeStatus, err error) {
nodes, err := s.status.ListNodesInternal(ctx, nil)
Expand Down Expand Up @@ -205,7 +205,7 @@ func (s *Server) upgradeStatus(
// clusterVersion returns the current cluster version from the SQL subsystem
// (which returns the version from the KV store as opposed to the possibly
// lagging settings subsystem).
func (s *Server) clusterVersion(ctx context.Context) (string, error) {
func (s *topLevelServer) clusterVersion(ctx context.Context) (string, error) {
row, err := s.sqlServer.internalExecutor.QueryRowEx(
ctx, "show-version", nil, /* txn */
sessiondata.RootUserSessionDataOverride,
Expand Down
8 changes: 5 additions & 3 deletions pkg/server/clock_monotonicity.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (

// startMonitoringForwardClockJumps starts a background task to monitor forward
// clock jumps based on a cluster setting.
func (s *Server) startMonitoringForwardClockJumps(ctx context.Context) error {
func (s *topLevelServer) startMonitoringForwardClockJumps(ctx context.Context) error {
forwardJumpCheckEnabled := make(chan bool, 1)
s.stopper.AddCloser(stop.CloserFn(func() { close(forwardJumpCheckEnabled) }))

Expand All @@ -69,7 +69,7 @@ func (s *Server) startMonitoringForwardClockJumps(ctx context.Context) error {
// checkHLCUpperBoundExists determines whether there's an HLC
// upper bound that will need to refreshed/persisted after
// the server has initialized.
func (s *Server) checkHLCUpperBoundExistsAndEnsureMonotonicity(
func (s *topLevelServer) checkHLCUpperBoundExistsAndEnsureMonotonicity(
ctx context.Context, initialStart bool,
) (hlcUpperBoundExists bool, err error) {
if initialStart {
Expand Down Expand Up @@ -238,7 +238,9 @@ func periodicallyPersistHLCUpperBound(
//
// tickCallback is called whenever persistHLCUpperBoundCh or a ticker tick is
// processed
func (s *Server) startPersistingHLCUpperBound(ctx context.Context, hlcUpperBoundExists bool) error {
func (s *topLevelServer) startPersistingHLCUpperBound(
ctx context.Context, hlcUpperBoundExists bool,
) error {
tickerFn := time.NewTicker
persistHLCUpperBoundFn := func(t int64) error { /* function to persist upper bound of HLC to all stores */
return s.node.SetHLCUpperBound(context.Background(), t)
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func getPingCheckDecommissionFn(
// or remove actions. If maxErrors >0, range checks will stop once maxError is
// reached.
// The error returned is a gRPC error.
func (s *Server) DecommissionPreCheck(
func (s *topLevelServer) DecommissionPreCheck(
ctx context.Context,
nodeIDs []roachpb.NodeID,
strictReadiness bool,
Expand Down Expand Up @@ -314,7 +314,7 @@ func evaluateRangeCheckResult(

// Decommission idempotently sets the decommissioning flag for specified nodes.
// The error return is a gRPC error.
func (s *Server) Decommission(
func (s *topLevelServer) Decommission(
ctx context.Context, targetStatus livenesspb.MembershipStatus, nodeIDs []roachpb.NodeID,
) error {
// If we're asked to decommission ourself we may lose access to cluster RPC,
Expand Down Expand Up @@ -396,7 +396,7 @@ func (s *Server) Decommission(

// DecommissioningNodeMap returns the set of node IDs that are decommissioning
// from the perspective of the server.
func (s *Server) DecommissioningNodeMap() map[roachpb.NodeID]interface{} {
func (s *topLevelServer) DecommissioningNodeMap() map[roachpb.NodeID]interface{} {
s.decomNodeMap.RLock()
defer s.decomNodeMap.RUnlock()
nodes := make(map[roachpb.NodeID]interface{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/import_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
// to be written to the DB.
const maxBatchSize = 10000

func maybeImportTS(ctx context.Context, s *Server) (returnErr error) {
func maybeImportTS(ctx context.Context, s *topLevelServer) (returnErr error) {
// We don't want to do startup retries as this is not meant to be run in
// production.
ctx = startup.WithoutChecks(ctx)
Expand Down
8 changes: 5 additions & 3 deletions pkg/server/initial_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// and `cockroach demo` with 2 nodes or fewer.
// If adminUser is non-empty, an admin user with that name is
// created upon initialization. Its password is then also returned.
func (s *Server) RunInitialSQL(
func (s *topLevelServer) RunInitialSQL(
ctx context.Context, startSingleNode bool, adminUser, adminPassword string,
) error {
if s.cfg.ObsServiceAddr == base.ObsServiceEmbedFlagValue {
Expand Down Expand Up @@ -75,7 +75,9 @@ func (s *SQLServerWrapper) RunInitialSQL(context.Context, bool, string, string)
}

// createAdminUser creates an admin user with the given name.
func (s *Server) createAdminUser(ctx context.Context, adminUser, adminPassword string) error {
func (s *topLevelServer) createAdminUser(
ctx context.Context, adminUser, adminPassword string,
) error {
ie := s.sqlServer.internalExecutor
_, err := ie.Exec(
ctx, "admin-user", nil,
Expand All @@ -97,7 +99,7 @@ func (s *Server) createAdminUser(ctx context.Context, adminUser, adminPassword s
//
// The change is effected using the internal SQL interface of the
// given server object.
func (s *Server) disableReplication(ctx context.Context) (retErr error) {
func (s *topLevelServer) disableReplication(ctx context.Context) (retErr error) {
ie := s.sqlServer.internalExecutor

it, err := ie.QueryIterator(ctx, "get-zones", nil,
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func maybeRunLossOfQuorumRecoveryCleanup(
ctx context.Context,
ie isql.Executor,
stores *kvserver.Stores,
server *Server,
server *topLevelServer,
stopper *stop.Stopper,
) {
publishCtx, publishCancel := stopper.WithCancelOnQuiesce(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// migrationServer is an implementation of the Migration service. The RPCs here
// are used to power the upgrades infrastructure in pkg/upgrades.
type migrationServer struct {
server *Server
server *topLevelServer

// We use this mutex to serialize attempts to bump the cluster version.
syncutil.Mutex
Expand Down
46 changes: 23 additions & 23 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ import (
"google.golang.org/grpc/codes"
)

// Server is the cockroach server node.
type Server struct {
// topLevelServer is the cockroach server node.
type topLevelServer struct {
// The following fields are populated in NewServer.

nodeIDContainer *base.NodeIDContainer
Expand Down Expand Up @@ -225,7 +225,7 @@ type Server struct {
//
// The caller is responsible for listening on the server's ShutdownRequested()
// channel and calling stopper.Stop().
func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
func NewServer(cfg Config, stopper *stop.Stopper) (*topLevelServer, error) {
ctx := cfg.AmbientCtx.AnnotateCtx(context.Background())

if err := cfg.ValidateAddrs(ctx); err != nil {
Expand Down Expand Up @@ -946,7 +946,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
db, node.stores, storePool, st, nodeLiveness, internalExecutor, systemConfigWatcher,
)

lateBoundServer := &Server{}
lateBoundServer := &topLevelServer{}

// The following initialization is mirrored in NewTenantServer().
// Please keep them in sync.
Expand Down Expand Up @@ -1261,7 +1261,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
},
)

*lateBoundServer = Server{
*lateBoundServer = topLevelServer{
nodeIDContainer: nodeIDContainer,
cfg: cfg,
st: st,
Expand Down Expand Up @@ -1319,36 +1319,36 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}

// ClusterSettings returns the cluster settings.
func (s *Server) ClusterSettings() *cluster.Settings {
func (s *topLevelServer) ClusterSettings() *cluster.Settings {
return s.st
}

// AnnotateCtx is a convenience wrapper; see AmbientContext.
func (s *Server) AnnotateCtx(ctx context.Context) context.Context {
func (s *topLevelServer) AnnotateCtx(ctx context.Context) context.Context {
return s.cfg.AmbientCtx.AnnotateCtx(ctx)
}

// AnnotateCtxWithSpan is a convenience wrapper; see AmbientContext.
func (s *Server) AnnotateCtxWithSpan(
func (s *topLevelServer) AnnotateCtxWithSpan(
ctx context.Context, opName string,
) (context.Context, *tracing.Span) {
return s.cfg.AmbientCtx.AnnotateCtxWithSpan(ctx, opName)
}

// StorageClusterID returns the ID of the storage cluster this server is a part of.
func (s *Server) StorageClusterID() uuid.UUID {
func (s *topLevelServer) StorageClusterID() uuid.UUID {
return s.rpcContext.StorageClusterID.Get()
}

// NodeID returns the ID of this node within its cluster.
func (s *Server) NodeID() roachpb.NodeID {
func (s *topLevelServer) NodeID() roachpb.NodeID {
return s.node.Descriptor.NodeID
}

// InitialStart returns whether this is the first time the node has started (as
// opposed to being restarted). Only intended to help print debugging info
// during server startup.
func (s *Server) InitialStart() bool {
func (s *topLevelServer) InitialStart() bool {
return s.node.initialStart
}

Expand Down Expand Up @@ -1401,7 +1401,7 @@ func (li listenerInfo) Iter() map[string]string {
//
// The passed context can be used to trace the server startup. The context
// should represent the general startup operation.
func (s *Server) PreStart(ctx context.Context) error {
func (s *topLevelServer) PreStart(ctx context.Context) error {
ctx = s.AnnotateCtx(ctx)
done := startup.Begin(ctx)
defer done()
Expand Down Expand Up @@ -2173,7 +2173,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// AcceptClients starts listening for incoming SQL clients over the network.
// This mirrors the implementation of (*SQLServerWrapper).AcceptClients.
// TODO(knz): Find a way to implement this method only once for both.
func (s *Server) AcceptClients(ctx context.Context) error {
func (s *topLevelServer) AcceptClients(ctx context.Context) error {
workersCtx := s.AnnotateCtx(context.Background())

if err := startServeSQL(
Expand Down Expand Up @@ -2205,7 +2205,7 @@ func (s *Server) AcceptClients(ctx context.Context) error {

// AcceptInternalClients starts listening for incoming SQL connections on the
// internal loopback interface.
func (s *Server) AcceptInternalClients(ctx context.Context) error {
func (s *topLevelServer) AcceptInternalClients(ctx context.Context) error {
connManager := netutil.MakeTCPServer(ctx, s.stopper)

return s.stopper.RunAsyncTaskEx(ctx,
Expand All @@ -2231,34 +2231,34 @@ func (s *Server) AcceptInternalClients(ctx context.Context) error {

// ShutdownRequested returns a channel that is signaled when a subsystem wants
// the server to be shut down.
func (s *Server) ShutdownRequested() <-chan ShutdownRequest {
func (s *topLevelServer) ShutdownRequested() <-chan ShutdownRequest {
return s.stopTrigger.C()
}

// TempDir returns the filepath of the temporary directory used for temp storage.
// It is empty for an in-memory temp storage.
func (s *Server) TempDir() string {
func (s *topLevelServer) TempDir() string {
return s.cfg.TempStorageConfig.Path
}

// PGServer exports the pgwire server. Used by tests.
func (s *Server) PGServer() *pgwire.Server {
func (s *topLevelServer) PGServer() *pgwire.Server {
return s.sqlServer.pgServer
}

// SpanConfigReporter returns the spanconfig.Reporter. Used by tests.
func (s *Server) SpanConfigReporter() spanconfig.Reporter {
func (s *topLevelServer) SpanConfigReporter() spanconfig.Reporter {
return s.spanConfigReporter
}

// LogicalClusterID implements cli.serverStartupInterface. This
// implementation exports the logical cluster ID of the system tenant.
func (s *Server) LogicalClusterID() uuid.UUID {
func (s *topLevelServer) LogicalClusterID() uuid.UUID {
return s.sqlServer.LogicalClusterID()
}

// startDiagnostics starts periodic diagnostics reporting and update checking.
func (s *Server) startDiagnostics(ctx context.Context) {
func (s *topLevelServer) startDiagnostics(ctx context.Context) {
s.updates.PeriodicallyCheckForUpdates(ctx, s.stopper)
s.sqlServer.StartDiagnostics(ctx)
}
Expand All @@ -2268,12 +2268,12 @@ func init() {
}

// Insecure returns true iff the server has security disabled.
func (s *Server) Insecure() bool {
func (s *topLevelServer) Insecure() bool {
return s.cfg.Insecure
}

// TenantCapabilitiesReader returns the Server's tenantcapabilities.Reader.
func (s *Server) TenantCapabilitiesReader() tenantcapabilities.Reader {
func (s *topLevelServer) TenantCapabilitiesReader() tenantcapabilities.Reader {
return s.tenantCapabilitiesWatcher
}

Expand All @@ -2292,7 +2292,7 @@ func (s *Server) TenantCapabilitiesReader() tenantcapabilities.Reader {
// TODO(knz): This method is currently exported for use by the
// shutdown code in cli/start.go; however, this is a mis-design. The
// start code should use the Drain() RPC like quit does.
func (s *Server) Drain(
func (s *topLevelServer) Drain(
ctx context.Context, verbose bool,
) (remaining uint64, info redact.RedactableString, err error) {
return s.drain.runDrain(ctx, verbose)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *tenantServerWrapper) gracefulDrain(
// We do not implement the onDemandServer interface methods on *Server
// directly so as to not add noise to its go documentation.
type systemServerWrapper struct {
server *Server
server *topLevelServer
}

var _ onDemandServer = (*systemServerWrapper)(nil)
Expand Down
8 changes: 4 additions & 4 deletions pkg/server/server_controller_new_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ type tenantServerCreator interface {
) (onDemandServer, error)
}

var _ tenantServerCreator = &Server{}
var _ tenantServerCreator = &topLevelServer{}

// newTenantServer implements the tenantServerCreator interface.
func (s *Server) newTenantServer(
func (s *topLevelServer) newTenantServer(
ctx context.Context,
tenantNameContainer *roachpb.TenantNameContainer,
tenantStopper *stop.Stopper,
Expand Down Expand Up @@ -103,7 +103,7 @@ func (errInvalidTenantMarker) Error() string { return "invalid tenant" }
// is not shared.
var ErrInvalidTenant error = errInvalidTenantMarker{}

func (s *Server) getTenantID(
func (s *topLevelServer) getTenantID(
ctx context.Context, tenantName roachpb.TenantName,
) (roachpb.TenantID, error) {
var rec *mtinfopb.TenantInfo
Expand Down Expand Up @@ -149,7 +149,7 @@ func newTenantServerInternal(
return newSharedProcessTenantServer(newCtx, stopper, baseCfg, sqlCfg, tenantNameContainer)
}

func (s *Server) makeSharedProcessTenantConfig(
func (s *topLevelServer) makeSharedProcessTenantConfig(
ctx context.Context, tenantID roachpb.TenantID, index int, stopper *stop.Stopper,
) (BaseConfig, SQLConfig, error) {
// Create a configuration for the new tenant.
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestServerController(t *testing.T) {

d, err := ts.serverController.getServer(ctx, "system")
require.NoError(t, err)
if d.(*systemServerWrapper).server != ts.Server {
if d.(*systemServerWrapper).server != ts.topLevelServer {
t.Fatal("expected wrapped system server")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_obs_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// startEmbeddedObsService creates the schema for the Observability Service (if
// it doesn't exist already), starts the internal RPC service for event
// ingestion and hooks up the event exporter to talk to the local service.
func (s *Server) startEmbeddedObsService(
func (s *topLevelServer) startEmbeddedObsService(
ctx context.Context, knobs *obs.EventExporterTestingKnobs,
) error {
// Create the Obs Service schema.
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_special_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestInternalSQL(t *testing.T) {
conf.User = "root"
// Configure pgx to connect on the loopback listener.
conf.DialFunc = func(ctx context.Context, network, addr string) (net.Conn, error) {
return s.(*testServer).Server.loopbackPgL.Connect(ctx)
return s.(*testServer).topLevelServer.loopbackPgL.Connect(ctx)
}
conn, err := pgx.ConnectConfig(ctx, conf)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 2b29313

Please sign in to comment.