From f449a961d803a59c0e196b4c986670aa1086a4fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rinaldo=20Pitzer=20J=C3=BAnior?= <16694899+rinaldodev@users.noreply.github.com> Date: Tue, 13 Dec 2022 10:51:06 -0300 Subject: [PATCH] MGDCTRS-1780: fix race condition with Camel Connector ConfigMap --- .../it/cucumber/ConnectorSteps.java | 16 ++++++- cos-fleetshard-operator-camel/README.adoc | 13 +++-- .../operator/camel/CamelOperandSupport.java | 47 +++++++++++++++---- .../operator/FleetShardOperator.java | 13 +++++ ...ce.java => ConnectorConfigMapWatcher.java} | 46 ++++++++---------- .../connector/ConnectorController.java | 15 ++---- .../housekeeping/reapers/AddonReaper.java | 6 +-- 7 files changed, 99 insertions(+), 57 deletions(-) rename cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/connector/{ConnectorConfigmapEventSource.java => ConnectorConfigMapWatcher.java} (67%) diff --git a/cos-fleetshard-it/cucumber/src/main/java/org/bf2/cos/fleetshard/it/cucumber/ConnectorSteps.java b/cos-fleetshard-it/cucumber/src/main/java/org/bf2/cos/fleetshard/it/cucumber/ConnectorSteps.java index 311bd27d..2c0106ef 100644 --- a/cos-fleetshard-it/cucumber/src/main/java/org/bf2/cos/fleetshard/it/cucumber/ConnectorSteps.java +++ b/cos-fleetshard-it/cucumber/src/main/java/org/bf2/cos/fleetshard/it/cucumber/ConnectorSteps.java @@ -184,7 +184,8 @@ public void a_connector(Map options) { .withDeploymentResourceVersion(drv) .withNewSchemaRegistry(SCHEMA_REGISTRY_ID, SCHEMA_REGISTRY_URL) .withKafka( - new KafkaSpecBuilder().withUrl(entry.getOrDefault(COS_KAFKA_BOOTSTRAP, KAFKA_URL)).build()) + new KafkaSpecBuilder().withUrl(entry.getOrDefault(COS_KAFKA_BOOTSTRAP, KAFKA_URL)) + .build()) .withDesiredState(entry.get(ConnectorContext.DESIRED_STATE)) .withSecret(Connectors.generateConnectorId(deploymentId) + "-" + drv) .build()) @@ -279,7 +280,18 @@ public void deploy_connector() { @When("^set configmap to:") public void change_configmap(Map contents) { - getConfigMapFilter().accept(configMap -> configMap.setData(contents)); + var sb = new StringBuilder(); + contents.forEach((k, v) -> { + sb.append("\t"); + sb.append(k); + sb.append("="); + sb.append(v); + sb.append("\n"); + }); + + getConfigMapFilter().accept(configMap -> { + configMap.setData(Map.of("override.properties", sb.toString())); + }); } @Then("the connector exists") diff --git a/cos-fleetshard-operator-camel/README.adoc b/cos-fleetshard-operator-camel/README.adoc index cb7b32d0..b650ab67 100644 --- a/cos-fleetshard-operator-camel/README.adoc +++ b/cos-fleetshard-operator-camel/README.adoc @@ -88,12 +88,15 @@ To change the default values, use the following system properties: One can then use that ConfigMap to override, for example, log levels for the connector, by applying the following configuration to the ConfigMap: -[source,properties] +[source,yaml] ---- -quarkus.log.level=INFO -quarkus.log.min-level=ALL -quarkus.log.category."org.apache".level=ALL -quarkus.log.category."org.apache".min-level=ALL +apiVersion: v1 +data: + override.properties: |- + quarkus.log.level=INFO + quarkus.log.min-level=ALL + quarkus.log.category."org.apache".level=ALL + quarkus.log.category."org.apache".min-level=ALL ---- Changes made to this ConfigMap triggers a redeployment of the connector with the additional supplied properties. \ No newline at end of file diff --git a/cos-fleetshard-operator-camel/src/main/java/org/bf2/cos/fleetshard/operator/camel/CamelOperandSupport.java b/cos-fleetshard-operator-camel/src/main/java/org/bf2/cos/fleetshard/operator/camel/CamelOperandSupport.java index efb58304..f568c21c 100644 --- a/cos-fleetshard-operator-camel/src/main/java/org/bf2/cos/fleetshard/operator/camel/CamelOperandSupport.java +++ b/cos-fleetshard-operator-camel/src/main/java/org/bf2/cos/fleetshard/operator/camel/CamelOperandSupport.java @@ -1,5 +1,6 @@ package org.bf2.cos.fleetshard.operator.camel; +import java.io.StringReader; import java.nio.charset.StandardCharsets; import java.util.*; @@ -334,20 +335,46 @@ public static Map createSecretsData( cfg.exchangePooling().exchangeFactoryStatisticsEnabled()); } + addOverrideProperties(connector, connectorConfiguration, props); + + return props; + } + + private static void addOverrideProperties(ManagedConnector connector, + ConnectorConfiguration connectorConfiguration, + Map props) { // configure the empty config map created for logging final ConfigMap configMap = connectorConfiguration.getConfigMap(); - if (configMap != null) { - final Map data = configMap.getData(); - if (data != null && !data.isEmpty()) { - LOGGER.info("ConfigMap for connector ({}/{}) contains data: {}", - connector.getMetadata().getNamespace(), - connector.getMetadata().getName(), - configMap.getData()); - props.putAll(data); - } + if (configMap == null) { + return; } - return props; + final Map data = configMap.getData(); + if (data == null || data.isEmpty()) { + return; + } + + String propertiesAsStr = data.get("override.properties"); + if (propertiesAsStr == null) { + LOGGER.error("Connector ConfigMap can only have properties in a override.properties embedded file." + + "Current content will be ignored: {}", data); + return; + } + propertiesAsStr = propertiesAsStr.replace("|-", ""); + + Properties contents = new Properties(); + try { + contents.load(new StringReader(propertiesAsStr)); + LOGGER.info("ConfigMap for connector ({}/{}) contains data: {}", + connector.getMetadata().getNamespace(), + connector.getMetadata().getName(), + StringUtils.normalizeSpace(contents.toString())); + contents.forEach((k, v) -> props.put((String) k, (String) v)); + } catch (Exception e) { + LOGGER.error( + "Unable to read properties from override.properties embedded in ConfigMap. Properties will get ignored.", + e); + } } /** diff --git a/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/FleetShardOperator.java b/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/FleetShardOperator.java index cf144e16..1ab7dfa8 100644 --- a/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/FleetShardOperator.java +++ b/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/FleetShardOperator.java @@ -4,6 +4,8 @@ import javax.inject.Inject; import org.bf2.cos.fleetshard.api.ManagedConnectorOperator; +import org.bf2.cos.fleetshard.operator.connector.ConnectorConfigMapWatcher; +import org.bf2.cos.fleetshard.support.client.EventClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +24,10 @@ public class FleetShardOperator { Operator operator; @Inject FleetShardOperatorConfig config; + @Inject + EventClient eventClient; + + private ConnectorConfigMapWatcher configMapWatcher; public void start() { LOGGER.info("Starting operator (id: {}, type: {}, version: {})", @@ -33,6 +39,9 @@ public void start() { .inNamespace(config.namespace()) .createOrReplace(managedConnectorOperator); + this.configMapWatcher = new ConnectorConfigMapWatcher(client, managedConnectorOperator, eventClient); + configMapWatcher.start(); + operator.start(); } @@ -42,6 +51,10 @@ public void stop() { managedConnectorOperator.getSpec().getType(), managedConnectorOperator.getSpec().getVersion()); + if (configMapWatcher != null) { + configMapWatcher.close(); + } + operator.stop(); } } diff --git a/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/connector/ConnectorConfigmapEventSource.java b/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/connector/ConnectorConfigMapWatcher.java similarity index 67% rename from cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/connector/ConnectorConfigmapEventSource.java rename to cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/connector/ConnectorConfigMapWatcher.java index 9bb9b2dc..d653dc61 100644 --- a/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/connector/ConnectorConfigmapEventSource.java +++ b/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/connector/ConnectorConfigMapWatcher.java @@ -2,11 +2,10 @@ import org.bf2.cos.fleetshard.api.ManagedConnector; import org.bf2.cos.fleetshard.api.ManagedConnectorOperator; -import org.bf2.cos.fleetshard.operator.support.InstrumentedWatcherEventSource; import org.bf2.cos.fleetshard.support.client.EventClient; -import org.bf2.cos.fleetshard.support.metrics.MetricsRecorder; import org.bf2.cos.fleetshard.support.resources.ConfigMaps; import org.bf2.cos.fleetshard.support.resources.Resources; +import org.bf2.cos.fleetshard.support.watch.AbstractWatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,27 +14,28 @@ import io.fabric8.kubernetes.client.Watch; import io.javaoperatorsdk.operator.processing.event.ResourceID; -public class ConnectorConfigmapEventSource extends InstrumentedWatcherEventSource { - private static final Logger LOGGER = LoggerFactory.getLogger(ConnectorConfigmapEventSource.class); +public class ConnectorConfigMapWatcher extends AbstractWatcher { + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectorConfigMapWatcher.class); private final ManagedConnectorOperator operator; private final EventClient eventClient; + private final KubernetesClient kubernetesClient; - public ConnectorConfigmapEventSource( + public ConnectorConfigMapWatcher( KubernetesClient kubernetesClient, ManagedConnectorOperator operator, - MetricsRecorder recorder, EventClient eventClient) { - super(kubernetesClient, recorder); - + this.kubernetesClient = kubernetesClient; this.operator = operator; this.eventClient = eventClient; } @Override protected Watch doWatch() { - return getClient() + LOGGER.info("Creating Watcher for Connector ConfigMaps."); + + return kubernetesClient .configMaps() .inAnyNamespace() .withLabel(Resources.LABEL_OPERATOR_TYPE, operator.getSpec().getType()) @@ -49,33 +49,25 @@ protected void onEventReceived(Action action, ConfigMap resource) { resource.getMetadata().getNamespace(), resource.getMetadata().getName()); - final String configMapChecksum; - if (resource.getData() == null) { - configMapChecksum = null; - } else { - switch (action) { - case DELETED: - configMapChecksum = null; - break; - default: - configMapChecksum = ConfigMaps.computeChecksum(resource); - break; - } + if (Action.MODIFIED.equals(action)) { + String checksum = resource.getData() == null ? null : ConfigMaps.computeChecksum(resource); + updateManagedConnectorResource(resource, checksum); } + } + private void updateManagedConnectorResource(ConfigMap resource, String checksum) { ResourceID.fromFirstOwnerReference(resource) .filter(rid -> rid.getName() != null && rid.getNamespace().isPresent()) .ifPresent(resourceID -> { // do a broadcast to provide feedback since changing this configmap is expected to be manual operation eventClient.broadcastNormal( "ConnectorConfigMap", - "Updating ManagedConnector (%s/%s) after event %s from configmap.", + "Updating ManagedConnector (%s/%s) after it's ConfigMap has been modified.", resource, resourceID.getNamespace().get(), - resourceID.getName(), - action.name()); + resourceID.getName()); - getClient().resources(ManagedConnector.class) + kubernetesClient.resources(ManagedConnector.class) .inNamespace(resourceID.getNamespace().get()) .withName(resourceID.getName()) .accept( @@ -83,9 +75,9 @@ protected void onEventReceived(Action action, ConfigMap resource) { LOGGER.info("Updating ManagedConnector ({}/{}) configMapChecksum to {}", mctr.getMetadata().getNamespace(), mctr.getMetadata().getName(), - configMapChecksum); + checksum); mctr.getSpec().getDeployment() - .setConfigMapChecksum(configMapChecksum); + .setConfigMapChecksum(checksum); }); }); } diff --git a/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/connector/ConnectorController.java b/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/connector/ConnectorController.java index 7209c3fe..957828f7 100644 --- a/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/connector/ConnectorController.java +++ b/cos-fleetshard-operator/src/main/java/org/bf2/cos/fleetshard/operator/connector/ConnectorController.java @@ -140,14 +140,6 @@ public HashMap prepareEventSources(EventSourceContext conte managedConnectorOperator, MetricsRecorder.of(registry, config.metrics().baseName() + ".controller.event.secrets", tags))); - eventSources.put( - "_configmaps", - new ConnectorConfigmapEventSource( - kubernetesClient, - managedConnectorOperator, - MetricsRecorder.of(registry, config.metrics().baseName() + ".controller.event.configmaps", tags), - eventClient)); - eventSources.put( "_operators", new ConnectorOperatorEventSource( @@ -190,11 +182,14 @@ public UpdateControl reconcile( if (!selected && !assigned) { // not selected, nor assigned: this connector is managed by another operator - LOGGER.debug("Connector {}/{} is not managed by this operator (assigned={}, operating={}).", + LOGGER.debug("Connector {}/{} is not managed by this operator (assigned={}, operating={}). " + + "This operator={}. Connector requires: {}.", connector.getMetadata().getNamespace(), connector.getMetadata().getName(), connector.getSpec().getOperatorSelector().getId(), - connector.getStatus().getConnectorStatus().getAssignedOperator().getId()); + connector.getStatus().getConnectorStatus().getAssignedOperator().getId(), + managedConnectorOperator.getMetadata().getName(), + connector.getSpec().getOperatorSelector().getId()); answer = UpdateControl.noUpdate(); } else if (!selected) { diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/AddonReaper.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/AddonReaper.java index fd2033cb..5202effe 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/AddonReaper.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/AddonReaper.java @@ -36,7 +36,7 @@ public class AddonReaper implements Housekeeper.Task, Service { private final FleetShardSyncConfig config; private final FleetShardSync fleetShardSync; private final FleetShardObservabilityClient observabilityClient; - private final ConfigMapWatcher watcher; + private final AddonConfigMapWatcher watcher; private final AtomicLong retries; private final AtomicBoolean running; private final AtomicBoolean taskRunning; @@ -49,7 +49,7 @@ public AddonReaper(KubernetesClient kubernetesClient, FleetShardSyncConfig confi this.fleetShardSync = fleetShardSync; this.observabilityClient = observabilityClient; this.eventClient = eventClient; - this.watcher = new ConfigMapWatcher(); + this.watcher = new AddonConfigMapWatcher(); this.retries = new AtomicLong(0); this.running = new AtomicBoolean(); this.taskRunning = new AtomicBoolean(); @@ -143,7 +143,7 @@ private FilterWatchListDeletable getNamespaceFilter() return kubernetesClient.namespaces().withLabel(Resources.LABEL_CLUSTER_ID, config.cluster().id()); } - private class ConfigMapWatcher extends AbstractWatcher { + private class AddonConfigMapWatcher extends AbstractWatcher { private static final String LABEL_PREFIX = "api.openshift.com/"; private static final String DELETE_LABEL_SUFFIX = "-delete";