diff --git a/cmd/internal/serverutil/main.go b/cmd/internal/serverutil/main.go index 2f92952e5a..8cfd0793d2 100644 --- a/cmd/internal/serverutil/main.go +++ b/cmd/internal/serverutil/main.go @@ -154,6 +154,10 @@ func (m *Main) Run(ctx context.Context) error { } go util.AwaitSignal(ctx, srv.Stop) + // stop grpc server when context is closed + cancel := util.AwaitContext(ctx, srv.Stop) + defer cancel() + if m.TreeGCEnabled { go func() { glog.Info("Deleted tree GC started") @@ -207,10 +211,11 @@ func (m *Main) newGRPCServer() (*grpc.Server, error) { return s, nil } -// AnnounceSelf announces this binary's presence to etcd. Returns a function that +// AnnounceSelf announces this binary's presence to etcd. This calls the cancel +// function if the keepalive lease with etcd expires. Returns a function that // should be called on process exit. // AnnounceSelf does nothing if client is nil. -func AnnounceSelf(ctx context.Context, client *clientv3.Client, etcdService, endpoint string) func() { +func AnnounceSelf(ctx context.Context, client *clientv3.Client, etcdService, endpoint string, cancel func()) func() { if client == nil { return func() {} } @@ -220,7 +225,12 @@ func AnnounceSelf(ctx context.Context, client *clientv3.Client, etcdService, end if err != nil { glog.Exitf("Failed to get lease from etcd: %v", err) } - client.KeepAlive(ctx, leaseRsp.ID) + + keepAliveRspCh, err := client.KeepAlive(ctx, leaseRsp.ID) + if err != nil { + glog.Exitf("Failed to keep lease alive from etcd: %v", err) + } + listenKeepAliveRsp(ctx, keepAliveRspCh, cancel) em, err := endpoints.NewManager(client, etcdService) if err != nil { @@ -238,3 +248,23 @@ func AnnounceSelf(ctx context.Context, client *clientv3.Client, etcdService, end client.Revoke(ctx, leaseRsp.ID) } } + +// listenKeepAliveRsp listens to `keepAliveRspCh` channel, and calls the cancel function +// to notify the lease expired. +func listenKeepAliveRsp(ctx context.Context, keepAliveRspCh <-chan *clientv3.LeaseKeepAliveResponse, cancel func()) { + go func() { + for { + select { + case <-ctx.Done(): + glog.Infof("listenKeepAliveRsp canceled: %v", ctx.Err()) + return + case _, ok := <-keepAliveRspCh: + if !ok { + cancel() + glog.Errorf("listenKeepAliveRsp canceled: unexpected lease expired") + return + } + } + } + }() +} diff --git a/cmd/trillian_log_server/main.go b/cmd/trillian_log_server/main.go index ec75c70297..ace27964ab 100644 --- a/cmd/trillian_log_server/main.go +++ b/cmd/trillian_log_server/main.go @@ -91,7 +91,8 @@ func main() { } } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() var options []grpc.ServerOption mf := prometheus.MetricFactory{} @@ -124,10 +125,11 @@ func main() { } // Announce our endpoints to etcd if so configured. - unannounce := serverutil.AnnounceSelf(ctx, client, *etcdService, *rpcEndpoint) + unannounce := serverutil.AnnounceSelf(ctx, client, *etcdService, *rpcEndpoint, cancel) defer unannounce() + if *httpEndpoint != "" { - unannounceHTTP := serverutil.AnnounceSelf(ctx, client, *etcdHTTPService, *httpEndpoint) + unannounceHTTP := serverutil.AnnounceSelf(ctx, client, *etcdHTTPService, *httpEndpoint, cancel) defer unannounceHTTP() } diff --git a/cmd/trillian_log_signer/main.go b/cmd/trillian_log_signer/main.go index 976752ae29..f2f8c54168 100644 --- a/cmd/trillian_log_signer/main.go +++ b/cmd/trillian_log_signer/main.go @@ -150,7 +150,7 @@ func main() { // Start HTTP server (optional) if *httpEndpoint != "" { // Announce our endpoint to etcd if so configured. - unannounceHTTP := serverutil.AnnounceSelf(ctx, client, *etcdHTTPService, *httpEndpoint) + unannounceHTTP := serverutil.AnnounceSelf(ctx, client, *etcdHTTPService, *httpEndpoint, cancel) defer unannounceHTTP() }