-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP]✨ Add remote cluster cache manager #2835
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,203 @@ | ||||||
/* | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to move the bulk of the implementation here to a library so that it would be easier to consume outside of the core controller manager? I wouldn't expect any providers (especially out of tree) to import anything under There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually a library. I've (privately) questioned why the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i initially put it under there because it was code to be used within capi controllers, it expanded over time though |
||||||
Copyright 2020 The Kubernetes Authors. | ||||||
|
||||||
Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
you may not use this file except in compliance with the License. | ||||||
You may obtain a copy of the License at | ||||||
|
||||||
http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|
||||||
Unless required by applicable law or agreed to in writing, software | ||||||
distributed under the License is distributed on an "AS IS" BASIS, | ||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
See the License for the specific language governing permissions and | ||||||
limitations under the License. | ||||||
*/ | ||||||
|
||||||
package remote | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"sync" | ||||||
|
||||||
"github.com/go-logr/logr" | ||||||
"github.com/pkg/errors" | ||||||
"k8s.io/apimachinery/pkg/runtime" | ||||||
"k8s.io/client-go/rest" | ||||||
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" | ||||||
ctrl "sigs.k8s.io/controller-runtime" | ||||||
"sigs.k8s.io/controller-runtime/pkg/cache" | ||||||
"sigs.k8s.io/controller-runtime/pkg/client" | ||||||
"sigs.k8s.io/controller-runtime/pkg/controller" | ||||||
"sigs.k8s.io/controller-runtime/pkg/reconcile" | ||||||
) | ||||||
|
||||||
// ClusterCache extends cache.Cache and adds the ability to stop the cache. | ||||||
type ClusterCache interface { | ||||||
cache.Cache | ||||||
|
||||||
// Stop stops the cache. | ||||||
Stop() | ||||||
} | ||||||
|
||||||
// clusterCache embeds cache.Cache and combines it with a stop channel. | ||||||
type clusterCache struct { | ||||||
cache.Cache | ||||||
|
||||||
lock sync.Mutex | ||||||
stopped bool | ||||||
stop chan struct{} | ||||||
} | ||||||
|
||||||
var _ ClusterCache = &clusterCache{} | ||||||
|
||||||
func (cc *clusterCache) Stop() { | ||||||
cc.lock.Lock() | ||||||
defer cc.lock.Unlock() | ||||||
|
||||||
if cc.stopped { | ||||||
return | ||||||
} | ||||||
|
||||||
cc.stopped = true | ||||||
close(cc.stop) | ||||||
} | ||||||
|
||||||
// ClusterCacheManager manages client caches for workload clusters. | ||||||
type ClusterCacheManager struct { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer to call this a manager. I know it's watching Clusters and reacting to them, but that's only as a maintenance task to stop and remove stale caches. I wouldn't consider this struct to be doing the same sort of reconciliation that the others are. But if you feel strongly enough, I can make the change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm with @ncdc on this, I think this is more of a management task than a full on controller, it's not changing the state of the world anywhere, I'd personally stick with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Manager is an overloaded word and we're potentially introducing a new naming convention here, how about This isn't blocking, if you all want to use Manager is fine, can be changed later if we find a better naming solution. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to both avoiding Also, +1 to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm splitting the "tracker" part from the "reconciler" part (2 different structs). Will be in next push. |
||||||
log logr.Logger | ||||||
managementClusterClient client.Client | ||||||
scheme *runtime.Scheme | ||||||
|
||||||
lock sync.RWMutex | ||||||
clusterCaches map[client.ObjectKey]ClusterCache | ||||||
|
||||||
// For testing | ||||||
newRESTConfig func(ctx context.Context, cluster client.ObjectKey) (*rest.Config, error) | ||||||
Comment on lines
+72
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. non blocking: could we use the kubeconfig secret to create a client to envtest instead of mocking it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean build up a dummy secret in the unit test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is what I did for the MHC tests and it seemed to work cluster-api/controllers/machinehealthcheck_controller_test.go Lines 72 to 73 in 09f87a2
I think it's preferential to do this, but could definitely be a follow up PR |
||||||
} | ||||||
|
||||||
// NewClusterCacheManagerInput is used as input when creating a ClusterCacheManager and contains all the fields | ||||||
// necessary for its construction. | ||||||
type NewClusterCacheManagerInput struct { | ||||||
Log logr.Logger | ||||||
Manager ctrl.Manager | ||||||
ManagementClusterClient client.Client | ||||||
Scheme *runtime.Scheme | ||||||
ControllerOptions controller.Options | ||||||
} | ||||||
|
||||||
// NewClusterCacheManager creates a new ClusterCacheManager. | ||||||
func NewClusterCacheManager(input NewClusterCacheManagerInput) (*ClusterCacheManager, error) { | ||||||
m := &ClusterCacheManager{ | ||||||
log: input.Log, | ||||||
managementClusterClient: input.ManagementClusterClient, | ||||||
scheme: input.Scheme, | ||||||
clusterCaches: make(map[client.ObjectKey]ClusterCache), | ||||||
} | ||||||
|
||||||
// Make sure we're using the default new cluster cache function. | ||||||
m.newRESTConfig = m.defaultNewRESTConfig | ||||||
|
||||||
// Watch Clusters so we can stop and remove caches when Clusters are deleted. | ||||||
_, err := ctrl.NewControllerManagedBy(input.Manager). | ||||||
For(&clusterv1.Cluster{}). | ||||||
WithOptions(input.ControllerOptions). | ||||||
Build(m) | ||||||
|
||||||
if err != nil { | ||||||
return nil, errors.Wrap(err, "failed to create cluster cache manager controller") | ||||||
} | ||||||
|
||||||
return m, nil | ||||||
} | ||||||
|
||||||
// ClusterCache returns the ClusterCache for cluster, creating a new ClusterCache if needed. | ||||||
func (m *ClusterCacheManager) ClusterCache(ctx context.Context, cluster client.ObjectKey) (ClusterCache, error) { | ||||||
cache := m.getClusterCache(cluster) | ||||||
if cache != nil { | ||||||
return cache, nil | ||||||
} | ||||||
|
||||||
return m.newClusterCache(ctx, cluster) | ||||||
} | ||||||
|
||||||
// getClusterCache returns the ClusterCache for cluster, or nil if it does not exist. | ||||||
func (m *ClusterCacheManager) getClusterCache(cluster client.ObjectKey) ClusterCache { | ||||||
m.lock.RLock() | ||||||
defer m.lock.RUnlock() | ||||||
|
||||||
return m.clusterCaches[cluster] | ||||||
} | ||||||
|
||||||
// defaultNewClusterCache creates and starts a new ClusterCache for cluster. | ||||||
func (m *ClusterCacheManager) newClusterCache(ctx context.Context, cluster client.ObjectKey) (ClusterCache, error) { | ||||||
m.lock.Lock() | ||||||
defer m.lock.Unlock() | ||||||
|
||||||
// If another goroutine created the cache while this one was waiting to acquire the write lock, return that | ||||||
// instead of overwriting it. | ||||||
if c, exists := m.clusterCaches[cluster]; exists { | ||||||
return c, nil | ||||||
} | ||||||
|
||||||
config, err := m.newRESTConfig(ctx, cluster) | ||||||
if err != nil { | ||||||
return nil, errors.Wrap(err, "error creating REST client config for remote cluster") | ||||||
} | ||||||
|
||||||
remoteCache, err := cache.New(config, cache.Options{}) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to expose at least a subset of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do |
||||||
if err != nil { | ||||||
return nil, errors.Wrap(err, "error creating cache for remote cluster") | ||||||
} | ||||||
stop := make(chan struct{}) | ||||||
|
||||||
cc := &clusterCache{ | ||||||
Cache: remoteCache, | ||||||
stop: stop, | ||||||
} | ||||||
m.clusterCaches[cluster] = cc | ||||||
|
||||||
// Start the cache!!! | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😄 I love the enthusiasm |
||||||
go remoteCache.Start(cc.stop) | ||||||
|
||||||
return cc, nil | ||||||
} | ||||||
|
||||||
func (m *ClusterCacheManager) defaultNewRESTConfig(ctx context.Context, cluster client.ObjectKey) (*rest.Config, error) { | ||||||
config, err := RESTConfig(ctx, m.managementClusterClient, cluster) | ||||||
if err != nil { | ||||||
return nil, errors.Wrap(err, "error fetching remote cluster config") | ||||||
} | ||||||
return config, nil | ||||||
} | ||||||
|
||||||
// Reconcile reconciles Clusters and removes ClusterCaches for any Cluster that cannot be retrieved from the | ||||||
// management cluster. | ||||||
func (m *ClusterCacheManager) Reconcile(req reconcile.Request) (reconcile.Result, error) { | ||||||
ctx := context.Background() | ||||||
|
||||||
log := m.log.WithValues("namespace", req.Namespace, "name", req.Name) | ||||||
log.V(4).Info("Reconciling") | ||||||
|
||||||
var cluster clusterv1.Cluster | ||||||
|
||||||
err := m.managementClusterClient.Get(ctx, req.NamespacedName, &cluster) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't we have client.Client as part of the struct like elsewhere and use mgr.GetClient() to get it during init? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I can make that change |
||||||
if err == nil { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this also attempt to guard against intermittent errors here as well and only continue on to the cache deletion if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd have to do some (re)digging, but given that the client is doing cached reads, I think the only way you can get a non-404 error is if the client's cache hasn't been started yet. |
||||||
log.V(4).Info("Cluster still exists") | ||||||
return reconcile.Result{}, nil | ||||||
} | ||||||
Comment on lines
+305
to
+308
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might it be worth checking if the cluster deletion timestamp is set and stop the cache earlier? My concern is that the manager may miss the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the manager is running, it will not miss the deletion event. One of two situations will happen:
However, I can add code to check for deletion timestamps too, if you think it'll be beneficial. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was unaware the the informer would send a simulated delete event, TIL! In that case, I'm content with it as is, thanks for the explanation |
||||||
|
||||||
log.V(4).Info("Error retrieving cluster", "error", err.Error()) | ||||||
|
||||||
c := m.getClusterCache(req.NamespacedName) | ||||||
if c == nil { | ||||||
log.V(4).Info("No current cluster cache exists - nothing to do") | ||||||
} | ||||||
|
||||||
log.V(4).Info("Stopping cluster cache") | ||||||
c.Stop() | ||||||
|
||||||
delete(m.clusterCaches, req.NamespacedName) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should there be a method on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I realized this last night. I have it fixed locally and it'll be in the next push. |
||||||
|
||||||
return reconcile.Result{}, nil | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
/* | ||
Copyright 2020 The Kubernetes Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package remote | ||
|
||
import ( | ||
"context" | ||
"path/filepath" | ||
"testing" | ||
"time" | ||
|
||
. "github.com/onsi/gomega" | ||
apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/client-go/rest" | ||
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/envtest" | ||
"sigs.k8s.io/controller-runtime/pkg/log" | ||
"sigs.k8s.io/controller-runtime/pkg/manager" | ||
) | ||
|
||
func TestClusterCache(t *testing.T) { | ||
g := NewWithT(t) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
testEnv := &envtest.Environment{ | ||
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, | ||
} | ||
|
||
cfg, err := testEnv.Start() | ||
g.Expect(err).ToNot(HaveOccurred()) | ||
g.Expect(cfg).ToNot(BeNil()) | ||
defer func() { | ||
g.Expect(testEnv.Stop()).To(Succeed()) | ||
}() | ||
|
||
scheme := runtime.NewScheme() | ||
g.Expect(clusterv1.AddToScheme(scheme)).To(Succeed()) | ||
|
||
mgr, err := manager.New(cfg, manager.Options{ | ||
Scheme: scheme, | ||
MetricsBindAddress: "0", | ||
}) | ||
g.Expect(err).ToNot(HaveOccurred()) | ||
go func() { | ||
err := mgr.Start(ctx.Done()) | ||
g.Expect(err).NotTo(HaveOccurred()) | ||
}() | ||
|
||
// Create the cluster | ||
cluster := &clusterv1.Cluster{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Namespace: "default", | ||
Name: "test", | ||
}, | ||
} | ||
err = mgr.GetClient().Create(ctx, cluster) | ||
g.Expect(err).ToNot(HaveOccurred()) | ||
|
||
clusterKey := client.ObjectKey{Namespace: "default", Name: "test"} | ||
g.Eventually(func() error { | ||
return mgr.GetClient().Get(ctx, clusterKey, cluster) | ||
}).Should(Succeed()) | ||
|
||
// Create our cache manager | ||
m, err := NewClusterCacheManager(NewClusterCacheManagerInput{ | ||
Log: &log.NullLogger{}, | ||
Manager: mgr, | ||
ManagementClusterClient: mgr.GetClient(), | ||
Scheme: scheme, | ||
}) | ||
g.Expect(err).ToNot(HaveOccurred()) | ||
|
||
m.newRESTConfig = func(ctx context.Context, cluster client.ObjectKey) (*rest.Config, error) { | ||
return mgr.GetConfig(), nil | ||
} | ||
|
||
g.Expect(len(m.clusterCaches)).To(Equal(0)) | ||
|
||
// Get a cache for our cluster | ||
cc, err := m.ClusterCache(ctx, clusterKey) | ||
g.Expect(err).NotTo(HaveOccurred()) | ||
g.Expect(cc).NotTo(BeNil()) | ||
g.Expect(len(m.clusterCaches)).To(Equal(1)) | ||
|
||
// Get it a few more times to make sure it doesn't create more than 1 cache | ||
for i := 0; i < 5; i++ { | ||
cc, err := m.ClusterCache(ctx, clusterKey) | ||
g.Expect(err).NotTo(HaveOccurred()) | ||
g.Expect(cc).NotTo(BeNil()) | ||
g.Expect(len(m.clusterCaches)).To(Equal(1)) | ||
} | ||
|
||
// Delete the cluster | ||
err = mgr.GetClient().Delete(ctx, cluster) | ||
g.Expect(err).ToNot(HaveOccurred()) | ||
|
||
// Make sure it's gone | ||
g.Eventually(func() bool { | ||
err := mgr.GetClient().Get(ctx, clusterKey, cluster) | ||
return apierrors.IsNotFound(err) | ||
}, 5*time.Second).Should(BeTrue()) | ||
|
||
// Make sure the cache was removed | ||
g.Eventually(func() int { | ||
return len(m.clusterCaches) | ||
}, 5*time.Second).Should(Equal(0)) | ||
|
||
// Make sure it was stopped | ||
typedCache := cc.(*clusterCache) | ||
g.Expect(typedCache.stopped).To(BeTrue()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we rename this to cluster_cache_controller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment below about manager vs reconciler. I'd prefer to keep this as-is, or rename to cluster_cache_manager.go. But again, if you feel strongly, I can rename it.