Skip to content

Commit

Permalink
embed: Avoid panic when shutting down gRPC Server w/ TLS configuration
Browse files Browse the repository at this point in the history
Fixes: etcd-io#8916
Provided solution implements suggestion from gRPC team: grpc/grpc-go#1384 (comment)
  • Loading branch information
jamesdphillips committed Dec 7, 2017
1 parent a2256a6 commit 867c7ec
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 14 deletions.
16 changes: 8 additions & 8 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
return
}
if !serving {
// errored before starting gRPC server for serveCtx.grpcServerC
// errored before starting gRPC server for serveCtx
for _, sctx := range e.sctxs {
close(sctx.grpcServerC)
close(sctx.secureGrpcServerC)
close(sctx.insecureGrpcServerC)
}
}
e.Close()
Expand Down Expand Up @@ -222,15 +223,14 @@ func (e *Etcd) Config() Config {
func (e *Etcd) Close() {
e.closeOnce.Do(func() { close(e.stopc) })

for _, sctx := range e.sctxs {
for gs := range sctx.grpcServerC {
e.stopGRPCServer(gs)
}
reqTimeout := 2 * time.Second
if e.Server != nil {
reqTimeout = e.Server.Cfg.ReqTimeout()
}

for _, sctx := range e.sctxs {
sctx.cancel()
teardownServeCtx(sctx, reqTimeout)
}

for i := range e.Clients {
if e.Clients[i] != nil {
e.Clients[i].Close()
Expand Down
73 changes: 67 additions & 6 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"net/http"
"strings"
"time"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3client"
Expand Down Expand Up @@ -54,13 +55,20 @@ type serveCtx struct {

userHandlers map[string]http.Handler
serviceRegister func(*grpc.Server)
grpcServerC chan *grpc.Server

secureHTTPServer *http.Server
secureGrpcServerC chan *grpc.Server
insecureGrpcServerC chan *grpc.Server
}

func newServeCtx() *serveCtx {
ctx, cancel := context.WithCancel(context.Background())
return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler),
grpcServerC: make(chan *grpc.Server, 2), // in case sctx.insecure,sctx.secure true
return &serveCtx{
ctx: ctx,
cancel: cancel,
userHandlers: make(map[string]http.Handler),
secureGrpcServerC: make(chan *grpc.Server, 1),
insecureGrpcServerC: make(chan *grpc.Server, 1),
}
}

Expand All @@ -84,7 +92,7 @@ func (sctx *serveCtx) serve(

if sctx.insecure {
gs := v3rpc.Server(s, nil, gopts...)
sctx.grpcServerC <- gs
sctx.insecureGrpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -118,7 +126,7 @@ func (sctx *serveCtx) serve(
return tlsErr
}
gs := v3rpc.Server(s, tlscfg, gopts...)
sctx.grpcServerC <- gs
sctx.secureGrpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -149,11 +157,13 @@ func (sctx *serveCtx) serve(
ErrorLog: logger, // do not log user error
}
go func() { errHandler(srv.Serve(tlsl)) }()
sctx.secureHTTPServer = srv

plog.Infof("serving client requests on %s", sctx.l.Addr().String())
}

close(sctx.grpcServerC)
close(sctx.secureGrpcServerC)
close(sctx.insecureGrpcServerC)
return m.Serve()
}

Expand Down Expand Up @@ -269,3 +279,54 @@ func (sctx *serveCtx) registerTrace() {
evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
}

// Attempt to gracefully tear down gRPC server(s) and any associated mechanisms
func teardownServeCtx(sctx *serveCtx, timeout time.Duration) {
if sctx.secure && len(sctx.secureGrpcServerC) > 0 {
gs := <-sctx.secureGrpcServerC
stopSecureServer(gs, sctx.secureHTTPServer, timeout)
}

if sctx.insecure && len(sctx.insecureGrpcServerC) > 0 {
gs := <-sctx.insecureGrpcServerC
stopInsecureServer(gs, timeout)
}

// Close any open gRPC connections
sctx.cancel()
}

// When using grpc's ServerHandlerTransport we are responsible for gracefully
// stopping connections and shutting down.
// https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
func stopSecureServer(gs *grpc.Server, httpSrv *http.Server, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// Stop accepting new connections await pending handlers
httpSrv.Shutdown(ctx)

// Teardown gRPC server
gs.Stop()
}

// Gracefully shutdown gRPC server when using HTTP2 transport.
func stopInsecureServer(gs *grpc.Server, timeout time.Duration) {
ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
}()
// wait until all pending RPCs are finished
select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}

0 comments on commit 867c7ec

Please sign in to comment.