Skip to content

Commit

Permalink
Add trigger in callbacks to request resource updates
Browse files Browse the repository at this point in the history
  • Loading branch information
valerian-roche committed Jul 22, 2022
1 parent 1c57a23 commit 3cd8044
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 10 deletions.
50 changes: 40 additions & 10 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type Callbacks interface {
OnStreamDeltaResponse(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
}

type ExtendedCallbacks interface {
OnStreamDeltaResponseF(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse, func(typeURL string, resourceNames []string))
}

var deltaErrorResponse = &cache.RawDeltaResponse{}

type server struct {
Expand Down Expand Up @@ -72,6 +76,20 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
}
}()

// Sets up a watch in the cache
processWatch := func(newWatch watch, watchTypeURL string) {
newWatch.responses = make(chan cache.DeltaResponse, 1)
newWatch.cancel = s.cache.CreateDeltaWatch(newWatch.req, newWatch.state, newWatch.responses)
watches.deltaWatches[watchTypeURL] = newWatch

go func() {
resp, more := <-newWatch.responses
if more {
watches.deltaMuxedResponses <- resp
}
}()
}

// Sends a response, returns the new stream nonce
send := func(resp cache.DeltaResponse) (string, error) {
if resp == nil {
Expand All @@ -87,6 +105,26 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
response.Nonce = strconv.FormatInt(streamNonce, 10)
if s.callbacks != nil {
s.callbacks.OnStreamDeltaResponse(streamID, resp.GetDeltaRequest(), response)

if extendedCallbacks, ok := s.callbacks.(ExtendedCallbacks); ok {
extendedCallbacks.OnStreamDeltaResponseF(streamID, resp.GetDeltaRequest(), response, func(typeURL string, resourceNames []string) {
// cancel existing watch to (re-)request a newer version
watch, ok := watches.deltaWatches[typeURL]
if !ok {
// There's no existing watch
return
} else {
watch.Cancel()
}
for _, resourceName := range resourceNames {
// This will force an update for those resources
// If not existing it will return them as removed
watch.state.GetResourceVersions()[resourceName] = ""
}

processWatch(watch, typeURL)
})
}
}

return response.Nonce, str.Send(response)
Expand Down Expand Up @@ -158,6 +196,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De

// cancel existing watch to (re-)request a newer version
watch, ok := watches.deltaWatches[typeURL]
watch.req = req
if !ok {
// Initialize the state of the stream.
// Since there was no previous state, we know we're handling the first request of this type
Expand All @@ -173,16 +212,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)

watch.responses = make(chan cache.DeltaResponse, 1)
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
watches.deltaWatches[typeURL] = watch

go func() {
resp, more := <-watch.responses
if more {
watches.deltaMuxedResponses <- resp
}
}()
processWatch(watch, typeURL)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type watch struct {
responses chan cache.DeltaResponse
cancel func()
nonce string
req *cache.DeltaRequest

state stream.StreamState
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Callbacks interface {
OnStreamResponse(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
}

type ExtendedCallbacks interface {
OnStreamResponseF(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse, func(typeURL string, resourceNames []string))
}

// NewServer creates handlers from a config watcher and callbacks.
func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server {
return &server{cache: config, callbacks: callbacks, ctx: ctx}
Expand Down Expand Up @@ -123,6 +127,26 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq

if s.callbacks != nil {
s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out)
if extendedCallbacks, ok := s.callbacks.(ExtendedCallbacks); ok {
extendedCallbacks.OnStreamResponseF(resp.GetContext(), streamID, resp.GetRequest(), out, func(typeURL string, resourceNames []string) {
if w, ok := watches.responders[typeURL]; !ok {
return
} else {
w.close()

// This will resend all values, as we don't have a way to provide known versions currently
// This should be fixed for cases when partial response can be returned (e.g. EDS)
// Currently both simple and linear caches have very different behaviors related to streamState
w.req.VersionInfo = ""
responder := make(chan cache.Response, 1)
w.cancel = s.cache.CreateWatch(w.req, w.state, responder)
w.response = responder

watches.addWatch(typeURL, w)
watches.recompute(s.ctx, reqCh)
}
})
}
}
return out.Nonce, str.Send(out)
}
Expand Down Expand Up @@ -201,6 +225,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
req: req,
state: streamState,
})
}
} else {
Expand All @@ -209,6 +235,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
req: req,
state: streamState,
})
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/server/sotw/v3/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// watches for all xDS resource types
Expand Down Expand Up @@ -65,6 +66,9 @@ type watch struct {
cancel func()
nonce string
response chan cache.Response
req *cache.Request

state stream.StreamState
}

// close cancels an open watch
Expand Down
158 changes: 158 additions & 0 deletions pkg/server/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func TestDeltaResponseHandlers(t *testing.T) {
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})

resp := makeMockDeltaStream(t)
// This is a wildcard request since we don't specify a list of resource subscriptions
resourceNames := []string{}
for resourceName := range config.deltaResources[typ] {
resourceNames = append(resourceNames, resourceName)
Expand Down Expand Up @@ -612,5 +613,162 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {
}
validateResponse(t, resp.sent, []string{"endpoints2"}, []string{"endpoints4"})
})
}

type testExtendedCallbacks struct {
server.CallbackFuncs
triggerType string
triggerName string
}

func (c testExtendedCallbacks) OnStreamDeltaResponseF(streamId int64, req *discovery.DeltaDiscoveryRequest, resp *discovery.DeltaDiscoveryResponse, updateTrigger func(typeURL string, resourceNames []string)) {
trigger := false
for _, res := range resp.GetResources() {
if res.Name == c.triggerName {
trigger = true
break
}
}
if trigger && req.TypeUrl == rsrc.ClusterType {
updateTrigger(c.triggerType, []string{"otherCluster"})
}
}

func TestDeltaCallbackTrigger(t *testing.T) {
config := makeMockConfigWatcher()
callback := testExtendedCallbacks{
triggerType: rsrc.ClusterType,
triggerName: clusterName,
}

validateResponse := func(t *testing.T, resp *mockDeltaStream, expectedType string, expectedResources []string) {
t.Helper()
var response *discovery.DeltaDiscoveryResponse
select {
case <-time.After(5 * time.Second):
assert.Fail(t, "no response after 5s")
return
case response = <-resp.sent:
}
assert.Equal(t, expectedType, response.TypeUrl)
if assert.Equal(t, len(expectedResources), len(response.Resources)) {
var names []string
for _, resource := range response.Resources {
names = append(names, resource.Name)
}
assert.ElementsMatch(t, names, expectedResources)
}
}

config.deltaResources = map[string]map[string]types.Resource{
rsrc.ClusterType: map[string]types.Resource{
cluster.Name: cluster,
"otherCluster": resource.MakeCluster(resource.Ads, "otherCluster"),
"thirdCluster": resource.MakeCluster(resource.Ads, "thirdCluster"),
},
rsrc.EndpointType: map[string]types.Resource{
cluster.Name: endpoint,
"otherCluster": resource.MakeEndpoint("otherCluster", 1234),
},
}

t.Run("Same url type", func(t *testing.T) {
resp := makeMockDeltaStream(t)
defer close(resp.recv)
s := server.NewServer(context.Background(), config, &callback)
go func() {
err := s.DeltaAggregatedResources(resp)
assert.NoError(t, err)
}()
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: rsrc.ClusterType,
ResourceNamesSubscribe: []string{"otherCluster"},
}
validateResponse(t, resp, rsrc.ClusterType, []string{"otherCluster"})

resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: rsrc.ClusterType,
ResourceNamesSubscribe: []string{"otherCluster"},
}

// This will not return as we already have it at the correct version
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: rsrc.ClusterType,
ResourceNamesSubscribe: []string{"thirdCluster"},
}
// Only return thirdCluster as otherCluster version has not changed
validateResponse(t, resp, rsrc.ClusterType, []string{"thirdCluster"})

// Now request clusterName, which will trigger the specific push
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: rsrc.ClusterType,
ResourceNamesSubscribe: []string{clusterName},
}
// The first response only includes the requested cluster
validateResponse(t, resp, rsrc.ClusterType, []string{clusterName})

// The second response also includes the triggered update
// It should ideally not return the initial resource, but sadly this doesn't properly work when using the same type
validateResponse(t, resp, rsrc.ClusterType, []string{clusterName, "otherCluster"})

assert.Equal(t, 0, config.deltaWatches)
})

callback.triggerType = rsrc.EndpointType
t.Run("Different url type, no existing watch", func(t *testing.T) {
resp := makeMockDeltaStream(t)
defer close(resp.recv)
s := server.NewServer(context.Background(), config, &callback)
go func() {
err := s.DeltaAggregatedResources(resp)
assert.NoError(t, err)
}()

resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: rsrc.ClusterType,
ResourceNamesSubscribe: []string{clusterName},
}
validateResponse(t, resp, rsrc.ClusterType, []string{clusterName})
assert.Equal(t, 0, config.deltaWatches)
})

t.Run("Different url type, existing watch", func(t *testing.T) {
resp := makeMockDeltaStream(t)
defer close(resp.recv)
s := server.NewServer(context.Background(), config, &callback)
go func() {
err := s.DeltaAggregatedResources(resp)
assert.NoError(t, err)
}()

// Setup a watch for endpoints
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{endpoint.ClusterName, "otherCluster"},
}
validateResponse(t, resp, rsrc.EndpointType, []string{endpoint.ClusterName, "otherCluster"})

resp.recv <- &discovery.DeltaDiscoveryRequest{
TypeUrl: rsrc.EndpointType,
ResponseNonce: "2",
ResourceNamesSubscribe: nil,
}
// Watch is setup now with the current version known

// Same call as above, but this time with an existing watch on eps for "otherCluster"
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: rsrc.ClusterType,
ResourceNamesSubscribe: []string{clusterName},
}
validateResponse(t, resp, rsrc.ClusterType, []string{clusterName})
validateResponse(t, resp, rsrc.EndpointType, []string{"otherCluster"})
assert.Equal(t, 0, config.deltaWatches)
})
}

0 comments on commit 3cd8044

Please sign in to comment.