Skip to content

Commit

Permalink
MGDCTRS-1780: fix race condition with Camel Connector ConfigMap
Browse files Browse the repository at this point in the history
  • Loading branch information
rinaldodev committed Dec 14, 2022
1 parent 11297fb commit f449a96
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public void a_connector(Map<String, String> options) {
.withDeploymentResourceVersion(drv)
.withNewSchemaRegistry(SCHEMA_REGISTRY_ID, SCHEMA_REGISTRY_URL)
.withKafka(
new KafkaSpecBuilder().withUrl(entry.getOrDefault(COS_KAFKA_BOOTSTRAP, KAFKA_URL)).build())
new KafkaSpecBuilder().withUrl(entry.getOrDefault(COS_KAFKA_BOOTSTRAP, KAFKA_URL))
.build())
.withDesiredState(entry.get(ConnectorContext.DESIRED_STATE))
.withSecret(Connectors.generateConnectorId(deploymentId) + "-" + drv)
.build())
Expand Down Expand Up @@ -279,7 +280,18 @@ public void deploy_connector() {

@When("^set configmap to:")
public void change_configmap(Map<String, String> contents) {
getConfigMapFilter().accept(configMap -> configMap.setData(contents));
var sb = new StringBuilder();
contents.forEach((k, v) -> {
sb.append("\t");
sb.append(k);
sb.append("=");
sb.append(v);
sb.append("\n");
});

getConfigMapFilter().accept(configMap -> {
configMap.setData(Map.of("override.properties", sb.toString()));
});
}

@Then("the connector exists")
Expand Down
13 changes: 8 additions & 5 deletions cos-fleetshard-operator-camel/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ To change the default values, use the following system properties:

One can then use that ConfigMap to override, for example, log levels for the connector, by applying the following configuration to the ConfigMap:

[source,properties]
[source,yaml]
----
quarkus.log.level=INFO
quarkus.log.min-level=ALL
quarkus.log.category."org.apache".level=ALL
quarkus.log.category."org.apache".min-level=ALL
apiVersion: v1
data:
override.properties: |-
quarkus.log.level=INFO
quarkus.log.min-level=ALL
quarkus.log.category."org.apache".level=ALL
quarkus.log.category."org.apache".min-level=ALL
----

Changes made to this ConfigMap triggers a redeployment of the connector with the additional supplied properties.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.bf2.cos.fleetshard.operator.camel;

import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.*;

Expand Down Expand Up @@ -334,20 +335,46 @@ public static Map<String, String> createSecretsData(
cfg.exchangePooling().exchangeFactoryStatisticsEnabled());
}

addOverrideProperties(connector, connectorConfiguration, props);

return props;
}

private static void addOverrideProperties(ManagedConnector connector,
ConnectorConfiguration<ObjectNode, ObjectNode> connectorConfiguration,
Map<String, String> props) {
// configure the empty config map created for logging
final ConfigMap configMap = connectorConfiguration.getConfigMap();
if (configMap != null) {
final Map<String, String> data = configMap.getData();
if (data != null && !data.isEmpty()) {
LOGGER.info("ConfigMap for connector ({}/{}) contains data: {}",
connector.getMetadata().getNamespace(),
connector.getMetadata().getName(),
configMap.getData());
props.putAll(data);
}
if (configMap == null) {
return;
}

return props;
final Map<String, String> data = configMap.getData();
if (data == null || data.isEmpty()) {
return;
}

String propertiesAsStr = data.get("override.properties");
if (propertiesAsStr == null) {
LOGGER.error("Connector ConfigMap can only have properties in a override.properties embedded file."
+ "Current content will be ignored: {}", data);
return;
}
propertiesAsStr = propertiesAsStr.replace("|-", "");

Properties contents = new Properties();
try {
contents.load(new StringReader(propertiesAsStr));
LOGGER.info("ConfigMap for connector ({}/{}) contains data: {}",
connector.getMetadata().getNamespace(),
connector.getMetadata().getName(),
StringUtils.normalizeSpace(contents.toString()));
contents.forEach((k, v) -> props.put((String) k, (String) v));
} catch (Exception e) {
LOGGER.error(
"Unable to read properties from override.properties embedded in ConfigMap. Properties will get ignored.",
e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import javax.inject.Inject;

import org.bf2.cos.fleetshard.api.ManagedConnectorOperator;
import org.bf2.cos.fleetshard.operator.connector.ConnectorConfigMapWatcher;
import org.bf2.cos.fleetshard.support.client.EventClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,6 +24,10 @@ public class FleetShardOperator {
Operator operator;
@Inject
FleetShardOperatorConfig config;
@Inject
EventClient eventClient;

private ConnectorConfigMapWatcher configMapWatcher;

public void start() {
LOGGER.info("Starting operator (id: {}, type: {}, version: {})",
Expand All @@ -33,6 +39,9 @@ public void start() {
.inNamespace(config.namespace())
.createOrReplace(managedConnectorOperator);

this.configMapWatcher = new ConnectorConfigMapWatcher(client, managedConnectorOperator, eventClient);
configMapWatcher.start();

operator.start();
}

Expand All @@ -42,6 +51,10 @@ public void stop() {
managedConnectorOperator.getSpec().getType(),
managedConnectorOperator.getSpec().getVersion());

if (configMapWatcher != null) {
configMapWatcher.close();
}

operator.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import org.bf2.cos.fleetshard.api.ManagedConnector;
import org.bf2.cos.fleetshard.api.ManagedConnectorOperator;
import org.bf2.cos.fleetshard.operator.support.InstrumentedWatcherEventSource;
import org.bf2.cos.fleetshard.support.client.EventClient;
import org.bf2.cos.fleetshard.support.metrics.MetricsRecorder;
import org.bf2.cos.fleetshard.support.resources.ConfigMaps;
import org.bf2.cos.fleetshard.support.resources.Resources;
import org.bf2.cos.fleetshard.support.watch.AbstractWatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -15,27 +14,28 @@
import io.fabric8.kubernetes.client.Watch;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class ConnectorConfigmapEventSource extends InstrumentedWatcherEventSource<ConfigMap> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectorConfigmapEventSource.class);
public class ConnectorConfigMapWatcher extends AbstractWatcher<ConfigMap> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectorConfigMapWatcher.class);

private final ManagedConnectorOperator operator;
private final EventClient eventClient;
private final KubernetesClient kubernetesClient;

public ConnectorConfigmapEventSource(
public ConnectorConfigMapWatcher(
KubernetesClient kubernetesClient,
ManagedConnectorOperator operator,
MetricsRecorder recorder,
EventClient eventClient) {

super(kubernetesClient, recorder);

this.kubernetesClient = kubernetesClient;
this.operator = operator;
this.eventClient = eventClient;
}

@Override
protected Watch doWatch() {
return getClient()
LOGGER.info("Creating Watcher for Connector ConfigMaps.");

return kubernetesClient
.configMaps()
.inAnyNamespace()
.withLabel(Resources.LABEL_OPERATOR_TYPE, operator.getSpec().getType())
Expand All @@ -49,43 +49,35 @@ protected void onEventReceived(Action action, ConfigMap resource) {
resource.getMetadata().getNamespace(),
resource.getMetadata().getName());

final String configMapChecksum;
if (resource.getData() == null) {
configMapChecksum = null;
} else {
switch (action) {
case DELETED:
configMapChecksum = null;
break;
default:
configMapChecksum = ConfigMaps.computeChecksum(resource);
break;
}
if (Action.MODIFIED.equals(action)) {
String checksum = resource.getData() == null ? null : ConfigMaps.computeChecksum(resource);
updateManagedConnectorResource(resource, checksum);
}
}

private void updateManagedConnectorResource(ConfigMap resource, String checksum) {
ResourceID.fromFirstOwnerReference(resource)
.filter(rid -> rid.getName() != null && rid.getNamespace().isPresent())
.ifPresent(resourceID -> {
// do a broadcast to provide feedback since changing this configmap is expected to be manual operation
eventClient.broadcastNormal(
"ConnectorConfigMap",
"Updating ManagedConnector (%s/%s) after event %s from configmap.",
"Updating ManagedConnector (%s/%s) after it's ConfigMap has been modified.",
resource,
resourceID.getNamespace().get(),
resourceID.getName(),
action.name());
resourceID.getName());

getClient().resources(ManagedConnector.class)
kubernetesClient.resources(ManagedConnector.class)
.inNamespace(resourceID.getNamespace().get())
.withName(resourceID.getName())
.accept(
mctr -> {
LOGGER.info("Updating ManagedConnector ({}/{}) configMapChecksum to {}",
mctr.getMetadata().getNamespace(),
mctr.getMetadata().getName(),
configMapChecksum);
checksum);
mctr.getSpec().getDeployment()
.setConfigMapChecksum(configMapChecksum);
.setConfigMapChecksum(checksum);
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,6 @@ public HashMap<String, EventSource> prepareEventSources(EventSourceContext conte
managedConnectorOperator,
MetricsRecorder.of(registry, config.metrics().baseName() + ".controller.event.secrets", tags)));

eventSources.put(
"_configmaps",
new ConnectorConfigmapEventSource(
kubernetesClient,
managedConnectorOperator,
MetricsRecorder.of(registry, config.metrics().baseName() + ".controller.event.configmaps", tags),
eventClient));

eventSources.put(
"_operators",
new ConnectorOperatorEventSource(
Expand Down Expand Up @@ -190,11 +182,14 @@ public UpdateControl<ManagedConnector> reconcile(

if (!selected && !assigned) {
// not selected, nor assigned: this connector is managed by another operator
LOGGER.debug("Connector {}/{} is not managed by this operator (assigned={}, operating={}).",
LOGGER.debug("Connector {}/{} is not managed by this operator (assigned={}, operating={}). "
+ "This operator={}. Connector requires: {}.",
connector.getMetadata().getNamespace(),
connector.getMetadata().getName(),
connector.getSpec().getOperatorSelector().getId(),
connector.getStatus().getConnectorStatus().getAssignedOperator().getId());
connector.getStatus().getConnectorStatus().getAssignedOperator().getId(),
managedConnectorOperator.getMetadata().getName(),
connector.getSpec().getOperatorSelector().getId());

answer = UpdateControl.noUpdate();
} else if (!selected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class AddonReaper implements Housekeeper.Task, Service {
private final FleetShardSyncConfig config;
private final FleetShardSync fleetShardSync;
private final FleetShardObservabilityClient observabilityClient;
private final ConfigMapWatcher watcher;
private final AddonConfigMapWatcher watcher;
private final AtomicLong retries;
private final AtomicBoolean running;
private final AtomicBoolean taskRunning;
Expand All @@ -49,7 +49,7 @@ public AddonReaper(KubernetesClient kubernetesClient, FleetShardSyncConfig confi
this.fleetShardSync = fleetShardSync;
this.observabilityClient = observabilityClient;
this.eventClient = eventClient;
this.watcher = new ConfigMapWatcher();
this.watcher = new AddonConfigMapWatcher();
this.retries = new AtomicLong(0);
this.running = new AtomicBoolean();
this.taskRunning = new AtomicBoolean();
Expand Down Expand Up @@ -143,7 +143,7 @@ private FilterWatchListDeletable<Namespace, NamespaceList> getNamespaceFilter()
return kubernetesClient.namespaces().withLabel(Resources.LABEL_CLUSTER_ID, config.cluster().id());
}

private class ConfigMapWatcher extends AbstractWatcher<ConfigMap> {
private class AddonConfigMapWatcher extends AbstractWatcher<ConfigMap> {
private static final String LABEL_PREFIX = "api.openshift.com/";
private static final String DELETE_LABEL_SUFFIX = "-delete";

Expand Down

0 comments on commit f449a96

Please sign in to comment.