Skip to content

Commit

Permalink
MINOR: Refactor StreamsProducer (#8380)
Browse files Browse the repository at this point in the history
Reviewers: Boyang Chen <[email protected]>, Guozhang Wang <[email protected]>, Andrew Choi <[email protected]>
  • Loading branch information
mjsax authored Apr 4, 2020
1 parent d4eb406 commit ab5e4f5
Show file tree
Hide file tree
Showing 15 changed files with 719 additions and 485 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ private Map<String, Object> getCommonConsumerConfigs() {
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);

final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
if (StreamThread.eosBetaEnabled(this)) {
if (StreamThread.processingMode(this) == StreamThread.ProcessingMode.EXACTLY_ONCE_BETA) {
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
}
consumerProps.putAll(getClientCustomProps());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -45,7 +44,6 @@

import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.AT_LEAST_ONCE;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;

Expand All @@ -61,15 +59,13 @@ class ActiveTaskCreator {
private final String threadId;
private final Logger log;
private final Sensor createTaskSensor;
private final String applicationId;
private final StreamsProducer threadProducer;
private final Map<TaskId, StreamsProducer> taskProducers;
private final StreamThread.ProcessingMode processingMode;
private final String transactionalId;

ActiveTaskCreator(final InternalTopologyBuilder builder,
final StreamsConfig config,
final StreamThread.ProcessingMode processingMode,
final StreamsMetricsImpl streamsMetrics,
final StateDirectory stateDirectory,
final ChangelogReader storeChangelogReader,
Expand All @@ -81,7 +77,6 @@ class ActiveTaskCreator {
final Logger log) {
this.builder = builder;
this.config = config;
this.processingMode = processingMode;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
this.storeChangelogReader = storeChangelogReader;
Expand All @@ -92,8 +87,9 @@ class ActiveTaskCreator {
this.log = log;

createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
transactionalId = applicationId + "-" + processId + "-StreamThread-" + threadId.split("-StreamThread-")[1];
processingMode = StreamThread.processingMode(config);

if (processingMode == EXACTLY_ONCE_ALPHA) {
threadProducer = null;
Expand All @@ -104,31 +100,19 @@ class ActiveTaskCreator {
final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
final LogContext logContext = new LogContext(threadIdPrefix);

final String threadProducerClientId = getThreadProducerClientId(threadId);
final Map<String, Object> producerConfigs = config.getProducerConfigs(threadProducerClientId);

if (processingMode == EXACTLY_ONCE_BETA) {
producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
threadProducer = new StreamsProducer(clientSupplier.getProducer(producerConfigs), EXACTLY_ONCE_BETA, logContext);
} else {
threadProducer = new StreamsProducer(clientSupplier.getProducer(producerConfigs), AT_LEAST_ONCE, logContext);
}
threadProducer = new StreamsProducer(
config,
threadId,
clientSupplier,
null,
processId,
logContext);
taskProducers = Collections.emptyMap();
}
}

public void reInitializeThreadProducer() {
if (processingMode != EXACTLY_ONCE_BETA) {
throw new IllegalStateException("Exactly-once beta is not enabled.");
}

threadProducer.kafkaProducer().close();

final String threadProducerClientId = getThreadProducerClientId(threadId);
final Map<String, Object> producerConfigs = config.getProducerConfigs(threadProducerClientId);
producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);

threadProducer.resetProducer(clientSupplier.getProducer(producerConfigs));
threadProducer.resetProducer();
}

StreamsProducer streamsProducerForTask(final TaskId taskId) {
Expand Down Expand Up @@ -175,14 +159,14 @@ Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
);

final StreamsProducer streamsProducer;
if (processingMode == EXACTLY_ONCE_ALPHA) {
final String taskProducerClientId = getTaskProducerClientId(threadId, taskId);
final Map<String, Object> producerConfigs = config.getProducerConfigs(taskProducerClientId);
producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId);
if (processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA) {
log.info("Creating producer client for task {}", taskId);
streamsProducer = new StreamsProducer(
clientSupplier.getProducer(producerConfigs),
EXACTLY_ONCE_ALPHA,
config,
threadId,
clientSupplier,
taskId,
null,
logContext);
taskProducers.put(taskId, streamsProducer);
} else {
Expand Down Expand Up @@ -221,7 +205,7 @@ Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
void closeThreadProducerIfNeeded() {
if (threadProducer != null) {
try {
threadProducer.kafkaProducer().close();
threadProducer.close();
} catch (final RuntimeException e) {
throw new StreamsException("Thread producer encounter error trying to close.", e);
}
Expand All @@ -232,7 +216,7 @@ void closeAndRemoveTaskProducerIfNeeded(final TaskId id) {
final StreamsProducer taskProducer = taskProducers.remove(id);
if (taskProducer != null) {
try {
taskProducer.kafkaProducer().close();
taskProducer.close();
} catch (final RuntimeException e) {
throw new StreamsException("[" + id + "] task producer encounter error trying to close.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec;
Expand All @@ -36,6 +27,16 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;

import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

public class ClientUtils {

// currently admin client is shared among all threads
Expand Down Expand Up @@ -77,7 +78,7 @@ public static Map<MetricName, Metric> adminClientMetrics(final Admin adminClient
public static Map<MetricName, Metric> producerMetrics(final Collection<StreamsProducer> producers) {
final Map<MetricName, Metric> result = new LinkedHashMap<>();
for (final StreamsProducer producer : producers) {
final Map<MetricName, ? extends Metric> producerMetrics = producer.kafkaProducer().metrics();
final Map<MetricName, ? extends Metric> producerMetrics = producer.metrics();
if (producerMetrics != null) {
result.putAll(producerMetrics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,19 +317,9 @@ public static StreamThread create(final InternalTopologyBuilder builder,

final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);

final ProcessingMode processingMode;
if (StreamThread.eosAlphaEnabled(config)) {
processingMode = StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
} else if (StreamThread.eosBetaEnabled(config)) {
processingMode = StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;
} else {
processingMode = StreamThread.ProcessingMode.AT_LEAST_ONCE;
}

final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
builder,
config,
processingMode,
streamsMetrics,
stateDirectory,
changelogReader,
Expand Down Expand Up @@ -359,7 +349,7 @@ public static StreamThread create(final InternalTopologyBuilder builder,
builder,
adminClient,
stateDirectory,
processingMode
StreamThread.processingMode(config)
);

log.info("Creating consumer client");
Expand Down Expand Up @@ -413,16 +403,20 @@ public enum ProcessingMode {
}
}

public static boolean eosAlphaEnabled(final StreamsConfig config) {
return EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
}

public static boolean eosBetaEnabled(final StreamsConfig config) {
return EXACTLY_ONCE_BETA.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
public static ProcessingMode processingMode(final StreamsConfig config) {
if (EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
return StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
} else if (EXACTLY_ONCE_BETA.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
return StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;
} else {
return StreamThread.ProcessingMode.AT_LEAST_ONCE;
}
}

public static boolean eosEnabled(final StreamsConfig config) {
return eosAlphaEnabled(config) || eosBetaEnabled(config);
final ProcessingMode processingMode = processingMode(config);
return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA ||
processingMode == ProcessingMode.EXACTLY_ONCE_BETA;
}

public StreamThread(final Time time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,29 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Future;

import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;

/**
Expand All @@ -55,28 +62,77 @@ public class StreamsProducer {
private final Logger log;
private final String logPrefix;

private Producer<byte[], byte[]> producer;
private final Map<String, Object> eosBetaProducerConfigs;
private final KafkaClientSupplier clientSupplier;
private final StreamThread.ProcessingMode processingMode;

private Producer<byte[], byte[]> producer;
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;

public StreamsProducer(final Producer<byte[], byte[]> producer,
final StreamThread.ProcessingMode processingMode,
public StreamsProducer(final StreamsConfig config,
final String threadId,
final KafkaClientSupplier clientSupplier,
final TaskId taskId,
final UUID processId,
final LogContext logContext) {
this.producer = Objects.requireNonNull(producer, "producer cannot be null");
this.processingMode = Objects.requireNonNull(processingMode, "processingMode cannot be null");

Objects.requireNonNull(config, "config cannot be null");
Objects.requireNonNull(threadId, "threadId cannot be null");
this.clientSupplier = Objects.requireNonNull(clientSupplier, "clientSupplier cannot be null");
log = Objects.requireNonNull(logContext, "logContext cannot be null").logger(getClass());
logPrefix = logContext.logPrefix().trim();

processingMode = StreamThread.processingMode(config);

final Map<String, Object> producerConfigs;
switch (processingMode) {
case AT_LEAST_ONCE: {
producerConfigs = config.getProducerConfigs(getThreadProducerClientId(threadId));
eosBetaProducerConfigs = null;

break;
}
case EXACTLY_ONCE_ALPHA: {
producerConfigs = config.getProducerConfigs(
getTaskProducerClientId(
threadId,
Objects.requireNonNull(taskId, "taskId cannot be null for exactly-once alpha")
)
);

final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId);

eosBetaProducerConfigs = null;

break;
}
case EXACTLY_ONCE_BETA: {
producerConfigs = config.getProducerConfigs(getThreadProducerClientId(threadId));

final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
producerConfigs.put(
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
applicationId + "-" + Objects.requireNonNull(processId, "processId cannot be null for exactly-once beta"));

eosBetaProducerConfigs = producerConfigs;

break;
}
default:
throw new IllegalArgumentException("Unknown processing mode: " + processingMode);
}

producer = clientSupplier.getProducer(producerConfigs);
}

private String formatException(final String message) {
return message + " [" + logPrefix + "]";
}

boolean eosEnabled() {
return processingMode == EXACTLY_ONCE_ALPHA || processingMode == EXACTLY_ONCE_BETA;
return processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA ||
processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;
}

/**
Expand Down Expand Up @@ -113,12 +169,14 @@ void initTransaction() {
}
}

public void resetProducer(final Producer<byte[], byte[]> producer) {
public void resetProducer() {
if (processingMode != EXACTLY_ONCE_BETA) {
throw new IllegalStateException(formatException("Exactly-once beta is not enabled"));
}

this.producer = Objects.requireNonNull(producer, "producer cannot be null");
producer.close();

producer = clientSupplier.getProducer(eosBetaProducerConfigs);
transactionInitialized = false;
}

Expand Down Expand Up @@ -233,10 +291,19 @@ List<PartitionInfo> partitionsFor(final String topic) throws TimeoutException {
return producer.partitionsFor(topic);
}

Map<MetricName, ? extends Metric> metrics() {
return producer.metrics();
}

void flush() {
producer.flush();
}

void close() {
producer.close();
}

// for testing only
Producer<byte[], byte[]> kafkaProducer() {
return producer;
}
Expand Down
Loading

0 comments on commit ab5e4f5

Please sign in to comment.