diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 5f10266aba..87ac4ed8fb 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -34,6 +34,10 @@ type Callbacks interface { OnStreamDeltaResponse(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse) } +type ExtendedCallbacks interface { + OnStreamDeltaResponseF(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse, func(typeURL string, resourceNames []string)) +} + var deltaErrorResponse = &cache.RawDeltaResponse{} type server struct { @@ -72,6 +76,20 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De } }() + // Sets up a watch in the cache + processWatch := func(newWatch watch, watchTypeURL string) { + newWatch.responses = make(chan cache.DeltaResponse, 1) + newWatch.cancel = s.cache.CreateDeltaWatch(newWatch.req, newWatch.state, newWatch.responses) + watches.deltaWatches[watchTypeURL] = newWatch + + go func() { + resp, more := <-newWatch.responses + if more { + watches.deltaMuxedResponses <- resp + } + }() + } + // Sends a response, returns the new stream nonce send := func(resp cache.DeltaResponse) (string, error) { if resp == nil { @@ -87,6 +105,26 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De response.Nonce = strconv.FormatInt(streamNonce, 10) if s.callbacks != nil { s.callbacks.OnStreamDeltaResponse(streamID, resp.GetDeltaRequest(), response) + + if extendedCallbacks, ok := s.callbacks.(ExtendedCallbacks); ok { + extendedCallbacks.OnStreamDeltaResponseF(streamID, resp.GetDeltaRequest(), response, func(typeURL string, resourceNames []string) { + // cancel existing watch to (re-)request a newer version + watch, ok := watches.deltaWatches[typeURL] + if !ok { + // There's no existing watch + return + } else { + watch.Cancel() + } + for _, resourceName := range resourceNames { + // This will force an update for those resources + // If not existing it will return them as removed + watch.state.GetResourceVersions()[resourceName] = "" + } + + processWatch(watch, typeURL) + }) + } } return response.Nonce, str.Send(response) @@ -158,6 +196,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De // cancel existing watch to (re-)request a newer version watch, ok := watches.deltaWatches[typeURL] + watch.req = req if !ok { // Initialize the state of the stream. // Since there was no previous state, we know we're handling the first request of this type @@ -173,16 +212,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) - watch.responses = make(chan cache.DeltaResponse, 1) - watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) - watches.deltaWatches[typeURL] = watch - - go func() { - resp, more := <-watch.responses - if more { - watches.deltaMuxedResponses <- resp - } - }() + processWatch(watch, typeURL) } } } diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index c88548388a..82d591331c 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -35,6 +35,7 @@ type watch struct { responses chan cache.DeltaResponse cancel func() nonce string + req *cache.DeltaRequest state stream.StreamState } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 91681237c9..449a003faf 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -49,6 +49,10 @@ type Callbacks interface { OnStreamResponse(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse) } +type ExtendedCallbacks interface { + OnStreamResponseF(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse, func(typeURL string, resourceNames []string)) +} + // NewServer creates handlers from a config watcher and callbacks. func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server { return &server{cache: config, callbacks: callbacks, ctx: ctx} @@ -123,6 +127,26 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) + if extendedCallbacks, ok := s.callbacks.(ExtendedCallbacks); ok { + extendedCallbacks.OnStreamResponseF(resp.GetContext(), streamID, resp.GetRequest(), out, func(typeURL string, resourceNames []string) { + if w, ok := watches.responders[typeURL]; !ok { + return + } else { + w.close() + + // This will resend all values, as we don't have a way to provide known versions currently + // This should be fixed for cases when partial response can be returned (e.g. EDS) + // Currently both simple and linear caches have very different behaviors related to streamState + w.req.VersionInfo = "" + responder := make(chan cache.Response, 1) + w.cancel = s.cache.CreateWatch(w.req, w.state, responder) + w.response = responder + + watches.addWatch(typeURL, w) + watches.recompute(s.ctx, reqCh) + } + }) + } } return out.Nonce, str.Send(out) } @@ -201,6 +225,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq watches.addWatch(typeURL, &watch{ cancel: s.cache.CreateWatch(req, streamState, responder), response: responder, + req: req, + state: streamState, }) } } else { @@ -209,6 +235,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq watches.addWatch(typeURL, &watch{ cancel: s.cache.CreateWatch(req, streamState, responder), response: responder, + req: req, + state: streamState, }) } diff --git a/pkg/server/sotw/v3/watches.go b/pkg/server/sotw/v3/watches.go index 45670d6a91..4436549b5d 100644 --- a/pkg/server/sotw/v3/watches.go +++ b/pkg/server/sotw/v3/watches.go @@ -7,6 +7,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // watches for all xDS resource types @@ -65,6 +66,9 @@ type watch struct { cancel func() nonce string response chan cache.Response + req *cache.Request + + state stream.StreamState } // close cancels an open watch diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 204599613c..3b3c25cd0f 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -255,6 +255,7 @@ func TestDeltaResponseHandlers(t *testing.T) { s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) resp := makeMockDeltaStream(t) + // This is a wildcard request since we don't specify a list of resource subscriptions resourceNames := []string{} for resourceName := range config.deltaResources[typ] { resourceNames = append(resourceNames, resourceName) @@ -612,5 +613,162 @@ func TestDeltaWildcardSubscriptions(t *testing.T) { } validateResponse(t, resp.sent, []string{"endpoints2"}, []string{"endpoints4"}) }) +} + +type testExtendedCallbacks struct { + server.CallbackFuncs + triggerType string + triggerName string +} + +func (c testExtendedCallbacks) OnStreamDeltaResponseF(streamId int64, req *discovery.DeltaDiscoveryRequest, resp *discovery.DeltaDiscoveryResponse, updateTrigger func(typeURL string, resourceNames []string)) { + trigger := false + for _, res := range resp.GetResources() { + if res.Name == c.triggerName { + trigger = true + break + } + } + if trigger && req.TypeUrl == rsrc.ClusterType { + updateTrigger(c.triggerType, []string{"otherCluster"}) + } +} + +func TestDeltaCallbackTrigger(t *testing.T) { + config := makeMockConfigWatcher() + callback := testExtendedCallbacks{ + triggerType: rsrc.ClusterType, + triggerName: clusterName, + } + + validateResponse := func(t *testing.T, resp *mockDeltaStream, expectedType string, expectedResources []string) { + t.Helper() + var response *discovery.DeltaDiscoveryResponse + select { + case <-time.After(5 * time.Second): + assert.Fail(t, "no response after 5s") + return + case response = <-resp.sent: + } + assert.Equal(t, expectedType, response.TypeUrl) + if assert.Equal(t, len(expectedResources), len(response.Resources)) { + var names []string + for _, resource := range response.Resources { + names = append(names, resource.Name) + } + assert.ElementsMatch(t, names, expectedResources) + } + } + + config.deltaResources = map[string]map[string]types.Resource{ + rsrc.ClusterType: map[string]types.Resource{ + cluster.Name: cluster, + "otherCluster": resource.MakeCluster(resource.Ads, "otherCluster"), + "thirdCluster": resource.MakeCluster(resource.Ads, "thirdCluster"), + }, + rsrc.EndpointType: map[string]types.Resource{ + cluster.Name: endpoint, + "otherCluster": resource.MakeEndpoint("otherCluster", 1234), + }, + } + + t.Run("Same url type", func(t *testing.T) { + resp := makeMockDeltaStream(t) + defer close(resp.recv) + s := server.NewServer(context.Background(), config, &callback) + go func() { + err := s.DeltaAggregatedResources(resp) + assert.NoError(t, err) + }() + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.ClusterType, + ResourceNamesSubscribe: []string{"otherCluster"}, + } + validateResponse(t, resp, rsrc.ClusterType, []string{"otherCluster"}) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.ClusterType, + ResourceNamesSubscribe: []string{"otherCluster"}, + } + + // This will not return as we already have it at the correct version + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.ClusterType, + ResourceNamesSubscribe: []string{"thirdCluster"}, + } + // Only return thirdCluster as otherCluster version has not changed + validateResponse(t, resp, rsrc.ClusterType, []string{"thirdCluster"}) + + // Now request clusterName, which will trigger the specific push + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.ClusterType, + ResourceNamesSubscribe: []string{clusterName}, + } + // The first response only includes the requested cluster + validateResponse(t, resp, rsrc.ClusterType, []string{clusterName}) + + // The second response also includes the triggered update + // It should ideally not return the initial resource, but sadly this doesn't properly work when using the same type + validateResponse(t, resp, rsrc.ClusterType, []string{clusterName, "otherCluster"}) + + assert.Equal(t, 0, config.deltaWatches) + }) + + callback.triggerType = rsrc.EndpointType + t.Run("Different url type, no existing watch", func(t *testing.T) { + resp := makeMockDeltaStream(t) + defer close(resp.recv) + s := server.NewServer(context.Background(), config, &callback) + go func() { + err := s.DeltaAggregatedResources(resp) + assert.NoError(t, err) + }() + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.ClusterType, + ResourceNamesSubscribe: []string{clusterName}, + } + validateResponse(t, resp, rsrc.ClusterType, []string{clusterName}) + assert.Equal(t, 0, config.deltaWatches) + }) + t.Run("Different url type, existing watch", func(t *testing.T) { + resp := makeMockDeltaStream(t) + defer close(resp.recv) + s := server.NewServer(context.Background(), config, &callback) + go func() { + err := s.DeltaAggregatedResources(resp) + assert.NoError(t, err) + }() + + // Setup a watch for endpoints + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{endpoint.ClusterName, "otherCluster"}, + } + validateResponse(t, resp, rsrc.EndpointType, []string{endpoint.ClusterName, "otherCluster"}) + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + TypeUrl: rsrc.EndpointType, + ResponseNonce: "2", + ResourceNamesSubscribe: nil, + } + // Watch is setup now with the current version known + + // Same call as above, but this time with an existing watch on eps for "otherCluster" + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.ClusterType, + ResourceNamesSubscribe: []string{clusterName}, + } + validateResponse(t, resp, rsrc.ClusterType, []string{clusterName}) + validateResponse(t, resp, rsrc.EndpointType, []string{"otherCluster"}) + assert.Equal(t, 0, config.deltaWatches) + }) }