Skip to content

Commit

Permalink
Optimize cache warmup on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
markusthoemmes committed Oct 30, 2020
1 parent 2da6dce commit 956d964
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 221 deletions.
47 changes: 2 additions & 45 deletions pkg/generator/caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,43 +44,17 @@ type Caches struct {
listeners []*v2.Listener
statusVirtualHost *route.VirtualHost
logger *zap.SugaredLogger
ingressesToSync map[string]struct{}
synced chan struct{}
}

func NewCaches(ctx context.Context, logger *zap.SugaredLogger, kubernetesClient kubeclient.Interface, extAuthz bool, ingressesToSync []*v1alpha1.Ingress) (*Caches, error) {
func NewCaches(ctx context.Context, logger *zap.SugaredLogger, kubernetesClient kubeclient.Interface, extAuthz bool) (*Caches, error) {
c := &Caches{
ingresses: make(map[string]*v1alpha1.Ingress),
translatedIngresses: make(map[string]*translatedIngress),
clusters: newClustersCache(logger.Named("cluster-cache")),
clustersToIngress: make(map[string][]string),
logger: logger,
synced: make(chan struct{}),
}
err := c.initConfig(ctx, kubernetesClient, extAuthz)

if len(ingressesToSync) == 0 {
// If ingressesToSync is empty, we can just close the "synced" channel now as we don't need to warm anything.
close(c.synced)
} else {
// Create our list of IngressesToSync from the array of ingresses, using the mapKey func.
c.ingressesToSync = make(map[string]struct{}, len(ingressesToSync))
for _, ingress := range ingressesToSync {
logger.Infof("added ingress to cache warmup %s/%s", ingress.Namespace, ingress.Name)
c.ingressesToSync[mapKey(ingress.Name, ingress.Namespace)] = struct{}{}
}
logger.Infof("total of %d ingresses to warm", len(c.ingressesToSync))
}

return c, err
}

func (caches *Caches) WaitForSync() <-chan struct{} {
return caches.synced
}

func (caches *Caches) hasSynced() bool {
return len(caches.ingressesToSync) == 0
return c, c.initConfig(ctx, kubernetesClient, extAuthz)
}

func (caches *Caches) UpdateIngress(ctx context.Context, ingress *v1alpha1.Ingress, ingressTranslation *translatedIngress, kubeclient kubeclient.Interface) error {
Expand Down Expand Up @@ -150,9 +124,6 @@ func (caches *Caches) addTranslatedIngress(ingress *v1alpha1.Ingress, translated
caches.ingresses[key] = ingress
caches.translatedIngresses[key] = translatedIngress

// Remove the Ingress from the Sync list as it has been warmed.
caches.deleteFromSyncList(ingress.Name, ingress.Namespace)

for _, cluster := range translatedIngress.clusters {
caches.addClusterForIngress(cluster, ingress.Name, ingress.Namespace)
}
Expand Down Expand Up @@ -232,17 +203,6 @@ func (caches *Caches) ToEnvoySnapshot() (cache.Snapshot, error) {
), nil
}

func (caches *Caches) deleteFromSyncList(ingressName, ingressNamespace string) {
// If caches are not synced, we try to delete the ingress from the IngressesToSync list
if !caches.hasSynced() {
delete(caches.ingressesToSync, mapKey(ingressName, ingressNamespace))
// Now let's see if after the delete we are in Sync and cwe can close the channel.
if caches.hasSynced() {
close(caches.synced)
}
}
}

// Note: changes the snapshot version of the caches object
// Notice that the clusters are not deleted. That's handled with the expiration
// time set in the "ClustersCache" struct.
Expand All @@ -251,9 +211,6 @@ func (caches *Caches) DeleteIngressInfo(ctx context.Context, ingressName string,
caches.mu.Lock()
defer caches.mu.Unlock()

// Remove the Ingress from the Sync list as there's no point to wait for it to be synced.
caches.deleteFromSyncList(ingressName, ingressNamespace)

caches.deleteTranslatedIngress(ingressName, ingressNamespace)
return caches.setListeners(ctx, kubeclient)
}
Expand Down
80 changes: 3 additions & 77 deletions pkg/generator/caches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestDeleteIngressInfo(t *testing.T) {
kubeClient := fake.Clientset{}
ctx := context.Background()

caches, err := NewCaches(ctx, logger, &kubeClient, false, nil)
caches, err := NewCaches(ctx, logger, &kubeClient, false)
if err != nil {
t.Fail()
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestDeleteIngressInfoWhenDoesNotExist(t *testing.T) {
kubeClient := fake.Clientset{}
ctx := context.Background()

caches, err := NewCaches(ctx, logger, &kubeClient, false, nil)
caches, err := NewCaches(ctx, logger, &kubeClient, false)
if err != nil {
t.Fail()
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestValidateIngress(t *testing.T) {
kubeClient := fake.Clientset{}
ctx := context.Background()

caches, err := NewCaches(ctx, logger, &kubeClient, false, nil)
caches, err := NewCaches(ctx, logger, &kubeClient, false)
if err != nil {
t.Fail()
}
Expand Down Expand Up @@ -243,77 +243,3 @@ func getVHostsNames(routeConfigs []v2.RouteConfiguration) []string {

return res
}

func TestCacheWithWarmingWithoutIngressesToSync(t *testing.T) {
logger := zap.S()
kubeClient := fake.Clientset{}
ctx := context.Background()

var ingressesToSync []*v1alpha1.Ingress
caches, err := NewCaches(ctx, logger, &kubeClient, false, ingressesToSync)
if err != nil {
t.Fail()
}

// If caches are not synced, let's fail as this should return false.
if !caches.hasSynced() {
t.Fail()
}

// WaitForSync channel should be closed.
select {
case <-caches.WaitForSync():
return
default:
t.Fail()
}
}

func TestCacheWithWarmingWithIngressesToSync(t *testing.T) {
logger := zap.S()
kubeClient := fake.Clientset{}
ctx := context.Background()

ingressesToSync := []*v1alpha1.Ingress{
{
TypeMeta: v1.TypeMeta{},
ObjectMeta: v1.ObjectMeta{Name: "test1", Namespace: "namespace1"},
Spec: v1alpha1.IngressSpec{},
Status: v1alpha1.IngressStatus{},
},
}
caches, err := NewCaches(ctx, logger, &kubeClient, false, ingressesToSync)
if err != nil {
t.Fail()
}

// If caches are synced, let's fail, as this should return false.
if caches.hasSynced() {
t.Fail()
}

// WaitForSync should still be open.
select {
case <-caches.WaitForSync():
t.Fail()
default:
// This means the channel has no data and it's still open, this is good, let's continue.
break
}

caches.deleteFromSyncList("test1", "namespace1")

// If caches are not synced, let's fail as this should return false.
if !caches.hasSynced() {
t.Fail()
}

// Let's check for the sync channel to be closed.
select {
case <-caches.WaitForSync():
return
default:
// Let's fail as the channel has not been closed.
t.Fail()
}
}
17 changes: 8 additions & 9 deletions pkg/generator/ingress_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
kubev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
kubeclient "k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
"knative.dev/net-kourier/pkg/envoy"
"knative.dev/net-kourier/pkg/knative"
"knative.dev/networking/pkg/apis/networking/v1alpha1"
Expand All @@ -37,8 +36,8 @@ import (

type IngressTranslator struct {
kubeclient kubeclient.Interface
endpointsLister corev1listers.EndpointsLister
serviceLister corev1listers.ServiceLister
endpointsGetter func(ns, name string) (*kubev1.Endpoints, error)
serviceGetter func(ns, name string) (*kubev1.Service, error)
tracker tracker.Interface
logger *zap.SugaredLogger
}
Expand All @@ -55,14 +54,14 @@ type translatedIngress struct {

func NewIngressTranslator(
kubeclient kubeclient.Interface,
endpointsLister corev1listers.EndpointsLister,
serviceLister corev1listers.ServiceLister,
endpointsGetter func(ns, name string) (*kubev1.Endpoints, error),
serviceGetter func(ns, name string) (*kubev1.Service, error),
tracker tracker.Interface,
logger *zap.SugaredLogger) IngressTranslator {
return IngressTranslator{
kubeclient: kubeclient,
endpointsLister: endpointsLister,
serviceLister: serviceLister,
endpointsGetter: endpointsGetter,
serviceGetter: serviceGetter,
tracker: tracker,
logger: logger,
}
Expand Down Expand Up @@ -110,15 +109,15 @@ func (translator *IngressTranslator) translateIngress(ctx context.Context, ingre
return nil, err
}

endpoints, err := translator.endpointsLister.Endpoints(split.ServiceNamespace).Get(split.ServiceName)
endpoints, err := translator.endpointsGetter(split.ServiceNamespace, split.ServiceName)
if apierrors.IsNotFound(err) {
translator.logger.Warnf("Endpoints '%s/%s' not yet created", split.ServiceNamespace, split.ServiceName)
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("failed to fetch endpoints '%s/%s': %w", split.ServiceNamespace, split.ServiceName, err)
}

service, err := translator.serviceLister.Services(split.ServiceNamespace).Get(split.ServiceName)
service, err := translator.serviceGetter(split.ServiceNamespace, split.ServiceName)
if apierrors.IsNotFound(err) {
translator.logger.Warnf("Service '%s/%s' not yet created", split.ServiceNamespace, split.ServiceName)
return nil, nil
Expand Down
54 changes: 6 additions & 48 deletions pkg/generator/ingress_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ import (
"gotest.tools/assert"
kubev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
corev1listers "k8s.io/client-go/listers/core/v1"
"knative.dev/net-kourier/pkg/envoy"
"knative.dev/networking/pkg/apis/networking/v1alpha1"
logtest "knative.dev/pkg/logging/testing"
Expand Down Expand Up @@ -120,7 +118,7 @@ func TestTrafficSplits(t *testing.T) {
}

ingressTranslator := NewIngressTranslator(
kubeClient, newMockedEndpointsLister(), newMockedServiceLister(), &pkgtest.FakeTracker{}, logtest.TestLogger(t))
kubeClient, mockedEndpointsGetter, mockedServiceGetter, &pkgtest.FakeTracker{}, logtest.TestLogger(t))

ingressTranslation, err := ingressTranslator.translateIngress(ctx, &ingress, false)
if err != nil {
Expand Down Expand Up @@ -199,7 +197,7 @@ func TestIngressVisibility(t *testing.T) {
}

ingressTranslator := NewIngressTranslator(
kubeClient, newMockedEndpointsLister(), newMockedServiceLister(), &pkgtest.FakeTracker{}, logtest.TestLogger(t))
kubeClient, mockedEndpointsGetter, mockedServiceGetter, &pkgtest.FakeTracker{}, logtest.TestLogger(t))

translatedIngress, err := ingressTranslator.translateIngress(ctx, ingress, false)
if err != nil {
Expand Down Expand Up @@ -253,7 +251,7 @@ func TestIngressWithTLS(t *testing.T) {
}

ingressTranslator := NewIngressTranslator(
kubeClient, newMockedEndpointsLister(), newMockedServiceLister(), &pkgtest.FakeTracker{}, logtest.TestLogger(t))
kubeClient, mockedEndpointsGetter, mockedServiceGetter, &pkgtest.FakeTracker{}, logtest.TestLogger(t))

translatedIngress, err := ingressTranslator.translateIngress(ctx, ingress, false)
if err != nil {
Expand Down Expand Up @@ -287,58 +285,18 @@ func TestReturnsErrorWhenTLSSecretDoesNotExist(t *testing.T) {
}

ingressTranslator := NewIngressTranslator(
kubeClient, newMockedEndpointsLister(), newMockedServiceLister(), &pkgtest.FakeTracker{}, logtest.TestLogger(t))
kubeClient, mockedEndpointsGetter, mockedServiceGetter, &pkgtest.FakeTracker{}, logtest.TestLogger(t))

_, err := ingressTranslator.translateIngress(ctx, ingress, false)

assert.Error(t, err, fmt.Sprintf("secrets \"%s\" not found", tlsSecretName))
}

func newMockedEndpointsLister() corev1listers.EndpointsLister {
return new(endpointsLister)
}

type endpointsLister struct{}

func (endpointsLister *endpointsLister) List(selector labels.Selector) ([]*kubev1.Endpoints, error) {
return []*kubev1.Endpoints{{}}, nil
}

func (endpointsLister *endpointsLister) Endpoints(namespace string) corev1listers.EndpointsNamespaceLister {
return new(endpoints)
}

type endpoints struct{}

func (endpoints *endpoints) List(selector labels.Selector) ([]*kubev1.Endpoints, error) {
return []*kubev1.Endpoints{{}}, nil
}

func (endpoints *endpoints) Get(name string) (*kubev1.Endpoints, error) {
var mockedEndpointsGetter = func(ns, name string) (*kubev1.Endpoints, error) {
return &kubev1.Endpoints{}, nil
}

func newMockedServiceLister() corev1listers.ServiceLister {
return new(serviceLister)
}

type serviceLister struct{}

func (endpointsLister *serviceLister) List(selector labels.Selector) ([]*kubev1.Service, error) {
return []*kubev1.Service{{}}, nil
}

func (endpointsLister *serviceLister) Services(namespace string) corev1listers.ServiceNamespaceLister {
return new(service)
}

type service struct{}

func (endpoints *service) List(selector labels.Selector) ([]*kubev1.Service, error) {
return []*kubev1.Service{{}}, nil
}

func (endpoints *service) Get(name string) (*kubev1.Service, error) {
var mockedServiceGetter = func(ns, name string) (*kubev1.Service, error) {
return &kubev1.Service{}, nil
}

Expand Down
Loading

0 comments on commit 956d964

Please sign in to comment.