Skip to content

Commit

Permalink
KAFKA-8147: Add changelog topic configuration to KTable suppress (#8029)
Browse files Browse the repository at this point in the history
Implements: KIP-446

Reviewers: John Roesler <[email protected]>
  • Loading branch information
highluck authored Feb 25, 2020
1 parent 9639ff4 commit 9064026
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;

import java.time.Duration;
import java.util.Map;

public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {

Expand Down Expand Up @@ -118,6 +119,25 @@ static StrictBufferConfig unbounded() {
* duplicate results downstream, but does not promise to eliminate them.
*/
EagerBufferConfig emitEarlyWhenFull();

/**
* Disable the changelog for this suppression's internal buffer.
* This will turn off fault-tolerance for the suppression, and will result in data loss in the event of a rebalance.
* By default the changelog is enabled.
* @return this
*/
BC withLoggingDisabled();

/**
* Indicates that a changelog topic should be created containing the currently suppressed
* records. Due to the short-lived nature of records in this topic it is likely more
* compactable than changelog topics for KTables.
*
* @param config Configs that should be applied to the changelog. Note: Any unrecognized
* configs will be ignored.
* @return this
*/
BC withLoggingEnabled(final Map<String, String> config);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -529,10 +530,27 @@ public KTable<K, V> suppress(final Suppressed<? super K> suppressed) {
this
);

final StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> storeBuilder;

if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
final Map<String, String> topicConfig = suppressedInternal.bufferConfig().getLogConfig();
storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(
storeName,
keySerde,
valSerde)
.withLoggingEnabled(topicConfig);
} else {
storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(
storeName,
keySerde,
valSerde)
.withLoggingDisabled();
}

final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, valSerde)
storeBuilder
);

builder.addGraphNode(streamsGraphNode, node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import org.apache.kafka.streams.kstream.Suppressed;

import java.util.Map;

import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;

abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC>> implements Suppressed.BufferConfig<BC> {
public abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC>> implements Suppressed.BufferConfig<BC> {
public abstract long maxRecords();

public abstract long maxBytes();
Expand All @@ -46,4 +48,8 @@ public Suppressed.StrictBufferConfig shutDownWhenFull() {
public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
return new EagerBufferConfigImpl(maxRecords(), maxBytes());
}

public abstract boolean isLoggingEnabled();

public abstract Map<String, String> getLogConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,38 @@

import org.apache.kafka.streams.kstream.Suppressed;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;

public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.EagerBufferConfig> implements Suppressed.EagerBufferConfig {

private final long maxRecords;
private final long maxBytes;
private final Map<String, String> logConfig;

public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.logConfig = Collections.emptyMap();
}

private EagerBufferConfigImpl(final long maxRecords,
final long maxBytes,
final Map<String, String> logConfig) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.logConfig = logConfig;
}

@Override
public Suppressed.EagerBufferConfig withMaxRecords(final long recordLimit) {
return new EagerBufferConfigImpl(recordLimit, maxBytes);
return new EagerBufferConfigImpl(recordLimit, maxBytes, logConfig);
}

@Override
public Suppressed.EagerBufferConfig withMaxBytes(final long byteLimit) {
return new EagerBufferConfigImpl(maxRecords, byteLimit);
return new EagerBufferConfigImpl(maxRecords, byteLimit, logConfig);
}

@Override
Expand All @@ -55,6 +67,26 @@ public BufferFullStrategy bufferFullStrategy() {
return BufferFullStrategy.EMIT;
}

@Override
public Suppressed.EagerBufferConfig withLoggingDisabled() {
return new EagerBufferConfigImpl(maxRecords, maxBytes, null);
}

@Override
public Suppressed.EagerBufferConfig withLoggingEnabled(final Map<String, String> config) {
return new EagerBufferConfigImpl(maxRecords, maxBytes, config);
}

@Override
public boolean isLoggingEnabled() {
return logConfig != null;
}

@Override
public Map<String, String> getLogConfig() {
return isLoggingEnabled() ? logConfig : Collections.emptyMap();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.streams.kstream.Suppressed;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;

import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
Expand All @@ -27,19 +29,32 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
private final long maxRecords;
private final long maxBytes;
private final BufferFullStrategy bufferFullStrategy;
private final Map<String, String> logConfig;

public StrictBufferConfigImpl(final long maxRecords,
final long maxBytes,
final BufferFullStrategy bufferFullStrategy,
final Map<String, String> logConfig) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.bufferFullStrategy = bufferFullStrategy;
this.logConfig = logConfig;
}

public StrictBufferConfigImpl(final long maxRecords,
final long maxBytes,
final BufferFullStrategy bufferFullStrategy) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.bufferFullStrategy = bufferFullStrategy;
this.logConfig = Collections.emptyMap();
}

public StrictBufferConfigImpl() {
this.maxRecords = Long.MAX_VALUE;
this.maxBytes = Long.MAX_VALUE;
this.bufferFullStrategy = SHUT_DOWN;
this.logConfig = Collections.emptyMap();
}

@Override
Expand Down Expand Up @@ -67,6 +82,26 @@ public BufferFullStrategy bufferFullStrategy() {
return bufferFullStrategy;
}

@Override
public Suppressed.StrictBufferConfig withLoggingDisabled() {
return new StrictBufferConfigImpl(maxRecords, maxBytes, bufferFullStrategy, null);
}

@Override
public Suppressed.StrictBufferConfig withLoggingEnabled(final Map<String, String> config) {
return new StrictBufferConfigImpl(maxRecords, maxBytes, bufferFullStrategy, config);
}

@Override
public boolean isLoggingEnabled() {
return logConfig != null;
}

@Override
public Map<String, String> getLogConfig() {
return isLoggingEnabled() ? logConfig : Collections.emptyMap();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public String name() {
return name;
}

BufferConfigInternal bufferConfig() {
@SuppressWarnings("unchecked")
public <BC extends Suppressed.BufferConfig<BC>> BufferConfigInternal<BC> bufferConfig() {
return bufferConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public static class Builder<K, V> implements StoreBuilder<InMemoryTimeOrderedKey
private final Serde<K> keySerde;
private final Serde<V> valSerde;
private boolean loggingEnabled = true;
private Map<String, String> logConfig = new HashMap<>();

public Builder(final String storeName, final Serde<K> keySerde, final Serde<V> valSerde) {
this.storeName = storeName;
Expand Down Expand Up @@ -127,7 +128,8 @@ public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withCachingDisabled

@Override
public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withLoggingEnabled(final Map<String, String> config) {
throw new UnsupportedOperationException();
logConfig = config;
return this;
}

@Override
Expand All @@ -143,7 +145,7 @@ public InMemoryTimeOrderedKeyValueBuffer<K, V> build() {

@Override
public Map<String, String> logConfig() {
return Collections.emptyMap();
return loggingEnabled() ? Collections.unmodifiableMap(logConfig) : Collections.emptyMap();
}

@Override
Expand Down
Loading

0 comments on commit 9064026

Please sign in to comment.