Skip to content

Commit

Permalink
flp-ingestor and flp-single now use the same service
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade committed May 13, 2022
1 parent 3edd5be commit 83e1672
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 30 deletions.
5 changes: 0 additions & 5 deletions controllers/flowcollector_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,6 @@ func flowCollectorControllerSpecs() {
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
}, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`))

By("Expecting service to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &v1.Service{})
}, timeout, interval).Should(MatchError(`services "flowlogs-pipeline" not found`))
})

It("Should remove kafka config successfully", func() {
Expand Down
5 changes: 4 additions & 1 deletion controllers/flowlogspipeline/flp_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (b *builder) service(old *corev1.Service) *corev1.Service {
if old == nil {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: constants.FLPName + b.confKindSuffix,
Name: constants.FLPName, //We don't add suffix here so we always use the same service
Namespace: b.namespace,
Labels: b.labels,
},
Expand All @@ -372,10 +372,13 @@ func (b *builder) service(old *corev1.Service) *corev1.Service {
}
// In case we're updating an existing service, we need to build from the old one to keep immutable fields such as clusterIP
newService := old.DeepCopy()
newService.Spec.Selector = b.selector
newService.Spec.SessionAffinity = corev1.ServiceAffinityClientIP
newService.Spec.Ports = []corev1.ServicePort{{
Port: b.desired.Port,
Protocol: b.portProtocol,
}}
newService.ObjectMeta.Labels = b.labels
return newService
}

Expand Down
38 changes: 20 additions & 18 deletions controllers/flowlogspipeline/flp_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ type ownedObjects struct {

func NewReconciler(cl reconcilers.ClientHelper, ns, prevNS string) FLPReconciler {
owned := ownedObjects{
serviceAccount: &corev1.ServiceAccount{},
service: &corev1.Service{},
}
nobjMngr := reconcilers.NewNamespacedObjectManager(cl, ns, prevNS)
nobjMngr.AddManagedObject(constants.FLPName, owned.serviceAccount)
nobjMngr.AddManagedObject(constants.FLPName, owned.service)

flpReconciler := FLPReconciler{ClientHelper: cl, nobjMngr: nobjMngr, owned: owned}
flpReconciler.singleReconcilers = append(flpReconciler.singleReconcilers, newSingleReconciler(cl, ns, prevNS, ConfSingle))
Expand All @@ -75,7 +75,6 @@ func newSingleReconciler(cl reconcilers.ClientHelper, ns string, prevNS string,
nobjMngr := reconcilers.NewNamespacedObjectManager(cl, ns, prevNS)
nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.deployment)
nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.daemonSet)
nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.service)
nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.hpa)
nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.serviceAccount)
nobjMngr.AddManagedObject(configMapName+FlpConfSuffix[confKind], owned.configMap)
Expand Down Expand Up @@ -240,14 +239,19 @@ func (r *singleDeploymentReconciler) reconcileAsDeployment(ctx context.Context,
}

func (r *singleDeploymentReconciler) reconcileAsService(ctx context.Context, desiredFLP *flpSpec, builder *builder) error {
if !r.nobjMngr.Exists(r.owned.service) {
newSVC := builder.service(nil)
if err := r.CreateOwned(ctx, newSVC); err != nil {
return err
actual := corev1.Service{}
if err := r.Client.Get(ctx,
types.NamespacedName{Name: constants.FLPName, Namespace: r.nobjMngr.Namespace},
&actual,
); err != nil {
if errors.IsNotFound(err) {
return r.CreateOwned(ctx, builder.service(nil))
}
} else if serviceNeedsUpdate(r.owned.service, desiredFLP) {
newSVC := builder.service(r.owned.service)
if err := r.UpdateOwned(ctx, r.owned.service, newSVC); err != nil {
return fmt.Errorf("can't reconcile %s Serviceg: %w", constants.FLPName, err)
}
newSVC := builder.service(&actual)
if serviceNeedsUpdate(&actual, newSVC) {
if err := r.UpdateOwned(ctx, &actual, newSVC); err != nil {
return err
}
}
Expand All @@ -257,8 +261,10 @@ func (r *singleDeploymentReconciler) reconcileAsService(ctx context.Context, des
func (r *singleDeploymentReconciler) reconcileAsDaemonSet(ctx context.Context, desiredFLP *flpSpec, builder *builder, configDigest string) error {
// Kind may have changed: try delete Deployment / Service / HPA and create DaemonSet
r.nobjMngr.TryDelete(ctx, r.owned.deployment)
r.nobjMngr.TryDelete(ctx, r.owned.service)
r.nobjMngr.TryDelete(ctx, r.owned.hpa)
if err := r.Client.Delete(ctx, builder.service(nil)); !errors.IsNotFound(err) {
return err
}
if !r.nobjMngr.Exists(r.owned.daemonSet) {
return r.CreateOwned(ctx, builder.daemonSet(configDigest))
} else if daemonSetNeedsUpdate(r.owned.daemonSet, desiredFLP, configDigest, constants.FLPName+FlpConfSuffix[r.confKind]) {
Expand Down Expand Up @@ -357,13 +363,9 @@ func configChanged(tmpl *corev1.PodTemplateSpec, configDigest string) bool {
return tmpl.Annotations == nil || tmpl.Annotations[PodConfigurationDigest] != configDigest
}

func serviceNeedsUpdate(svc *corev1.Service, desired *flpSpec) bool {
for _, port := range svc.Spec.Ports {
if port.Port == desired.Port && port.Protocol == corev1.ProtocolUDP {
return false
}
}
return true
func serviceNeedsUpdate(actual *corev1.Service, desired *corev1.Service) bool {
return !reflect.DeepEqual(actual.ObjectMeta, desired.ObjectMeta) ||
!reflect.DeepEqual(actual.Spec, desired.Spec)
}

func containerNeedsUpdate(podSpec *corev1.PodSpec, desired *flpSpec, expectHostPort bool, name string) bool {
Expand Down
11 changes: 5 additions & 6 deletions controllers/flowlogspipeline/flp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,9 @@ func TestServiceNoChange(t *testing.T) {
first := b.service(nil)

// Check no change
flp = getFLPConfig()
loki = getLokiConfig()
b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle)
newService := first.DeepCopy()

assert.False(serviceNeedsUpdate(first, &flp))
assert.False(serviceNeedsUpdate(first, newService))
}

func TestServiceChanged(t *testing.T) {
Expand All @@ -332,13 +330,14 @@ func TestServiceChanged(t *testing.T) {
b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle)
second := b.service(first)

assert.True(serviceNeedsUpdate(first, &flp))
assert.True(serviceNeedsUpdate(first, second))

// Make sure non-service settings doesn't trigger service update
flp.LogLevel = "error"
b = newBuilder(ns, corev1.ProtocolUDP, &flp, &loki, ConfSingle)
third := b.service(first)

assert.False(serviceNeedsUpdate(second, &flp))
assert.False(serviceNeedsUpdate(second, third))
}

func TestConfigMapShouldDeserializeAsYAML(t *testing.T) {
Expand Down

0 comments on commit 83e1672

Please sign in to comment.