Skip to content

Commit

Permalink
KAFKA-9422: Track the set of topics a connector is using (KIP-558) (#…
Browse files Browse the repository at this point in the history
…8017)

This feature corresponds to KIP-558 and extends how the internal status topic (set via `status.storage.topic` distributed worker config) is used to include information that allows Kafka Connect to keep track which topics a connector is using.

The set of topics a connector is actively using, is exposed via a new endpoint that is added to the REST API of Connect workers.
* A `GET /connectors/{name}/topics` request will return the set of topics that have been recorded as active since a connector started or since the set of topics was reset for this connector.

An additional endpoints allows users to reset the set of active topics for a connector via the second endpoint that this feature is adding:
* A `PUT /connectors/{name}/topics/reset` request clears the set of active topics. An operator may enable or disable this feature by setting `topic.tracking.enable` (true by default).

The `topic.tracking.enable` worker config property (true by default) allows an operator to enable/disable the entire feature. Or if the feature is enabled, the `topic.tracking.allow.reset` worker config property (true by default) allows an operator to control whether reset requests submitted to the Connect REST API are allowed.

Author: Konstantine Karantasis <[email protected]>
Reviewer: Randall Hauch <[email protected]>
  • Loading branch information
kkonstantine authored Jan 30, 2020
1 parent 8494fdb commit 7746301
Show file tree
Hide file tree
Showing 20 changed files with 778 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
Expand Down Expand Up @@ -62,6 +63,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
Expand Down Expand Up @@ -260,6 +262,25 @@ public ConnectorStateInfo connectorStatus(String connName) {
conf == null ? ConnectorType.UNKNOWN : connectorTypeForClass(conf.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
}

@Override
public ActiveTopicsInfo connectorActiveTopics(String connName) {
Collection<String> topics = statusBackingStore.getAllTopics(connName).stream()
.map(TopicStatus::topic)
.collect(Collectors.toList());
return new ActiveTopicsInfo(connName, topics);
}

@Override
public void resetConnectorActiveTopics(String connName) {
statusBackingStore.getAllTopics(connName).stream()
.forEach(status -> statusBackingStore.deleteTopic(status.connector(), status.topic()));
}

@Override
public StatusBackingStore statusBackingStore() {
return statusBackingStore;
}

@Override
public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
TaskStatus status = statusBackingStore.get(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;

Expand Down Expand Up @@ -144,6 +146,28 @@ public interface Herder {
*/
ConnectorStateInfo connectorStatus(String connName);

/**
* Lookup the set of topics currently used by a connector.
*
* @param connName name of the connector
* @return the set of active topics
*/
ActiveTopicsInfo connectorActiveTopics(String connName);

/**
* Request to asynchronously reset the active topics for the named connector.
*
* @param connName name of the connector
*/
void resetConnectorActiveTopics(String connName);

/**
* Return a reference to the status backing store used by this herder.
*
* @return the status backing store used by this herder
*/
StatusBackingStore statusBackingStore();

/**
* Lookup the status of the a task.
* @param id id of the task
Expand Down Expand Up @@ -200,7 +224,6 @@ public interface Herder {
*/
Plugins plugins();


/**
* Get the cluster ID of the Kafka cluster backing this Connect cluster.
* @return the cluster ID of the Kafka cluster backing this connect cluster
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;

import org.apache.kafka.connect.util.ConnectorTaskId;

import java.util.Objects;

/**
* Represents the metadata that is stored as the value of the record that is stored in the
* {@link org.apache.kafka.connect.storage.StatusBackingStore#put(TopicStatus)},
*/
public class TopicStatus {
private final String topic;
private final String connector;
private final int task;
private final long discoverTimestamp;

public TopicStatus(String topic, ConnectorTaskId task, long discoverTimestamp) {
this(topic, task.connector(), task.task(), discoverTimestamp);
}

public TopicStatus(String topic, String connector, int task, long discoverTimestamp) {
this.topic = Objects.requireNonNull(topic);
this.connector = Objects.requireNonNull(connector);
this.task = task;
this.discoverTimestamp = discoverTimestamp;
}

/**
* Get the name of the topic.
*
* @return the topic name; never null
*/
public String topic() {
return topic;
}

/**
* Get the name of the connector.
*
* @return the connector name; never null
*/
public String connector() {
return connector;
}

/**
* Get the ID of the task that stored the topic status.
*
* @return the task ID
*/
public int task() {
return task;
}

/**
* Get a timestamp that represents when this topic was discovered as being actively used by
* this connector.
*
* @return the discovery timestamp
*/
public long discoverTimestamp() {
return discoverTimestamp;
}

@Override
public String toString() {
return "TopicStatus{" +
"topic='" + topic + '\'' +
", connector='" + connector + '\'' +
", task=" + task +
", discoverTimestamp=" + discoverTimestamp +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TopicStatus)) {
return false;
}
TopicStatus that = (TopicStatus) o;
return task == that.task &&
discoverTimestamp == that.discoverTimestamp &&
topic.equals(that.topic) &&
connector.equals(that.connector);
}

@Override
public int hashCode() {
return Objects.hash(topic, connector, task, discoverTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,
// Note we pass the configState as it performs dynamic transformations under the covers
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, configState, metrics, loader,
time, retryWithToleranceOperator);
time, retryWithToleranceOperator, herder.statusBackingStore());
} else if (task instanceof SinkTask) {
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
Expand All @@ -535,7 +535,7 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState,

return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
valueConverter, headerConverter, transformationChain, consumer, loader, time,
retryWithToleranceOperator);
retryWithToleranceOperator, herder.statusBackingStore());
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ public class WorkerConfig extends AbstractConfig {
public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;

public static final String TOPIC_TRACKING_ENABLE_CONFIG = "topic.tracking.enable";
protected static final String TOPIC_TRACKING_ENABLE_DOC = "Enable tracking the set of active "
+ "topics per connector during runtime.";
protected static final boolean TOPIC_TRACKING_ENABLE_DEFAULT = true;

public static final String TOPIC_TRACKING_ALLOW_RESET_CONFIG = "topic.tracking.allow.reset";
protected static final String TOPIC_TRACKING_ALLOW_RESET_DOC = "If set to true, it allows "
+ "user requests to reset the set of active topics per connector.";
protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;

/**
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
* bootstrap their own ConfigDef.
Expand Down Expand Up @@ -310,7 +320,11 @@ protected static ConfigDef baseConfigDef() {
.define(ADMIN_LISTENERS_CONFIG, Type.LIST, null,
new AdminListenersValidator(), Importance.LOW, ADMIN_LISTENERS_DOC)
.define(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, Type.STRING, CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT,
Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC);
Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC)
.define(TOPIC_TRACKING_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ENABLE_DEFAULT,
Importance.LOW, TOPIC_TRACKING_ENABLE_DOC)
.define(TOPIC_TRACKING_ALLOW_RESET_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ALLOW_RESET_DEFAULT,
Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC);
}

private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
Expand All @@ -71,7 +72,6 @@ class WorkerSinkTask extends WorkerTask {
private final SinkTask task;
private final ClusterConfigState configState;
private Map<String, String> taskConfig;
private final Time time;
private final Converter keyConverter;
private final Converter valueConverter;
private final HeaderConverter headerConverter;
Expand Down Expand Up @@ -105,8 +105,10 @@ public WorkerSinkTask(ConnectorTaskId id,
KafkaConsumer<byte[], byte[]> consumer,
ClassLoader loader,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator) {
super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator);
RetryWithToleranceOperator retryWithToleranceOperator,
StatusBackingStore statusBackingStore) {
super(id, statusListener, initialState, loader, connectMetrics,
retryWithToleranceOperator, time, statusBackingStore);

this.workerConfig = workerConfig;
this.task = task;
Expand All @@ -115,7 +117,6 @@ public WorkerSinkTask(ConnectorTaskId id,
this.valueConverter = valueConverter;
this.headerConverter = headerConverter;
this.transformationChain = transformationChain;
this.time = time;
this.messageBatch = new ArrayList<>();
this.currentOffsets = new HashMap<>();
this.origOffsets = new HashMap<>();
Expand Down Expand Up @@ -504,6 +505,7 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[]
headers);
log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
recordActiveTopic(origRecord.topic());
return transformationChain.apply(origRecord);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,7 +78,6 @@ class WorkerSourceTask extends WorkerTask {
private KafkaProducer<byte[], byte[]> producer;
private final CloseableOffsetStorageReader offsetReader;
private final OffsetStorageWriter offsetWriter;
private final Time time;
private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
private final AtomicReference<Exception> producerSendException;

Expand Down Expand Up @@ -112,9 +112,11 @@ public WorkerSourceTask(ConnectorTaskId id,
ConnectMetrics connectMetrics,
ClassLoader loader,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator) {
RetryWithToleranceOperator retryWithToleranceOperator,
StatusBackingStore statusBackingStore) {

super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator);
super(id, statusListener, initialState, loader, connectMetrics,
retryWithToleranceOperator, time, statusBackingStore);

this.workerConfig = workerConfig;
this.task = task;
Expand All @@ -126,7 +128,6 @@ public WorkerSourceTask(ConnectorTaskId id,
this.producer = producer;
this.offsetReader = offsetReader;
this.offsetWriter = offsetWriter;
this.time = time;

this.toSend = null;
this.lastSendFailed = false;
Expand Down Expand Up @@ -355,6 +356,7 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
commitTaskRecord(preTransformRecord, recordMetadata);
recordActiveTopic(producerRecord.topic());
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -57,6 +58,8 @@ abstract class WorkerTask implements Runnable {
protected final ConnectorTaskId id;
private final TaskStatus.Listener statusListener;
protected final ClassLoader loader;
protected final StatusBackingStore statusBackingStore;
protected final Time time;
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final TaskMetricsGroup taskMetricsGroup;
private volatile TargetState targetState;
Expand All @@ -70,7 +73,9 @@ public WorkerTask(ConnectorTaskId id,
TargetState initialState,
ClassLoader loader,
ConnectMetrics connectMetrics,
RetryWithToleranceOperator retryWithToleranceOperator) {
RetryWithToleranceOperator retryWithToleranceOperator,
Time time,
StatusBackingStore statusBackingStore) {
this.id = id;
this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
this.statusListener = taskMetricsGroup;
Expand All @@ -80,6 +85,8 @@ public WorkerTask(ConnectorTaskId id,
this.cancelled = false;
this.taskMetricsGroup.recordState(this.targetState);
this.retryWithToleranceOperator = retryWithToleranceOperator;
this.time = time;
this.statusBackingStore = statusBackingStore;
}

public ConnectorTaskId id() {
Expand Down Expand Up @@ -279,6 +286,20 @@ public void transitionTo(TargetState state) {
}
}

/**
* Include this topic to the set of active topics for the connector that this worker task
* is running. This information is persisted in the status backing store used by this worker.
*
* @param topic the topic to mark as active for this connector
*/
protected void recordActiveTopic(String topic) {
if (statusBackingStore.getTopic(id.connector(), topic) != null) {
// The topic is already recorded as active. No further action is required.
return;
}
statusBackingStore.put(new TopicStatus(topic, id, time.milliseconds()));
}

/**
* Record that offsets have been committed.
*
Expand Down
Loading

0 comments on commit 7746301

Please sign in to comment.