Skip to content

Commit

Permalink
Use utils instead of local functions
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling committed Oct 23, 2019
1 parent 78a22a3 commit 7b38638
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 133 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ sigs.k8s.io/controller-runtime v0.2.0/go.mod h1:ZHqrRDZi3f6BzONcvlUxkqCKgwasGk5F
sigs.k8s.io/controller-tools v0.2.0 h1:AmQ/0JKBJAjyAiPAkrAf9QW06jkx2lc5hpxMjamsFpw=
sigs.k8s.io/controller-tools v0.2.0/go.mod h1:8t/X+FVWvk6TaBcsa+UKUBbn7GMtvyBKX30SGl4em6Y=
sigs.k8s.io/controller-tools v0.2.1 h1:HoCik83vXOpPi7KSJWdPRmiGntyOzK0v0BTV4U+pl8o=
sigs.k8s.io/controller-tools v0.2.2 h1:tOXKme2gR7KoM6+7Y+nzjwjbXDgqLfTuX5r7+4dvlig=
sigs.k8s.io/kustomize v2.0.3+incompatible/go.mod h1:MkjgH3RdOWrievjo6c9T245dYlB5QeXV4WCbnt/PEpU=
sigs.k8s.io/testing_frameworks v0.1.1 h1:cP2l8fkA3O9vekpy5Ks8mmA0NW/F7yBdXf8brkWhVrs=
sigs.k8s.io/testing_frameworks v0.1.1/go.mod h1:VVBKrHmJ6Ekkfz284YKhQePcdycOzNH9qL6ht1zEr/U=
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/jaegertracing/v1/jaeger_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const (

// AnnotationProvisionedKafkaKey is a label to be added to Kafkas that have been provisioned by Jaeger
// +k8s:openapi-gen=true
AnnotationProvisionedKafkaKey string = "jaegertracing.io/provisioned"
AnnotationProvisionedKafkaKey string = "jaegertracing.io/kafka-provisioned"

// AnnotationProvisionedKafkaValue is a label to be added to Kafkas that have been provisioned by Jaeger
// +k8s:openapi-gen=true
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/jaegertracing/v1/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,12 @@ func (o *Options) ToArgs() []string {
func (o *Options) Map() map[string]string {
return o.opts
}

// GenericMap returns the map representing the option entries as interface{}, suitable for usage with NewOptions()
func (o *Options) GenericMap() map[string]interface{} {
out := make(map[string]interface{})
for k, v := range o.opts {
out[k] = v
}
return out
}
8 changes: 4 additions & 4 deletions pkg/autodetect/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func TestAutoDetectKafkaProvisionWithKafkaOperator(t *testing.T) {
assert.Equal(t, v1.FlagProvisionKafkaYes, viper.GetString("kafka-provision"))
}

func TestAutoDetectKafkaExplicitTrue(t *testing.T) {
func TestAutoDetectKafkaExplicitYes(t *testing.T) {
// prepare
viper.Set("kafka-provision", v1.FlagProvisionKafkaYes)
defer viper.Reset()
Expand All @@ -294,7 +294,7 @@ func TestAutoDetectKafkaExplicitTrue(t *testing.T) {
assert.Equal(t, v1.FlagProvisionKafkaYes, viper.GetString("kafka-provision"))
}

func TestAutoDetectKafkaExplicitFalse(t *testing.T) {
func TestAutoDetectKafkaExplicitNo(t *testing.T) {
// prepare
viper.Set("kafka-provision", v1.FlagProvisionKafkaNo)
defer viper.Reset()
Expand All @@ -307,7 +307,7 @@ func TestAutoDetectKafkaExplicitFalse(t *testing.T) {
b.autoDetectCapabilities()

// verify
assert.False(t, viper.GetBool("kafka-provision"))
assert.Equal(t, v1.FlagProvisionKafkaNo, viper.GetString("kafka-provision"))
}

func TestAutoDetectKafkaDefaultNoOperator(t *testing.T) {
Expand All @@ -323,7 +323,7 @@ func TestAutoDetectKafkaDefaultNoOperator(t *testing.T) {
b.autoDetectCapabilities()

// verify
assert.False(t, viper.GetBool("kafka-provision"))
assert.Equal(t, v1.FlagProvisionKafkaNo, viper.GetString("kafka-provision"))
}

func TestAutoDetectKafkaDefaultWithOperator(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/jaeger/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ func TestKafkaDelete(t *testing.T) {
defer viper.Reset()

nsn := types.NamespacedName{
Name: "TestKafkaDelete",
Name: "TestKafkaDelete",
Namespace: "tenant1",
}

orig := v1beta1.Kafka{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func Persistent(jaeger *v1.Jaeger) v1beta1.Kafka {
"app": "jaeger",
"app.kubernetes.io/name": jaeger.Name,
"app.kubernetes.io/instance": jaeger.Name,
"app.kubernetes.io/component": "kafkauser",
"app.kubernetes.io/component": "kafka",
"app.kubernetes.io/part-of": "jaeger",

// workaround for https://github.com/strimzi/strimzi-kafka-operator/issues/2107
Expand Down
89 changes: 15 additions & 74 deletions pkg/strategy/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/jaegertracing/jaeger-operator/pkg/kafka"
"github.com/jaegertracing/jaeger-operator/pkg/route"
"github.com/jaegertracing/jaeger-operator/pkg/storage"
"github.com/jaegertracing/jaeger-operator/pkg/util"
)

func newStreamingStrategy(jaeger *v1.Jaeger) S {
Expand Down Expand Up @@ -47,8 +48,8 @@ func newStreamingStrategy(jaeger *v1.Jaeger) S {
manifest.configMaps = append(manifest.configMaps, *cm)
}

_, pfound := asOptionsMap(jaeger.Spec.Collector.Options.Map())["kafka.producer.brokers"]
_, cfound := asOptionsMap(jaeger.Spec.Ingester.Options.Map())["kafka.consumer.brokers"]
_, pfound := jaeger.Spec.Collector.Options.GenericMap()["kafka.producer.brokers"]
_, cfound := jaeger.Spec.Ingester.Options.GenericMap()["kafka.consumer.brokers"]
provisioned := jaeger.Annotations[v1.AnnotationProvisionedKafkaKey] == v1.AnnotationProvisionedKafkaValue

// we provision a Kafka when no brokers have been set, or, when we are not in the first run,
Expand Down Expand Up @@ -126,9 +127,12 @@ func autoProvisionKafka(jaeger *v1.Jaeger, manifest S) S {
clusterCAPath := fmt.Sprintf("/var/run/secrets/%s-cluster-ca", jaeger.Name)
clientCertPath := fmt.Sprintf("/var/run/secrets/%s", ku.Name)

// store the new volumes/volume mounts in a common spec, later to be merged with the instance's common spec
commonSpec := v1.JaegerCommonSpec{}

// this is the volume containing the client TLS details, like the cert and key
kuVolume := corev1.Volume{
Name: ku.Name,
Name: fmt.Sprintf("kafkauser-%s", ku.Name),
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: ku.Name,
Expand All @@ -137,30 +141,30 @@ func autoProvisionKafka(jaeger *v1.Jaeger, manifest S) S {
}
// this is the volume containing the CA cluster cert
kuCAVolume := corev1.Volume{
Name: fmt.Sprintf("%s-cluster-ca", jaeger.Name), // the cluster name is the jaeger name
Name: fmt.Sprintf("kafkauser-%s-cluster-ca", jaeger.Name), // the cluster name is the jaeger name
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: fmt.Sprintf("%s-cluster-ca-cert", jaeger.Name),
},
},
}
jaeger.Spec.Volumes = replaceVolume(jaeger.Spec.Volumes, kuVolume, kuCAVolume)
commonSpec.Volumes = append(commonSpec.Volumes, kuVolume, kuCAVolume)

// and finally, the mount paths to have the secrets in the container file system
kuVolumeMount := corev1.VolumeMount{
Name: ku.Name,
Name: fmt.Sprintf("kafkauser-%s", ku.Name),
MountPath: clientCertPath,
}
kuCAVolumeMount := corev1.VolumeMount{
Name: fmt.Sprintf("%s-cluster-ca", jaeger.Name), // the cluster name is the jaeger name
Name: fmt.Sprintf("kafkauser-%s-cluster-ca", jaeger.Name), // the cluster name is the jaeger name
MountPath: clusterCAPath,
}
jaeger.Spec.VolumeMounts = replaceVolumeMount(jaeger.Spec.VolumeMounts, kuVolumeMount, kuCAVolumeMount)
commonSpec.VolumeMounts = append(commonSpec.VolumeMounts, kuVolumeMount, kuCAVolumeMount)

brokers := fmt.Sprintf("%s-kafka-bootstrap.kafka.svc.cluster.local:9093", k.Name)

collectorOpts := asOptionsMap(jaeger.Spec.Collector.Options.Map())
ingesterOpts := asOptionsMap(jaeger.Spec.Ingester.Options.Map())
collectorOpts := jaeger.Spec.Collector.Options.GenericMap()
ingesterOpts := jaeger.Spec.Ingester.Options.GenericMap()

collectorOpts["kafka.producer.brokers"] = brokers
collectorOpts["kafka.producer.authentication"] = "tls"
Expand All @@ -182,70 +186,7 @@ func autoProvisionKafka(jaeger *v1.Jaeger, manifest S) S {

jaeger.Spec.Collector.Options = v1.NewOptions(collectorOpts)
jaeger.Spec.Ingester.Options = v1.NewOptions(ingesterOpts)
jaeger.Spec.JaegerCommonSpec = *util.Merge([]v1.JaegerCommonSpec{commonSpec, jaeger.Spec.JaegerCommonSpec})

return manifest
}

func asOptionsMap(in map[string]string) map[string]interface{} {
out := make(map[string]interface{})
for k, v := range in {
out[k] = v
}
return out
}

func replaceVolume(volumes []corev1.Volume, name ...corev1.Volume) []corev1.Volume {
out := []corev1.Volume{}

for _, v := range volumes {
add := true

for _, n := range name {
if n.Name == v.Name {
// we have an existing volume with the same name one of the new volumes
// skip adding the old one to the output
add = false
break
}
}

if add {
out = append(out, v)
}
}

// now, we add the new volumes
for _, n := range name {
out = append(out, n)
}

return out
}

func replaceVolumeMount(volumes []corev1.VolumeMount, name ...corev1.VolumeMount) []corev1.VolumeMount {
out := []corev1.VolumeMount{}

for _, v := range volumes {
add := true

for _, n := range name {
if n.Name == v.Name {
// we have an existing volume with the same name one of the new volumes
// skip adding the old one to the output
add = false
break
}
}

if add {
out = append(out, v)
}
}

// now, we add the new volumes
for _, n := range name {
out = append(out, n)
}

return out
}
72 changes: 21 additions & 51 deletions pkg/strategy/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,17 @@ func TestAutoProvisionedKafkaInjectsIntoInstance(t *testing.T) {

func TestReplaceVolume(t *testing.T) {
// prepare
existing := []corev1.Volume{
instance := v1.NewJaeger(types.NamespacedName{Name: "my-instance", Namespace: "tenant-1"})
instance.Spec.Volumes = []corev1.Volume{
{
Name: "volume-a",
Name: "kafkauser-my-instance",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: "secret-name-a",
},
},
}, {
Name: "volume-b",
Name: "kafkauser-my-instance-cluster-ca",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: "secret-name-b",
Expand All @@ -297,83 +298,52 @@ func TestReplaceVolume(t *testing.T) {
},
}

new := []corev1.Volume{
{
Name: "volume-a",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: "new-secret-name-a",
},
},
}, {
Name: "volume-b",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: "new-secret-name-b",
},
},
},
}

// test
res := replaceVolume(existing, new...)
autoProvisionKafka(instance, newStreamingStrategy(instance))

// verify
assert.Len(t, res, 3)
assert.Len(t, instance.Spec.Volumes, 3)

found := 0
for _, v := range res {
if v.Name == "volume-a" {
for _, v := range instance.Spec.Volumes {
if v.Name == "kafkauser-my-instance-cluster-ca" {
found = found + 1
assert.Equal(t, "new-secret-name-a", v.VolumeSource.Secret.SecretName)
assert.Equal(t, "my-instance-cluster-ca-cert", v.VolumeSource.Secret.SecretName)
}
if v.Name == "volume-b" {
if v.Name == "kafkauser-my-instance" {
found = found + 1
assert.Equal(t, "new-secret-name-b", v.VolumeSource.Secret.SecretName)
assert.Equal(t, "my-instance", v.VolumeSource.Secret.SecretName)
}
}
assert.Equal(t, 2, found)
}

func TestReplaceVolumeMount(t *testing.T) {
// prepare
existing := []corev1.VolumeMount{
instance := v1.NewJaeger(types.NamespacedName{Name: "my-instance", Namespace: "tenant-1"})
instance.Spec.VolumeMounts = []corev1.VolumeMount{
{
Name: "volume-a",
Name: "kafkauser-my-instance-cluster-ca",
MountPath: "/var/path",
}, {
Name: "volume-b",
MountPath: "/var/path-b",
Name: "kafkauser-my-instance",
MountPath: "/var/path",
}, {
Name: "volume-c",
MountPath: "/var/path-c",
},
}

new := []corev1.VolumeMount{
{
Name: "volume-a",
MountPath: "/user/path",
}, {
Name: "volume-b",
MountPath: "/user/path-b",
},
}

// test
res := replaceVolumeMount(existing, new...)
autoProvisionKafka(instance, newStreamingStrategy(instance))

// verify
assert.Len(t, res, 3)
assert.Len(t, instance.Spec.VolumeMounts, 3)
found := 0
for _, v := range res {
if v.Name == "volume-a" {
found = found + 1
assert.Equal(t, "/user/path", v.MountPath)
}
if v.Name == "volume-b" {
for _, v := range instance.Spec.VolumeMounts {
if v.Name == "kafkauser-my-instance-cluster-ca" || v.Name == "kafkauser-my-instance" {
found = found + 1
assert.Equal(t, "/user/path-b", v.MountPath)
assert.True(t, strings.HasPrefix(v.MountPath, "/var/run/secrets"))
}
}
assert.Equal(t, 2, found)
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/jaegertracing/jaeger-operator/pkg/apis/jaegertracing/v1"
v1 "github.com/jaegertracing/jaeger-operator/pkg/apis/jaegertracing/v1"
)

// removeDuplicatedVolumes returns a unique list of Volumes based on Volume names. Only the first item is kept.
Expand Down

0 comments on commit 7b38638

Please sign in to comment.