Skip to content

Commit

Permalink
server: refactor the grpc server
Browse files Browse the repository at this point in the history
This patch creates the grpcServer object, to encapsulate the grpc.Server
object, the serveMode object and the interceptor that our server.Server
uses to filter out RPCs.
The idea is to work towards decoupling the gprc.Server from our
server.Server object. I'd like to have the grpc server be created before
Server.Start(): I'd like the cluster id and node id to be available by
the time Server.Start() is called so that we can get rid of the
nodeIDCountainer that everybody and their dog uses. But for getting
these ids a grpc server needs to be running early to handle the "init"
rpc which bootstraps a cluster.
This patch doesn't accomplish too much - it doesn't do anything about
actually starting to serve any requests without a Server (i.e. create
the needed listeners), but I think the patch stands on its own too as
good refactoring.

Release note: None
  • Loading branch information
andreimatei committed Jan 14, 2019
1 parent af6f64c commit 9287008
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 46 deletions.
70 changes: 46 additions & 24 deletions pkg/server/servemode.go → pkg/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,31 @@ import (
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)

// grpcServer is a wrapper on top of a grpc.Server that includes an interceptor
// and a mode of operation that can instruct the interceptor to refuse certain
// RPCs.
type grpcServer struct {
*grpc.Server
mode serveMode
}

func newGRPCServer(rpcCtx *rpc.Context) *grpcServer {
s := &grpcServer{}
s.mode.set(modeInitializing)
s.Server = rpc.NewServerWithInterceptor(rpcCtx, func(path string) error {
return s.intercept(path)
})
return s
}

type serveMode int32

// A list of the server states for bootstrap process.
const (
// modeInitializing is intended for server initialization process.
Expand All @@ -36,25 +57,31 @@ const (
modeDraining
)

type serveMode int32
func (s *grpcServer) setMode(mode serveMode) {
s.mode.set(mode)
}

// Intercept implements filtering rules for each server state.
func (s *Server) Intercept() func(string) error {
interceptors := map[string]struct{}{
"/cockroach.rpc.Heartbeat/Ping": {},
"/cockroach.gossip.Gossip/Gossip": {},
"/cockroach.server.serverpb.Init/Bootstrap": {},
"/cockroach.server.serverpb.Status/Details": {},
}
return func(fullName string) error {
if s.serveMode.operational() {
return nil
}
if _, allowed := interceptors[fullName]; !allowed {
return WaitingForInitError(fullName)
}
func (s *grpcServer) operational() bool {
sMode := s.mode.get()
return sMode == modeOperational || sMode == modeDraining
}

var rpcsAllowedWhileBootstrapping = map[string]struct{}{
"/cockroach.rpc.Heartbeat/Ping": {},
"/cockroach.gossip.Gossip/Gossip": {},
"/cockroach.server.serverpb.Init/Bootstrap": {},
"/cockroach.server.serverpb.Status/Details": {},
}

// intercept implements filtering rules for each server state.
func (s *grpcServer) intercept(fullName string) error {
if s.operational() {
return nil
}
if _, allowed := rpcsAllowedWhileBootstrapping[fullName]; !allowed {
return s.waitingForInitError(fullName)
}
return nil
}

func (s *serveMode) set(mode serveMode) {
Expand All @@ -65,14 +92,9 @@ func (s *serveMode) get() serveMode {
return serveMode(atomic.LoadInt32((*int32)(s)))
}

func (s *serveMode) operational() bool {
sMode := s.get()
return sMode == modeOperational || sMode == modeDraining
}

// WaitingForInitError indicates that the server cannot run the specified
// method until the node has been initialized.
func WaitingForInitError(methodName string) error {
// waitingForInitError creates an error indicating that the server cannot run
// the specified method until the node has been initialized.
func (s *grpcServer) waitingForInitError(methodName string) error {
return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/server/servemode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (

func TestWaitingForInitError(t *testing.T) {
defer leaktest.AfterTest(t)()
if err := WaitingForInitError("foo"); !IsWaitingForInit(err) {
s := &grpcServer{}
if err := s.waitingForInitError("foo"); !IsWaitingForInit(err) {
t.Errorf("WaitingForInitError() not recognized by IsWaitingForInit(): %v", err)
}
if err := grpcstatus.Errorf(codes.Unavailable, "foo"); IsWaitingForInit(err) {
Expand Down
39 changes: 19 additions & 20 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,13 @@ func (mux *safeServeMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type Server struct {
nodeIDContainer base.NodeIDContainer

cfg Config
st *cluster.Settings
mux safeServeMux
clock *hlc.Clock
rpcContext *rpc.Context
grpc *grpc.Server
cfg Config
st *cluster.Settings
mux safeServeMux
clock *hlc.Clock
rpcContext *rpc.Context
// The gRPC server on which the different RPC handlers will be registered.
grpc *grpcServer
gossip *gossip.Gossip
nodeDialer *nodedialer.Dialer
nodeLiveness *storage.NodeLiveness
Expand Down Expand Up @@ -189,7 +190,6 @@ type Server struct {
adminMemMetrics sql.MemoryMetrics
// sqlMemMetrics are used to track memory usage of sql sessions.
sqlMemMetrics sql.MemoryMetrics
serveMode
}

// NewServer creates a Server from a server.Config.
Expand All @@ -212,7 +212,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
cfg: cfg,
registry: metric.NewRegistry(),
}
s.serveMode.set(modeInitializing)

// If the tracer has a Close function, call it after the server stops.
if tr, ok := cfg.AmbientCtx.Tracer.(stop.Closer); ok {
Expand Down Expand Up @@ -251,14 +250,14 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}
}

s.grpc = rpc.NewServerWithInterceptor(s.rpcContext, s.Intercept())
s.grpc = newGRPCServer(s.rpcContext)

s.gossip = gossip.New(
s.cfg.AmbientCtx,
&s.rpcContext.ClusterID,
&s.nodeIDContainer,
s.rpcContext,
s.grpc,
s.grpc.Server,
s.stopper,
s.registry,
s.cfg.Locality,
Expand Down Expand Up @@ -340,7 +339,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
)

s.raftTransport = storage.NewRaftTransport(
s.cfg.AmbientCtx, st, s.nodeDialer, s.grpc, s.stopper,
s.cfg.AmbientCtx, st, s.nodeDialer, s.grpc.Server, s.stopper,
)

// Set up internal memory metrics for use by internal SQL executors.
Expand Down Expand Up @@ -481,9 +480,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.node = NewNode(
storeCfg, s.recorder, s.registry, s.stopper,
txnMetrics, nil /* execCfg */, &s.rpcContext.ClusterID)
roachpb.RegisterInternalServer(s.grpc, s.node)
storage.RegisterPerReplicaServer(s.grpc, s.node.perReplicaServer)
s.node.storeCfg.ClosedTimestamp.RegisterClosedTimestampServer(s.grpc)
roachpb.RegisterInternalServer(s.grpc.Server, s.node)
storage.RegisterPerReplicaServer(s.grpc.Server, s.node.perReplicaServer)
s.node.storeCfg.ClosedTimestamp.RegisterClosedTimestampServer(s.grpc.Server)

s.sessionRegistry = sql.NewSessionRegistry()
s.jobRegistry = jobs.MakeRegistry(
Expand Down Expand Up @@ -535,7 +534,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}

s.distSQLServer = distsqlrun.NewServer(ctx, distSQLCfg)
distsqlpb.RegisterDistSQLServer(s.grpc, s.distSQLServer)
distsqlpb.RegisterDistSQLServer(s.grpc.Server, s.distSQLServer)

s.admin = newAdminServer(s)
s.status = newStatusServer(
Expand All @@ -555,13 +554,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
)
s.authentication = newAuthenticationServer(s)
for _, gw := range []grpcGatewayServer{s.admin, s.status, s.authentication, &s.tsServer} {
gw.RegisterService(s.grpc)
gw.RegisterService(s.grpc.Server)
}

s.initServer = newInitServer(s)
s.initServer.semaphore.acquire()

serverpb.RegisterInitServer(s.grpc, s.initServer)
serverpb.RegisterInitServer(s.grpc.Server, s.initServer)

nodeInfo := sql.NodeInfo{
AdminURL: cfg.AdminURL,
Expand Down Expand Up @@ -1562,7 +1561,7 @@ func (s *Server) Start(ctx context.Context) error {
s.distSQLServer.Start()
s.pgServer.Start(ctx, s.stopper)

s.serveMode.set(modeOperational)
s.grpc.setMode(modeOperational)

log.Infof(ctx, "starting %s server at %s (use: %s)",
s.cfg.HTTPRequestScheme(), s.cfg.HTTPAddr, s.cfg.HTTPAdvertiseAddr)
Expand Down Expand Up @@ -1757,15 +1756,15 @@ func (s *Server) doDrain(
switch mode {
case serverpb.DrainMode_CLIENT:
if setTo {
s.serveMode.set(modeDraining)
s.grpc.setMode(modeDraining)
// Wait for drainUnreadyWait. This will fail load balancer checks and
// delay draining so that client traffic can move off this node.
time.Sleep(drainWait.Get(&s.st.SV))
}
if err := func() error {
if !setTo {
// Execute this last.
defer func() { s.serveMode.set(modeOperational) }()
defer func() { s.grpc.setMode(modeOperational) }()
}
// Since enabling the lease manager's draining mode will prevent
// the acquisition of new leases, the switch must be made after
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (s *statusServer) Details(
return resp, nil
}

serveMode := s.admin.server.serveMode.get()
serveMode := s.admin.server.grpc.mode.get()
if serveMode != modeOperational {
return nil, grpcstatus.Error(codes.Unavailable, "node is not ready")
}
Expand Down

0 comments on commit 9287008

Please sign in to comment.