Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: refactor the grpc server #33690

Merged
merged 1 commit into from
Jan 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 46 additions & 24 deletions pkg/server/servemode.go → pkg/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,31 @@ import (
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)

// grpcServer is a wrapper on top of a grpc.Server that includes an interceptor
// and a mode of operation that can instruct the interceptor to refuse certain
// RPCs.
type grpcServer struct {
*grpc.Server
mode serveMode
}

func newGRPCServer(rpcCtx *rpc.Context) *grpcServer {
s := &grpcServer{}
s.mode.set(modeInitializing)
s.Server = rpc.NewServerWithInterceptor(rpcCtx, func(path string) error {
return s.intercept(path)
})
return s
}

type serveMode int32

// A list of the server states for bootstrap process.
const (
// modeInitializing is intended for server initialization process.
Expand All @@ -36,25 +57,31 @@ const (
modeDraining
)

type serveMode int32
func (s *grpcServer) setMode(mode serveMode) {
s.mode.set(mode)
}

// Intercept implements filtering rules for each server state.
func (s *Server) Intercept() func(string) error {
interceptors := map[string]struct{}{
"/cockroach.rpc.Heartbeat/Ping": {},
"/cockroach.gossip.Gossip/Gossip": {},
"/cockroach.server.serverpb.Init/Bootstrap": {},
"/cockroach.server.serverpb.Status/Details": {},
}
return func(fullName string) error {
if s.serveMode.operational() {
return nil
}
if _, allowed := interceptors[fullName]; !allowed {
return WaitingForInitError(fullName)
}
func (s *grpcServer) operational() bool {
sMode := s.mode.get()
return sMode == modeOperational || sMode == modeDraining
}

var rpcsAllowedWhileBootstrapping = map[string]struct{}{
"/cockroach.rpc.Heartbeat/Ping": {},
"/cockroach.gossip.Gossip/Gossip": {},
"/cockroach.server.serverpb.Init/Bootstrap": {},
"/cockroach.server.serverpb.Status/Details": {},
}

// intercept implements filtering rules for each server state.
func (s *grpcServer) intercept(fullName string) error {
if s.operational() {
return nil
}
if _, allowed := rpcsAllowedWhileBootstrapping[fullName]; !allowed {
return s.waitingForInitError(fullName)
}
return nil
}

func (s *serveMode) set(mode serveMode) {
Expand All @@ -65,14 +92,9 @@ func (s *serveMode) get() serveMode {
return serveMode(atomic.LoadInt32((*int32)(s)))
}

func (s *serveMode) operational() bool {
sMode := s.get()
return sMode == modeOperational || sMode == modeDraining
}

// WaitingForInitError indicates that the server cannot run the specified
// method until the node has been initialized.
func WaitingForInitError(methodName string) error {
// waitingForInitError creates an error indicating that the server cannot run
// the specified method until the node has been initialized.
func (s *grpcServer) waitingForInitError(methodName string) error {
return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/server/servemode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (

func TestWaitingForInitError(t *testing.T) {
defer leaktest.AfterTest(t)()
if err := WaitingForInitError("foo"); !IsWaitingForInit(err) {
s := &grpcServer{}
if err := s.waitingForInitError("foo"); !IsWaitingForInit(err) {
t.Errorf("WaitingForInitError() not recognized by IsWaitingForInit(): %v", err)
}
if err := grpcstatus.Errorf(codes.Unavailable, "foo"); IsWaitingForInit(err) {
Expand Down
39 changes: 19 additions & 20 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,13 @@ func (mux *safeServeMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type Server struct {
nodeIDContainer base.NodeIDContainer

cfg Config
st *cluster.Settings
mux safeServeMux
clock *hlc.Clock
rpcContext *rpc.Context
grpc *grpc.Server
cfg Config
st *cluster.Settings
mux safeServeMux
clock *hlc.Clock
rpcContext *rpc.Context
// The gRPC server on which the different RPC handlers will be registered.
grpc *grpcServer
gossip *gossip.Gossip
nodeDialer *nodedialer.Dialer
nodeLiveness *storage.NodeLiveness
Expand Down Expand Up @@ -189,7 +190,6 @@ type Server struct {
adminMemMetrics sql.MemoryMetrics
// sqlMemMetrics are used to track memory usage of sql sessions.
sqlMemMetrics sql.MemoryMetrics
serveMode
}

// NewServer creates a Server from a server.Config.
Expand All @@ -212,7 +212,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
cfg: cfg,
registry: metric.NewRegistry(),
}
s.serveMode.set(modeInitializing)

// If the tracer has a Close function, call it after the server stops.
if tr, ok := cfg.AmbientCtx.Tracer.(stop.Closer); ok {
Expand Down Expand Up @@ -251,14 +250,14 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}
}

s.grpc = rpc.NewServerWithInterceptor(s.rpcContext, s.Intercept())
s.grpc = newGRPCServer(s.rpcContext)

s.gossip = gossip.New(
s.cfg.AmbientCtx,
&s.rpcContext.ClusterID,
&s.nodeIDContainer,
s.rpcContext,
s.grpc,
s.grpc.Server,
s.stopper,
s.registry,
s.cfg.Locality,
Expand Down Expand Up @@ -340,7 +339,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
)

s.raftTransport = storage.NewRaftTransport(
s.cfg.AmbientCtx, st, s.nodeDialer, s.grpc, s.stopper,
s.cfg.AmbientCtx, st, s.nodeDialer, s.grpc.Server, s.stopper,
)

// Set up internal memory metrics for use by internal SQL executors.
Expand Down Expand Up @@ -481,9 +480,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.node = NewNode(
storeCfg, s.recorder, s.registry, s.stopper,
txnMetrics, nil /* execCfg */, &s.rpcContext.ClusterID)
roachpb.RegisterInternalServer(s.grpc, s.node)
storage.RegisterPerReplicaServer(s.grpc, s.node.perReplicaServer)
s.node.storeCfg.ClosedTimestamp.RegisterClosedTimestampServer(s.grpc)
roachpb.RegisterInternalServer(s.grpc.Server, s.node)
storage.RegisterPerReplicaServer(s.grpc.Server, s.node.perReplicaServer)
s.node.storeCfg.ClosedTimestamp.RegisterClosedTimestampServer(s.grpc.Server)

s.sessionRegistry = sql.NewSessionRegistry()
s.jobRegistry = jobs.MakeRegistry(
Expand Down Expand Up @@ -535,7 +534,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}

s.distSQLServer = distsqlrun.NewServer(ctx, distSQLCfg)
distsqlpb.RegisterDistSQLServer(s.grpc, s.distSQLServer)
distsqlpb.RegisterDistSQLServer(s.grpc.Server, s.distSQLServer)

s.admin = newAdminServer(s)
s.status = newStatusServer(
Expand All @@ -555,7 +554,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
)
s.authentication = newAuthenticationServer(s)
for _, gw := range []grpcGatewayServer{s.admin, s.status, s.authentication, &s.tsServer} {
gw.RegisterService(s.grpc)
gw.RegisterService(s.grpc.Server)
}

// TODO(andrei): We're creating an initServer even through the inspection of
Expand All @@ -565,7 +564,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
// figure out early if our engines are bootstrapped and, if they are, create a
// dummy implementation of the InitServer that rejects all RPCs.
s.initServer = newInitServer(s.gossip.Connected, s.stopper.ShouldStop())
serverpb.RegisterInitServer(s.grpc, s.initServer)
serverpb.RegisterInitServer(s.grpc.Server, s.initServer)

nodeInfo := sql.NodeInfo{
AdminURL: cfg.AdminURL,
Expand Down Expand Up @@ -1538,7 +1537,7 @@ func (s *Server) Start(ctx context.Context) error {
s.distSQLServer.Start()
s.pgServer.Start(ctx, s.stopper)

s.serveMode.set(modeOperational)
s.grpc.setMode(modeOperational)

log.Infof(ctx, "starting %s server at %s (use: %s)",
s.cfg.HTTPRequestScheme(), s.cfg.HTTPAddr, s.cfg.HTTPAdvertiseAddr)
Expand Down Expand Up @@ -1758,15 +1757,15 @@ func (s *Server) doDrain(
switch mode {
case serverpb.DrainMode_CLIENT:
if setTo {
s.serveMode.set(modeDraining)
s.grpc.setMode(modeDraining)
// Wait for drainUnreadyWait. This will fail load balancer checks and
// delay draining so that client traffic can move off this node.
time.Sleep(drainWait.Get(&s.st.SV))
}
if err := func() error {
if !setTo {
// Execute this last.
defer func() { s.serveMode.set(modeOperational) }()
defer func() { s.grpc.setMode(modeOperational) }()
}
// Since enabling the lease manager's draining mode will prevent
// the acquisition of new leases, the switch must be made after
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (s *statusServer) Details(
return resp, nil
}

serveMode := s.admin.server.serveMode.get()
serveMode := s.admin.server.grpc.mode.get()
if serveMode != modeOperational {
return nil, grpcstatus.Error(codes.Unavailable, "node is not ready")
}
Expand Down