Skip to content

Commit

Permalink
KAFKA-10021: Changed Kafka backing stores to use shared admin client …
Browse files Browse the repository at this point in the history
…to get end offsets and create topics

The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in apache#1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.
  • Loading branch information
rhauch committed Feb 8, 2021
1 parent 8bd5ceb commit ec727a5
Show file tree
Hide file tree
Showing 14 changed files with 815 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;

import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -233,20 +234,28 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig distributedConfig = new DistributedConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
// Create the admin client to be shared by all backing stores for this herder
Map<String, Object> adminProps = new HashMap<>(config.originals());
ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId);
SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
offsetBackingStore.configure(distributedConfig);
Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
statusBackingStore.configure(distributedConfig);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
internalValueConverter,
distributedConfig,
configTransformer);
configTransformer,
sharedAdmin);
// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
// herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
// tracking the various shared admin objects in this class.
Herder herder = new DistributedHerder(distributedConfig, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY);
advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
herders.put(sourceAndTarget, herder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
Expand Down Expand Up @@ -101,7 +103,12 @@ public Connect startConnect(Map<String, String> workerProps) {
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();

KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
// Create the admin client to be shared by all backing stores.
Map<String, Object> adminProps = new HashMap<>(config.originals());
ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId);
SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);

KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
offsetBackingStore.configure(config);

ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
Expand All @@ -112,17 +119,20 @@ public Connect startConnect(Map<String, String> workerProps) {
WorkerConfigTransformer configTransformer = worker.configTransformer();

Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
statusBackingStore.configure(config);

ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
internalValueConverter,
config,
configTransformer);
configTransformer,
sharedAdmin);

// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
// herder is stopped. This is easier than having to track and own the lifecycle ourselves.
DistributedHerder herder = new DistributedHerder(config, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl.toString(), connectorClientConfigOverridePolicy);
advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);

final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
Expand Down Expand Up @@ -66,6 +67,7 @@
import javax.crypto.SecretKey;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -138,6 +140,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {

private final Time time;
private final HerderMetrics herderMetrics;
private final List<AutoCloseable> uponShutdown;

private final String workerGroupId;
private final int workerSyncTimeoutMs;
Expand Down Expand Up @@ -185,16 +188,33 @@ public class DistributedHerder extends AbstractHerder implements Runnable {

private final DistributedConfig config;

/**
* Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs)
* that have the same group ID.
*
* @param config the configuration for the worker; may not be null
* @param time the clock to use; may not be null
* @param worker the {@link Worker} instance to use; may not be null
* @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null
* @param statusBackingStore the backing store for statuses; may not be null
* @param configBackingStore the backing store for connector configurations; may not be null
* @param restUrl the URL of this herder's REST API; may not be null
* @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
* in connector configurations; may not be null
* @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
* after all services and resources owned by this herder are stopped
*/
public DistributedHerder(DistributedConfig config,
Time time,
Worker worker,
String kafkaClusterId,
StatusBackingStore statusBackingStore,
ConfigBackingStore configBackingStore,
String restUrl,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
AutoCloseable... uponShutdown) {
this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(),
time, connectorClientConfigOverridePolicy);
time, connectorClientConfigOverridePolicy, uponShutdown);
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}

Expand All @@ -209,7 +229,8 @@ public DistributedHerder(DistributedConfig config,
String restUrl,
ConnectMetrics metrics,
Time time,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
AutoCloseable... uponShutdown) {
super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);

this.time = time;
Expand All @@ -223,6 +244,7 @@ public DistributedHerder(DistributedConfig config,
this.keySignatureVerificationAlgorithms = config.getList(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
this.keyGenerator = config.getInternalRequestKeyGenerator();
this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.uponShutdown = Arrays.asList(uponShutdown);

String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
Expand Down Expand Up @@ -676,6 +698,16 @@ public void halt() {
}
}

@Override
protected void stopServices() {
try {
super.stopServices();
} finally {
this.uponShutdown.stream().forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "<unknown>"));
}

}

@Override
public void stop() {
log.info("Herder stopping");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

/**
* <p>
Expand Down Expand Up @@ -224,6 +225,7 @@ public static String COMMIT_TASKS_KEY(String connectorName) {
// Connector and task configs: name or id -> config map
private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
private final Supplier<TopicAdmin> topicAdminSupplier;

// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
// is in an inconsistent state and we cannot safely use them until they have been refreshed.
Expand All @@ -241,11 +243,17 @@ public static String COMMIT_TASKS_KEY(String connectorName) {

private final WorkerConfigTransformer configTransformer;

@Deprecated
public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) {
this(converter, config, configTransformer, null);
}

public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier) {
this.lock = new Object();
this.started = false;
this.converter = converter;
this.offset = -1;
this.topicAdminSupplier = adminSupplier;

this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
if (this.topic == null || this.topic.trim().length() == 0)
Expand Down Expand Up @@ -471,6 +479,7 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo

Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).configStorageTopicSettings()
: Collections.emptyMap();
Expand All @@ -481,27 +490,25 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
.replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG))
.build();

return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
}

private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
final NewTopic topicDescription, final Map<String, Object> adminProps) {
Runnable createTopics = () -> {
final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
log.debug("Creating admin client to manage Connect internal config topic");
try (TopicAdmin admin = new TopicAdmin(adminProps)) {
// Create the topic if it doesn't exist
Set<String> newTopics = admin.createTopics(topicDescription);
if (!newTopics.contains(topic)) {
// It already existed, so check that the topic cleanup policy is compact only and not delete
log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
admin.verifyTopicCleanupPolicyOnlyCompact(topic,
DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
}
// Create the topic if it doesn't exist
Set<String> newTopics = admin.createTopics(topicDescription);
if (!newTopics.contains(topic)) {
// It already existed, so check that the topic cleanup policy is compact only and not delete
log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
admin.verifyTopicCleanupPolicyOnlyCompact(topic,
DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
}
};
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

/**
* <p>
Expand All @@ -62,6 +64,16 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {

private KafkaBasedLog<byte[], byte[]> offsetLog;
private HashMap<ByteBuffer, ByteBuffer> data;
private final Supplier<TopicAdmin> topicAdminSupplier;

@Deprecated
public KafkaOffsetBackingStore() {
this.topicAdminSupplier = null;
}

public KafkaOffsetBackingStore(Supplier<TopicAdmin> topicAdmin) {
this.topicAdminSupplier = Objects.requireNonNull(topicAdmin);
}

@Override
public void configure(final WorkerConfig config) {
Expand All @@ -86,6 +98,7 @@ public void configure(final WorkerConfig config) {

Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).offsetStorageTopicSettings()
: Collections.emptyMap();
Expand All @@ -96,27 +109,25 @@ public void configure(final WorkerConfig config) {
.replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG))
.build();

offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps);
offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier);
}

private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback,
final NewTopic topicDescription, final Map<String, Object> adminProps) {
Runnable createTopics = () -> {
final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
log.debug("Creating admin client to manage Connect internal offset topic");
try (TopicAdmin admin = new TopicAdmin(adminProps)) {
// Create the topic if it doesn't exist
Set<String> newTopics = admin.createTopics(topicDescription);
if (!newTopics.contains(topic)) {
// It already existed, so check that the topic cleanup policy is compact only and not delete
log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
admin.verifyTopicCleanupPolicyOnlyCompact(topic,
DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
}
// Create the topic if it doesn't exist
Set<String> newTopics = admin.createTopics(topicDescription);
if (!newTopics.contains(topic)) {
// It already existed, so check that the topic cleanup policy is compact only and not delete
log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
admin.verifyTopicCleanupPolicyOnlyCompact(topic,
DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
}
};
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
}

@Override
Expand Down
Loading

0 comments on commit ec727a5

Please sign in to comment.