diff --git a/pkg/server/server.go b/pkg/server/server.go index 3841fc384ba4..b13ef5e6dcc6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -148,7 +148,7 @@ func (mux *safeServeMux) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Server is the cockroach server node. type Server struct { - nodeIDContainer base.NodeIDContainer + nodeIDContainer *base.NodeIDContainer cfg Config st *cluster.Settings @@ -178,7 +178,7 @@ type Server struct { // stores is already initialized. initServer *initServer tsDB *ts.DB - tsServer ts.Server + tsServer *ts.Server raftTransport *kvserver.RaftTransport stopper *stop.Stopper @@ -235,14 +235,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } else { clock = hlc.NewClock(hlc.UnixNano, time.Duration(cfg.MaxOffset)) } - s := &Server{ - st: st, - clock: clock, - stopper: stopper, - cfg: cfg, - registry: metric.NewRegistry(), - } - + registry := metric.NewRegistry() // If the tracer has a Close function, call it after the server stops. if tr, ok := cfg.AmbientCtx.Tracer.(stop.Closer); ok { stopper.AddCloser(tr) @@ -253,7 +246,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { return nil, err } else if certMgr != nil { // The certificate manager is non-nil in secure mode. - s.registry.AddMetricStruct(certMgr.Metrics()) + registry.AddMetricStruct(certMgr.Metrics()) } // Add a dynamic log tag value for the node ID. @@ -268,48 +261,50 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { // regular tag since it's just doing an (atomic) load when a log/trace message // is constructed. The node ID is set by the Store if this host was // bootstrapped; otherwise a new one is allocated in Node. - s.cfg.AmbientCtx.AddLogTag("n", &s.nodeIDContainer) + nodeIDContainer := &base.NodeIDContainer{} + cfg.AmbientCtx.AddLogTag("n", nodeIDContainer) - ctx := s.AnnotateCtx(context.Background()) + ctx := cfg.AmbientCtx.AnnotateCtx(context.Background()) // Check the compatibility between the configured addresses and that // provided in certificates. This also logs the certificate // addresses in all cases to aid troubleshooting. // This must be called after the certificate manager was initialized // and after ValidateAddrs(). - s.cfg.CheckCertificateAddrs(ctx) + cfg.CheckCertificateAddrs(ctx) - if knobs := s.cfg.TestingKnobs.Server; knobs != nil { + var rpcContext *rpc.Context + if knobs := cfg.TestingKnobs.Server; knobs != nil { serverKnobs := knobs.(*TestingKnobs) - s.rpcContext = rpc.NewContextWithTestingKnobs( - s.cfg.AmbientCtx, s.cfg.Config, s.clock, s.stopper, cfg.Settings, + rpcContext = rpc.NewContextWithTestingKnobs( + cfg.AmbientCtx, cfg.Config, clock, stopper, cfg.Settings, serverKnobs.ContextTestingKnobs, ) } else { - s.rpcContext = rpc.NewContext(s.cfg.AmbientCtx, s.cfg.Config, s.clock, s.stopper, + rpcContext = rpc.NewContext(cfg.AmbientCtx, cfg.Config, clock, stopper, cfg.Settings) } - s.rpcContext.HeartbeatCB = func() { - if err := s.rpcContext.RemoteClocks.VerifyClockOffset(ctx); err != nil { + rpcContext.HeartbeatCB = func() { + if err := rpcContext.RemoteClocks.VerifyClockOffset(ctx); err != nil { log.Fatal(ctx, err) } } - s.registry.AddMetricStruct(s.rpcContext.Metrics()) + registry.AddMetricStruct(rpcContext.Metrics()) - s.grpc = newGRPCServer(s.rpcContext) + grpcServer := newGRPCServer(rpcContext) - s.gossip = gossip.New( - s.cfg.AmbientCtx, - &s.rpcContext.ClusterID, - &s.nodeIDContainer, - s.rpcContext, - s.grpc.Server, - s.stopper, - s.registry, - s.cfg.Locality, - &s.cfg.DefaultZoneConfig, + g := gossip.New( + cfg.AmbientCtx, + &rpcContext.ClusterID, + nodeIDContainer, + rpcContext, + grpcServer.Server, + stopper, + registry, + cfg.Locality, + &cfg.DefaultZoneConfig, ) - s.nodeDialer = nodedialer.New(s.rpcContext, gossip.AddressResolver(s.gossip)) + nodeDialer := nodedialer.New(rpcContext, gossip.AddressResolver(g)) bootstrapVersion := cfg.Settings.Version.BinaryVersion() if knobs := cfg.TestingKnobs.Server; knobs != nil { @@ -318,17 +313,17 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } } - s.initServer = newInitServer( + initServer := newInitServer( cfg.Settings.Version.BinaryVersion(), cfg.Settings.Version.BinaryMinSupportedVersion(), clusterversion.ClusterVersion{Version: bootstrapVersion}, &cfg.DefaultZoneConfig, &cfg.DefaultSystemZoneConfig, ) - serverpb.RegisterInitServer(s.grpc.Server, s.initServer) + serverpb.RegisterInitServer(grpcServer.Server, initServer) - s.runtime = status.NewRuntimeStatSampler(context.TODO(), s.clock) - s.registry.AddMetricStruct(s.runtime) + runtimeSampler := status.NewRuntimeStatSampler(context.TODO(), clock) + registry.AddMetricStruct(runtimeSampler) // A custom RetryOptions is created which uses stopper.ShouldQuiesce() as // the Closer. This prevents infinite retry loops from occurring during @@ -342,79 +337,81 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { // succeed because the only server has been shut down; thus, the // DistSender needs to know that it should not retry in this situation. var clientTestingKnobs kvcoord.ClientTestingKnobs - if kvKnobs := s.cfg.TestingKnobs.KVClient; kvKnobs != nil { + if kvKnobs := cfg.TestingKnobs.KVClient; kvKnobs != nil { clientTestingKnobs = *kvKnobs.(*kvcoord.ClientTestingKnobs) } - retryOpts := s.cfg.RetryOptions + retryOpts := cfg.RetryOptions if retryOpts == (retry.Options{}) { retryOpts = base.DefaultRetryOptions() } - retryOpts.Closer = s.stopper.ShouldQuiesce() + retryOpts.Closer = stopper.ShouldQuiesce() distSenderCfg := kvcoord.DistSenderConfig{ - AmbientCtx: s.cfg.AmbientCtx, + AmbientCtx: cfg.AmbientCtx, Settings: st, - Clock: s.clock, - RPCContext: s.rpcContext, + Clock: clock, + RPCContext: rpcContext, RPCRetryOptions: &retryOpts, TestingKnobs: clientTestingKnobs, - NodeDialer: s.nodeDialer, + NodeDialer: nodeDialer, } - s.distSender = kvcoord.NewDistSender(distSenderCfg, s.gossip) - s.registry.AddMetricStruct(s.distSender.Metrics()) + distSender := kvcoord.NewDistSender(distSenderCfg, g) + registry.AddMetricStruct(distSender.Metrics()) - txnMetrics := kvcoord.MakeTxnMetrics(s.cfg.HistogramWindowInterval()) - s.registry.AddMetricStruct(txnMetrics) + txnMetrics := kvcoord.MakeTxnMetrics(cfg.HistogramWindowInterval()) + registry.AddMetricStruct(txnMetrics) txnCoordSenderFactoryCfg := kvcoord.TxnCoordSenderFactoryConfig{ - AmbientCtx: s.cfg.AmbientCtx, + AmbientCtx: cfg.AmbientCtx, Settings: st, - Clock: s.clock, - Stopper: s.stopper, - Linearizable: s.cfg.Linearizable, + Clock: clock, + Stopper: stopper, + Linearizable: cfg.Linearizable, Metrics: txnMetrics, TestingKnobs: clientTestingKnobs, } - s.tcsFactory = kvcoord.NewTxnCoordSenderFactory(txnCoordSenderFactoryCfg, s.distSender) + tcsFactory := kvcoord.NewTxnCoordSenderFactory(txnCoordSenderFactoryCfg, distSender) dbCtx := kv.DefaultDBContext() - dbCtx.NodeID = &s.nodeIDContainer - dbCtx.Stopper = s.stopper - s.db = kv.NewDBWithContext(s.cfg.AmbientCtx, s.tcsFactory, s.clock, dbCtx) + dbCtx.NodeID = nodeIDContainer + dbCtx.Stopper = stopper + db := kv.NewDBWithContext(cfg.AmbientCtx, tcsFactory, clock, dbCtx) - nlActive, nlRenewal := s.cfg.NodeLivenessDurations() + nlActive, nlRenewal := cfg.NodeLivenessDurations() - s.nodeLiveness = kvserver.NewNodeLiveness( - s.cfg.AmbientCtx, - s.clock, - s.db, - s.engines, - s.gossip, + nodeLiveness := kvserver.NewNodeLiveness( + cfg.AmbientCtx, + clock, + db, + // TODO(tbg): this is supposed to get the engines slice, which is only + // available at start time. + nil, + g, nlActive, nlRenewal, - s.st, - s.cfg.HistogramWindowInterval(), + st, + cfg.HistogramWindowInterval(), ) - s.registry.AddMetricStruct(s.nodeLiveness.Metrics()) + registry.AddMetricStruct(nodeLiveness.Metrics()) - s.storePool = kvserver.NewStorePool( - s.cfg.AmbientCtx, - s.st, - s.gossip, - s.clock, - s.nodeLiveness.GetNodeCount, - kvserver.MakeStorePoolNodeLivenessFunc(s.nodeLiveness), + storePool := kvserver.NewStorePool( + cfg.AmbientCtx, + st, + g, + clock, + nodeLiveness.GetNodeCount, + kvserver.MakeStorePoolNodeLivenessFunc(nodeLiveness), /* deterministic */ false, ) - s.raftTransport = kvserver.NewRaftTransport( - s.cfg.AmbientCtx, st, s.nodeDialer, s.grpc.Server, s.stopper, + raftTransport := kvserver.NewRaftTransport( + cfg.AmbientCtx, st, nodeDialer, grpcServer.Server, stopper, ) - s.tsDB = ts.NewDB(s.db, s.cfg.Settings) - s.registry.AddMetricStruct(s.tsDB.Metrics()) + tsDB := ts.NewDB(db, cfg.Settings) + registry.AddMetricStruct(tsDB.Metrics()) nodeCountFn := func() int64 { - return s.nodeLiveness.Metrics().LiveNodes.Value() + return nodeLiveness.Metrics().LiveNodes.Value() } - s.tsServer = ts.MakeServer(s.cfg.AmbientCtx, s.tsDB, nodeCountFn, s.cfg.TimeSeriesServerConfig, s.stopper) + tsServer := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, stopper) // The InternalExecutor will be further initialized later, as we create more // of the server's components. There's a circular dependency - many things @@ -427,171 +424,211 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { // This function defines how ExternalStorage objects are created. externalStorage := func(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) { return cloud.MakeExternalStorage( - ctx, dest, s.cfg.ExternalIOConfig, st, + ctx, dest, cfg.ExternalIOConfig, st, blobs.NewBlobClientFactory( - s.nodeIDContainer.Get(), - s.nodeDialer, + nodeIDContainer.Get(), + nodeDialer, st.ExternalIODir, ), ) } externalStorageFromURI := func(ctx context.Context, uri string) (cloud.ExternalStorage, error) { return cloud.ExternalStorageFromURI( - ctx, uri, s.cfg.ExternalIOConfig, st, + ctx, uri, cfg.ExternalIOConfig, st, blobs.NewBlobClientFactory( - s.nodeIDContainer.Get(), - s.nodeDialer, + nodeIDContainer.Get(), + nodeDialer, st.ExternalIODir, ), ) } - var err error - if s.protectedtsProvider, err = ptprovider.New(ptprovider.Config{ - DB: s.db, + protectedtsProvider, err := ptprovider.New(ptprovider.Config{ + DB: db, InternalExecutor: internalExecutor, Settings: st, - }); err != nil { + }) + if err != nil { return nil, err } - // TODO(bdarnell): make StoreConfig configurable. + // Break a circular dependency: we need a Node to make a StoreConfig (for + // ClosedTimestamp), but the Node needs a StoreConfig to be made. + var lateBoundNode *Node + storeCfg := kvserver.StoreConfig{ - DefaultZoneConfig: &s.cfg.DefaultZoneConfig, + DefaultZoneConfig: &cfg.DefaultZoneConfig, Settings: st, - AmbientCtx: s.cfg.AmbientCtx, - RaftConfig: s.cfg.RaftConfig, - Clock: s.clock, - DB: s.db, - Gossip: s.gossip, - NodeLiveness: s.nodeLiveness, - Transport: s.raftTransport, - NodeDialer: s.nodeDialer, - RPCContext: s.rpcContext, - ScanInterval: s.cfg.ScanInterval, - ScanMinIdleTime: s.cfg.ScanMinIdleTime, - ScanMaxIdleTime: s.cfg.ScanMaxIdleTime, - TimestampCachePageSize: s.cfg.TimestampCachePageSize, - HistogramWindowInterval: s.cfg.HistogramWindowInterval(), - StorePool: s.storePool, + AmbientCtx: cfg.AmbientCtx, + RaftConfig: cfg.RaftConfig, + Clock: clock, + DB: db, + Gossip: g, + NodeLiveness: nodeLiveness, + Transport: raftTransport, + NodeDialer: nodeDialer, + RPCContext: rpcContext, + ScanInterval: cfg.ScanInterval, + ScanMinIdleTime: cfg.ScanMinIdleTime, + ScanMaxIdleTime: cfg.ScanMaxIdleTime, + TimestampCachePageSize: cfg.TimestampCachePageSize, + HistogramWindowInterval: cfg.HistogramWindowInterval(), + StorePool: storePool, SQLExecutor: internalExecutor, - LogRangeEvents: s.cfg.EventLogEnabled, - RangeDescriptorCache: s.distSender.RangeDescriptorCache(), - TimeSeriesDataStore: s.tsDB, + LogRangeEvents: cfg.EventLogEnabled, + RangeDescriptorCache: distSender.RangeDescriptorCache(), + TimeSeriesDataStore: tsDB, // Initialize the closed timestamp subsystem. Note that it won't // be ready until it is .Start()ed, but the grpc server can be // registered early. ClosedTimestamp: container.NewContainer(container.Config{ Settings: st, - Stopper: s.stopper, - Clock: s.nodeLiveness.AsLiveClock(), + Stopper: stopper, + Clock: nodeLiveness.AsLiveClock(), // NB: s.node is not defined at this point, but it will be // before this is ever called. Refresh: func(rangeIDs ...roachpb.RangeID) { for _, rangeID := range rangeIDs { - repl, _, err := s.node.stores.GetReplicaForRangeID(rangeID) + repl, _, err := lateBoundNode.stores.GetReplicaForRangeID(rangeID) if err != nil || repl == nil { continue } repl.EmitMLAI() } }, - Dialer: s.nodeDialer.CTDialer(), + Dialer: nodeDialer.CTDialer(), }), EnableEpochRangeLeases: true, ExternalStorage: externalStorage, ExternalStorageFromURI: externalStorageFromURI, - ProtectedTimestampCache: s.protectedtsProvider, + ProtectedTimestampCache: protectedtsProvider, } - if storeTestingKnobs := s.cfg.TestingKnobs.Store; storeTestingKnobs != nil { + if storeTestingKnobs := cfg.TestingKnobs.Store; storeTestingKnobs != nil { storeCfg.TestingKnobs = *storeTestingKnobs.(*kvserver.StoreTestingKnobs) } - s.recorder = status.NewMetricsRecorder(s.clock, s.nodeLiveness, s.rpcContext, s.gossip, st) - s.registry.AddMetricStruct(s.rpcContext.RemoteClocks.Metrics()) - - s.node = NewNode( - storeCfg, s.recorder, s.registry, s.stopper, - txnMetrics, nil /* execCfg */, &s.rpcContext.ClusterID) - roachpb.RegisterInternalServer(s.grpc.Server, s.node) - kvserver.RegisterPerReplicaServer(s.grpc.Server, s.node.perReplicaServer) - s.node.storeCfg.ClosedTimestamp.RegisterClosedTimestampServer(s.grpc.Server) - s.replicationReporter = reports.NewReporter( - s.db, s.node.stores, s.storePool, - s.ClusterSettings(), s.nodeLiveness, internalExecutor) - - s.protectedtsReconciler = ptreconcile.NewReconciler(ptreconcile.Config{ - Settings: s.st, - Stores: s.node.stores, - DB: s.db, - Storage: s.protectedtsProvider, - Cache: s.protectedtsProvider, + recorder := status.NewMetricsRecorder(clock, nodeLiveness, rpcContext, g, st) + registry.AddMetricStruct(rpcContext.RemoteClocks.Metrics()) + + node := NewNode( + storeCfg, recorder, registry, stopper, + txnMetrics, nil /* execCfg */, &rpcContext.ClusterID) + lateBoundNode = node + roachpb.RegisterInternalServer(grpcServer.Server, node) + kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer) + node.storeCfg.ClosedTimestamp.RegisterClosedTimestampServer(grpcServer.Server) + replicationReporter := reports.NewReporter( + db, node.stores, storePool, st, nodeLiveness, internalExecutor) + + protectedtsReconciler := ptreconcile.NewReconciler(ptreconcile.Config{ + Settings: st, + Stores: node.stores, + DB: db, + Storage: protectedtsProvider, + Cache: protectedtsProvider, StatusFuncs: ptreconcile.StatusFuncs{ jobsprotectedts.MetaType: jobsprotectedts.MakeStatusFunc(jobRegistry), }, }) - s.registry.AddMetricStruct(s.protectedtsReconciler.Metrics()) + registry.AddMetricStruct(protectedtsReconciler.Metrics()) - s.admin = newAdminServer(s) + lateBoundServer := &Server{} + // TODO(tbg): don't pass the whole Server into lateBoundServer to avoid this + // hack. + adminServer := newAdminServer(lateBoundServer) sessionRegistry := sql.NewSessionRegistry() - s.status = newStatusServer( - s.cfg.AmbientCtx, + statusServer := newStatusServer( + cfg.AmbientCtx, st, - s.cfg.Config, - s.admin, - s.db, - s.gossip, - s.recorder, - s.nodeLiveness, - s.storePool, - s.rpcContext, - s.node.stores, - s.stopper, + cfg.Config, + adminServer, + db, + g, + recorder, + nodeLiveness, + storePool, + rpcContext, + node.stores, + stopper, sessionRegistry, internalExecutor, ) - s.authentication = newAuthenticationServer(s) - for i, gw := range []grpcGatewayServer{s.admin, s.status, s.authentication, &s.tsServer} { + // TODO(tbg): don't pass all of Server into this to avoid this hack. + authenticationServer := newAuthenticationServer(lateBoundServer) + for i, gw := range []grpcGatewayServer{adminServer, statusServer, authenticationServer, &tsServer} { if reflect.ValueOf(gw).IsNil() { return nil, errors.Errorf("%d: nil", i) } - gw.RegisterService(s.grpc.Server) + gw.RegisterService(grpcServer.Server) } - flowDB := kv.NewDB(cfg.AmbientCtx, s.tcsFactory, s.clock) - s.sqlServer, err = newSQLServer(sqlServerArgs{ - Config: &s.cfg, // NB: s.cfg has a populated AmbientContext. + flowDB := kv.NewDB(cfg.AmbientCtx, tcsFactory, clock) + sqlServer, err := newSQLServer(sqlServerArgs{ + Config: &cfg, // NB: s.cfg has a populated AmbientContext. stopper: stopper, clock: clock, - rpcContext: s.rpcContext, - distSender: s.distSender, - status: s.status, - nodeLiveness: s.nodeLiveness, - protectedtsProvider: s.protectedtsProvider, - gossip: s.gossip, - nodeDialer: s.nodeDialer, - grpcServer: s.grpc.Server, - recorder: s.recorder, - runtime: s.runtime, - db: s.db, - registry: s.registry, + rpcContext: rpcContext, + distSender: distSender, + status: statusServer, + nodeLiveness: nodeLiveness, + protectedtsProvider: protectedtsProvider, + gossip: g, + nodeDialer: nodeDialer, + grpcServer: grpcServer.Server, + recorder: recorder, + runtime: runtimeSampler, + db: db, + registry: registry, internalExecutor: internalExecutor, - nodeIDContainer: &s.nodeIDContainer, + nodeIDContainer: nodeIDContainer, flowDB: flowDB, externalStorage: externalStorage, externalStorageFromURI: externalStorageFromURI, jobRegistry: jobRegistry, - isMeta1Leaseholder: s.node.stores.IsMeta1Leaseholder, + isMeta1Leaseholder: node.stores.IsMeta1Leaseholder, }) if err != nil { return nil, err } - s.debug = debug.NewServer(s.ClusterSettings(), s.sqlServer.pgServer.HBADebugFn()) - s.node.InitLogger(s.sqlServer.execCfg) - return s, err + debugServer := debug.NewServer(st, sqlServer.pgServer.HBADebugFn()) + node.InitLogger(sqlServer.execCfg) + + *lateBoundServer = Server{ + nodeIDContainer: nodeIDContainer, + cfg: cfg, + st: st, + clock: clock, + rpcContext: rpcContext, + grpc: grpcServer, + gossip: g, + nodeDialer: nodeDialer, + nodeLiveness: nodeLiveness, + storePool: storePool, + tcsFactory: tcsFactory, + distSender: distSender, + db: db, + node: node, + registry: registry, + recorder: recorder, + runtime: runtimeSampler, + admin: adminServer, + status: statusServer, + authentication: authenticationServer, + initServer: initServer, + tsDB: tsDB, + tsServer: &tsServer, + raftTransport: raftTransport, + stopper: stopper, + debug: debugServer, + replicationReporter: replicationReporter, + protectedtsProvider: protectedtsProvider, + protectedtsReconciler: protectedtsReconciler, + sqlServer: sqlServer, + } + return lateBoundServer, err } type sqlServerArgs struct { @@ -1521,7 +1558,7 @@ func (s *Server) Start(ctx context.Context) error { } }) - for _, gw := range []grpcGatewayServer{s.admin, s.status, s.authentication, &s.tsServer} { + for _, gw := range []grpcGatewayServer{s.admin, s.status, s.authentication, s.tsServer} { if err := gw.RegisterGateway(gwCtx, gwMux, conn); err != nil { return err } @@ -1881,7 +1918,7 @@ func (s *Server) Start(ctx context.Context) error { ui.Handler(ui.Config{ ExperimentalUseLogin: s.cfg.EnableWebSessionAuthentication, LoginEnabled: s.cfg.RequireWebSession(), - NodeID: &s.nodeIDContainer, + NodeID: s.nodeIDContainer, GetUser: func(ctx context.Context) *string { if u, ok := ctx.Value(webSessionUserKey{}).(string); ok { return &u