Skip to content
/ etcd Public
forked from etcd-io/etcd

Commit

Permalink
embed: Avoid panic when shutting down gRPC Server
Browse files Browse the repository at this point in the history
Avoid panic when stopping gRPC Server if TLS configuration is present.
Provided solution (attempts to) implement suggestion from gRPC team: grpc/grpc-go#1384 (comment).

Fixes etcd-io#8916
  • Loading branch information
jamesdphillips authored and gyuho committed Dec 8, 2017
1 parent fc2eecf commit e39915f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 14 deletions.
16 changes: 8 additions & 8 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
return
}
if !serving {
// errored before starting gRPC server for serveCtx.grpcServerC
// errored before starting gRPC server for serveCtx
for _, sctx := range e.sctxs {
close(sctx.grpcServerC)
close(sctx.secureGrpcServerC)
close(sctx.insecureGrpcServerC)
}
}
e.Close()
Expand Down Expand Up @@ -222,15 +223,14 @@ func (e *Etcd) Config() Config {
func (e *Etcd) Close() {
e.closeOnce.Do(func() { close(e.stopc) })

for _, sctx := range e.sctxs {
for gs := range sctx.grpcServerC {
e.stopGRPCServer(gs)
}
reqTimeout := 2 * time.Second
if e.Server != nil {
reqTimeout = e.Server.Cfg.ReqTimeout()
}

for _, sctx := range e.sctxs {
sctx.cancel()
teardownServeCtx(sctx, reqTimeout)
}

for i := range e.Clients {
if e.Clients[i] != nil {
e.Clients[i].Close()
Expand Down
73 changes: 67 additions & 6 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"net/http"
"strings"
"time"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3client"
Expand Down Expand Up @@ -54,13 +55,20 @@ type serveCtx struct {

userHandlers map[string]http.Handler
serviceRegister func(*grpc.Server)
grpcServerC chan *grpc.Server

secureHTTPServer *http.Server
secureGrpcServerC chan *grpc.Server
insecureGrpcServerC chan *grpc.Server
}

func newServeCtx() *serveCtx {
ctx, cancel := context.WithCancel(context.Background())
return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler),
grpcServerC: make(chan *grpc.Server, 2), // in case sctx.insecure,sctx.secure true
return &serveCtx{
ctx: ctx,
cancel: cancel,
userHandlers: make(map[string]http.Handler),
secureGrpcServerC: make(chan *grpc.Server, 1),
insecureGrpcServerC: make(chan *grpc.Server, 1),
}
}

Expand All @@ -84,7 +92,7 @@ func (sctx *serveCtx) serve(

if sctx.insecure {
gs := v3rpc.Server(s, nil, gopts...)
sctx.grpcServerC <- gs
sctx.insecureGrpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -118,7 +126,7 @@ func (sctx *serveCtx) serve(
return tlsErr
}
gs := v3rpc.Server(s, tlscfg, gopts...)
sctx.grpcServerC <- gs
sctx.secureGrpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -149,11 +157,13 @@ func (sctx *serveCtx) serve(
ErrorLog: logger, // do not log user error
}
go func() { errHandler(srv.Serve(tlsl)) }()
sctx.secureHTTPServer = srv

plog.Infof("serving client requests on %s", sctx.l.Addr().String())
}

close(sctx.grpcServerC)
close(sctx.secureGrpcServerC)
close(sctx.insecureGrpcServerC)
return m.Serve()
}

Expand Down Expand Up @@ -269,3 +279,54 @@ func (sctx *serveCtx) registerTrace() {
evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
}

// Attempt to gracefully tear down gRPC server(s) and any associated mechanisms
func teardownServeCtx(sctx *serveCtx, timeout time.Duration) {
if sctx.secure && len(sctx.secureGrpcServerC) > 0 {
gs := <-sctx.secureGrpcServerC
stopSecureServer(gs, sctx.secureHTTPServer, timeout)
}

if sctx.insecure && len(sctx.insecureGrpcServerC) > 0 {
gs := <-sctx.insecureGrpcServerC
stopInsecureServer(gs, timeout)
}

// Close any open gRPC connections
sctx.cancel()
}

// When using grpc's ServerHandlerTransport we are responsible for gracefully
// stopping connections and shutting down.
// https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
func stopSecureServer(gs *grpc.Server, httpSrv *http.Server, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// Stop accepting new connections await pending handlers
httpSrv.Shutdown(ctx)

// Teardown gRPC server
gs.Stop()
}

// Gracefully shutdown gRPC server when using HTTP2 transport.
func stopInsecureServer(gs *grpc.Server, timeout time.Duration) {
ch := make(chan struct{})
go func() {
defer close(ch)
// close listeners to stop accepting new connections,
// will block on any existing transports
gs.GracefulStop()
}()
// wait until all pending RPCs are finished
select {
case <-ch:
case <-time.After(timeout):
// took too long, manually close open transports
// e.g. watch streams
gs.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}

0 comments on commit e39915f

Please sign in to comment.