Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor hard-coded configuration strings into const variables to improve code readability and extensibility #917

Merged
merged 11 commits into from
Feb 22, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/banzaicloud/koperator/api/v1beta1"
kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka"
properties "github.com/banzaicloud/koperator/properties/pkg"
)

Expand Down Expand Up @@ -54,7 +55,7 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster *v1beta1
listeners, found := brokerConfig.Get("listeners")
Expect(found).To(BeTrue())
Expect(listeners.Value()).To(Equal("INTERNAL://:29092,CONTROLLER://:29093,TEST://:9094"))
listenerSecMap, found := brokerConfig.Get("listener.security.protocol.map")
listenerSecMap, found := brokerConfig.Get(kafkautils.KafkaConfigListenerSecurityProtocolMap)
Expect(found).To(BeTrue())
Expect(listenerSecMap.Value()).To(Equal("INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT"))
// check service
Expand Down
24 changes: 12 additions & 12 deletions pkg/resources/cruisecontrol/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func (r *Reconciler) configMap(clientPass string, capacityConfig string, log log
if err != nil {
log.Error(err, "getting Kafka bootstrap servers for Cruise Control failed")
}
if err = ccConfig.Set("bootstrap.servers", bootstrapServers); err != nil {
log.Error(err, "setting bootstrap.servers in Cruise Control configuration failed", "config", bootstrapServers)
if err = ccConfig.Set(kafkautils.KafkaConfigBoostrapServers, bootstrapServers); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' in Cruise Control configuration failed", kafkautils.KafkaConfigBoostrapServers), "config", bootstrapServers)
}

// Add Zookeeper configuration
zkConnect := zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath())
if err = ccConfig.Set("zookeeper.connect", zkConnect); err != nil {
log.Error(err, "setting zookeeper.connect in Cruise Control configuration failed", "config", zkConnect)
if err = ccConfig.Set(kafkautils.KafkaConfigZooKeeperConnect, zkConnect); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' in Cruise Control configuration failed", kafkautils.KafkaConfigZooKeeperConnect), "config", zkConnect)
}

// Add SSL configuration
Expand Down Expand Up @@ -97,18 +97,18 @@ func generateSSLConfig(kafkaCluster v1beta1.KafkaClusterSpec, clientPass string,
trustStoreLoc := keystoreVolumePath + "/" + v1alpha1.TLSJKSTrustStore

sslConfig := map[string]string{
"security.protocol": "SSL",
"ssl.truststore.type": "JKS",
"ssl.keystore.type": "JKS",
"ssl.truststore.location": trustStoreLoc,
"ssl.keystore.location": keyStoreLoc,
"ssl.keystore.password": clientPass,
"ssl.truststore.password": clientPass,
kafkautils.KafkaConfigSecurityProtocol: "SSL",
kafkautils.KafkaConfigSSLTrustStoreType: "JKS",
kafkautils.KafkaConfigSSLKeystoreType: "JKS",
kafkautils.KafkaConfigSSLTrustStoreLocation: trustStoreLoc,
kafkautils.KafkaConfigSSLKeyStoreLocation: keyStoreLoc,
kafkautils.KafkaConfigSSLKeyStorePassword: clientPass,
kafkautils.KafkaConfigSSLTrustStorePassword: clientPass,
}

for k, v := range sslConfig {
if err := config.Set(k, v); err != nil {
log.Error(err, fmt.Sprintf("setting %s parameter in cruise control configuration resulted an error", k))
log.Error(err, fmt.Sprintf("setting '%s' parameter in Cruise Control configuration resulted an error", k))
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/resources/envoy/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,22 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
if err != nil {
return err
}
var externalListernerResources []resources.ResourceWithLogAndExternalListenerSpecificInfos
externalListernerResources = append(externalListernerResources,
var externalListenerResources []resources.ResourceWithLogAndExternalListenerSpecificInfos
externalListenerResources = append(externalListenerResources,
r.service,
r.configMap,
r.deployment,
)

if r.KafkaCluster.Spec.EnvoyConfig.GetDistruptionBudget().DisruptionBudget.Create {
externalListernerResources = append(externalListernerResources, r.podDisruptionBudget)
externalListenerResources = append(externalListenerResources, r.podDisruptionBudget)
}
for name, ingressConfig := range ingressConfigs {
if !util.IsIngressConfigInUse(name, defaultControllerName, r.KafkaCluster, log) {
continue
}

for _, res := range externalListernerResources {
for _, res := range externalListenerResources {
o := res(log, eListener, ingressConfig, name, defaultControllerName)
err := k8sutil.Reconcile(log, r.Client, o, r.KafkaCluster)
if err != nil {
Expand Down
104 changes: 51 additions & 53 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import (
properties "github.com/banzaicloud/koperator/properties/pkg"
)

const brokerLogDirPropertyName = "log.dirs"

func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32,
extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList,
serverPasses map[string]string, clientPass string, superUsers []string, log logr.Logger) *properties.Properties {
Expand All @@ -52,22 +50,22 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32
// Add listener configuration
advertisedListenerConf := generateAdvertisedListenerConfig(id, r.KafkaCluster.Spec.ListenersConfig, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses)
if len(advertisedListenerConf) > 0 {
if err := config.Set("advertised.listeners", advertisedListenerConf); err != nil {
log.Error(err, "setting advertised.listeners in broker configuration resulted an error")
if err := config.Set(kafkautils.KafkaConfigAdvertisedListeners, advertisedListenerConf); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigAdvertisedListeners))
}
}

// Add control plane listener
cclConf := generateControlPlaneListener(r.KafkaCluster.Spec.ListenersConfig.InternalListeners)
if cclConf != "" {
if err := config.Set("control.plane.listener.name", cclConf); err != nil {
log.Error(err, "setting control.plane.listener.name parameter in broker configuration resulted an error")
if err := config.Set(kafkautils.KafkaConfigControlPlaneListener, cclConf); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigControlPlaneListener))
}
}

// Add Zookeeper configuration
if err := config.Set("zookeeper.connect", zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath())); err != nil {
log.Error(err, "setting zookeeper.connect parameter in broker configuration resulted an error")
if err := config.Set(kafkautils.KafkaConfigZooKeeperConnect, zookeeperutils.PrepareConnectionAddress(r.KafkaCluster.Spec.ZKAddresses, r.KafkaCluster.Spec.GetZkPath())); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigZooKeeperConnect))
}

// Add Cruise Control Metrics Reporter SSL configuration
Expand All @@ -79,11 +77,11 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32
trustStoreLoc := clientKeystorePath + "/" + v1alpha1.TLSJKSTrustStore

sslConfig := map[string]string{
"security.protocol": "SSL",
"ssl.truststore.location": trustStoreLoc,
"ssl.keystore.location": keyStoreLoc,
"ssl.keystore.password": clientPass,
"ssl.truststore.password": clientPass,
kafkautils.KafkaConfigSecurityProtocol: "SSL",
kafkautils.KafkaConfigSSLTrustStoreLocation: trustStoreLoc,
kafkautils.KafkaConfigSSLKeyStoreLocation: keyStoreLoc,
kafkautils.KafkaConfigSSLKeyStorePassword: clientPass,
kafkautils.KafkaConfigSSLTrustStorePassword: clientPass,
}

for k, v := range sslConfig {
Expand All @@ -94,23 +92,23 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32
}

// Add Cruise Control Metrics Reporter configuration
if err := config.Set("metric.reporters", "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil {
log.Error(err, "setting metric.reporters in broker configuration resulted an error")
if err := config.Set(kafkautils.CruiseControlConfigMetricsReporters, "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporters))
}
bootstrapServers, err := kafkautils.GetBootstrapServersService(r.KafkaCluster)
if err != nil {
log.Error(err, "getting Kafka bootstrap servers for Cruise Control failed")
}
if err := config.Set("cruise.control.metrics.reporter.bootstrap.servers", bootstrapServers); err != nil {
log.Error(err, "setting cruise.control.metrics.reporter.bootstrap.servers in broker configuration resulted an error")
if err := config.Set(kafkautils.CruiseControlConfigMetricsReportersBootstrapServers, bootstrapServers); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReportersBootstrapServers))
}
if err := config.Set("cruise.control.metrics.reporter.kubernetes.mode", true); err != nil {
log.Error(err, "setting cruise.control.metrics.reporter.kubernetes.mode in broker configuration resulted an error")
if err := config.Set(kafkautils.CruiseControlConfigMetricsReporterK8sMode, true); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporterK8sMode))
}

// Kafka Broker configuration
if err := config.Set("broker.id", id); err != nil {
log.Error(err, "setting broker.id in broker configuration resulted an error")
if err := config.Set(kafkautils.KafkaConfigBrokerId, id); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigBrokerId))
}

// This logic prevents the removal of the mountPath from the broker configmap
Expand All @@ -123,7 +121,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32

mountPathsOld, err := getMountPathsFromBrokerConfigMap(&brokerConfigMapOld)
if err != nil {
log.Error(err, "could not get mounthPaths from broker configmap", v1beta1.BrokerIdLabelKey, id)
log.Error(err, "could not get mountPaths from broker configmap", v1beta1.BrokerIdLabelKey, id)
}
mountPathsNew := generateStorageConfig(bConfig.StorageConfigs)
mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew)
Expand All @@ -133,16 +131,16 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32
}

if len(mountPathsMerged) != 0 {
if err := config.Set(brokerLogDirPropertyName, strings.Join(mountPathsMerged, ",")); err != nil {
log.Error(err, "setting log.dirs in broker configuration resulted an error")
if err := config.Set(kafkautils.KafkaConfigBrokerLogDirectory, strings.Join(mountPathsMerged, ",")); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigBrokerLogDirectory))
}
}

// Add superuser configuration
su := strings.Join(generateSuperUsers(superUsers), ";")
if su != "" {
if err := config.Set("super.users", su); err != nil {
log.Error(err, "setting super.users in broker configuration resulted an error")
if err := config.Set(kafkautils.KafkaConfigSuperUsers, su); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigSuperUsers))
}
}
return config
Expand Down Expand Up @@ -238,7 +236,7 @@ func getMountPathsFromBrokerConfigMap(configMap *v1.ConfigMap) ([]string, error)
if err != nil {
return nil, err
}
brokerLogDirProperty, found := brokerConfigProperties.Get(brokerLogDirPropertyName)
brokerLogDirProperty, found := brokerConfigProperties.Get(kafkautils.KafkaConfigBrokerLogDirectory)
if !found || brokerLogDirProperty.Value() == "" {
return nil, nil
}
Expand Down Expand Up @@ -283,34 +281,34 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map
log.Error(errors.New("inter broker listener name already set"), "config error")
}
}
UpperedListenerType := iListener.Type.ToUpperString()
UpperedListenerName := strings.ToUpper(iListener.Name)
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", UpperedListenerName, UpperedListenerType))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", UpperedListenerName, iListener.ContainerPort))
upperedListenerType := iListener.Type.ToUpperString()
upperedListenerName := strings.ToUpper(iListener.Name)
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, iListener.ContainerPort))
// Add internal listeners SSL configuration
if iListener.Type == v1beta1.SecurityProtocolSSL {
generateListenerSSLConfig(config, iListener.Name, iListener.SSLClientAuth, serverPasses[iListener.Name], log)
}
}

for _, eListener := range l.ExternalListeners {
UpperedListenerType := eListener.Type.ToUpperString()
UpperedListenerName := strings.ToUpper(eListener.Name)
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", UpperedListenerName, UpperedListenerType))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", UpperedListenerName, eListener.ContainerPort))
upperedListenerType := eListener.Type.ToUpperString()
upperedListenerName := strings.ToUpper(eListener.Name)
securityProtocolMapConfig = append(securityProtocolMapConfig, fmt.Sprintf("%s:%s", upperedListenerName, upperedListenerType))
listenerConfig = append(listenerConfig, fmt.Sprintf("%s://:%d", upperedListenerName, eListener.ContainerPort))
// Add external listeners SSL configuration
if eListener.Type == v1beta1.SecurityProtocolSSL {
generateListenerSSLConfig(config, eListener.Name, eListener.SSLClientAuth, serverPasses[eListener.Name], log)
}
}
if err := config.Set("listener.security.protocol.map", securityProtocolMapConfig); err != nil {
log.Error(err, "setting listener.security.protocol.map parameter in broker configuration resulted an error")
if err := config.Set(kafkautils.KafkaConfigListenerSecurityProtocolMap, securityProtocolMapConfig); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListenerSecurityProtocolMap))
}
if err := config.Set("inter.broker.listener.name", interBrokerListenerName); err != nil {
log.Error(err, "setting inter.broker.listener.name parameter in broker configuration resulted an error")
if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName))
}
if err := config.Set("listeners", listenerConfig); err != nil {
log.Error(err, "setting listeners parameter in broker configuration resulted an error")
if err := config.Set(kafkautils.KafkaConfigListeners, listenerConfig); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListeners))
}
return config
}
Expand All @@ -324,36 +322,36 @@ func generateListenerSSLConfig(config *properties.Properties, name string, sslCl
trustStoreLoc := namedKeystorePath + "/" + v1alpha1.TLSJKSTrustStore

listenerSSLConfig = map[string]string{
fmt.Sprintf(`listener.name.%s.ssl.keystore.location`, name): keyStoreLoc,
fmt.Sprintf("listener.name.%s.ssl.truststore.location", name): trustStoreLoc,
fmt.Sprintf("listener.name.%s.ssl.keystore.type", name): keyStoreType,
fmt.Sprintf("listener.name.%s.ssl.truststore.type", name): trustStoreType,
fmt.Sprintf("listener.name.%s.ssl.truststore.password", name): password,
fmt.Sprintf("listener.name.%s.ssl.keystore.password", name): password,
fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLKeyStoreLocation): keyStoreLoc,
fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLTrustStoreLocation): trustStoreLoc,
fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLKeystoreType): keyStoreType,
fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLTrustStoreType): trustStoreType,
fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLTrustStorePassword): password,
fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLKeyStorePassword): password,
}

// enable 2-way SSL authentication if SSL is enabled but this field is not provided in the listener config
if sslClientAuth == "" {
listenerSSLConfig[fmt.Sprintf("listener.name.%s.ssl.client.auth", name)] = string(v1beta1.SSLClientAuthRequired)
listenerSSLConfig[fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLClientAuth)] = string(v1beta1.SSLClientAuthRequired)
} else {
listenerSSLConfig[fmt.Sprintf("listener.name.%s.ssl.client.auth", name)] = string(sslClientAuth)
listenerSSLConfig[fmt.Sprintf("%s.%s.%s", kafkautils.KafkaConfigListenerName, name, kafkautils.KafkaConfigSSLClientAuth)] = string(sslClientAuth)
}

for k, v := range listenerSSLConfig {
if err := config.Set(k, v); err != nil {
log.Error(err, fmt.Sprintf("setting %s parameter in broker configuration resulted an error", k))
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", k))
}
}
}

// mergeSuperUsersPropertyValue merges the target and source super.users property value, and returns it as string.
// It returns empty string when there were no updates or any of the super.users property value was empty.
func mergeSuperUsersPropertyValue(source *properties.Properties, target *properties.Properties) string {
sourceVal, foundSource := source.Get("super.users")
sourceVal, foundSource := source.Get(kafkautils.KafkaConfigSuperUsers)
if !foundSource || sourceVal.IsEmpty() {
return ""
}
targetVal, foundTarget := target.Get("super.users")
targetVal, foundTarget := target.Get(kafkautils.KafkaConfigSuperUsers)
if !foundTarget || targetVal.IsEmpty() {
return ""
}
Expand Down Expand Up @@ -399,7 +397,7 @@ func (r Reconciler) generateBrokerConfig(id int32, brokerConfig *v1beta1.BrokerC
if suMerged := mergeSuperUsersPropertyValue(finalBrokerConfig, opGenConf); suMerged != "" {
// Setting string value for a property is not going to run into error, also we don't return error in this function
//nolint:errcheck
opGenConf.Set("super.users", suMerged)
opGenConf.Set(kafkautils.KafkaConfigSuperUsers, suMerged)
}
finalBrokerConfig.Merge(opGenConf)
}
Expand Down
Loading