diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java index bec77e6532fc..ba52990a635f 100644 --- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java +++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java @@ -34,6 +34,7 @@ * Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}. */ public class WordCountProcessorTest { + @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437 @Test public void test() { final MockProcessorContext context = new MockProcessorContext(); diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java index 98d50123bb2a..6bb8c6c09b8a 100644 --- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java +++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java @@ -33,6 +33,7 @@ * Demonstrate the use of {@link MockProcessorContext} for testing the {@link Transformer} in the {@link WordCountTransformerDemo}. */ public class WordCountTransformerTest { + @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437 @Test public void test() { final MockProcessorContext context = new MockProcessorContext(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index 4f47b12b8aee..570bb6273862 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -65,7 +65,11 @@ public interface StateStore { * * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition + * @deprecated Since 2.7.0. Callers should invoke {@link this#init(StateStoreContext, StateStore)} instead. + * Implementers may choose to implement this method for backward compatibility or to throw an + * informative exception instead. */ + @Deprecated void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root); /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java index 74227af3fa20..4424cdbc2c76 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; @@ -45,12 +46,19 @@ public void flush() { throw new UnsupportedOperationException(ERROR_MESSAGE); } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { throw new UnsupportedOperationException(ERROR_MESSAGE); } + @Override + public void init(final StateStoreContext context, + final StateStore root) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + @Override public void close() { throw new UnsupportedOperationException(ERROR_MESSAGE); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index 41026c4eb528..d07708919abd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -16,11 +16,11 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.List; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; @@ -31,6 +31,8 @@ import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.WrappedStateStore; +import java.util.List; + abstract class AbstractReadWriteDecorator extends WrappedStateStore { static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams"; @@ -38,12 +40,19 @@ private AbstractReadWriteDecorator(final T inner) { super(inner); } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { throw new UnsupportedOperationException(ERROR_MESSAGE); } + @Override + public void init(final StateStoreContext context, + final StateStore root) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + @Override public void close() { throw new UnsupportedOperationException(ERROR_MESSAGE); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java index 78b1ff5ebac4..9c84aaba5963 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; /** @@ -47,9 +48,42 @@ public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context) return (StreamsMetricsImpl) context.metrics(); } + /** + * Should be removed as part of KAFKA-10217 + */ + public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) { + return (StreamsMetricsImpl) context.metrics(); + } + public static String changelogFor(final ProcessorContext context, final String storeName) { return context instanceof InternalProcessorContext ? ((InternalProcessorContext) context).changelogFor(storeName) : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); } + + public static String changelogFor(final StateStoreContext context, final String storeName) { + return context instanceof InternalProcessorContext + ? ((InternalProcessorContext) context).changelogFor(storeName) + : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); + } + + public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) { + if (context instanceof InternalProcessorContext) { + return (InternalProcessorContext) context; + } else { + throw new IllegalArgumentException( + "This component requires internal features of Kafka Streams and must be disabled for unit tests." + ); + } + } + + public static InternalProcessorContext asInternalProcessorContext(final StateStoreContext context) { + if (context instanceof InternalProcessorContext) { + return (InternalProcessorContext) context; + } else { + throw new IllegalArgumentException( + "This component requires internal features of Kafka Streams and must be disabled for unit tests." + ); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index 2e9882d0049e..1014bfadac2e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -225,6 +225,7 @@ public String name() { return name; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index cae38e0d6807..50d7c2518dbb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.state.KeyValueIterator; @@ -34,6 +35,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext; import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll; import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed; @@ -54,23 +56,34 @@ public class CachingKeyValueStore super(underlying); } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - initInternal(context); + initInternal(asInternalProcessorContext(context)); super.init(context, root); // save the stream thread as we only ever want to trigger a flush // when the stream thread is the current thread. streamThread = Thread.currentThread(); } - private void initInternal(final ProcessorContext context) { - this.context = (InternalProcessorContext) context; + @Override + public void init(final StateStoreContext context, + final StateStore root) { + initInternal(asInternalProcessorContext(context)); + super.init(context, root); + // save the stream thread as we only ever want to trigger a flush + // when the stream thread is the current thread. + streamThread = Thread.currentThread(); + } + + private void initInternal(final InternalProcessorContext context) { + this.context = context; this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name()); this.context.registerCacheFlushListener(cacheName, entries -> { for (final ThreadCache.DirtyEntry entry : entries) { - putAndMaybeForward(entry, (InternalProcessorContext) context); + putAndMaybeForward(entry, context); } }); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 4ac43a216c3f..c92123dbe757 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordQueue; @@ -34,6 +35,7 @@ import java.util.NoSuchElementException; import java.util.Objects; +import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext; import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll; import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed; @@ -61,9 +63,16 @@ class CachingSessionStore this.maxObservedTimestamp = RecordQueue.UNKNOWN; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - initInternal((InternalProcessorContext) context); + initInternal(asInternalProcessorContext(context)); + super.init(context, root); + } + + @Override + public void init(final StateStoreContext context, final StateStore root) { + initInternal(asInternalProcessorContext(context)); super.init(context, root); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 20be3a3a33bd..c750f3a6052c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; @@ -37,6 +38,7 @@ import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext; import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll; import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed; @@ -67,14 +69,16 @@ class CachingWindowStore this.maxObservedTimestamp = new AtomicLong(RecordQueue.UNKNOWN); } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - if (!(context instanceof InternalProcessorContext)) { - throw new IllegalArgumentException( - "Caching requires internal features of KafkaStreams and must be disabled for unit tests." - ); - } - initInternal((InternalProcessorContext) context); + initInternal(asInternalProcessorContext(context)); + super.init(context, root); + } + + @Override + public void init(final StateStoreContext context, final StateStore root) { + initInternal(asInternalProcessorContext(context)); super.init(context, root); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index 236f21877ef2..d6526a10053e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -20,12 +20,15 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import java.util.List; +import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext; + public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore, byte[], byte[]> implements KeyValueStore { @@ -36,12 +39,24 @@ public class ChangeLoggingKeyValueBytesStore super(inner); } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { super.init(context, root); - this.context = (InternalProcessorContext) context; + this.context = asInternalProcessorContext(context); + maybeSetEvictionListener(); + } + + @Override + public void init(final StateStoreContext context, + final StateStore root) { + super.init(context, root); + this.context = asInternalProcessorContext(context); + maybeSetEvictionListener(); + } + private void maybeSetEvictionListener() { // if the inner store is an LRU cache, add the eviction listener to log removed record if (wrapped() instanceof MemoryLRUCache) { ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index cc586d3ba1a9..0d2133d9f40e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -20,10 +20,13 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; +import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext; + /** * Simple wrapper around a {@link SessionStore} to support writing * updates to a changelog @@ -38,10 +41,17 @@ class ChangeLoggingSessionBytesStore super(bytesStore); } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { super.init(context, root); - this.context = (InternalProcessorContext) context; + this.context = asInternalProcessorContext(context); + } + + @Override + public void init(final StateStoreContext context, final StateStore root) { + super.init(context, root); + this.context = asInternalProcessorContext(context); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index 8da413cab24b..47f088e7c564 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -20,11 +20,14 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; +import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext; + /** * Simple wrapper around a {@link WindowStore} to support writing * updates to a changelog @@ -43,15 +46,18 @@ class ChangeLoggingWindowBytesStore this.retainDuplicates = retainDuplicates; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - if (!(context instanceof InternalProcessorContext)) { - throw new IllegalArgumentException( - "Change logging requires internal features of KafkaStreams and must be disabled for unit tests." - ); - } - this.context = (InternalProcessorContext) context; + this.context = asInternalProcessorContext(context); + super.init(context, root); + } + + @Override + public void init(final StateStoreContext context, + final StateStore root) { + this.context = asInternalProcessorContext(context); super.init(context, root); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index b02459dc57a6..31041b91c3bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -50,6 +50,7 @@ public String name() { return name; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index e4fda06682c8..2e45b48b7df1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -72,6 +72,7 @@ public String name() { return name; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { this.context = (InternalProcessorContext) context; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index e3d3ba62635a..3099e9ca3a45 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -29,7 +29,9 @@ import org.apache.kafka.streams.kstream.internals.FullChangeSerde; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; @@ -193,11 +195,22 @@ public void setSerdesIfNull(final Serde keySerde, final Serde valueSerde) this.valueSerde = this.valueSerde == null ? FullChangeSerde.wrap(valueSerde) : this.valueSerde; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { + this.context = ProcessorContextUtils.asInternalProcessorContext(context); + init(root); + } + + @Override + public void init(final StateStoreContext context, final StateStore root) { + this.context = ProcessorContextUtils.asInternalProcessorContext(context); + init(root); + } + + private void init(final StateStore root) { taskId = context.taskId().toString(); - this.context = (InternalProcessorContext) context; - streamsMetrics = this.context.metrics(); + streamsMetrics = context.metrics(); threadId = Thread.currentThread().getName(); bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 49322b0973f1..cd50b1596bb5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -85,6 +85,7 @@ public String name() { return name; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { this.context = context; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java index 21fbc45327e7..b6d65049f4d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java @@ -17,10 +17,11 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.Objects; class KeyValueSegment extends RocksDBStore implements Comparable, Segment { @@ -45,10 +46,9 @@ public int compareTo(final KeyValueSegment segment) { } @Override - public void openDB(final ProcessorContext context) { - super.openDB(context); + public void openDB(final Map configs, final File stateDir) { + super.openDB(configs, stateDir); // skip the registering step - internalProcessorContext = context; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java index c8b4b90b2524..a17666e2c8d2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java @@ -48,7 +48,7 @@ public KeyValueSegment getOrCreateSegment(final long segmentId, throw new IllegalStateException("KeyValueSegment already exists. Possible concurrent access."); } - newSegment.openDB(context); + newSegment.openDB(context.appConfigs(), context.stateDir()); return newSegment; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java index fa29974c9833..6bb0950953c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -81,12 +82,18 @@ public String name() { return store.name(); } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { store.init(context, root); } + @Override + public void init(final StateStoreContext context, final StateStore root) { + store.init(context, root); + } + @Override public void flush() { store.flush(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index 32a91cd67129..236fedc8d6c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -73,6 +74,7 @@ public String name() { return this.name; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { @@ -84,6 +86,16 @@ public void init(final ProcessorContext context, final StateStore root) { }); } + @Override + public void init(final StateStoreContext context, final StateStore root) { + // register the store + context.register(root, (key, value) -> { + restoring = true; + put(Bytes.wrap(key), value); + restoring = false; + }); + } + @Override public boolean persistent() { return false; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 31e2eff5a4eb..2c453585961b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -24,6 +24,8 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -65,7 +67,7 @@ public class MeteredKeyValueStore private Sensor rangeSensor; private Sensor flushSensor; private Sensor e2eLatencySensor; - private ProcessorContext context; + private InternalProcessorContext context; private StreamsMetricsImpl streamsMetrics; private final String threadId; private String taskId; @@ -83,14 +85,40 @@ public class MeteredKeyValueStore this.valueSerde = valueSerde; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - this.context = context; + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; taskId = context.taskId().toString(); initStoreSerde(context); streamsMetrics = (StreamsMetricsImpl) context.metrics(); + registerMetrics(); + final Sensor restoreSensor = + StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + + // register and possibly restore the state from the logs + maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); + } + + @Override + public void init(final StateStoreContext context, + final StateStore root) { + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; + taskId = context.taskId().toString(); + initStoreSerde(context); + streamsMetrics = (StreamsMetricsImpl) context.metrics(); + + registerMetrics(); + final Sensor restoreSensor = + StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + + // register and possibly restore the state from the logs + maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); + } + + private void registerMetrics() { putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics); putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(threadId, taskId, metricsScope, name(), streamsMetrics); putAllSensor = StateStoreMetrics.putAllSensor(threadId, taskId, metricsScope, name(), streamsMetrics); @@ -100,15 +128,23 @@ public void init(final ProcessorContext context, flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics); deleteSensor = StateStoreMetrics.deleteSensor(threadId, taskId, metricsScope, name(), streamsMetrics); e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics); - final Sensor restoreSensor = - StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - - // register and possibly restore the state from the logs - maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); } + @Deprecated @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { + final String storeName = name(); + final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); + serdes = new StateSerdes<>( + changelogTopic != null ? + changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? (Serde) context.valueSerde() : valueSerde); + } + + @SuppressWarnings("unchecked") + void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); serdes = new StateSerdes<>( @@ -250,7 +286,9 @@ private List> innerEntries(final List> fr } private void maybeRecordE2ELatency() { - if (e2eLatencySensor.shouldRecord()) { + // Context is null if the provided context isn't an implementation of InternalProcessorContext. + // In that case, we _can't_ get the current timestamp, so we don't record anything. + if (e2eLatencySensor.shouldRecord() && context != null) { final long currentTime = time.milliseconds(); final long e2eLatency = currentTime - context.timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index d8ce02a6d18b..8b9256dc5d91 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -24,6 +24,8 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -51,7 +53,7 @@ public class MeteredSessionStore private Sensor flushSensor; private Sensor removeSensor; private Sensor e2eLatencySensor; - private ProcessorContext context; + private InternalProcessorContext context; private final String threadId; private String taskId; @@ -68,19 +70,16 @@ public class MeteredSessionStore this.time = time; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - this.context = context; + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; initStoreSerde(context); taskId = context.taskId().toString(); streamsMetrics = (StreamsMetricsImpl) context.metrics(); - putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - removeSensor = StateStoreMetrics.removeSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics); + registerMetrics(); final Sensor restoreSensor = StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics); @@ -88,6 +87,30 @@ public void init(final ProcessorContext context, maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); } + @Override + public void init(final StateStoreContext context, + final StateStore root) { + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; + initStoreSerde(context); + taskId = context.taskId().toString(); + streamsMetrics = (StreamsMetricsImpl) context.metrics(); + + registerMetrics(); + final Sensor restoreSensor = + StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + + // register and possibly restore the state from the logs + maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); + } + + private void registerMetrics() { + putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + removeSensor = StateStoreMetrics.removeSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics); + } + @SuppressWarnings("unchecked") private void initStoreSerde(final ProcessorContext context) { final String storeName = name(); @@ -101,6 +124,19 @@ private void initStoreSerde(final ProcessorContext context) { ); } + @SuppressWarnings("unchecked") + private void initStoreSerde(final StateStoreContext context) { + final String storeName = name(); + final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); + serdes = new StateSerdes<>( + changelogTopic != null ? + changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? (Serde) context.valueSerde() : valueSerde + ); + } + @SuppressWarnings("unchecked") @Override public boolean setFlushListener(final CacheFlushListener, V> listener, @@ -255,7 +291,9 @@ private Bytes keyBytes(final K key) { } private void maybeRecordE2ELatency() { - if (e2eLatencySensor.shouldRecord()) { + // Context is null if the provided context isn't an implementation of InternalProcessorContext. + // In that case, we _can't_ get the current timestamp, so we don't record anything. + if (e2eLatencySensor.shouldRecord() && context != null) { final long currentTime = time.milliseconds(); final long e2eLatency = currentTime - context.timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index 022c1f851402..bd9cb921a9a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueStore; @@ -50,6 +51,7 @@ public class MeteredTimestampedKeyValueStore super(inner, metricScope, time, keySerde, valueSerde); } + @Deprecated @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { final String storeName = name(); @@ -62,6 +64,18 @@ void initStoreSerde(final ProcessorContext context) { valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); } + @SuppressWarnings("unchecked") + void initStoreSerde(final StateStoreContext context) { + final String storeName = name(); + final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); + serdes = new StateSerdes<>( + changelogTopic != null ? + changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); + } + public RawAndDeserializedValue getWithBinary(final K key) { try { return maybeMeasureLatency(() -> { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java index f61ebd47a95e..258db8931c44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.StateSerdes; @@ -49,6 +50,7 @@ class MeteredTimestampedWindowStore super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde); } + @Deprecated @SuppressWarnings("unchecked") @Override void initStoreSerde(final ProcessorContext context) { @@ -62,4 +64,18 @@ void initStoreSerde(final ProcessorContext context) { valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde ); } + + @SuppressWarnings("unchecked") + @Override + void initStoreSerde(final StateStoreContext context) { + final String storeName = name(); + final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); + serdes = new StateSerdes<>( + changelogTopic != null ? + changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde + ); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 85c3ec2763cf..d47233b3b225 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -24,6 +24,8 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -50,7 +52,7 @@ public class MeteredWindowStore private Sensor fetchSensor; private Sensor flushSensor; private Sensor e2eLatencySensor; - private ProcessorContext context; + private InternalProcessorContext context; private final String threadId; private String taskId; @@ -69,18 +71,32 @@ public class MeteredWindowStore this.valueSerde = valueSerde; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - this.context = context; + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; initStoreSerde(context); streamsMetrics = (StreamsMetricsImpl) context.metrics(); taskId = context.taskId().toString(); - putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics); - e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics); + registerMetrics(); + final Sensor restoreSensor = + StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + + // register and possibly restore the state from the logs + maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); + } + + @Override + public void init(final StateStoreContext context, + final StateStore root) { + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; + initStoreSerde(context); + streamsMetrics = (StreamsMetricsImpl) context.metrics(); + taskId = context.taskId().toString(); + + registerMetrics(); final Sensor restoreSensor = StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics); @@ -88,6 +104,14 @@ public void init(final ProcessorContext context, maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor); } + private void registerMetrics() { + putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics); + e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics); + } + + @Deprecated @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { final String storeName = name(); @@ -100,6 +124,18 @@ void initStoreSerde(final ProcessorContext context) { valueSerde == null ? (Serde) context.valueSerde() : valueSerde); } + @SuppressWarnings("unchecked") + void initStoreSerde(final StateStoreContext context) { + final String storeName = name(); + final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); + serdes = new StateSerdes<>( + changelogTopic != null ? + changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? (Serde) context.valueSerde() : valueSerde); + } + @SuppressWarnings("unchecked") @Override public boolean setFlushListener(final CacheFlushListener, V> listener, @@ -122,7 +158,7 @@ public boolean setFlushListener(final CacheFlushListener, V> listene @Override public void put(final K key, final V value) { - put(key, value, context.timestamp()); + put(key, value, context != null ? context.timestamp() : 0L); } @Override @@ -264,7 +300,9 @@ private Bytes keyBytes(final K key) { } private void maybeRecordE2ELatency() { - if (e2eLatencySensor.shouldRecord()) { + // Context is null if the provided context isn't an implementation of InternalProcessorContext. + // In that case, we _can't_ get the current timestamp, so we don't record anything. + if (e2eLatencySensor.shouldRecord() && context != null) { final long currentTime = time.milliseconds(); final long e2eLatency = currentTime - context.timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 3442d8353d50..3241d070b216 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -26,7 +26,7 @@ import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.RocksDBConfigSetter; @@ -67,6 +67,7 @@ import java.util.Set; import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG; +import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.getMetricsImpl; /** * A persistent key-value store based on RocksDB. @@ -102,7 +103,6 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS private final RocksDBMetricsRecorder metricsRecorder; - ProcessorContext internalProcessorContext; // visible for testing volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null; @@ -122,7 +122,7 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS } @SuppressWarnings("unchecked") - void openDB(final ProcessorContext context) { + void openDB(final Map configs, final File stateDir) { // initialize the default rocksdb options final DBOptions dbOptions = new DBOptions(); @@ -161,7 +161,6 @@ void openDB(final ProcessorContext context) { fOptions = new FlushOptions(); fOptions.setWaitForFlush(true); - final Map configs = context.appConfigs(); final Class configSetterClass = (Class) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG); @@ -170,7 +169,7 @@ void openDB(final ProcessorContext context) { configSetter.setConfig(name, userSpecifiedOptions, configs); } - dbDir = new File(new File(context.stateDir(), parentDir), name); + dbDir = new File(new File(stateDir, parentDir), name); try { Files.createDirectories(dbDir.getParentFile().toPath()); @@ -232,13 +231,26 @@ void openRocksDB(final DBOptions dbOptions, } } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { // open the DB dir - internalProcessorContext = context; - metricsRecorder.init((StreamsMetricsImpl) context.metrics(), context.taskId()); - openDB(context); + metricsRecorder.init(getMetricsImpl(context), context.taskId()); + openDB(context.appConfigs(), context.stateDir()); + batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this); + + // value getter should always read directly from rocksDB + // since it is only for values that are already flushed + context.register(root, batchingStateRestoreCallback); + } + + @Override + public void init(final StateStoreContext context, + final StateStore root) { + // open the DB dir + metricsRecorder.init(getMetricsImpl(context), context.taskId()); + openDB(context.appConfigs(), context.stateDir()); batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this); // value getter should always read directly from rocksDB diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 53f061511481..b3ba65243fc8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -20,6 +20,8 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -31,7 +33,7 @@ public class RocksDBWindowStore private final boolean retainDuplicates; private final long windowSize; - private ProcessorContext context; + private InternalProcessorContext context; private int seqnum = 0; RocksDBWindowStore(final SegmentedBytesStore bytesStore, @@ -42,16 +44,23 @@ public class RocksDBWindowStore this.windowSize = windowSize; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { - this.context = context; + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; + super.init(context, root); + } + + @Override + public void init(final StateStoreContext context, final StateStore root) { + this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null; super.init(context, root); } @Deprecated @Override public void put(final Bytes key, final byte[] value) { - put(key, value, context.timestamp()); + put(key, value, context != null ? context.timestamp() : 0L); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java index cea10887e18a..444d00510012 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -95,12 +96,18 @@ private InMemoryTimestampedKeyValueStoreMarker(final KeyValueStore, Segment { @@ -45,10 +46,9 @@ public int compareTo(final TimestampedSegment segment) { } @Override - public void openDB(final ProcessorContext context) { - super.openDB(context); + public void openDB(final Map configs, final File stateDir) { + super.openDB(configs, stateDir); // skip the registering step - internalProcessorContext = context; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java index 58f3bbcf9935..7318208f956a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java @@ -48,7 +48,7 @@ public TimestampedSegment getOrCreateSegment(final long segmentId, throw new IllegalStateException("TimestampedSegment already exists. Possible concurrent access."); } - newSegment.openDB(context); + newSegment.openDB(context.appConfigs(), context.stateDir()); return newSegment; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java index 290b0ff17160..075545f9ea79 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.TimestampedWindowStore; @@ -109,12 +110,18 @@ private InMemoryTimestampedWindowStoreMarker(final WindowStore wr this.wrapped = wrapped; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { wrapped.init(context, root); } + @Override + public void init(final StateStoreContext context, final StateStore root) { + wrapped.init(context, root); + } + @Deprecated @Override public void put(final Bytes key, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java index da3259974b9a..586e87dbcaaa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -160,12 +161,18 @@ public String name() { return store.name(); } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { store.init(context, root); } + @Override + public void init(final StateStoreContext context, final StateStore root) { + store.init(context, root); + } + @Override public void flush() { store.flush(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index 208bc060da30..e8244f7a83d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -19,6 +19,7 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.TimestampedBytesStore; /** @@ -42,12 +43,18 @@ public WrappedStateStore(final S wrapped) { this.wrapped = wrapped; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { wrapped.init(context, root); } + @Override + public void init(final StateStoreContext context, final StateStore root) { + wrapped.init(context, root); + } + @SuppressWarnings("unchecked") @Override public boolean setFlushListener(final CacheFlushListener listener, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index a83c92bf3895..eba6f0611bcc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -19,8 +19,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.state.KeyValueStore; @@ -147,7 +147,7 @@ public void shouldNotAllowToSchedulePunctuations() { public void shouldNotAllowInitForKeyValueStore() { final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME); try { - store.init((ProcessorContext) null, null); + store.init((StateStoreContext) null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -156,7 +156,7 @@ public void shouldNotAllowInitForKeyValueStore() { public void shouldNotAllowInitForTimestampedKeyValueStore() { final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME); try { - store.init((ProcessorContext) null, null); + store.init((StateStoreContext) null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -165,7 +165,7 @@ public void shouldNotAllowInitForTimestampedKeyValueStore() { public void shouldNotAllowInitForWindowStore() { final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME); try { - store.init((ProcessorContext) null, null); + store.init((StateStoreContext) null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -174,7 +174,7 @@ public void shouldNotAllowInitForWindowStore() { public void shouldNotAllowInitForTimestampedWindowStore() { final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME); try { - store.init((ProcessorContext) null, null); + store.init((StateStoreContext) null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -183,7 +183,7 @@ public void shouldNotAllowInitForTimestampedWindowStore() { public void shouldNotAllowInitForSessionStore() { final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME); try { - store.init((ProcessorContext) null, null); + store.init((StateStoreContext) null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index ab88efae4c4d..50466e3a236f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.Task.TaskType; @@ -767,7 +768,7 @@ private void verifyStoreCannotBeInitializedOrClosed(final StateStore store) { assertTrue(store.persistent()); assertTrue(store.isOpen()); - checkThrowsUnsupportedOperation(() -> store.init((ProcessorContext) null, null), "init()"); + checkThrowsUnsupportedOperation(() -> store.init((StateStoreContext) null, null), "init()"); checkThrowsUnsupportedOperation(store::close, "close()"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 7af3f3945acf..76e4c0fbc6bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; @@ -331,7 +332,7 @@ public void addEntryToRestoreLog(final K key, final V value) { * @return the processing context; never null * @see #addEntryToRestoreLog(Object, Object) */ - public ProcessorContext context() { + public StateStoreContext context() { return context; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java index 39e5d03474a0..0de4890d0530 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java @@ -54,6 +54,7 @@ public String name() { return ""; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 4c9c0440eda1..3284b177f776 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -51,7 +51,7 @@ public abstract class AbstractKeyValueStoreTest { - protected abstract KeyValueStore createKeyValueStore(final ProcessorContext context); + protected abstract KeyValueStore createKeyValueStore(final StateStoreContext context); protected InternalMockProcessorContext context; protected KeyValueStore store; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 98f0ba654bf0..89e2b0e96399 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -88,7 +88,7 @@ public void after() { @SuppressWarnings("unchecked") @Override - protected KeyValueStore createKeyValueStore(final ProcessorContext context) { + protected KeyValueStore createKeyValueStore(final StateStoreContext context) { final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("cache-store"), (Serde) context.keySerde(), @@ -100,6 +100,31 @@ protected KeyValueStore createKeyValueStore(final ProcessorContext return store; } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + final KeyValueStore inner = EasyMock.mock(InMemoryKeyValueStore.class); + final CachingKeyValueStore outer = new CachingKeyValueStore(inner); + EasyMock.expect(inner.name()).andStubReturn("store"); + inner.init((ProcessorContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + outer.init((ProcessorContext) context, outer); + EasyMock.verify(inner); + } + + @Test + public void shouldDelegateInit() { + final KeyValueStore inner = EasyMock.mock(InMemoryKeyValueStore.class); + final CachingKeyValueStore outer = new CachingKeyValueStore(inner); + EasyMock.expect(inner.name()).andStubReturn("store"); + inner.init((StateStoreContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + outer.init((StateStoreContext) context, outer); + EasyMock.verify(inner); + } + @Test public void shouldSetFlushListener() { assertTrue(store.setFlushListener(null, true)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index f36629d1aca5..05e97a2a4b1a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -89,7 +90,7 @@ public class CachingSessionStoreTest { public void before() { cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); - final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null)); cachingStore.init((StateStoreContext) context, cachingStore); } @@ -99,6 +100,31 @@ public void after() { cachingStore.close(); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + final SessionStore inner = EasyMock.mock(InMemorySessionStore.class); + final CachingSessionStore outer = new CachingSessionStore(inner, SEGMENT_INTERVAL); + EasyMock.expect(inner.name()).andStubReturn("store"); + inner.init((ProcessorContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + outer.init((ProcessorContext) context, outer); + EasyMock.verify(inner); + } + + @Test + public void shouldDelegateInit() { + final SessionStore inner = EasyMock.mock(InMemorySessionStore.class); + final CachingSessionStore outer = new CachingSessionStore(inner, SEGMENT_INTERVAL); + EasyMock.expect(inner.name()).andStubReturn("store"); + inner.init((StateStoreContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + outer.init((StateStoreContext) context, outer); + EasyMock.verify(inner); + } + @Test public void shouldPutFetchFromCache() { cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 42b750b932c3..2a04c482d162 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Consumed; @@ -42,7 +43,6 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; @@ -113,6 +113,31 @@ public void closeStore() { cachingStore.close(); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + final WindowStore inner = EasyMock.mock(WindowStore.class); + final CachingWindowStore outer = new CachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); + EasyMock.expect(inner.name()).andStubReturn("store"); + inner.init((ProcessorContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + outer.init((ProcessorContext) context, outer); + EasyMock.verify(inner); + } + + @Test + public void shouldDelegateInit() { + final WindowStore inner = EasyMock.mock(WindowStore.class); + final CachingWindowStore outer = new CachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); + EasyMock.expect(inner.name()).andStubReturn("store"); + inner.init((StateStoreContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + outer.init((StateStoreContext) context, outer); + EasyMock.verify(inner); + } + @Test public void shouldNotReturnDuplicatesInRanges() { final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 910658051fc8..c3808b24887b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -21,11 +21,15 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; import org.apache.kafka.test.TestUtils; +import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -49,14 +53,19 @@ public class ChangeLoggingKeyValueBytesStoreTest { @Before public void before() { - final InternalMockProcessorContext context = new InternalMockProcessorContext( + final InternalMockProcessorContext context = mockContext(); + context.setTime(0); + store.init((StateStoreContext) context, store); + } + + private InternalMockProcessorContext mockContext() { + return new InternalMockProcessorContext( TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), collector, - new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); - context.setTime(0); - store.init((StateStoreContext) context, store); + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())) + ); } @After @@ -64,6 +73,31 @@ public void after() { store.close(); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + final InternalMockProcessorContext context = mockContext(); + final KeyValueStore innerMock = EasyMock.mock(InMemoryKeyValueStore.class); + final StateStore outer = new ChangeLoggingKeyValueBytesStore(innerMock); + innerMock.init((ProcessorContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(innerMock); + outer.init((ProcessorContext) context, outer); + EasyMock.verify(innerMock); + } + + @Test + public void shouldDelegateInit() { + final InternalMockProcessorContext context = mockContext(); + final KeyValueStore innerMock = EasyMock.mock(InMemoryKeyValueStore.class); + final StateStore outer = new ChangeLoggingKeyValueBytesStore(innerMock); + innerMock.init((StateStoreContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(innerMock); + outer.init((StateStoreContext) context, outer); + EasyMock.verify(innerMock); + } + @Test public void shouldWriteKeyValueBytesToInnerStoreOnPut() { store.put(hi, there); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index 426a33484fb5..c55c4e159c81 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -57,13 +57,32 @@ public void setUp() { private void init() { EasyMock.expect(context.taskId()).andReturn(taskId); EasyMock.expect(context.recordCollector()).andReturn(collector); - inner.init((ProcessorContext) context, store); + inner.init((StateStoreContext) context, store); EasyMock.expectLastCall(); EasyMock.replay(inner, context); store.init((StateStoreContext) context, store); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + inner.init((ProcessorContext) context, store); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + store.init((ProcessorContext) context, store); + EasyMock.verify(inner); + } + + @Test + public void shouldDelegateInit() { + inner.init((StateStoreContext) context, store); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + store.init((StateStoreContext) context, store); + EasyMock.verify(inner); + } + @Test public void shouldLogPuts() { inner.put(key1, value1); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java index e05b1714d89f..8295f7d2e4af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java @@ -21,12 +21,16 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; import org.apache.kafka.test.TestUtils; +import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -54,14 +58,19 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest { @Before public void before() { - final InternalMockProcessorContext context = new InternalMockProcessorContext( + final InternalMockProcessorContext context = mockContext(); + context.setTime(0); + store.init((StateStoreContext) context, store); + } + + private InternalMockProcessorContext mockContext() { + return new InternalMockProcessorContext( TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), collector, - new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); - context.setTime(0); - store.init((StateStoreContext) context, store); + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())) + ); } @After @@ -69,6 +78,31 @@ public void after() { store.close(); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + final InternalMockProcessorContext context = mockContext(); + final KeyValueStore inner = EasyMock.mock(InMemoryKeyValueStore.class); + final StateStore outer = new ChangeLoggingTimestampedKeyValueBytesStore(inner); + inner.init((ProcessorContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + outer.init((ProcessorContext) context, outer); + EasyMock.verify(inner); + } + + @Test + public void shouldDelegateInit() { + final InternalMockProcessorContext context = mockContext(); + final KeyValueStore inner = EasyMock.mock(InMemoryKeyValueStore.class); + final StateStore outer = new ChangeLoggingTimestampedKeyValueBytesStore(inner); + inner.init((StateStoreContext) context, outer); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + outer.init((StateStoreContext) context, outer); + EasyMock.verify(inner); + } + @Test public void shouldWriteKeyValueBytesToInnerStoreOnPut() { store.put(hi, rawThere); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java index 9de2207c77e9..6608739a8f41 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java @@ -59,13 +59,32 @@ public void setUp() { private void init() { EasyMock.expect(context.taskId()).andReturn(taskId); EasyMock.expect(context.recordCollector()).andReturn(collector); - inner.init((ProcessorContext) context, store); + inner.init((StateStoreContext) context, store); EasyMock.expectLastCall(); EasyMock.replay(inner, context); store.init((StateStoreContext) context, store); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + inner.init((ProcessorContext) context, store); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + store.init((ProcessorContext) context, store); + EasyMock.verify(inner); + } + + @Test + public void shouldDelegateInit() { + inner.init((StateStoreContext) context, store); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + store.init((StateStoreContext) context, store); + EasyMock.verify(inner); + } + @Test @SuppressWarnings("deprecation") public void shouldLogPuts() { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java index c877ac64f5ab..c2c31e072988 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java @@ -57,13 +57,32 @@ public void setUp() { private void init() { EasyMock.expect(context.taskId()).andReturn(taskId); EasyMock.expect(context.recordCollector()).andReturn(collector); - inner.init((ProcessorContext) context, store); + inner.init((StateStoreContext) context, store); EasyMock.expectLastCall(); EasyMock.replay(inner, context); store.init((StateStoreContext) context, store); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + inner.init((ProcessorContext) context, store); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + store.init((ProcessorContext) context, store); + EasyMock.verify(inner); + } + + @Test + public void shouldDelegateInit() { + inner.init((StateStoreContext) context, store); + EasyMock.expectLastCall(); + EasyMock.replay(inner); + store.init((StateStoreContext) context, store); + EasyMock.verify(inner); + } + @Test @SuppressWarnings("deprecation") public void shouldLogPuts() { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java index 7c0d16cb355e..f54fca113a31 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -28,7 +28,7 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest { @SuppressWarnings("unchecked") @Override - protected KeyValueStore createKeyValueStore(final ProcessorContext context) { + protected KeyValueStore createKeyValueStore(final StateStoreContext context) { final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore("my-store"), (Serde) context.keySerde(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java index 6dc90eac8d41..973093623c1a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -31,7 +31,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { @SuppressWarnings("unchecked") @Override - protected KeyValueStore createKeyValueStore(final ProcessorContext context) { + protected KeyValueStore createKeyValueStore(final StateStoreContext context) { final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore("my-store"), (Serde) context.keySerde(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java index 2a86cddac663..a044eda442bb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -36,7 +36,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest { @SuppressWarnings("unchecked") @Override - protected KeyValueStore createKeyValueStore(final ProcessorContext context) { + protected KeyValueStore createKeyValueStore(final StateStoreContext context) { final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( Stores.lruMap("my-store", 10), (Serde) context.keySerde(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java index c062e618b433..71841bdebd2b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java @@ -67,7 +67,7 @@ public void shouldDeleteStateDirectoryOnDestroy() throws Exception { expect(mockContext.stateDir()).andReturn(directory); replay(mockContext); - segment.openDB(mockContext); + segment.openDB(mockContext.appConfigs(), mockContext.stateDir()); assertTrue(new File(directoryPath, "window").exists()); assertTrue(new File(directoryPath + File.separator + "window", "segment").exists()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 83fffef12925..68faa15007b5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -158,6 +159,43 @@ private void init() { metered.init((StateStoreContext) context, metered); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + final KeyValueStore inner = mock(KeyValueStore.class); + final MeteredKeyValueStore outer = new MeteredKeyValueStore<>( + inner, + STORE_TYPE, + new MockTime(), + Serdes.String(), + Serdes.String() + ); + expect(inner.name()).andStubReturn("store"); + inner.init((ProcessorContext) context, outer); + expectLastCall(); + replay(inner, context); + outer.init((ProcessorContext) context, outer); + verify(inner); + } + + @Test + public void shouldDelegateInit() { + final KeyValueStore inner = mock(KeyValueStore.class); + final MeteredKeyValueStore outer = new MeteredKeyValueStore<>( + inner, + STORE_TYPE, + new MockTime(), + Serdes.String(), + Serdes.String() + ); + expect(inner.name()).andStubReturn("store"); + inner.init((StateStoreContext) context, outer); + expectLastCall(); + replay(inner, context); + outer.init((StateStoreContext) context, outer); + verify(inner); + } + @Test public void shouldPassChangelogTopicNameToStateStoreSerde() { doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index 0ff822ebb697..a77dd077d72a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -162,6 +163,43 @@ private void init() { store.init((StateStoreContext) context, store); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + final SessionStore inner = mock(SessionStore.class); + final MeteredSessionStore outer = new MeteredSessionStore<>( + inner, + STORE_TYPE, + Serdes.String(), + Serdes.String(), + new MockTime() + ); + expect(inner.name()).andStubReturn("store"); + inner.init((ProcessorContext) context, outer); + expectLastCall(); + replay(inner, context); + outer.init((ProcessorContext) context, outer); + verify(inner); + } + + @Test + public void shouldDelegateInit() { + final SessionStore inner = mock(SessionStore.class); + final MeteredSessionStore outer = new MeteredSessionStore<>( + inner, + STORE_TYPE, + Serdes.String(), + Serdes.String(), + new MockTime() + ); + expect(inner.name()).andStubReturn("store"); + inner.init((StateStoreContext) context, outer); + expectLastCall(); + replay(inner, context); + outer.init((StateStoreContext) context, outer); + verify(inner); + } + @Test public void shouldPassChangelogTopicNameToStateStoreSerde() { doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 3d28266e20ad..405ab4fd6245 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -162,6 +163,43 @@ private void init() { metered.init((StateStoreContext) context, metered); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + final KeyValueStore inner = mock(InMemoryKeyValueStore.class); + final MeteredTimestampedKeyValueStore outer = new MeteredTimestampedKeyValueStore<>( + inner, + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueAndTimestampSerde<>(Serdes.String()) + ); + expect(inner.name()).andStubReturn("store"); + inner.init((ProcessorContext) context, outer); + expectLastCall(); + replay(inner, context); + outer.init((ProcessorContext) context, outer); + verify(inner); + } + + @Test + public void shouldDelegateInit() { + final KeyValueStore inner = mock(InMemoryKeyValueStore.class); + final MeteredTimestampedKeyValueStore outer = new MeteredTimestampedKeyValueStore<>( + inner, + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueAndTimestampSerde<>(Serdes.String()) + ); + expect(inner.name()).andStubReturn("store"); + inner.init((StateStoreContext) context, outer); + expectLastCall(); + replay(inner, context); + outer.init((StateStoreContext) context, outer); + verify(inner); + } + @Test public void shouldPassChangelogTopicNameToStateStoreSerde() { doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java index 9a9d763d144e..315a1aaa25a4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -42,6 +43,7 @@ import org.junit.Test; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.niceMock; import static org.easymock.EasyMock.replay; @@ -95,6 +97,45 @@ public void setUp() { ); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + final WindowStore inner = mock(WindowStore.class); + final MeteredTimestampedWindowStore outer = new MeteredTimestampedWindowStore<>( + inner, + WINDOW_SIZE_MS, // any size + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueAndTimestampSerde<>(new SerdeThatDoesntHandleNull()) + ); + expect(inner.name()).andStubReturn("store"); + inner.init((ProcessorContext) context, outer); + expectLastCall(); + replay(inner); + outer.init((ProcessorContext) context, outer); + verify(inner); + } + + @Test + public void shouldDelegateInit() { + final WindowStore inner = mock(WindowStore.class); + final MeteredTimestampedWindowStore outer = new MeteredTimestampedWindowStore<>( + inner, + WINDOW_SIZE_MS, // any size + STORE_TYPE, + new MockTime(), + Serdes.String(), + new ValueAndTimestampSerde<>(new SerdeThatDoesntHandleNull()) + ); + expect(inner.name()).andStubReturn("store"); + inner.init((StateStoreContext) context, outer); + expectLastCall(); + replay(inner); + outer.init((StateStoreContext) context, outer); + verify(inner); + } + @Test public void shouldPassChangelogTopicNameToStateStoreSerde() { context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 730169476f34..18c0fa34f461 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -150,6 +151,45 @@ public void setUp() { ); } + @SuppressWarnings("deprecation") + @Test + public void shouldDelegateDeprecatedInit() { + final WindowStore inner = mock(WindowStore.class); + final MeteredWindowStore outer = new MeteredWindowStore<>( + inner, + WINDOW_SIZE_MS, // any size + STORE_TYPE, + new MockTime(), + Serdes.String(), + new SerdeThatDoesntHandleNull() + ); + expect(inner.name()).andStubReturn("store"); + inner.init((ProcessorContext) context, outer); + expectLastCall(); + replay(inner); + outer.init((ProcessorContext) context, outer); + verify(inner); + } + + @Test + public void shouldDelegateInit() { + final WindowStore inner = mock(WindowStore.class); + final MeteredWindowStore outer = new MeteredWindowStore<>( + inner, + WINDOW_SIZE_MS, // any size + STORE_TYPE, + new MockTime(), + Serdes.String(), + new SerdeThatDoesntHandleNull() + ); + expect(inner.name()).andStubReturn("store"); + inner.init((StateStoreContext) context, outer); + expectLastCall(); + replay(inner); + outer.init((StateStoreContext) context, outer); + verify(inner); + } + @Test public void shouldPassChangelogTopicNameToStateStoreSerde() { context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index 55f91f113754..96c3ae05d9b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -382,6 +382,7 @@ public String name() { return null; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index aa6a4b82af89..0eefed50fc8a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.errors.InvalidStateStoreException; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -34,7 +34,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { @SuppressWarnings("unchecked") @Override - protected KeyValueStore createKeyValueStore(final ProcessorContext context) { + protected KeyValueStore createKeyValueStore(final StateStoreContext context) { final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("my-store"), (Serde) context.keySerde(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 18432884ea24..4d87ee4de999 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -158,7 +158,7 @@ public void shouldAddValueProvidersWithoutStatisticsToInjectedMetricsRecorderWhe metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull()); replay(metricsRecorder); - rocksDBStore.openDB(context); + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); verify(metricsRecorder); reset(metricsRecorder); @@ -172,7 +172,7 @@ public void shouldAddValueProvidersWithStatisticsToInjectedMetricsRecorderWhenRe metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), notNull()); replay(metricsRecorder); - rocksDBStore.openDB(context); + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); verify(metricsRecorder); reset(metricsRecorder); @@ -183,7 +183,7 @@ public void shouldRemoveValueProvidersFromInjectedMetricsRecorderOnClose() { rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder(); try { context = getProcessorContext(RecordingLevel.DEBUG); - rocksDBStore.openDB(context); + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); reset(metricsRecorder); metricsRecorder.removeValueProviders(DB_NAME); replay(metricsRecorder); @@ -213,7 +213,7 @@ public void shouldNotSetStatisticsInValueProvidersWhenUserProvidesStatistics() { metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull()); replay(metricsRecorder); - rocksDBStore.openDB(context); + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); verify(metricsRecorder); reset(metricsRecorder); } @@ -243,7 +243,7 @@ public void shouldThrowWhenUserProvidesNewBlockBasedTableFormatConfig() { "the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to " + "the RocksDB options.", ProcessorStateException.class, - () -> rocksDBStore.openDB(context) + () -> rocksDBStore.openDB(context.appConfigs(), context.stateDir()) ); } @@ -269,7 +269,7 @@ public void shouldNotSetCacheInValueProvidersWhenUserProvidesPlainTableFormatCon metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), isNull(), notNull()); replay(metricsRecorder); - rocksDBStore.openDB(context); + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); verify(metricsRecorder); reset(metricsRecorder); } @@ -323,7 +323,7 @@ public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() { assertTrue(tmpDir.setReadOnly()); - assertThrows(ProcessorStateException.class, () -> rocksDBStore.openDB(tmpContext)); + assertThrows(ProcessorStateException.class, () -> rocksDBStore.openDB(tmpContext.appConfigs(), tmpContext.stateDir())); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java index 0e04851f093b..332181020feb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java @@ -67,7 +67,7 @@ public void shouldDeleteStateDirectoryOnDestroy() throws Exception { expect(mockContext.stateDir()).andReturn(directory); replay(mockContext); - segment.openDB(mockContext); + segment.openDB(mockContext.appConfigs(), mockContext.stateDir()); assertTrue(new File(directoryPath, "window").exists()); assertTrue(new File(directoryPath + File.separator + "window", "segment").exists()); diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java index 649dc5b37729..72e6c266fea8 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java @@ -58,6 +58,7 @@ public String name() { return this.name; } + @Deprecated @Override @SuppressWarnings("unchecked") /* This is a "dummy" store used for testing; diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java index b1b75a16324b..c77cbacb1db2 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java @@ -59,6 +59,7 @@ public String name() { return this.name; } + @Deprecated @Override @SuppressWarnings("unchecked") /* This is a "dummy" store used for testing; diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java index 7cb376f370c1..d9e4acfc081b 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java @@ -54,6 +54,7 @@ public String name() { return name; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index dd78c529cc1e..c6b5eeec4816 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -69,6 +69,7 @@ public String name() { return name; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { if (rocksdbStore) { diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index 4f6d5debb0ad..a2924fc6568f 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -92,6 +92,7 @@ public String name() { return ""; } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java index 749c41f01182..94b5c8eaab95 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -35,12 +36,18 @@ public KeyValueStoreFacade(final TimestampedKeyValueStore inner) { super(inner); } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { inner.init(context, root); } + @Override + public void init(final StateStoreContext context, final StateStore root) { + inner.init(context, root); + } + @Override public void put(final K key, final V value) { diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java index 69450599c81a..342817ea7356 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -35,12 +36,18 @@ public WindowStoreFacade(final TimestampedWindowStore store) { super(store); } + @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { inner.init(context, root); } + @Override + public void init(final StateStoreContext context, final StateStore root) { + inner.init(context, root); + } + @Deprecated @Override public void put(final K key, diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index fe901ed981b2..6e2f4ed47b08 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -231,8 +231,7 @@ public void process(final String key, final Long value) { assertFalse(context.committed()); } - - @SuppressWarnings("unchecked") + @SuppressWarnings({"deprecation", "unchecked"}) // TODO deprecation will be fixed in KAFKA-10437 @Test public void shouldStoreAndReturnStateStores() { final AbstractProcessor processor = new AbstractProcessor() { diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java index b7814c7b8708..4d7a2773ca19 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java @@ -53,6 +53,7 @@ public void setup() { keyValueStoreFacade = new KeyValueStoreFacade<>(mockedKeyValueTimestampStore); } + @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437 @Test public void shouldForwardInit() { final ProcessorContext context = mock(ProcessorContext.class); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java index 3347ddd775d6..6a2c6bd35f8d 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java @@ -47,6 +47,7 @@ public void setup() { windowStoreFacade = new WindowStoreFacade<>(mockedWindowTimestampStore); } + @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437 @Test public void shouldForwardInit() { final ProcessorContext context = mock(ProcessorContext.class); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java index 4995a77258f0..00a9f8fdcb86 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java @@ -40,6 +40,7 @@ import static org.junit.Assert.assertThrows; public class WindowedWordCountProcessorTest { + @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437 @Test public void shouldWorkWithInMemoryStore() { final MockProcessorContext context = new MockProcessorContext(); @@ -86,6 +87,7 @@ public void shouldWorkWithInMemoryStore() { assertThat(capturedForwards.hasNext(), is(false)); } + @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437 @Test public void shouldWorkWithPersistentStore() throws IOException { final Properties properties = new Properties(); @@ -146,6 +148,7 @@ public void shouldWorkWithPersistentStore() throws IOException { } } + @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437 @Test public void shouldFailWithLogging() { final MockProcessorContext context = new MockProcessorContext(); @@ -164,6 +167,7 @@ public void shouldFailWithLogging() { assertThrows(IllegalArgumentException.class, () -> store.init(context, store)); } + @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437 @Test public void shouldFailWithCaching() { final MockProcessorContext context = new MockProcessorContext();