diff --git a/internal/controllers/clusterclass/clusterclass_controller.go b/internal/controllers/clusterclass/clusterclass_controller.go index 274fc229d7a8..64d0d5884c29 100644 --- a/internal/controllers/clusterclass/clusterclass_controller.go +++ b/internal/controllers/clusterclass/clusterclass_controller.go @@ -46,7 +46,9 @@ import ( runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" "sigs.k8s.io/cluster-api/feature" runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client" + runtimeregistry "sigs.k8s.io/cluster-api/internal/runtime/registry" "sigs.k8s.io/cluster-api/internal/topology/variables" + "sigs.k8s.io/cluster-api/internal/util/cache" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/conversion" "sigs.k8s.io/cluster-api/util/patch" @@ -67,6 +69,10 @@ type Reconciler struct { // RuntimeClient is a client for calling runtime extensions. RuntimeClient runtimeclient.Client + + // discoverVariablesCache is used to temporarily store the response of a DiscoveryVariables call for + // a specific runtime extension/settings combination. + discoverVariablesCache cache.Cache[runtimeclient.CallExtensionCacheEntry] } func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { @@ -91,6 +97,8 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") } + + r.discoverVariablesCache = cache.New[runtimeclient.CallExtensionCacheEntry]() return nil } @@ -302,8 +310,13 @@ func (r *Reconciler) reconcileVariables(ctx context.Context, s *scope) (ctrl.Res req := &runtimehooksv1.DiscoverVariablesRequest{} req.Settings = patch.External.Settings + // We temporarily cache the response of a DiscoveryVariables call to improve performance in case there are + // many ClusterClasses using the same runtime extension/settings combination. + // This also mitigates spikes when ClusterClass re-syncs happen or when changes to the ExtensionConfig are applied. + // DiscoverVariables is expected to return a "static" response and usually there are few ExtensionConfigs in a mgmt cluster. resp := &runtimehooksv1.DiscoverVariablesResponse{} - err := r.RuntimeClient.CallExtension(ctx, runtimehooksv1.DiscoverVariables, clusterClass, *patch.External.DiscoverVariablesExtension, req, resp) + err := r.RuntimeClient.CallExtension(ctx, runtimehooksv1.DiscoverVariables, clusterClass, *patch.External.DiscoverVariablesExtension, req, resp, + runtimeclient.WithCaching{Cache: r.discoverVariablesCache, CacheKeyFunc: cacheKeyFunc}) if err != nil { errs = append(errs, errors.Wrapf(err, "failed to call DiscoverVariables for patch %s", patch.Name)) continue @@ -492,3 +505,12 @@ func matchNamespace(ctx context.Context, c client.Client, selector labels.Select } return selector.Matches(labels.Set(ns.GetLabels())) } + +func cacheKeyFunc(registration *runtimeregistry.ExtensionRegistration, request runtimehooksv1.RequestObject) string { + // Note: registration.Name is identical to the value of the patch.External.DiscoverVariablesExtension field in the ClusterClass. + s := fmt.Sprintf("%s-%s", registration.Name, registration.ExtensionConfigResourceVersion) + for k, v := range request.GetSettings() { + s += fmt.Sprintf(",%s=%s", k, v) + } + return s +} diff --git a/internal/controllers/clusterclass/clusterclass_controller_test.go b/internal/controllers/clusterclass/clusterclass_controller_test.go index 1df78cd48845..6cb63b0ca45d 100644 --- a/internal/controllers/clusterclass/clusterclass_controller_test.go +++ b/internal/controllers/clusterclass/clusterclass_controller_test.go @@ -43,7 +43,9 @@ import ( runtimecatalog "sigs.k8s.io/cluster-api/exp/runtime/catalog" runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" "sigs.k8s.io/cluster-api/feature" + runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client" fakeruntimeclient "sigs.k8s.io/cluster-api/internal/runtime/client/fake" + "sigs.k8s.io/cluster-api/internal/util/cache" "sigs.k8s.io/cluster-api/util/test/builder" ) @@ -1154,7 +1156,8 @@ func TestReconciler_reconcileVariables(t *testing.T) { Build() r := &Reconciler{ - RuntimeClient: fakeRuntimeClient, + RuntimeClient: fakeRuntimeClient, + discoverVariablesCache: cache.New[runtimeclient.CallExtensionCacheEntry](), } // Pin the compatibility version used in variable CEL validation to 1.29, so we don't have to continuously refactor diff --git a/internal/controllers/topology/cluster/patches/external/external_patch_generator_test.go b/internal/controllers/topology/cluster/patches/external/external_patch_generator_test.go index 7f2b259d01e9..56e8c94a631b 100644 --- a/internal/controllers/topology/cluster/patches/external/external_patch_generator_test.go +++ b/internal/controllers/topology/cluster/patches/external/external_patch_generator_test.go @@ -129,7 +129,7 @@ func (f *fakeRuntimeClient) CallAllExtensions(_ context.Context, _ runtimecatalo panic("implement me") } -func (f *fakeRuntimeClient) CallExtension(_ context.Context, _ runtimecatalog.Hook, _ metav1.Object, _ string, request runtimehooksv1.RequestObject, _ runtimehooksv1.ResponseObject) error { +func (f *fakeRuntimeClient) CallExtension(_ context.Context, _ runtimecatalog.Hook, _ metav1.Object, _ string, request runtimehooksv1.RequestObject, _ runtimehooksv1.ResponseObject, _ ...runtimeclient.CallExtensionOption) error { // Keep a copy of the request object. // We keep a copy because the request is modified after the call is made. So we keep a copy to perform assertions. f.callExtensionRequest = request.DeepCopyObject().(runtimehooksv1.RequestObject) diff --git a/internal/runtime/client/client.go b/internal/runtime/client/client.go index 005bc8605db9..b0cb562759ce 100644 --- a/internal/runtime/client/client.go +++ b/internal/runtime/client/client.go @@ -27,6 +27,7 @@ import ( "net/http" "net/url" "path" + "reflect" "strconv" "strings" "time" @@ -49,6 +50,7 @@ import ( runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" runtimemetrics "sigs.k8s.io/cluster-api/internal/runtime/metrics" runtimeregistry "sigs.k8s.io/cluster-api/internal/runtime/registry" + "sigs.k8s.io/cluster-api/internal/util/cache" "sigs.k8s.io/cluster-api/util" ) @@ -96,7 +98,7 @@ type Client interface { CallAllExtensions(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject) error // CallExtension calls the ExtensionHandler with the given name. - CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject) error + CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject, opts ...CallExtensionOption) error } var _ Client = &client{} @@ -276,6 +278,44 @@ func aggregateSuccessfulResponses(aggregatedResponse runtimehooksv1.ResponseObje aggregatedResponse.SetMessage(strings.Join(messages, ", ")) } +// CallExtensionOption is the interface for configuration that modifies CallExtensionOptions for a CallExtension call. +type CallExtensionOption interface { + // ApplyToOptions applies this configuration to the given CallExtensionOptions. + ApplyToOptions(*CallExtensionOptions) +} + +// CallExtensionCacheEntry is a cache entry for the cache that can be used with the CallExtension call via +// the WithCaching option. +type CallExtensionCacheEntry struct { + CacheKey string + Response runtimehooksv1.ResponseObject +} + +// Key returns the cache key of a CallExtensionCacheEntry. +func (c CallExtensionCacheEntry) Key() string { + return c.CacheKey +} + +// WithCaching enables caching for the CallExtension call. +type WithCaching struct { + Cache cache.Cache[CallExtensionCacheEntry] + CacheKeyFunc func(*runtimeregistry.ExtensionRegistration, runtimehooksv1.RequestObject) string +} + +// ApplyToOptions applies WithCaching to the given CallExtensionOptions. +func (w WithCaching) ApplyToOptions(in *CallExtensionOptions) { + in.WithCaching = true + in.Cache = w.Cache + in.CacheKeyFunc = w.CacheKeyFunc +} + +// CallExtensionOptions contains the options for the CallExtension call. +type CallExtensionOptions struct { + WithCaching bool + Cache cache.Cache[CallExtensionCacheEntry] + CacheKeyFunc func(*runtimeregistry.ExtensionRegistration, runtimehooksv1.RequestObject) string +} + // CallExtension makes the call to the extension with the given name. // The response object passed will be updated with the response of the call. // An error is returned if the extension is not compatible with the hook. @@ -288,7 +328,13 @@ func aggregateSuccessfulResponses(aggregatedResponse runtimehooksv1.ResponseObje // Nb. FailurePolicy does not affect the following kinds of errors: // - Internal errors. Examples: hooks is incompatible with ExtensionHandler, ExtensionHandler information is missing. // - Error when ExtensionHandler returns a response with `Status` set to `Failure`. -func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject) error { +func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject, opts ...CallExtensionOption) error { + // Calculate the options. + options := &CallExtensionOptions{} + for _, opt := range opts { + opt.ApplyToOptions(options) + } + log := ctrl.LoggerFrom(ctx).WithValues("extensionHandler", name, "hook", runtimecatalog.HookName(hook)) ctx = ctrl.LoggerInto(ctx, log) hookGVH, err := c.catalog.GroupVersionHook(hook) @@ -331,7 +377,23 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo // Prepare the request by merging the settings in the registration with the settings in the request. request = cloneAndAddSettings(request, registration.Settings) - opts := &httpCallOptions{ + var cacheKey string + if options.WithCaching { + // Return a cached response if response is cached. + cacheKey = options.CacheKeyFunc(registration, request) + if cacheEntry, ok := options.Cache.Has(cacheKey); ok { + // Set response to cacheEntry.Response. + outVal := reflect.ValueOf(response) + cacheVal := reflect.ValueOf(cacheEntry.Response) + if !cacheVal.Type().AssignableTo(outVal.Type()) { + return fmt.Errorf("failed to call extension handler %q: cached response of type %s instead of type %s", name, cacheVal.Type(), outVal.Type()) + } + reflect.Indirect(outVal).Set(reflect.Indirect(cacheVal)) + return nil + } + } + + httpOpts := &httpCallOptions{ catalog: c.catalog, config: registration.ClientConfig, registrationGVH: registration.GroupVersionHook, @@ -339,7 +401,7 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo name: strings.TrimSuffix(registration.Name, "."+registration.ExtensionConfigName), timeout: timeoutDuration, } - err = httpCall(ctx, request, response, opts) + err = httpCall(ctx, request, response, httpOpts) if err != nil { // If the error is errCallingExtensionHandler then apply failure policy to calculate // the effective result of the operation. @@ -368,6 +430,14 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo log.V(4).Info("Extension handler returned success response") } + if options.WithCaching { + // Add response to the cache. + options.Cache.Add(CallExtensionCacheEntry{ + CacheKey: cacheKey, + Response: response, + }) + } + // Received a successful response from the extension handler. The `response` object // has been populated with the result. Return no error. return nil diff --git a/internal/runtime/client/client_test.go b/internal/runtime/client/client_test.go index 73ec5aa5cef1..5d868e36acea 100644 --- a/internal/runtime/client/client_test.go +++ b/internal/runtime/client/client_test.go @@ -44,6 +44,7 @@ import ( runtimeregistry "sigs.k8s.io/cluster-api/internal/runtime/registry" fakev1alpha1 "sigs.k8s.io/cluster-api/internal/runtime/test/v1alpha1" fakev1alpha2 "sigs.k8s.io/cluster-api/internal/runtime/test/v1alpha2" + "sigs.k8s.io/cluster-api/internal/util/cache" ) func TestClient_httpCall(t *testing.T) { @@ -543,6 +544,9 @@ func TestClient_CallExtension(t *testing.T) { fpIgnore := runtimev1.FailurePolicyIgnore validExtensionHandlerWithFailPolicy := runtimev1.ExtensionConfig{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "15", + }, Spec: runtimev1.ExtensionConfigSpec{ ClientConfig: runtimev1.ClientConfig{ // Set a fake URL, in test cases where we start the test server the URL will be overridden. @@ -566,6 +570,9 @@ func TestClient_CallExtension(t *testing.T) { }, } validExtensionHandlerWithIgnorePolicy := runtimev1.ExtensionConfig{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "15", + }, Spec: runtimev1.ExtensionConfigSpec{ ClientConfig: runtimev1.ClientConfig{ // Set a fake URL, in test cases where we start the test server the URL will be overridden. @@ -599,6 +606,7 @@ func TestClient_CallExtension(t *testing.T) { args args testServer testServerConfig wantErr bool + wantResponseCached bool }{ { name: "should fail when hook and request/response are not compatible", @@ -612,7 +620,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.SecondFakeRequest{}, response: &fakev1alpha1.SecondFakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, { name: "should fail when hook GVH does not match the registered ExtensionHandler", @@ -626,7 +635,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.SecondFakeRequest{}, response: &fakev1alpha1.SecondFakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, { name: "should fail if ExtensionHandler is not registered", @@ -643,7 +653,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, { name: "should succeed when calling ExtensionHandler with success response and FailurePolicyFail", @@ -660,7 +671,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: false, + wantErr: false, + wantResponseCached: true, }, { name: "should succeed when calling ExtensionHandler with success response and FailurePolicyIgnore", @@ -677,7 +689,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: false, + wantErr: false, + wantResponseCached: true, }, { name: "should fail when calling ExtensionHandler with failure response and FailurePolicyFail", @@ -694,7 +707,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, { name: "should fail when calling ExtensionHandler with failure response and FailurePolicyIgnore", @@ -711,7 +725,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, { @@ -726,7 +741,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: false, + wantErr: false, + wantResponseCached: false, // Note: We only want to cache entirely successful responses. }, { name: "should fail with unreachable extension and FailurePolicyFail", @@ -740,7 +756,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, } @@ -748,8 +765,11 @@ func TestClient_CallExtension(t *testing.T) { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) + var serverCallCount int if tt.testServer.start { - srv := createSecureTestServer(tt.testServer) + srv := createSecureTestServer(tt.testServer, func() { + serverCallCount++ + }) srv.StartTLS() defer srv.Close() @@ -778,17 +798,58 @@ func TestClient_CallExtension(t *testing.T) { Namespace: "foo", }, } + // Call once without caching. err := c.CallExtension(context.Background(), tt.args.hook, obj, tt.args.name, tt.args.request, tt.args.response) + if tt.wantErr { + g.Expect(err).To(HaveOccurred()) + } else { + g.Expect(err).ToNot(HaveOccurred()) + } + // Call again with caching. + serverCallCount = 0 + cache := cache.New[CallExtensionCacheEntry]() + err = c.CallExtension(context.Background(), tt.args.hook, obj, tt.args.name, tt.args.request, tt.args.response, + WithCaching{Cache: cache, CacheKeyFunc: cacheKeyFunc}) if tt.wantErr { g.Expect(err).To(HaveOccurred()) } else { g.Expect(err).ToNot(HaveOccurred()) } + + if tt.wantResponseCached { + // When we expect the response to be cached we expect 1 call to the server. + g.Expect(serverCallCount).To(Equal(1)) + cacheEntry, isCached := cache.Has("valid-extension-15") + g.Expect(isCached).To(BeTrue()) + g.Expect(cacheEntry).ToNot(BeNil()) + + err = c.CallExtension(context.Background(), tt.args.hook, obj, tt.args.name, tt.args.request, tt.args.response, + WithCaching{Cache: cache, CacheKeyFunc: cacheKeyFunc}) + // When we expect the response to be cached we always expect no errors. + g.Expect(err).ToNot(HaveOccurred()) + // As the response is cached we expect no further calls to the server. + g.Expect(serverCallCount).To(Equal(1)) + cacheEntry, isCached = cache.Has("valid-extension-15") + g.Expect(isCached).To(BeTrue()) + g.Expect(cacheEntry).ToNot(BeNil()) + } else { + _, isCached := cache.Has("valid-extension-15") + g.Expect(isCached).To(BeFalse()) + } }) } } +func cacheKeyFunc(registration *runtimeregistry.ExtensionRegistration, request runtimehooksv1.RequestObject) string { + // Note: registration.Name is identical to the value of the name parameter passed into CallExtension. + s := fmt.Sprintf("%s-%s", registration.Name, registration.ExtensionConfigResourceVersion) + for k, v := range request.GetSettings() { + s += fmt.Sprintf(",%s=%s", k, v) + } + return s +} + func TestPrepareRequest(t *testing.T) { t.Run("request should have the correct settings", func(t *testing.T) { tests := []struct { @@ -1255,9 +1316,13 @@ func response(status runtimehooksv1.ResponseStatus) testServerResponse { } } -func createSecureTestServer(server testServerConfig) *httptest.Server { +func createSecureTestServer(server testServerConfig, callbacks ...func()) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + for _, callback := range callbacks { + callback() + } + // Write the response for the first match in tt.testServer.responses. for pathRegex, resp := range server.responses { if !regexp.MustCompile(pathRegex).MatchString(r.URL.Path) { diff --git a/internal/runtime/client/fake/fake_client.go b/internal/runtime/client/fake/fake_client.go index d1722a2d40a1..c37efe186e2f 100644 --- a/internal/runtime/client/fake/fake_client.go +++ b/internal/runtime/client/fake/fake_client.go @@ -119,7 +119,7 @@ func (fc *RuntimeClient) CallAllExtensions(ctx context.Context, hook runtimecata } // CallExtension implements Client. -func (fc *RuntimeClient) CallExtension(ctx context.Context, _ runtimecatalog.Hook, _ metav1.Object, name string, _ runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject) error { +func (fc *RuntimeClient) CallExtension(ctx context.Context, _ runtimecatalog.Hook, _ metav1.Object, name string, _ runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject, _ ...runtimeclient.CallExtensionOption) error { expectedResponse, ok := fc.callResponses[name] if !ok { // This should actually panic because an error here would mean a mistake in the test setup. diff --git a/internal/runtime/registry/registry.go b/internal/runtime/registry/registry.go index 1ed2a7407f95..1de4c1eebcfa 100644 --- a/internal/runtime/registry/registry.go +++ b/internal/runtime/registry/registry.go @@ -64,6 +64,9 @@ type ExtensionRegistration struct { // ExtensionConfigName is the name of the corresponding ExtensionConfig. ExtensionConfigName string + // ExtensionConfigResourceVersion is the ResourceVersion of the corresponding ExtensionConfig. + ExtensionConfigResourceVersion string + // GroupVersionHook is the GroupVersionHook that the RuntimeExtension implements. GroupVersionHook runtimecatalog.GroupVersionHook @@ -247,8 +250,9 @@ func (r *extensionRegistry) add(extensionConfig *runtimev1.ExtensionConfig) erro // Registrations will only be added to the registry if no errors occur (all or nothing). registrations = append(registrations, &ExtensionRegistration{ - ExtensionConfigName: extensionConfig.Name, - Name: e.Name, + ExtensionConfigName: extensionConfig.Name, + ExtensionConfigResourceVersion: extensionConfig.ResourceVersion, + Name: e.Name, GroupVersionHook: runtimecatalog.GroupVersionHook{ Group: gv.Group, Version: gv.Version, diff --git a/test/extension/handlers/topologymutation/handler_integration_test.go b/test/extension/handlers/topologymutation/handler_integration_test.go index 74068c0da042..c1bd5435980f 100644 --- a/test/extension/handlers/topologymutation/handler_integration_test.go +++ b/test/extension/handlers/topologymutation/handler_integration_test.go @@ -54,6 +54,7 @@ import ( "sigs.k8s.io/cluster-api/exp/topology/desiredstate" "sigs.k8s.io/cluster-api/exp/topology/scope" "sigs.k8s.io/cluster-api/feature" + runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client" v1beta2conditions "sigs.k8s.io/cluster-api/util/conditions/v1beta2" "sigs.k8s.io/cluster-api/util/contract" "sigs.k8s.io/cluster-api/webhooks" @@ -411,7 +412,7 @@ type injectRuntimeClient struct { runtimeExtension TopologyMutationHook } -func (i injectRuntimeClient) CallExtension(ctx context.Context, hook runtimecatalog.Hook, _ metav1.Object, _ string, req runtimehooksv1.RequestObject, resp runtimehooksv1.ResponseObject) error { +func (i injectRuntimeClient) CallExtension(ctx context.Context, hook runtimecatalog.Hook, _ metav1.Object, _ string, req runtimehooksv1.RequestObject, resp runtimehooksv1.ResponseObject, _ ...runtimeclient.CallExtensionOption) error { // Note: We have to copy the requests. Otherwise we could get side effect by Runtime Extensions // modifying the request instead of properly returning a response. Also after Unmarshal, // only the Raw fields in runtime.RawExtension fields should be filled out and Object should be nil.