From afb0d3b1e08315d21bc4b9f45cfb4ca53db6b646 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Tue, 17 Jan 2023 16:28:59 -0500 Subject: [PATCH 1/8] Replace hardcoded strings with const vars --- ...ontroller_externallistenerbindings_test.go | 3 +- pkg/resources/cruisecontrol/configmap.go | 24 +++--- pkg/resources/kafka/configmap.go | 82 +++++++++---------- pkg/util/kafka/common.go | 24 ++---- pkg/util/kafka/const.go | 58 +++++++++++++ 5 files changed, 117 insertions(+), 74 deletions(-) create mode 100644 pkg/util/kafka/const.go diff --git a/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go b/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go index b9b59322f..61d0d4994 100644 --- a/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go +++ b/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "github.com/banzaicloud/koperator/api/v1beta1" + kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka" properties "github.com/banzaicloud/koperator/properties/pkg" ) @@ -54,7 +55,7 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster *v1beta1 listeners, found := brokerConfig.Get("listeners") Expect(found).To(BeTrue()) Expect(listeners.Value()).To(Equal("INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094")) - listenerSecMap, found := brokerConfig.Get("listener.security.protocol.map") + listenerSecMap, found := brokerConfig.Get(kafkautils.KafkaConfigSecurityProtocolMap) Expect(found).To(BeTrue()) Expect(listenerSecMap.Value()).To(Equal("INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT")) // check service diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index 976cbc395..63e7eeac9 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -56,14 +56,14 @@ func (r *Reconciler) configMap(clientPass string, capacityConfig string, log log if err != nil { log.Error(err, "getting Kafka bootstrap servers for Cruise Control failed") } - if err = ccConfig.Set("bootstrap.servers", bootstrapServers); err != nil { - log.Error(err, "setting bootstrap.servers in Cruise Control configuration failed", "config", bootstrapServers) + if err = ccConfig.Set(kafkautils.KafkaConfigBoostrapServers, bootstrapServers); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in Cruise Control configuration failed", kafkautils.KafkaConfigBoostrapServers), "config", bootstrapServers) } // Add Zookeeper configuration zkConnect := zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath()) - if err = ccConfig.Set("zookeeper.connect", zkConnect); err != nil { - log.Error(err, "setting zookeeper.connect in Cruise Control configuration failed", "config", zkConnect) + if err = ccConfig.Set(kafkautils.KafkaConfigZooKeeperConnect, zkConnect); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in Cruise Control configuration failed", kafkautils.KafkaConfigZooKeeperConnect), "config", zkConnect) } // Add SSL configuration @@ -97,18 +97,18 @@ func generateSSLConfig(kafkaCluster v1beta1.KafkaClusterSpec, clientPass string, trustStoreLoc := keystoreVolumePath + "/" + v1alpha1.TLSJKSTrustStore sslConfig := map[string]string{ - "security.protocol": "SSL", - "ssl.truststore.type": "JKS", - "ssl.keystore.type": "JKS", - "ssl.truststore.location": trustStoreLoc, - "ssl.keystore.location": keyStoreLoc, - "ssl.keystore.password": clientPass, - "ssl.truststore.password": clientPass, + kafkautils.KafkaConfigSecurityProtocol: "SSL", + kafkautils.KafkaConfigSSLTrustStoreType: "JKS", + kafkautils.KafkaConfigSSLKeystoreType: "JKS", + kafkautils.KafkaConfigSSLTrustStoreLocation: trustStoreLoc, + kafkautils.KafkaConfigSSLKeyStoreLocation: keyStoreLoc, + kafkautils.KafkaConfigSSLKeyStorePassword: clientPass, + kafkautils.KafkaConfigSSLTrustStorePassword: clientPass, } for k, v := range sslConfig { if err := config.Set(k, v); err != nil { - log.Error(err, fmt.Sprintf("setting %s parameter in cruise control configuration resulted an error", k)) + log.Error(err, fmt.Sprintf("setting '%s' parameter in Cruise Control configuration resulted an error", k)) } } } diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 067a12588..d790ee7a1 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -38,8 +38,6 @@ import ( properties "github.com/banzaicloud/koperator/properties/pkg" ) -const brokerLogDirPropertyName = "log.dirs" - func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, serverPasses map[string]string, clientPass string, superUsers []string, log logr.Logger) *properties.Properties { @@ -52,22 +50,22 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 // Add listener configuration advertisedListenerConf := generateAdvertisedListenerConfig(id, r.KafkaCluster.Spec.ListenersConfig, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses) if len(advertisedListenerConf) > 0 { - if err := config.Set("advertised.listeners", advertisedListenerConf); err != nil { - log.Error(err, "setting advertised.listeners in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigAdvertisedListeners, advertisedListenerConf); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigAdvertisedListeners)) } } // Add control plane listener cclConf := generateControlPlaneListener(r.KafkaCluster.Spec.ListenersConfig.InternalListeners) if cclConf != "" { - if err := config.Set("control.plane.listener.name", cclConf); err != nil { - log.Error(err, "setting control.plane.listener.name parameter in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigControlPlaneListener, cclConf); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigControlPlaneListener)) } } // Add Zookeeper configuration - if err := config.Set("zookeeper.connect", zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath())); err != nil { - log.Error(err, "setting zookeeper.connect parameter in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigZooKeeperConnect, zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath())); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigZooKeeperConnect)) } // Add Cruise Control Metrics Reporter SSL configuration @@ -79,11 +77,11 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 trustStoreLoc := clientKeystorePath + "/" + v1alpha1.TLSJKSTrustStore sslConfig := map[string]string{ - "security.protocol": "SSL", - "ssl.truststore.location": trustStoreLoc, - "ssl.keystore.location": keyStoreLoc, - "ssl.keystore.password": clientPass, - "ssl.truststore.password": clientPass, + kafkautils.KafkaConfigSecurityProtocol: "SSL", + kafkautils.KafkaConfigSSLTrustStoreLocation: trustStoreLoc, + kafkautils.KafkaConfigSSLKeyStoreLocation: keyStoreLoc, + kafkautils.KafkaConfigSSLKeyStorePassword: clientPass, + kafkautils.KafkaConfigSSLTrustStorePassword: clientPass, } for k, v := range sslConfig { @@ -94,23 +92,23 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 } // Add Cruise Control Metrics Reporter configuration - if err := config.Set("metric.reporters", "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil { - log.Error(err, "setting metric.reporters in broker configuration resulted an error") + if err := config.Set(kafkautils.CruiseControlConfigMetricsReporters, "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporters)) } bootstrapServers, err := kafkautils.GetBootstrapServersService(r.KafkaCluster) if err != nil { log.Error(err, "getting Kafka bootstrap servers for Cruise Control failed") } - if err := config.Set("cruise.control.metrics.reporter.bootstrap.servers", bootstrapServers); err != nil { - log.Error(err, "setting cruise.control.metrics.reporter.bootstrap.servers in broker configuration resulted an error") + if err := config.Set(kafkautils.CruiseControlConfigMetricsReportersBootstrapServers, bootstrapServers); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReportersBootstrapServers)) } - if err := config.Set("cruise.control.metrics.reporter.kubernetes.mode", true); err != nil { - log.Error(err, "setting cruise.control.metrics.reporter.kubernetes.mode in broker configuration resulted an error") + if err := config.Set(kafkautils.CruiseControlConfigMetricsReporterK8sMode, true); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporterK8sMode)) } // Kafka Broker configuration - if err := config.Set("broker.id", id); err != nil { - log.Error(err, "setting broker.id in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigBrokerId, id); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigBrokerId)) } // This logic prevents the removal of the mountPath from the broker configmap @@ -123,7 +121,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 mountPathsOld, err := getMountPathsFromBrokerConfigMap(&brokerConfigMapOld) if err != nil { - log.Error(err, "could not get mounthPaths from broker configmap", v1beta1.BrokerIdLabelKey, id) + log.Error(err, "could not get mountPaths from broker configmap", v1beta1.BrokerIdLabelKey, id) } mountPathsNew := generateStorageConfig(bConfig.StorageConfigs) mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew) @@ -133,16 +131,16 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 } if len(mountPathsMerged) != 0 { - if err := config.Set(brokerLogDirPropertyName, strings.Join(mountPathsMerged, ",")); err != nil { - log.Error(err, "setting log.dirs in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigBrokerLogDirectory, strings.Join(mountPathsMerged, ",")); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigBrokerLogDirectory)) } } // Add superuser configuration su := strings.Join(generateSuperUsers(superUsers), ";") if su != "" { - if err := config.Set("super.users", su); err != nil { - log.Error(err, "setting super.users in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigSuperUsers, su); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigSuperUsers)) } } return config @@ -238,7 +236,7 @@ func getMountPathsFromBrokerConfigMap(configMap *v1.ConfigMap) ([]string, error) if err != nil { return nil, err } - brokerLogDirProperty, found := brokerConfigProperties.Get(brokerLogDirPropertyName) + brokerLogDirProperty, found := brokerConfigProperties.Get(kafkautils.KafkaConfigBrokerLogDirectory) if !found || brokerLogDirProperty.Value() == "" { return nil, nil } @@ -303,14 +301,14 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map generateListenerSSLConfig(config, eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name], log) } } - if err := config.Set("listener.security.protocol.map", securityProtocolMapConfig); err != nil { - log.Error(err, "setting listener.security.protocol.map parameter in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigSecurityProtocolMap, securityProtocolMapConfig); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigSecurityProtocolMap)) } - if err := config.Set("inter.broker.listener.name", interBrokerListenerName); err != nil { - log.Error(err, "setting inter.broker.listener.name parameter in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName)) } - if err := config.Set("listeners", listenerConfig); err != nil { - log.Error(err, "setting listeners parameter in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigListeners, listenerConfig); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListeners)) } return config } @@ -324,24 +322,24 @@ func generateListenerSSLConfig(config *properties.Properties, name string, sslCl trustStoreLoc := namedKeystorePath + "/" + v1alpha1.TLSJKSTrustStore listenerSSLConfig = map[string]string{ - fmt.Sprintf(`listener.name.%s.ssl.keystore.location`, name): keyStoreLoc, - fmt.Sprintf("listener.name.%s.ssl.truststore.location", name): trustStoreLoc, - fmt.Sprintf("listener.name.%s.ssl.keystore.type", name): keyStoreType, - fmt.Sprintf("listener.name.%s.ssl.truststore.type", name): trustStoreType, - fmt.Sprintf("listener.name.%s.ssl.truststore.password", name): password, - fmt.Sprintf("listener.name.%s.ssl.keystore.password", name): password, + fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLKeyStoreLocation): keyStoreLoc, + fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLTrustStoreLocation): trustStoreLoc, + fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLKeystoreType): keyStoreType, + fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLTrustStoreType): trustStoreType, + fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLTrustStorePassword): password, + fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLKeyStorePassword): password, } // enable 2-way SSL authentication if SSL is enabled but this field is not provided in the listener config if sslClientAuth == "" { - listenerSSLConfig[fmt.Sprintf("listener.name.%s.ssl.client.auth", name)] = string(v1beta1.SSLClientAuthRequired) + listenerSSLConfig[fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLClientAuth)] = string(v1beta1.SSLClientAuthRequired) } else { - listenerSSLConfig[fmt.Sprintf("listener.name.%s.ssl.client.auth", name)] = string(sslClientAuth) + listenerSSLConfig[fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLClientAuth)] = string(sslClientAuth) } for k, v := range listenerSSLConfig { if err := config.Set(k, v); err != nil { - log.Error(err, fmt.Sprintf("setting %s parameter in broker configuration resulted an error", k)) + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", k)) } } } diff --git a/pkg/util/kafka/common.go b/pkg/util/kafka/common.go index b3d93ee8a..77d3ec5ea 100644 --- a/pkg/util/kafka/common.go +++ b/pkg/util/kafka/common.go @@ -28,29 +28,15 @@ import ( "github.com/banzaicloud/koperator/pkg/util" ) -const ( - // AllBrokerServiceTemplate template for Kafka all broker service - AllBrokerServiceTemplate = "%s-all-broker" - // HeadlessServiceTemplate template for Kafka headless service - HeadlessServiceTemplate = "%s-headless" - // NodePortServiceTemplate template for Kafka nodeport service - NodePortServiceTemplate = "%s-%d-%s" - - //ConfigPropertyName name in the ConfigMap's Data field for the broker configuration - ConfigPropertyName = "broker-config" - securityProtocolMapConfigName = "listener.security.protocol.map" -) - // PerBrokerConfigs configurations will not trigger rolling upgrade when updated var PerBrokerConfigs = []string{ // currently hardcoded in configmap.go - "ssl.client.auth", + KafkaConfigSSLClientAuth, // listener related config change will trigger rolling upgrade anyways due to pod spec change - "listeners", - "advertised.listeners", - - securityProtocolMapConfigName, + KafkaConfigListeners, + KafkaConfigAdvertisedListeners, + KafkaConfigSecurityProtocolMap, } // commonACLString is the raw representation of an ACL allowing Describe on a Topic @@ -114,7 +100,7 @@ func ShouldRefreshOnlyPerBrokerConfigs(currentConfigs, desiredConfigs *propertie log.V(1).Info("configs have been changed", "configs", configDiff) - if diff, ok := configDiff[securityProtocolMapConfigName]; ok { + if diff, ok := configDiff[KafkaConfigSecurityProtocolMap]; ok { if listenersSecurityProtocolChanged(diff[0].Value(), diff[1].Value()) { return false } diff --git a/pkg/util/kafka/const.go b/pkg/util/kafka/const.go new file mode 100644 index 000000000..f9dddda5a --- /dev/null +++ b/pkg/util/kafka/const.go @@ -0,0 +1,58 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +const ( + // AllBrokerServiceTemplate template for Kafka all broker service + AllBrokerServiceTemplate = "%s-all-broker" + // HeadlessServiceTemplate template for Kafka headless service + HeadlessServiceTemplate = "%s-headless" + // NodePortServiceTemplate template for Kafka nodeport service + NodePortServiceTemplate = "%s-%d-%s" + + //ConfigPropertyName name in the ConfigMap's Data field for the broker configuration + ConfigPropertyName = "broker-config" +) + +// used for Kafka configurations +const ( + KafkaConfigSecurityProtocolMap = "listener.security.protocol.map" + KafkaConfigInterBrokerListenerName = "inter.broker.listener.name" + KafkaConfigListeners = "listeners" + KafkaConfigAdvertisedListeners = "advertised.listeners" + KafkaConfigBoostrapServers = "bootstrap.servers" + KafkaConfigZooKeeperConnect = "zookeeper.connect" + KafkaConfigBrokerId = "broker.id" + KafkaConfigControlPlaneListener = "control.plane.listener.name" + KafkaConfigBrokerLogDirectory = "log.dirs" + KafkaConfigSuperUsers = "super.users" + + KafkaConfigSecurityProtocol = "security.protocol" + KafkaConfigSSLTrustStoreType = "ssl.truststore.type" + KafkaConfigSSLTrustStoreLocation = "ssl.truststore.location" + KafkaConfigSSLTrustStorePassword = "ssl.truststore.password" + KafkaConfigSSLKeystoreType = "ssl.keystore.type" + KafkaConfigSSLKeyStoreLocation = "ssl.keystore.location" + KafkaConfigSSLKeyStorePassword = "ssl.keystore.password" + + KafkaConfigSSLClientAuth = "ssl.client.auth" +) + +// used for Cruise Control configurations +const ( + CruiseControlConfigMetricsReporters = "metric.reporters" + CruiseControlConfigMetricsReportersBootstrapServers = "cruise.control.metrics.reporter.bootstrap.servers" + CruiseControlConfigMetricsReporterK8sMode = "cruise.control.metrics.reporter.kubernetes.mode" +) From b1aeea258fdd8f53174f6040760ea951d66d4422 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Tue, 17 Jan 2023 16:33:40 -0500 Subject: [PATCH 2/8] Reorder const vars based on configuration types --- pkg/util/kafka/const.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/util/kafka/const.go b/pkg/util/kafka/const.go index f9dddda5a..6d1b65e3a 100644 --- a/pkg/util/kafka/const.go +++ b/pkg/util/kafka/const.go @@ -28,26 +28,27 @@ const ( // used for Kafka configurations const ( + KafkaConfigSuperUsers = "super.users" + + KafkaConfigBoostrapServers = "bootstrap.servers" + KafkaConfigZooKeeperConnect = "zookeeper.connect" + KafkaConfigBrokerId = "broker.id" + KafkaConfigBrokerLogDirectory = "log.dirs" + + KafkaConfigListeners = "listeners" KafkaConfigSecurityProtocolMap = "listener.security.protocol.map" KafkaConfigInterBrokerListenerName = "inter.broker.listener.name" - KafkaConfigListeners = "listeners" KafkaConfigAdvertisedListeners = "advertised.listeners" - KafkaConfigBoostrapServers = "bootstrap.servers" - KafkaConfigZooKeeperConnect = "zookeeper.connect" - KafkaConfigBrokerId = "broker.id" KafkaConfigControlPlaneListener = "control.plane.listener.name" - KafkaConfigBrokerLogDirectory = "log.dirs" - KafkaConfigSuperUsers = "super.users" KafkaConfigSecurityProtocol = "security.protocol" + KafkaConfigSSLClientAuth = "ssl.client.auth" KafkaConfigSSLTrustStoreType = "ssl.truststore.type" KafkaConfigSSLTrustStoreLocation = "ssl.truststore.location" KafkaConfigSSLTrustStorePassword = "ssl.truststore.password" KafkaConfigSSLKeystoreType = "ssl.keystore.type" KafkaConfigSSLKeyStoreLocation = "ssl.keystore.location" KafkaConfigSSLKeyStorePassword = "ssl.keystore.password" - - KafkaConfigSSLClientAuth = "ssl.client.auth" ) // used for Cruise Control configurations From 55c12a213e0ad2f29ff50e06de25264d33ab07f9 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Wed, 18 Jan 2023 14:59:06 -0500 Subject: [PATCH 3/8] Update const var name --- ...cluster_controller_externallistenerbindings_test.go | 2 +- pkg/resources/kafka/configmap.go | 4 ++-- pkg/util/kafka/common.go | 4 ++-- pkg/util/kafka/const.go | 10 +++++----- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go b/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go index 61d0d4994..38d05acc1 100644 --- a/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go +++ b/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go @@ -55,7 +55,7 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster *v1beta1 listeners, found := brokerConfig.Get("listeners") Expect(found).To(BeTrue()) Expect(listeners.Value()).To(Equal("INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094")) - listenerSecMap, found := brokerConfig.Get(kafkautils.KafkaConfigSecurityProtocolMap) + listenerSecMap, found := brokerConfig.Get(kafkautils.KafkaConfigListenerSecurityProtocolMap) Expect(found).To(BeTrue()) Expect(listenerSecMap.Value()).To(Equal("INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT")) // check service diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index d790ee7a1..67ec54fe9 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -301,8 +301,8 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map generateListenerSSLConfig(config, eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name], log) } } - if err := config.Set(kafkautils.KafkaConfigSecurityProtocolMap, securityProtocolMapConfig); err != nil { - log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigSecurityProtocolMap)) + if err := config.Set(kafkautils.KafkaConfigListenerSecurityProtocolMap, securityProtocolMapConfig); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListenerSecurityProtocolMap)) } if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil { log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName)) diff --git a/pkg/util/kafka/common.go b/pkg/util/kafka/common.go index 77d3ec5ea..52ff1a97a 100644 --- a/pkg/util/kafka/common.go +++ b/pkg/util/kafka/common.go @@ -36,7 +36,7 @@ var PerBrokerConfigs = []string{ // listener related config change will trigger rolling upgrade anyways due to pod spec change KafkaConfigListeners, KafkaConfigAdvertisedListeners, - KafkaConfigSecurityProtocolMap, + KafkaConfigListenerSecurityProtocolMap, } // commonACLString is the raw representation of an ACL allowing Describe on a Topic @@ -100,7 +100,7 @@ func ShouldRefreshOnlyPerBrokerConfigs(currentConfigs, desiredConfigs *propertie log.V(1).Info("configs have been changed", "configs", configDiff) - if diff, ok := configDiff[KafkaConfigSecurityProtocolMap]; ok { + if diff, ok := configDiff[KafkaConfigListenerSecurityProtocolMap]; ok { if listenersSecurityProtocolChanged(diff[0].Value(), diff[1].Value()) { return false } diff --git a/pkg/util/kafka/const.go b/pkg/util/kafka/const.go index 6d1b65e3a..e5eaf30bc 100644 --- a/pkg/util/kafka/const.go +++ b/pkg/util/kafka/const.go @@ -35,11 +35,11 @@ const ( KafkaConfigBrokerId = "broker.id" KafkaConfigBrokerLogDirectory = "log.dirs" - KafkaConfigListeners = "listeners" - KafkaConfigSecurityProtocolMap = "listener.security.protocol.map" - KafkaConfigInterBrokerListenerName = "inter.broker.listener.name" - KafkaConfigAdvertisedListeners = "advertised.listeners" - KafkaConfigControlPlaneListener = "control.plane.listener.name" + KafkaConfigListeners = "listeners" + KafkaConfigListenerSecurityProtocolMap = "listener.security.protocol.map" + KafkaConfigInterBrokerListenerName = "inter.broker.listener.name" + KafkaConfigAdvertisedListeners = "advertised.listeners" + KafkaConfigControlPlaneListener = "control.plane.listener.name" KafkaConfigSecurityProtocol = "security.protocol" KafkaConfigSSLClientAuth = "ssl.client.auth" From 41bd0e62c89e8cf05b64485639edef52d7924ca1 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Wed, 18 Jan 2023 15:49:56 -0500 Subject: [PATCH 4/8] Make 'listener.name' a const var --- pkg/resources/kafka/configmap.go | 16 ++++++++-------- pkg/util/kafka/const.go | 1 + 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 67ec54fe9..413ea1405 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -322,19 +322,19 @@ func generateListenerSSLConfig(config *properties.Properties, name string, sslCl trustStoreLoc := namedKeystorePath + "/" + v1alpha1.TLSJKSTrustStore listenerSSLConfig = map[string]string{ - fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLKeyStoreLocation): keyStoreLoc, - fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLTrustStoreLocation): trustStoreLoc, - fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLKeystoreType): keyStoreType, - fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLTrustStoreType): trustStoreType, - fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLTrustStorePassword): password, - fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLKeyStorePassword): password, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLKeyStoreLocation): keyStoreLoc, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLTrustStoreLocation): trustStoreLoc, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLKeystoreType): keyStoreType, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLTrustStoreType): trustStoreType, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLTrustStorePassword): password, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLKeyStorePassword): password, } // enable 2-way SSL authentication if SSL is enabled but this field is not provided in the listener config if sslClientAuth == "" { - listenerSSLConfig[fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLClientAuth)] = string(v1beta1.SSLClientAuthRequired) + listenerSSLConfig[fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLClientAuth)] = string(v1beta1.SSLClientAuthRequired) } else { - listenerSSLConfig[fmt.Sprintf("listener.name.%s.%s", name, kafkautils.KafkaConfigSSLClientAuth)] = string(sslClientAuth) + listenerSSLConfig[fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLClientAuth)] = string(sslClientAuth) } for k, v := range listenerSSLConfig { diff --git a/pkg/util/kafka/const.go b/pkg/util/kafka/const.go index e5eaf30bc..684535429 100644 --- a/pkg/util/kafka/const.go +++ b/pkg/util/kafka/const.go @@ -36,6 +36,7 @@ const ( KafkaConfigBrokerLogDirectory = "log.dirs" KafkaConfigListeners = "listeners" + KafkaConfigListenerName = "listener.name" KafkaConfigListenerSecurityProtocolMap = "listener.security.protocol.map" KafkaConfigInterBrokerListenerName = "inter.broker.listener.name" KafkaConfigAdvertisedListeners = "advertised.listeners" From ccd5fbd59d6d4eff5ce6e6e18e6a5fa8c0e3116b Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 16 Feb 2023 15:10:40 -0500 Subject: [PATCH 5/8] Fix typo --- pkg/resources/envoy/envoy.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/resources/envoy/envoy.go b/pkg/resources/envoy/envoy.go index 12f98575b..71f2efe30 100644 --- a/pkg/resources/envoy/envoy.go +++ b/pkg/resources/envoy/envoy.go @@ -61,22 +61,22 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if err != nil { return err } - var externalListernerResources []resources.ResourceWithLogAndExternalListenerSpecificInfos - externalListernerResources = append(externalListernerResources, + var externalListenerResources []resources.ResourceWithLogAndExternalListenerSpecificInfos + externalListenerResources = append(externalListenerResources, r.service, r.configMap, r.deployment, ) if r.KafkaCluster.Spec.EnvoyConfig.GetDistruptionBudget().DisruptionBudget.Create { - externalListernerResources = append(externalListernerResources, r.podDisruptionBudget) + externalListenerResources = append(externalListenerResources, r.podDisruptionBudget) } for name, ingressConfig := range ingressConfigs { if !util.IsIngressConfigInUse(name, defaultControllerName, r.KafkaCluster, log) { continue } - for _, res := range externalListernerResources { + for _, res := range externalListenerResources { o := res(log, eListener, ingressConfig, name, defaultControllerName) err := k8sutil.Reconcile(log, r.Client, o, r.KafkaCluster) if err != nil { From 101f1d341421f226a526ec7788daadf2c7c2409b Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 16 Feb 2023 15:11:32 -0500 Subject: [PATCH 6/8] Replace super.users with const --- pkg/resources/kafka/configmap.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index f85cdf891..cd7a4b43a 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -347,11 +347,11 @@ func generateListenerSSLConfig(config *properties.Properties, name string, sslCl // mergeSuperUsersPropertyValue merges the target and source super.users property value, and returns it as string. // It returns empty string when there were no updates or any of the super.users property value was empty. func mergeSuperUsersPropertyValue(source *properties.Properties, target *properties.Properties) string { - sourceVal, foundSource := source.Get("super.users") + sourceVal, foundSource := source.Get(kafkautils.KafkaConfigSuperUsers) if !foundSource || sourceVal.IsEmpty() { return "" } - targetVal, foundTarget := target.Get("super.users") + targetVal, foundTarget := target.Get(kafkautils.KafkaConfigSuperUsers) if !foundTarget || targetVal.IsEmpty() { return "" } @@ -397,7 +397,7 @@ func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerC if suMerged := mergeSuperUsersPropertyValue(finalBrokerConfig, opGenConf); suMerged != "" { // Setting string value for a property is not going to run into error, also we don't return error in this function //nolint:errcheck - opGenConf.Set("super.users", suMerged) + opGenConf.Set(kafkautils.KafkaConfigSuperUsers, suMerged) } finalBrokerConfig.Merge(opGenConf) } From 473d82e5310913b38a2b666de5e74668ae46bee0 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 16 Feb 2023 15:15:58 -0500 Subject: [PATCH 7/8] Convert unnecessary exported variables into private variables --- pkg/resources/kafka/configmap.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index cd7a4b43a..33b4fdffa 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -281,10 +281,10 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map log.Error(errors.New("inter broker listener name already set"), "config error") } } - UpperedListenerType := iListener.Type.ToUpperString() - UpperedListenerName := strings.ToUpper(iListener.Name) - securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", UpperedListenerName, UpperedListenerType)) - listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", UpperedListenerName, iListener.ContainerPort)) + upperedListenerType := iListener.Type.ToUpperString() + upperedListenerName := strings.ToUpper(iListener.Name) + securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType)) + listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, iListener.ContainerPort)) // Add internal listeners SSL configuration if iListener.Type == v1beta1.SecurityProtocolSSL { generateListenerSSLConfig(config, iListener.Name, iListener.SSLClientAuth, serverPasses[iListener.Name], log) @@ -292,10 +292,10 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map } for _, eListener := range l.ExternalListeners { - UpperedListenerType := eListener.Type.ToUpperString() - UpperedListenerName := strings.ToUpper(eListener.Name) - securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", UpperedListenerName, UpperedListenerType)) - listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", UpperedListenerName, eListener.ContainerPort)) + upperedListenerType := eListener.Type.ToUpperString() + upperedListenerName := strings.ToUpper(eListener.Name) + securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType)) + listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, eListener.ContainerPort)) // Add external listeners SSL configuration if eListener.Type == v1beta1.SecurityProtocolSSL { generateListenerSSLConfig(config, eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name], log) From ea97e7e26b8fc6b26d11d3a90f4f9b5e9472ec88 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Tue, 21 Feb 2023 10:47:54 -0500 Subject: [PATCH 8/8] Move consts for templates from const.go to template.go --- pkg/util/kafka/const.go | 13 ++----------- pkg/util/kafka/template.go | 24 ++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 11 deletions(-) create mode 100644 pkg/util/kafka/template.go diff --git a/pkg/util/kafka/const.go b/pkg/util/kafka/const.go index 684535429..e286db10e 100644 --- a/pkg/util/kafka/const.go +++ b/pkg/util/kafka/const.go @@ -14,17 +14,8 @@ package kafka -const ( - // AllBrokerServiceTemplate template for Kafka all broker service - AllBrokerServiceTemplate = "%s-all-broker" - // HeadlessServiceTemplate template for Kafka headless service - HeadlessServiceTemplate = "%s-headless" - // NodePortServiceTemplate template for Kafka nodeport service - NodePortServiceTemplate = "%s-%d-%s" - - //ConfigPropertyName name in the ConfigMap's Data field for the broker configuration - ConfigPropertyName = "broker-config" -) +// ConfigPropertyName name in the ConfigMap's Data field for the broker configuration +const ConfigPropertyName = "broker-config" // used for Kafka configurations const ( diff --git a/pkg/util/kafka/template.go b/pkg/util/kafka/template.go new file mode 100644 index 000000000..32e140c6a --- /dev/null +++ b/pkg/util/kafka/template.go @@ -0,0 +1,24 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +const ( + // AllBrokerServiceTemplate template for Kafka all broker service + AllBrokerServiceTemplate = "%s-all-broker" + // HeadlessServiceTemplate template for Kafka headless service + HeadlessServiceTemplate = "%s-headless" + // NodePortServiceTemplate template for Kafka nodeport service + NodePortServiceTemplate = "%s-%d-%s" +)