diff --git a/flytepropeller/go.mod b/flytepropeller/go.mod index 1bf5324863..05c6933acf 100644 --- a/flytepropeller/go.mod +++ b/flytepropeller/go.mod @@ -15,6 +15,7 @@ require ( github.com/golang/protobuf v1.4.3 github.com/google/uuid v1.2.0 github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 + github.com/imdario/mergo v0.3.11 // indirect github.com/magiconair/properties v1.8.4 github.com/mitchellh/mapstructure v1.4.1 github.com/pkg/errors v0.9.1 diff --git a/flytepropeller/manager/manager.go b/flytepropeller/manager/manager.go index fd3cc6e9b8..5b7538fe2e 100644 --- a/flytepropeller/manager/manager.go +++ b/flytepropeller/manager/manager.go @@ -15,6 +15,8 @@ import ( "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" + "github.com/imdario/mergo" + "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/api/core/v1" @@ -163,15 +165,24 @@ func (m *Manager) createPods(ctx context.Context) error { errs := stderrors.ErrorCollection{} for i, podName := range podNames { if exists := podExists[podName]; !exists { + baseObjectMeta := podTemplate.Template.ObjectMeta.DeepCopy() + objectMeta := metav1.ObjectMeta{ + Annotations: podAnnotations, + Name: podName, + Namespace: m.podNamespace, + Labels: podLabels, + OwnerReferences: m.ownerReferences, + } + + err = mergo.Merge(baseObjectMeta, objectMeta, mergo.WithOverride, mergo.WithAppendSlice) + if err != nil { + errs.Append(fmt.Errorf("failed to initialize pod ObjectMeta for '%s' [%v]", podName, err)) + continue + } + pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: podAnnotations, - Name: podName, - Namespace: m.podNamespace, - Labels: podLabels, - OwnerReferences: m.ownerReferences, - }, - Spec: *podTemplate.Template.Spec.DeepCopy(), + ObjectMeta: *baseObjectMeta, + Spec: *podTemplate.Template.Spec.DeepCopy(), } err := m.shardStrategy.UpdatePodSpec(&pod.Spec, m.podTemplateContainerName, i) diff --git a/flytepropeller/manager/manager_test.go b/flytepropeller/manager/manager_test.go index 9eb831d8b6..3886970063 100644 --- a/flytepropeller/manager/manager_test.go +++ b/flytepropeller/manager/manager_test.go @@ -25,6 +25,15 @@ var ( ResourceVersion: "0", }, Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "foo": "bar", + }, + Labels: map[string]string{ + "app": "foo", + "bar": "baz", + }, + }, Spec: v1.PodSpec{ Containers: []v1.Container{ v1.Container{ @@ -84,6 +93,12 @@ func TestCreatePods(t *testing.T) { assert.NoError(t, err) assert.Equal(t, tt.shardStrategy.GetPodCount(), len(pods.Items)) + for _, pod := range pods.Items { + assert.Equal(t, pod.Annotations["foo"], "bar") + assert.Equal(t, pod.Labels["app"], "flytepropeller") + assert.Equal(t, pod.Labels["bar"], "baz") + } + // execute again to ensure no new pods are created err = manager.createPods(ctx) assert.NoError(t, err)