Skip to content

Commit

Permalink
Refactoring to prepare kafka integration
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade committed Apr 7, 2022
1 parent 2321cba commit 34660db
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 53 deletions.
62 changes: 38 additions & 24 deletions controllers/flowlogspipeline/flp_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,52 @@ const (
startupPeriodSeconds = 10
)

const (
ConfSingle = "allInOne"
ConfKafkaIngestor = "kafkaIngestor"
ConfKafkaTransformer = "kafkaTransformer"
)

var flpConfSuffix = map[string]string{
ConfSingle: "",
ConfKafkaIngestor: "-kingestor",
ConfKafkaTransformer: "-ktransformer",
}

// PodConfigurationDigest is an annotation name to facilitate pod restart after
// any external configuration change
const PodConfigurationDigest = "flows.netobserv.io/" + configMapName

type builder struct {
namespace string
labels map[string]string
selector map[string]string
desired *flowsv1alpha1.FlowCollectorFLP
desiredLoki *flowsv1alpha1.FlowCollectorLoki
namespace string
labels map[string]string
selector map[string]string
desired *flowsv1alpha1.FlowCollectorFLP
desiredLoki *flowsv1alpha1.FlowCollectorLoki
confKindSuffix string
}

func newBuilder(ns string, desired *flowsv1alpha1.FlowCollectorFLP, desiredLoki *flowsv1alpha1.FlowCollectorLoki) builder {
func newBuilder(ns string, desired *flowsv1alpha1.FlowCollectorFLP, desiredLoki *flowsv1alpha1.FlowCollectorLoki, confKind string) builder {
version := helper.ExtractVersion(desired.Image)
return builder{
namespace: ns,
labels: map[string]string{
"app": constants.FLPName,
"app": constants.FLPName + flpConfSuffix[confKind],
"version": version,
},
selector: map[string]string{
"app": constants.FLPName,
"app": constants.FLPName + flpConfSuffix[confKind],
},
desired: desired,
desiredLoki: desiredLoki,
desired: desired,
desiredLoki: desiredLoki,
confKindSuffix: flpConfSuffix[confKind],
}
}

func (b *builder) deployment(configDigest string) *appsv1.Deployment {
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: constants.FLPName,
Name: constants.FLPName + b.confKindSuffix,
Namespace: b.namespace,
Labels: b.labels,
},
Expand All @@ -79,7 +93,7 @@ func (b *builder) deployment(configDigest string) *appsv1.Deployment {
func (b *builder) daemonSet(configDigest string) *appsv1.DaemonSet {
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: constants.FLPName,
Name: constants.FLPName + b.confKindSuffix,
Namespace: b.namespace,
Labels: b.labels,
},
Expand All @@ -97,7 +111,7 @@ func (b *builder) podTemplate(configDigest string) corev1.PodTemplateSpec {
var tolerations []corev1.Toleration
if b.desired.Kind == constants.DaemonSetKind {
ports = []corev1.ContainerPort{{
Name: constants.FLPPortName,
Name: constants.FLPPortName + b.confKindSuffix,
HostPort: b.desired.Port,
ContainerPort: b.desired.Port,
Protocol: corev1.ProtocolUDP,
Expand All @@ -113,7 +127,7 @@ func (b *builder) podTemplate(configDigest string) corev1.PodTemplateSpec {
})

container := corev1.Container{
Name: constants.FLPName,
Name: constants.FLPName + b.confKindSuffix,
Image: b.desired.Image,
ImagePullPolicy: corev1.PullPolicy(b.desired.ImagePullPolicy),
Args: []string{fmt.Sprintf(`--config=%s/%s`, configPath, configFile)},
Expand Down Expand Up @@ -162,7 +176,7 @@ func (b *builder) podTemplate(configDigest string) corev1.PodTemplateSpec {
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: configMapName,
Name: configMapName + b.confKindSuffix,
},
},
},
Expand Down Expand Up @@ -267,7 +281,7 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {

configMap := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Name: configMapName + b.confKindSuffix,
Namespace: b.namespace,
Labels: b.labels,
},
Expand All @@ -285,7 +299,7 @@ func (b *builder) service(old *corev1.Service) *corev1.Service {
if old == nil {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: constants.FLPName,
Name: constants.FLPName + b.confKindSuffix,
Namespace: b.namespace,
Labels: b.labels,
},
Expand All @@ -311,14 +325,14 @@ func (b *builder) service(old *corev1.Service) *corev1.Service {
func (b *builder) autoScaler() *ascv2.HorizontalPodAutoscaler {
return &ascv2.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: constants.FLPName,
Name: constants.FLPName + b.confKindSuffix,
Namespace: b.namespace,
Labels: b.labels,
},
Spec: ascv2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: ascv2.CrossVersionObjectReference{
Kind: constants.DeploymentKind,
Name: constants.FLPName,
Name: constants.FLPName + b.confKindSuffix,
APIVersion: "apps/v1",
},
MinReplicas: b.desired.HPA.MinReplicas,
Expand All @@ -333,17 +347,17 @@ func (b *builder) autoScaler() *ascv2.HorizontalPodAutoscaler {
//+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=create;delete;patch;update;get;watch;list
//+kubebuilder:rbac:groups=core,resources=pods;services;nodes,verbs=get;list;watch

func buildAppLabel() map[string]string {
func buildAppLabel(confKind string) map[string]string {
return map[string]string{
"app": constants.FLPName,
"app": constants.FLPName + flpConfSuffix[confKind],
}
}

func buildClusterRole() *rbacv1.ClusterRole {
return &rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: constants.FLPName,
Labels: buildAppLabel(),
Labels: buildAppLabel(""),
},
Rules: []rbacv1.PolicyRule{{
APIGroups: []string{""},
Expand Down Expand Up @@ -371,7 +385,7 @@ func buildServiceAccount(ns string) *corev1.ServiceAccount {
ObjectMeta: metav1.ObjectMeta{
Name: constants.FLPName,
Namespace: ns,
Labels: buildAppLabel(),
Labels: buildAppLabel(""),
},
}
}
Expand All @@ -380,7 +394,7 @@ func buildClusterRoleBinding(ns string) *rbacv1.ClusterRoleBinding {
return &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: constants.FLPName,
Labels: buildAppLabel(),
Labels: buildAppLabel(""),
},
RoleRef: rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Expand Down
43 changes: 38 additions & 5 deletions controllers/flowlogspipeline/flp_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,17 @@ type lokiSpec = flowsv1alpha1.FlowCollectorLoki

// FLPReconciler reconciles the current flowlogs-pipeline state with the desired configuration
type FLPReconciler struct {
reconcilers.ClientHelper
nobjMngr *reconcilers.NamespacedObjectManager
owned ownedObjects
singleReconcilers []singleFLPReconciler
}

type singleFLPReconciler struct {
reconcilers.ClientHelper
nobjMngr *reconcilers.NamespacedObjectManager
owned ownedObjects
confKind string
}

type ownedObjects struct {
Expand All @@ -35,6 +43,18 @@ type ownedObjects struct {
}

func NewReconciler(cl reconcilers.ClientHelper, ns, prevNS string) FLPReconciler {
owned := ownedObjects{
serviceAccount: &corev1.ServiceAccount{},
}
nobjMngr := reconcilers.NewNamespacedObjectManager(cl, ns, prevNS)
nobjMngr.AddManagedObject(constants.FLPName, owned.serviceAccount)

flpReconciler := FLPReconciler{ClientHelper: cl, nobjMngr: nobjMngr, owned: owned}
flpReconciler.singleReconcilers = append(flpReconciler.singleReconcilers, newSingleReconciler(cl, ns, prevNS, ConfSingle))
return flpReconciler
}

func newSingleReconciler(cl reconcilers.ClientHelper, ns string, prevNS string, confKind string) singleFLPReconciler {
owned := ownedObjects{
deployment: &appsv1.Deployment{},
daemonSet: &appsv1.DaemonSet{},
Expand All @@ -51,7 +71,7 @@ func NewReconciler(cl reconcilers.ClientHelper, ns, prevNS string) FLPReconciler
nobjMngr.AddManagedObject(constants.FLPName, owned.serviceAccount)
nobjMngr.AddManagedObject(configMapName, owned.configMap)

return FLPReconciler{ClientHelper: cl, nobjMngr: nobjMngr, owned: owned}
return singleFLPReconciler{ClientHelper: cl, nobjMngr: nobjMngr, owned: owned, confKind: confKind}
}

// InitStaticResources inits some "static" / one-shot resources, usually not subject to reconciliation
Expand All @@ -62,6 +82,9 @@ func (r *FLPReconciler) InitStaticResources(ctx context.Context) error {
// PrepareNamespaceChange cleans up old namespace and restore the relevant "static" resources
func (r *FLPReconciler) PrepareNamespaceChange(ctx context.Context) error {
// Switching namespace => delete everything in the previous namespace
for _, singleFlp := range r.singleReconcilers {
singleFlp.nobjMngr.CleanupNamespace(ctx)
}
r.nobjMngr.CleanupNamespace(ctx)
return r.createPermissions(ctx, false)
}
Expand All @@ -76,14 +99,24 @@ func validateDesired(desired *flpSpec) error {
return nil
}

// Reconcile is the reconciler entry point to reconcile the current flowlogs-pipeline state with the desired configuration
func (r *FLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec, desiredLoki *lokiSpec) error {
for _, singleFlp := range r.singleReconcilers {
err := singleFlp.Reconcile(ctx, desiredFLP, desiredLoki)
if err != nil {
return err
}
}
return nil
}

// Reconcile is the reconciler entry point to reconcile the current flowlogs-pipeline state with the desired configuration
func (r *singleFLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec, desiredLoki *lokiSpec) error {
err := validateDesired(desiredFLP)
if err != nil {
return err
}

builder := newBuilder(r.nobjMngr.Namespace, desiredFLP, desiredLoki)
builder := newBuilder(r.nobjMngr.Namespace, desiredFLP, desiredLoki, r.confKind)
// Retrieve current owned objects
err = r.nobjMngr.FetchAll(ctx)
if err != nil {
Expand All @@ -110,7 +143,7 @@ func (r *FLPReconciler) Reconcile(ctx context.Context, desiredFLP *flpSpec, desi
}
}

func (r *FLPReconciler) reconcileAsDeployment(ctx context.Context, desiredFLP *flpSpec, builder *builder, configDigest string) error {
func (r *singleFLPReconciler) reconcileAsDeployment(ctx context.Context, desiredFLP *flpSpec, builder *builder, configDigest string) error {
// Kind may have changed: try delete DaemonSet and create Deployment+Service
ns := r.nobjMngr.Namespace
r.nobjMngr.TryDelete(ctx, r.owned.daemonSet)
Expand Down Expand Up @@ -154,7 +187,7 @@ func (r *FLPReconciler) reconcileAsDeployment(ctx context.Context, desiredFLP *f
return nil
}

func (r *FLPReconciler) reconcileAsDaemonSet(ctx context.Context, desiredFLP *flpSpec, builder *builder, configDigest string) error {
func (r *singleFLPReconciler) reconcileAsDaemonSet(ctx context.Context, desiredFLP *flpSpec, builder *builder, configDigest string) error {
// Kind may have changed: try delete Deployment / Service / HPA and create DaemonSet
r.nobjMngr.TryDelete(ctx, r.owned.deployment)
r.nobjMngr.TryDelete(ctx, r.owned.service)
Expand Down
Loading

0 comments on commit 34660db

Please sign in to comment.