Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add a hash suffix based on the spec for pipeline #244

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/argoproj/pkg v0.13.7-0.20230626144333-d56162821bd1
github.com/prometheus/client_golang v1.18.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/kubernetes v1.29.6
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
)

Expand Down Expand Up @@ -138,7 +139,6 @@ require (
k8s.io/kube-aggregator v0.29.6 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/kubectl v0.29.6 // indirect
k8s.io/kubernetes v1.29.6 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3 // indirect
sigs.k8s.io/kustomize/kyaml v0.14.3-0.20230601165947-6ce0bf390ce3 // indirect
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/isbservicerollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ var _ = Describe("ISBServiceRollout Controller", Ordered, func() {

It("Should auto heal the InterStepBufferService with the ISBServiceRollout pipeline spec when the InterStepBufferService spec is changed", func() {
By("updating the InterStepBufferService and verifying the changed field is the same as the original and not the modified version")
verifyAutoHealing(ctx, numaflowv1.ISBGroupVersionKind, defaultNamespace, defaultISBSvcRolloutName, "spec.jetstream.version", "1.2.3.4.5")
lookupKey := types.NamespacedName{Name: defaultISBSvcRolloutName, Namespace: defaultNamespace}
verifyAutoHealing(ctx, numaflowv1.ISBGroupVersionKind, lookupKey, "spec.jetstream.version", "1.2.3.4.5")
})

It("Should delete the ISBServiceRollout and InterStepBufferService", func() {
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/monovertexrollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ var _ = Describe("MonoVertexRollout Controller", Ordered, func() {

It("Should auto heal the MonoVertex when the spec is directly changed", func() {
By("Updating the MonoVertex and verifying the changed field is the same")
verifyAutoHealing(ctx, numaflowv1.MonoVertexGroupVersionKind, namespace, monoVertexRolloutName, "spec.source.udsource.container.image", "wrong-image")
lookupKey := types.NamespacedName{Name: monoVertexRolloutName, Namespace: namespace}
verifyAutoHealing(ctx, numaflowv1.MonoVertexGroupVersionKind, lookupKey, "spec.source.udsource.container.image", "wrong-image")
})

It("Should delete the MonoVertexRollout and MonoVertex", func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ var _ = Describe("NumaflowControllerRollout Controller", Ordered, func() {

It("Should auto heal the Numaflow Controller Deployment with the spec based on the NumaflowControllerRollout version field value when the Deployment spec is changed", func() {
By("updating the Numaflow Controller Deployment and verifying the changed field is the same as the original and not the modified version")
verifyAutoHealing(ctx, appsv1.SchemeGroupVersion.WithKind("Deployment"), namespace, "numaflow-controller", "spec.template.spec.serviceAccountName", "someothersaname")
lookupKey := types.NamespacedName{Name: "numaflow-controller", Namespace: namespace}
verifyAutoHealing(ctx, appsv1.SchemeGroupVersion.WithKind("Deployment"), lookupKey, "spec.template.spec.serviceAccountName", "someothersaname")
})

It("Should auto heal the numaflow-cmd-params-config ConfigMap with the spec based on the NumaflowControllerRollout version field value when the ConfigMap spec is changed", func() {
By("updating the numaflow-cmd-params-config ConfigMap and verifying the changed field is the same as the original and not the modified version")
verifyAutoHealing(ctx, corev1.SchemeGroupVersion.WithKind("ConfigMap"), namespace, "numaflow-cmd-params-config", "data.namespaced", "false")
lookupKey := types.NamespacedName{Name: "numaflow-cmd-params-config", Namespace: namespace}
verifyAutoHealing(ctx, corev1.SchemeGroupVersion.WithKind("ConfigMap"), lookupKey, "data.namespaced", "false")
})

AfterAll(func() {
Expand Down
50 changes: 36 additions & 14 deletions internal/controller/pipelinerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"maps"
"strings"
"sync"
Expand All @@ -32,9 +33,11 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
hashutil "k8s.io/kubernetes/pkg/util/hash"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
runtimecontroller "sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -370,8 +373,8 @@ func (r *PipelineRolloutReconciler) reconcile(
// Object already exists
// if Pipeline is not owned by Rollout, fail and return
if !checkOwnerRef(existingPipelineDef.OwnerReferences, pipelineRollout.UID) {
pipelineRollout.Status.MarkFailed(fmt.Sprintf("Pipeline %s already exists in namespace", pipelineRollout.Name))
return false, nil
numaLogger.Debugf("PipelineRollout %s failed because Pipeline %s already exists in namespace", pipelineRollout.Name, existingPipelineDef.Name)
return false, fmt.Errorf("pipeline %s already exists in namespace", existingPipelineDef.Name)
}
newPipelineDef = mergePipeline(existingPipelineDef, newPipelineDef)
err = r.processExistingPipeline(ctx, pipelineRollout, existingPipelineDef, newPipelineDef, syncStartTime)
Expand Down Expand Up @@ -399,9 +402,14 @@ func mergePipeline(existingPipeline *kubernetes.GenericObject, newPipeline *kube
resultPipeline.Labels = maps.Clone(newPipeline.Labels)
return resultPipeline
}
func (r *PipelineRolloutReconciler) processExistingPipeline(ctx context.Context, pipelineRollout *apiv1.PipelineRollout,
existingPipelineDef *kubernetes.GenericObject, newPipelineDef *kubernetes.GenericObject, syncStartTime time.Time) error {

func (r *PipelineRolloutReconciler) processExistingPipeline(
ctx context.Context,
pipelineRollout *apiv1.PipelineRollout,
existingPipelineDef *kubernetes.GenericObject,
newPipelineDef *kubernetes.GenericObject,
syncStartTime time.Time,
) error {
// Get the fields we need from both the Pipeline spec we have and the one we want
// TODO: consider having one struct which include our GenericObject plus our PipelineSpec so we can avoid multiple repeat conversions

Expand Down Expand Up @@ -836,20 +844,17 @@ func updatePipelineSpec(ctx context.Context, restConfig *rest.Config, obj *kuber
return kubernetes.UpdateCR(ctx, restConfig, obj, "pipelines")
}

func pipelineLabels(pipelineRollout *apiv1.PipelineRollout) (map[string]string, error) {
var pipelineSpec PipelineSpec
labelMapping := map[string]string{
common.LabelKeyISBServiceNameForPipeline: "default",
}
if err := json.Unmarshal(pipelineRollout.Spec.Pipeline.Spec.Raw, &pipelineSpec); err != nil {
return nil, fmt.Errorf("failed to unmarshal pipeline spec: %v", err)
}
func pipelineLabels(pipelineSpec *numaflowv1.PipelineSpec) (map[string]string, error) {
labelMapping := make(map[string]string)
if pipelineSpec.InterStepBufferServiceName != "" {
labelMapping[common.LabelKeyISBServiceNameForPipeline] = pipelineSpec.InterStepBufferServiceName
} else {
labelMapping[common.LabelKeyISBServiceNameForPipeline] = "default"
Copy link
Collaborator

@juliev0 juliev0 Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe doesn't need to be done in this PR, but are we going to add a label for PipelineRollout name? Then we can easily find all Pipelines using that PipelineRollout. I know the Pipeline name will be prefixed with the PipelineRollout name, but I think this would enable easy selection both through kubectl and potentially in the code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless we can do a query on OwnerReference I guess...but just to note that Numaflow includes labels like this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, I have just created this issue which depends on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that will be the follow up PR for this.

}

return labelMapping, nil
}

func (r *PipelineRolloutReconciler) updatePipelineRolloutStatus(ctx context.Context, pipelineRollout *apiv1.PipelineRollout) error {
rawSpec := runtime.RawExtension{}
err := util.StructToStruct(&pipelineRollout.Spec, &rawSpec)
Expand Down Expand Up @@ -879,7 +884,13 @@ func (r *PipelineRolloutReconciler) updatePipelineRolloutStatusToFailed(ctx cont
}

func makePipelineDefinition(pipelineRollout *apiv1.PipelineRollout) (*kubernetes.GenericObject, error) {
labels, err := pipelineLabels(pipelineRollout)
var pipelineSpec numaflowv1.PipelineSpec
err := json.Unmarshal(pipelineRollout.Spec.Pipeline.Spec.Raw, &pipelineSpec)
if err != nil {
return nil, fmt.Errorf("failed to convert Pipeline spec %q into PipelineSpec type, err=%v", string(pipelineRollout.Spec.Pipeline.Spec.Raw), err)
}

labels, err := pipelineLabels(&pipelineSpec)
if err != nil {
return nil, err
}
Expand All @@ -890,7 +901,7 @@ func makePipelineDefinition(pipelineRollout *apiv1.PipelineRollout) (*kubernetes
APIVersion: "numaflow.numaproj.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: pipelineRollout.Name,
Name: GetPipelineName(pipelineRollout.Name, &pipelineSpec),
Namespace: pipelineRollout.Namespace,
Labels: labels,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pipelineRollout.GetObjectMeta(), apiv1.PipelineRolloutGroupVersionKind)},
Expand Down Expand Up @@ -923,3 +934,14 @@ func (r *PipelineRolloutReconciler) ErrorHandler(pipelineRollout *apiv1.Pipeline
r.customMetrics.PipelinesSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(pipelineRollout, corev1.EventTypeWarning, reason, msg+" %v", err.Error())
}

// GetPipelineName returns the pipeline name determined by the name of the
// pipeline rollout, and suffix with a hash value calculated from pipeline
// spec. The hash will be safe encoded to avoid bad words.
func GetPipelineName(pipelineRolloutName string, pipelineSpec *numaflowv1.PipelineSpec) string {
pipelineSpecHasher := fnv.New32a()
hashutil.DeepHashObject(pipelineSpecHasher, pipelineSpec)
hash := rand.SafeEncodeString(fmt.Sprint(pipelineSpecHasher.Sum32()))

return pipelineRolloutName + "-" + hash
}
Loading
Loading