diff --git a/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go b/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go index b9b59322f..38d05acc1 100644 --- a/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go +++ b/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "github.com/banzaicloud/koperator/api/v1beta1" + kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka" properties "github.com/banzaicloud/koperator/properties/pkg" ) @@ -54,7 +55,7 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster *v1beta1 listeners, found := brokerConfig.Get("listeners") Expect(found).To(BeTrue()) Expect(listeners.Value()).To(Equal("INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094")) - listenerSecMap, found := brokerConfig.Get("listener.security.protocol.map") + listenerSecMap, found := brokerConfig.Get(kafkautils.KafkaConfigListenerSecurityProtocolMap) Expect(found).To(BeTrue()) Expect(listenerSecMap.Value()).To(Equal("INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT")) // check service diff --git a/pkg/resources/cruisecontrol/configmap.go b/pkg/resources/cruisecontrol/configmap.go index 976cbc395..63e7eeac9 100644 --- a/pkg/resources/cruisecontrol/configmap.go +++ b/pkg/resources/cruisecontrol/configmap.go @@ -56,14 +56,14 @@ func (r *Reconciler) configMap(clientPass string, capacityConfig string, log log if err != nil { log.Error(err, "getting Kafka bootstrap servers for Cruise Control failed") } - if err = ccConfig.Set("bootstrap.servers", bootstrapServers); err != nil { - log.Error(err, "setting bootstrap.servers in Cruise Control configuration failed", "config", bootstrapServers) + if err = ccConfig.Set(kafkautils.KafkaConfigBoostrapServers, bootstrapServers); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in Cruise Control configuration failed", kafkautils.KafkaConfigBoostrapServers), "config", bootstrapServers) } // Add Zookeeper configuration zkConnect := zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath()) - if err = ccConfig.Set("zookeeper.connect", zkConnect); err != nil { - log.Error(err, "setting zookeeper.connect in Cruise Control configuration failed", "config", zkConnect) + if err = ccConfig.Set(kafkautils.KafkaConfigZooKeeperConnect, zkConnect); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in Cruise Control configuration failed", kafkautils.KafkaConfigZooKeeperConnect), "config", zkConnect) } // Add SSL configuration @@ -97,18 +97,18 @@ func generateSSLConfig(kafkaCluster v1beta1.KafkaClusterSpec, clientPass string, trustStoreLoc := keystoreVolumePath + "/" + v1alpha1.TLSJKSTrustStore sslConfig := map[string]string{ - "security.protocol": "SSL", - "ssl.truststore.type": "JKS", - "ssl.keystore.type": "JKS", - "ssl.truststore.location": trustStoreLoc, - "ssl.keystore.location": keyStoreLoc, - "ssl.keystore.password": clientPass, - "ssl.truststore.password": clientPass, + kafkautils.KafkaConfigSecurityProtocol: "SSL", + kafkautils.KafkaConfigSSLTrustStoreType: "JKS", + kafkautils.KafkaConfigSSLKeystoreType: "JKS", + kafkautils.KafkaConfigSSLTrustStoreLocation: trustStoreLoc, + kafkautils.KafkaConfigSSLKeyStoreLocation: keyStoreLoc, + kafkautils.KafkaConfigSSLKeyStorePassword: clientPass, + kafkautils.KafkaConfigSSLTrustStorePassword: clientPass, } for k, v := range sslConfig { if err := config.Set(k, v); err != nil { - log.Error(err, fmt.Sprintf("setting %s parameter in cruise control configuration resulted an error", k)) + log.Error(err, fmt.Sprintf("setting '%s' parameter in Cruise Control configuration resulted an error", k)) } } } diff --git a/pkg/resources/envoy/envoy.go b/pkg/resources/envoy/envoy.go index 12f98575b..71f2efe30 100644 --- a/pkg/resources/envoy/envoy.go +++ b/pkg/resources/envoy/envoy.go @@ -61,22 +61,22 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if err != nil { return err } - var externalListernerResources []resources.ResourceWithLogAndExternalListenerSpecificInfos - externalListernerResources = append(externalListernerResources, + var externalListenerResources []resources.ResourceWithLogAndExternalListenerSpecificInfos + externalListenerResources = append(externalListenerResources, r.service, r.configMap, r.deployment, ) if r.KafkaCluster.Spec.EnvoyConfig.GetDistruptionBudget().DisruptionBudget.Create { - externalListernerResources = append(externalListernerResources, r.podDisruptionBudget) + externalListenerResources = append(externalListenerResources, r.podDisruptionBudget) } for name, ingressConfig := range ingressConfigs { if !util.IsIngressConfigInUse(name, defaultControllerName, r.KafkaCluster, log) { continue } - for _, res := range externalListernerResources { + for _, res := range externalListenerResources { o := res(log, eListener, ingressConfig, name, defaultControllerName) err := k8sutil.Reconcile(log, r.Client, o, r.KafkaCluster) if err != nil { diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index cbd260c5f..33b4fdffa 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -38,8 +38,6 @@ import ( properties "github.com/banzaicloud/koperator/properties/pkg" ) -const brokerLogDirPropertyName = "log.dirs" - func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, serverPasses map[string]string, clientPass string, superUsers []string, log logr.Logger) *properties.Properties { @@ -52,22 +50,22 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 // Add listener configuration advertisedListenerConf := generateAdvertisedListenerConfig(id, r.KafkaCluster.Spec.ListenersConfig, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses) if len(advertisedListenerConf) > 0 { - if err := config.Set("advertised.listeners", advertisedListenerConf); err != nil { - log.Error(err, "setting advertised.listeners in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigAdvertisedListeners, advertisedListenerConf); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigAdvertisedListeners)) } } // Add control plane listener cclConf := generateControlPlaneListener(r.KafkaCluster.Spec.ListenersConfig.InternalListeners) if cclConf != "" { - if err := config.Set("control.plane.listener.name", cclConf); err != nil { - log.Error(err, "setting control.plane.listener.name parameter in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigControlPlaneListener, cclConf); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigControlPlaneListener)) } } // Add Zookeeper configuration - if err := config.Set("zookeeper.connect", zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath())); err != nil { - log.Error(err, "setting zookeeper.connect parameter in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigZooKeeperConnect, zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath())); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigZooKeeperConnect)) } // Add Cruise Control Metrics Reporter SSL configuration @@ -79,11 +77,11 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 trustStoreLoc := clientKeystorePath + "/" + v1alpha1.TLSJKSTrustStore sslConfig := map[string]string{ - "security.protocol": "SSL", - "ssl.truststore.location": trustStoreLoc, - "ssl.keystore.location": keyStoreLoc, - "ssl.keystore.password": clientPass, - "ssl.truststore.password": clientPass, + kafkautils.KafkaConfigSecurityProtocol: "SSL", + kafkautils.KafkaConfigSSLTrustStoreLocation: trustStoreLoc, + kafkautils.KafkaConfigSSLKeyStoreLocation: keyStoreLoc, + kafkautils.KafkaConfigSSLKeyStorePassword: clientPass, + kafkautils.KafkaConfigSSLTrustStorePassword: clientPass, } for k, v := range sslConfig { @@ -94,23 +92,23 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 } // Add Cruise Control Metrics Reporter configuration - if err := config.Set("metric.reporters", "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil { - log.Error(err, "setting metric.reporters in broker configuration resulted an error") + if err := config.Set(kafkautils.CruiseControlConfigMetricsReporters, "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporters)) } bootstrapServers, err := kafkautils.GetBootstrapServersService(r.KafkaCluster) if err != nil { log.Error(err, "getting Kafka bootstrap servers for Cruise Control failed") } - if err := config.Set("cruise.control.metrics.reporter.bootstrap.servers", bootstrapServers); err != nil { - log.Error(err, "setting cruise.control.metrics.reporter.bootstrap.servers in broker configuration resulted an error") + if err := config.Set(kafkautils.CruiseControlConfigMetricsReportersBootstrapServers, bootstrapServers); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReportersBootstrapServers)) } - if err := config.Set("cruise.control.metrics.reporter.kubernetes.mode", true); err != nil { - log.Error(err, "setting cruise.control.metrics.reporter.kubernetes.mode in broker configuration resulted an error") + if err := config.Set(kafkautils.CruiseControlConfigMetricsReporterK8sMode, true); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporterK8sMode)) } // Kafka Broker configuration - if err := config.Set("broker.id", id); err != nil { - log.Error(err, "setting broker.id in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigBrokerId, id); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigBrokerId)) } // This logic prevents the removal of the mountPath from the broker configmap @@ -123,7 +121,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 mountPathsOld, err := getMountPathsFromBrokerConfigMap(&brokerConfigMapOld) if err != nil { - log.Error(err, "could not get mounthPaths from broker configmap", v1beta1.BrokerIdLabelKey, id) + log.Error(err, "could not get mountPaths from broker configmap", v1beta1.BrokerIdLabelKey, id) } mountPathsNew := generateStorageConfig(bConfig.StorageConfigs) mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew) @@ -133,16 +131,16 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 } if len(mountPathsMerged) != 0 { - if err := config.Set(brokerLogDirPropertyName, strings.Join(mountPathsMerged, ",")); err != nil { - log.Error(err, "setting log.dirs in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigBrokerLogDirectory, strings.Join(mountPathsMerged, ",")); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigBrokerLogDirectory)) } } // Add superuser configuration su := strings.Join(generateSuperUsers(superUsers), ";") if su != "" { - if err := config.Set("super.users", su); err != nil { - log.Error(err, "setting super.users in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigSuperUsers, su); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigSuperUsers)) } } return config @@ -238,7 +236,7 @@ func getMountPathsFromBrokerConfigMap(configMap *v1.ConfigMap) ([]string, error) if err != nil { return nil, err } - brokerLogDirProperty, found := brokerConfigProperties.Get(brokerLogDirPropertyName) + brokerLogDirProperty, found := brokerConfigProperties.Get(kafkautils.KafkaConfigBrokerLogDirectory) if !found || brokerLogDirProperty.Value() == "" { return nil, nil } @@ -283,10 +281,10 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map log.Error(errors.New("inter broker listener name already set"), "config error") } } - UpperedListenerType := iListener.Type.ToUpperString() - UpperedListenerName := strings.ToUpper(iListener.Name) - securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", UpperedListenerName, UpperedListenerType)) - listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", UpperedListenerName, iListener.ContainerPort)) + upperedListenerType := iListener.Type.ToUpperString() + upperedListenerName := strings.ToUpper(iListener.Name) + securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType)) + listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, iListener.ContainerPort)) // Add internal listeners SSL configuration if iListener.Type == v1beta1.SecurityProtocolSSL { generateListenerSSLConfig(config, iListener.Name, iListener.SSLClientAuth, serverPasses[iListener.Name], log) @@ -294,23 +292,23 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map } for _, eListener := range l.ExternalListeners { - UpperedListenerType := eListener.Type.ToUpperString() - UpperedListenerName := strings.ToUpper(eListener.Name) - securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", UpperedListenerName, UpperedListenerType)) - listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", UpperedListenerName, eListener.ContainerPort)) + upperedListenerType := eListener.Type.ToUpperString() + upperedListenerName := strings.ToUpper(eListener.Name) + securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType)) + listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, eListener.ContainerPort)) // Add external listeners SSL configuration if eListener.Type == v1beta1.SecurityProtocolSSL { generateListenerSSLConfig(config, eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name], log) } } - if err := config.Set("listener.security.protocol.map", securityProtocolMapConfig); err != nil { - log.Error(err, "setting listener.security.protocol.map parameter in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigListenerSecurityProtocolMap, securityProtocolMapConfig); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListenerSecurityProtocolMap)) } - if err := config.Set("inter.broker.listener.name", interBrokerListenerName); err != nil { - log.Error(err, "setting inter.broker.listener.name parameter in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName)) } - if err := config.Set("listeners", listenerConfig); err != nil { - log.Error(err, "setting listeners parameter in broker configuration resulted an error") + if err := config.Set(kafkautils.KafkaConfigListeners, listenerConfig); err != nil { + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListeners)) } return config } @@ -324,24 +322,24 @@ func generateListenerSSLConfig(config *properties.Properties, name string, sslCl trustStoreLoc := namedKeystorePath + "/" + v1alpha1.TLSJKSTrustStore listenerSSLConfig = map[string]string{ - fmt.Sprintf(`listener.name.%s.ssl.keystore.location`, name): keyStoreLoc, - fmt.Sprintf("listener.name.%s.ssl.truststore.location", name): trustStoreLoc, - fmt.Sprintf("listener.name.%s.ssl.keystore.type", name): keyStoreType, - fmt.Sprintf("listener.name.%s.ssl.truststore.type", name): trustStoreType, - fmt.Sprintf("listener.name.%s.ssl.truststore.password", name): password, - fmt.Sprintf("listener.name.%s.ssl.keystore.password", name): password, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLKeyStoreLocation): keyStoreLoc, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLTrustStoreLocation): trustStoreLoc, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLKeystoreType): keyStoreType, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLTrustStoreType): trustStoreType, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLTrustStorePassword): password, + fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLKeyStorePassword): password, } // enable 2-way SSL authentication if SSL is enabled but this field is not provided in the listener config if sslClientAuth == "" { - listenerSSLConfig[fmt.Sprintf("listener.name.%s.ssl.client.auth", name)] = string(v1beta1.SSLClientAuthRequired) + listenerSSLConfig[fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLClientAuth)] = string(v1beta1.SSLClientAuthRequired) } else { - listenerSSLConfig[fmt.Sprintf("listener.name.%s.ssl.client.auth", name)] = string(sslClientAuth) + listenerSSLConfig[fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLClientAuth)] = string(sslClientAuth) } for k, v := range listenerSSLConfig { if err := config.Set(k, v); err != nil { - log.Error(err, fmt.Sprintf("setting %s parameter in broker configuration resulted an error", k)) + log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", k)) } } } @@ -349,11 +347,11 @@ func generateListenerSSLConfig(config *properties.Properties, name string, sslCl // mergeSuperUsersPropertyValue merges the target and source super.users property value, and returns it as string. // It returns empty string when there were no updates or any of the super.users property value was empty. func mergeSuperUsersPropertyValue(source *properties.Properties, target *properties.Properties) string { - sourceVal, foundSource := source.Get("super.users") + sourceVal, foundSource := source.Get(kafkautils.KafkaConfigSuperUsers) if !foundSource || sourceVal.IsEmpty() { return "" } - targetVal, foundTarget := target.Get("super.users") + targetVal, foundTarget := target.Get(kafkautils.KafkaConfigSuperUsers) if !foundTarget || targetVal.IsEmpty() { return "" } @@ -399,7 +397,7 @@ func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerC if suMerged := mergeSuperUsersPropertyValue(finalBrokerConfig, opGenConf); suMerged != "" { // Setting string value for a property is not going to run into error, also we don't return error in this function //nolint:errcheck - opGenConf.Set("super.users", suMerged) + opGenConf.Set(kafkautils.KafkaConfigSuperUsers, suMerged) } finalBrokerConfig.Merge(opGenConf) } diff --git a/pkg/util/kafka/common.go b/pkg/util/kafka/common.go index b3d93ee8a..52ff1a97a 100644 --- a/pkg/util/kafka/common.go +++ b/pkg/util/kafka/common.go @@ -28,29 +28,15 @@ import ( "github.com/banzaicloud/koperator/pkg/util" ) -const ( - // AllBrokerServiceTemplate template for Kafka all broker service - AllBrokerServiceTemplate = "%s-all-broker" - // HeadlessServiceTemplate template for Kafka headless service - HeadlessServiceTemplate = "%s-headless" - // NodePortServiceTemplate template for Kafka nodeport service - NodePortServiceTemplate = "%s-%d-%s" - - //ConfigPropertyName name in the ConfigMap's Data field for the broker configuration - ConfigPropertyName = "broker-config" - securityProtocolMapConfigName = "listener.security.protocol.map" -) - // PerBrokerConfigs configurations will not trigger rolling upgrade when updated var PerBrokerConfigs = []string{ // currently hardcoded in configmap.go - "ssl.client.auth", + KafkaConfigSSLClientAuth, // listener related config change will trigger rolling upgrade anyways due to pod spec change - "listeners", - "advertised.listeners", - - securityProtocolMapConfigName, + KafkaConfigListeners, + KafkaConfigAdvertisedListeners, + KafkaConfigListenerSecurityProtocolMap, } // commonACLString is the raw representation of an ACL allowing Describe on a Topic @@ -114,7 +100,7 @@ func ShouldRefreshOnlyPerBrokerConfigs(currentConfigs, desiredConfigs *propertie log.V(1).Info("configs have been changed", "configs", configDiff) - if diff, ok := configDiff[securityProtocolMapConfigName]; ok { + if diff, ok := configDiff[KafkaConfigListenerSecurityProtocolMap]; ok { if listenersSecurityProtocolChanged(diff[0].Value(), diff[1].Value()) { return false } diff --git a/pkg/util/kafka/const.go b/pkg/util/kafka/const.go new file mode 100644 index 000000000..e286db10e --- /dev/null +++ b/pkg/util/kafka/const.go @@ -0,0 +1,51 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +// ConfigPropertyName name in the ConfigMap's Data field for the broker configuration +const ConfigPropertyName = "broker-config" + +// used for Kafka configurations +const ( + KafkaConfigSuperUsers = "super.users" + + KafkaConfigBoostrapServers = "bootstrap.servers" + KafkaConfigZooKeeperConnect = "zookeeper.connect" + KafkaConfigBrokerId = "broker.id" + KafkaConfigBrokerLogDirectory = "log.dirs" + + KafkaConfigListeners = "listeners" + KafkaConfigListenerName = "listener.name" + KafkaConfigListenerSecurityProtocolMap = "listener.security.protocol.map" + KafkaConfigInterBrokerListenerName = "inter.broker.listener.name" + KafkaConfigAdvertisedListeners = "advertised.listeners" + KafkaConfigControlPlaneListener = "control.plane.listener.name" + + KafkaConfigSecurityProtocol = "security.protocol" + KafkaConfigSSLClientAuth = "ssl.client.auth" + KafkaConfigSSLTrustStoreType = "ssl.truststore.type" + KafkaConfigSSLTrustStoreLocation = "ssl.truststore.location" + KafkaConfigSSLTrustStorePassword = "ssl.truststore.password" + KafkaConfigSSLKeystoreType = "ssl.keystore.type" + KafkaConfigSSLKeyStoreLocation = "ssl.keystore.location" + KafkaConfigSSLKeyStorePassword = "ssl.keystore.password" +) + +// used for Cruise Control configurations +const ( + CruiseControlConfigMetricsReporters = "metric.reporters" + CruiseControlConfigMetricsReportersBootstrapServers = "cruise.control.metrics.reporter.bootstrap.servers" + CruiseControlConfigMetricsReporterK8sMode = "cruise.control.metrics.reporter.kubernetes.mode" +) diff --git a/pkg/util/kafka/template.go b/pkg/util/kafka/template.go new file mode 100644 index 000000000..32e140c6a --- /dev/null +++ b/pkg/util/kafka/template.go @@ -0,0 +1,24 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +const ( + // AllBrokerServiceTemplate template for Kafka all broker service + AllBrokerServiceTemplate = "%s-all-broker" + // HeadlessServiceTemplate template for Kafka headless service + HeadlessServiceTemplate = "%s-headless" + // NodePortServiceTemplate template for Kafka nodeport service + NodePortServiceTemplate = "%s-%d-%s" +)