From b2b366965f9d5698b82768edf3516ee007326e54 Mon Sep 17 00:00:00 2001 From: Dave Cheney Date: Wed, 19 Jun 2019 20:15:21 +1000 Subject: [PATCH] internal/grpc: handle one response per request Updates #499 Updates #273 Updates #1176 The XDS spec says that Envoy will always initiate a stream with a discovery request, and expects the management server to respond with only one discovery response. After that, Envoy will initiate another discovery request containing an ACK or a NACK from the previous response. Currently Contour ignores the ACK/NACK, this is #1176, however after inspection of the current code it is evident that we're also not waiting for Envoy to send the next discovery request. This PR removes the inner `for {}` loop that would continue to reuse the initial discovery request until the client disconnected. The previous code was written in a time when we'd just implemented filtering and it was possible for the filter to return no results, hence the inner loop was--incorrectly--trying to loop until there was a result to return. Huge thanks to @lrouquette who pointed this out. Signed-off-by: Dave Cheney --- internal/grpc/xds.go | 82 +++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 42 deletions(-) diff --git a/internal/grpc/xds.go b/internal/grpc/xds.go index 5075117817e..f0612a07527 100644 --- a/internal/grpc/xds.go +++ b/internal/grpc/xds.go @@ -86,6 +86,8 @@ func (xh *xdsHandler) stream(st grpcStream) (err error) { return err } + // TODO(dfc) issue 1176: handle xDS ACK/NACK + // from the request we derive the resource to stream which have // been registered according to the typeURL. r, ok := xh.resources[req.TypeUrl] @@ -97,49 +99,45 @@ func (xh *xdsHandler) stream(st grpcStream) (err error) { // so the next time around the loop all is forgotten. log := log.WithField("version_info", req.VersionInfo).WithField("resource_names", req.ResourceNames).WithField("type_url", req.TypeUrl).WithField("response_nonce", req.ResponseNonce).WithField("error_detail", req.ErrorDetail) - for { - log.Info("stream_wait") - - // now we wait for a notification, if this is the first time through the loop - // then last will be less than zero and that will trigger a notification immediately. - r.Register(ch, last) - select { - case last = <-ch: - // boom, something in the cache has changed. - // TODO(dfc) the thing that has changed may not be in the scope of the filter - // so we're going to be sending an update that is a no-op. See #426 - - var resources []proto.Message - switch len(req.ResourceNames) { - case 0: - // no resource hints supplied, return the full - // contents of the resource - resources = r.Contents() - default: - // resource hints supplied, return exactly those - resources = r.Query(req.ResourceNames) - } - - any, err := toAny(r.TypeURL(), resources) - if err != nil { - return err - } - - resp := &v2.DiscoveryResponse{ - VersionInfo: strconv.Itoa(last), - Resources: any, - TypeUrl: r.TypeURL(), - Nonce: strconv.Itoa(last), - } - if err := st.Send(resp); err != nil { - return err - } - log.WithField("count", len(resources)).Info("response") - - // ok, the client hung up, return any error stored in the context and we're done. - case <-ctx.Done(): - return ctx.Err() + log.Info("stream_wait") + + // now we wait for a notification, if this is the first request received on this + // connection last will be less than zero and that will trigger a response immediately. + r.Register(ch, last) + select { + case last = <-ch: + // boom, something in the cache has changed. + // TODO(dfc) the thing that has changed may not be in the scope of the filter + // so we're going to be sending an update that is a no-op. See #426 + + var resources []proto.Message + switch len(req.ResourceNames) { + case 0: + // no resource hints supplied, return the full + // contents of the resource + resources = r.Contents() + default: + // resource hints supplied, return exactly those + resources = r.Query(req.ResourceNames) + } + + any, err := toAny(r.TypeURL(), resources) + if err != nil { + return err + } + + resp := &v2.DiscoveryResponse{ + VersionInfo: strconv.Itoa(last), + Resources: any, + TypeUrl: r.TypeURL(), + Nonce: strconv.Itoa(last), + } + if err := st.Send(resp); err != nil { + return err } + log.WithField("count", len(resources)).Info("response") + case <-ctx.Done(): + return ctx.Err() } } }