Skip to content

Commit

Permalink
Merge #33690 #34027
Browse files Browse the repository at this point in the history
33690: server: refactor the grpc server r=andreimatei a=andreimatei

This patch creates the grpcServer object, to encapsulate the grpc.Server
object, the serveMode object and the interceptor that our server.Server
uses to filter out RPCs.
The idea is to work towards decoupling the gprc.Server from our
server.Server object. I'd like to have the grpc server be created before
Server.Start(): I'd like the cluster id and node id to be available by
the time Server.Start() is called so that we can get rid of the
nodeIDCountainer that everybody and their dog uses. But for getting
these ids a grpc server needs to be running early to handle the "init"
rpc which bootstraps a cluster.
This patch doesn't accomplish too much - it doesn't do anything about
actually starting to serve any requests without a Server (i.e. create
the needed listeners), but I think the patch stands on its own too as
good refactoring.

Release note: None

34027: distsqlrun: add metrics for queue size and wait duration r=ajwerner a=ajwerner

Long queuing can lead to errors like described in #27746.
This change should give us more visibility into when queuing is occurring
and how problematic it is.

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Jan 15, 2019
3 parents 9cf4ebc + 015aeb8 + 418dbcd commit ba4eaaf
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 47 deletions.
70 changes: 46 additions & 24 deletions pkg/server/servemode.go → pkg/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,31 @@ import (
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)

// grpcServer is a wrapper on top of a grpc.Server that includes an interceptor
// and a mode of operation that can instruct the interceptor to refuse certain
// RPCs.
type grpcServer struct {
*grpc.Server
mode serveMode
}

func newGRPCServer(rpcCtx *rpc.Context) *grpcServer {
s := &grpcServer{}
s.mode.set(modeInitializing)
s.Server = rpc.NewServerWithInterceptor(rpcCtx, func(path string) error {
return s.intercept(path)
})
return s
}

type serveMode int32

// A list of the server states for bootstrap process.
const (
// modeInitializing is intended for server initialization process.
Expand All @@ -36,25 +57,31 @@ const (
modeDraining
)

type serveMode int32
func (s *grpcServer) setMode(mode serveMode) {
s.mode.set(mode)
}

// Intercept implements filtering rules for each server state.
func (s *Server) Intercept() func(string) error {
interceptors := map[string]struct{}{
"/cockroach.rpc.Heartbeat/Ping": {},
"/cockroach.gossip.Gossip/Gossip": {},
"/cockroach.server.serverpb.Init/Bootstrap": {},
"/cockroach.server.serverpb.Status/Details": {},
}
return func(fullName string) error {
if s.serveMode.operational() {
return nil
}
if _, allowed := interceptors[fullName]; !allowed {
return WaitingForInitError(fullName)
}
func (s *grpcServer) operational() bool {
sMode := s.mode.get()
return sMode == modeOperational || sMode == modeDraining
}

var rpcsAllowedWhileBootstrapping = map[string]struct{}{
"/cockroach.rpc.Heartbeat/Ping": {},
"/cockroach.gossip.Gossip/Gossip": {},
"/cockroach.server.serverpb.Init/Bootstrap": {},
"/cockroach.server.serverpb.Status/Details": {},
}

// intercept implements filtering rules for each server state.
func (s *grpcServer) intercept(fullName string) error {
if s.operational() {
return nil
}
if _, allowed := rpcsAllowedWhileBootstrapping[fullName]; !allowed {
return s.waitingForInitError(fullName)
}
return nil
}

func (s *serveMode) set(mode serveMode) {
Expand All @@ -65,14 +92,9 @@ func (s *serveMode) get() serveMode {
return serveMode(atomic.LoadInt32((*int32)(s)))
}

func (s *serveMode) operational() bool {
sMode := s.get()
return sMode == modeOperational || sMode == modeDraining
}

// WaitingForInitError indicates that the server cannot run the specified
// method until the node has been initialized.
func WaitingForInitError(methodName string) error {
// waitingForInitError creates an error indicating that the server cannot run
// the specified method until the node has been initialized.
func (s *grpcServer) waitingForInitError(methodName string) error {
return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/server/servemode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (

func TestWaitingForInitError(t *testing.T) {
defer leaktest.AfterTest(t)()
if err := WaitingForInitError("foo"); !IsWaitingForInit(err) {
s := &grpcServer{}
if err := s.waitingForInitError("foo"); !IsWaitingForInit(err) {
t.Errorf("WaitingForInitError() not recognized by IsWaitingForInit(): %v", err)
}
if err := grpcstatus.Errorf(codes.Unavailable, "foo"); IsWaitingForInit(err) {
Expand Down
39 changes: 19 additions & 20 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,13 @@ func (mux *safeServeMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type Server struct {
nodeIDContainer base.NodeIDContainer

cfg Config
st *cluster.Settings
mux safeServeMux
clock *hlc.Clock
rpcContext *rpc.Context
grpc *grpc.Server
cfg Config
st *cluster.Settings
mux safeServeMux
clock *hlc.Clock
rpcContext *rpc.Context
// The gRPC server on which the different RPC handlers will be registered.
grpc *grpcServer
gossip *gossip.Gossip
nodeDialer *nodedialer.Dialer
nodeLiveness *storage.NodeLiveness
Expand Down Expand Up @@ -189,7 +190,6 @@ type Server struct {
adminMemMetrics sql.MemoryMetrics
// sqlMemMetrics are used to track memory usage of sql sessions.
sqlMemMetrics sql.MemoryMetrics
serveMode
}

// NewServer creates a Server from a server.Config.
Expand All @@ -212,7 +212,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
cfg: cfg,
registry: metric.NewRegistry(),
}
s.serveMode.set(modeInitializing)

// If the tracer has a Close function, call it after the server stops.
if tr, ok := cfg.AmbientCtx.Tracer.(stop.Closer); ok {
Expand Down Expand Up @@ -251,14 +250,14 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}
}

s.grpc = rpc.NewServerWithInterceptor(s.rpcContext, s.Intercept())
s.grpc = newGRPCServer(s.rpcContext)

s.gossip = gossip.New(
s.cfg.AmbientCtx,
&s.rpcContext.ClusterID,
&s.nodeIDContainer,
s.rpcContext,
s.grpc,
s.grpc.Server,
s.stopper,
s.registry,
s.cfg.Locality,
Expand Down Expand Up @@ -340,7 +339,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
)

s.raftTransport = storage.NewRaftTransport(
s.cfg.AmbientCtx, st, s.nodeDialer, s.grpc, s.stopper,
s.cfg.AmbientCtx, st, s.nodeDialer, s.grpc.Server, s.stopper,
)

// Set up internal memory metrics for use by internal SQL executors.
Expand Down Expand Up @@ -481,9 +480,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.node = NewNode(
storeCfg, s.recorder, s.registry, s.stopper,
txnMetrics, nil /* execCfg */, &s.rpcContext.ClusterID)
roachpb.RegisterInternalServer(s.grpc, s.node)
storage.RegisterPerReplicaServer(s.grpc, s.node.perReplicaServer)
s.node.storeCfg.ClosedTimestamp.RegisterClosedTimestampServer(s.grpc)
roachpb.RegisterInternalServer(s.grpc.Server, s.node)
storage.RegisterPerReplicaServer(s.grpc.Server, s.node.perReplicaServer)
s.node.storeCfg.ClosedTimestamp.RegisterClosedTimestampServer(s.grpc.Server)

s.sessionRegistry = sql.NewSessionRegistry()
s.jobRegistry = jobs.MakeRegistry(
Expand Down Expand Up @@ -535,7 +534,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}

s.distSQLServer = distsqlrun.NewServer(ctx, distSQLCfg)
distsqlpb.RegisterDistSQLServer(s.grpc, s.distSQLServer)
distsqlpb.RegisterDistSQLServer(s.grpc.Server, s.distSQLServer)

s.admin = newAdminServer(s)
s.status = newStatusServer(
Expand All @@ -555,7 +554,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
)
s.authentication = newAuthenticationServer(s)
for _, gw := range []grpcGatewayServer{s.admin, s.status, s.authentication, &s.tsServer} {
gw.RegisterService(s.grpc)
gw.RegisterService(s.grpc.Server)
}

// TODO(andrei): We're creating an initServer even through the inspection of
Expand All @@ -565,7 +564,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
// figure out early if our engines are bootstrapped and, if they are, create a
// dummy implementation of the InitServer that rejects all RPCs.
s.initServer = newInitServer(s.gossip.Connected, s.stopper.ShouldStop())
serverpb.RegisterInitServer(s.grpc, s.initServer)
serverpb.RegisterInitServer(s.grpc.Server, s.initServer)

nodeInfo := sql.NodeInfo{
AdminURL: cfg.AdminURL,
Expand Down Expand Up @@ -1538,7 +1537,7 @@ func (s *Server) Start(ctx context.Context) error {
s.distSQLServer.Start()
s.pgServer.Start(ctx, s.stopper)

s.serveMode.set(modeOperational)
s.grpc.setMode(modeOperational)

log.Infof(ctx, "starting %s server at %s (use: %s)",
s.cfg.HTTPRequestScheme(), s.cfg.HTTPAddr, s.cfg.HTTPAdvertiseAddr)
Expand Down Expand Up @@ -1758,15 +1757,15 @@ func (s *Server) doDrain(
switch mode {
case serverpb.DrainMode_CLIENT:
if setTo {
s.serveMode.set(modeDraining)
s.grpc.setMode(modeDraining)
// Wait for drainUnreadyWait. This will fail load balancer checks and
// delay draining so that client traffic can move off this node.
time.Sleep(drainWait.Get(&s.st.SV))
}
if err := func() error {
if !setTo {
// Execute this last.
defer func() { s.serveMode.set(modeOperational) }()
defer func() { s.grpc.setMode(modeOperational) }()
}
// Since enabling the lease manager's draining mode will prevent
// the acquisition of new leases, the switch must be made after
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (s *statusServer) Details(
return resp, nil
}

serveMode := s.admin.server.serveMode.get()
serveMode := s.admin.server.grpc.mode.get()
if serveMode != modeOperational {
return nil, grpcstatus.Error(codes.Unavailable, "node is not ready")
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/distsqlrun/flow_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (fs *flowScheduler) ScheduleFlow(ctx context.Context, f *Flow) error {
return fs.runFlowNow(ctx, f)
}
log.VEventf(ctx, 1, "flow scheduler enqueuing flow %s to be run later", f.id)
fs.metrics.FlowsQueued.Inc(1)
fs.mu.queue.PushBack(&flowWithCtx{
ctx: ctx,
flow: f,
Expand Down Expand Up @@ -156,9 +157,12 @@ func (fs *flowScheduler) Start() {
if frElem := fs.mu.queue.Front(); frElem != nil {
n := frElem.Value.(*flowWithCtx)
fs.mu.queue.Remove(frElem)
wait := timeutil.Since(n.enqueueTime)
log.VEventf(
n.ctx, 1, "flow scheduler dequeued flow %s, spent %s in queue", n.flow.id, timeutil.Since(n.enqueueTime),
n.ctx, 1, "flow scheduler dequeued flow %s, spent %s in queue", n.flow.id, wait,
)
fs.metrics.FlowsQueued.Dec(1)
fs.metrics.QueueWaitHist.RecordValue(int64(wait))
// Note: we use the flow's context instead of the worker
// context, to ensure that logging etc is relative to the
// specific flow.
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/distsqlrun/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type DistSQLMetrics struct {
QueriesTotal *metric.Counter
FlowsActive *metric.Gauge
FlowsTotal *metric.Counter
FlowsQueued *metric.Gauge
QueueWaitHist *metric.Histogram
MaxBytesHist *metric.Histogram
CurBytesCount *metric.Gauge
}
Expand Down Expand Up @@ -61,6 +63,18 @@ var (
Measurement: "Flows",
Unit: metric.Unit_COUNT,
}
metaFlowsQueued = metric.Metadata{
Name: "sql.distsql.flows.queued",
Help: "Number of distributed SQL flows currently queued",
Measurement: "Flows",
Unit: metric.Unit_COUNT,
}
metaQueueWaitHist = metric.Metadata{
Name: "sql.distsql.flows.queue_wait",
Help: "Duration of time flows spend waiting in the queue",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaMemMaxBytes = metric.Metadata{
Name: "sql.mem.distsql.max",
Help: "Memory usage per sql statement for distsql",
Expand All @@ -86,6 +100,8 @@ func MakeDistSQLMetrics(histogramWindow time.Duration) DistSQLMetrics {
QueriesTotal: metric.NewCounter(metaQueriesTotal),
FlowsActive: metric.NewGauge(metaFlowsActive),
FlowsTotal: metric.NewCounter(metaFlowsTotal),
FlowsQueued: metric.NewGauge(metaFlowsQueued),
QueueWaitHist: metric.NewLatency(metaQueueWaitHist, histogramWindow),
MaxBytesHist: metric.NewHistogram(metaMemMaxBytes, histogramWindow, log10int64times1000, 3),
CurBytesCount: metric.NewGauge(metaMemCurBytes),
}
Expand Down

0 comments on commit ba4eaaf

Please sign in to comment.