Skip to content

Commit

Permalink
KAFKA-10562: Properly invoke new StateStoreContext init (apache#9388)
Browse files Browse the repository at this point in the history
* all wrapping stores should pass StateStoreContext init through to the same
  method on the wrapped store and not translate it to ProcessorContext init
* base-level stores should handle StateStoreContext init so that callers passing
  a non-InternalProcessorContext implementation will be able to initialize the store
* extra tests are added to verify the desired behavior

Reviewers: Guozhang Wang <[email protected]>
  • Loading branch information
vvcephei authored and javierfreire committed Oct 8, 2020
1 parent 413830c commit 52a9ebe
Show file tree
Hide file tree
Showing 71 changed files with 877 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}.
*/
public class WordCountProcessorTest {
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
@Test
public void test() {
final MockProcessorContext context = new MockProcessorContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* Demonstrate the use of {@link MockProcessorContext} for testing the {@link Transformer} in the {@link WordCountTransformerDemo}.
*/
public class WordCountTransformerTest {
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
@Test
public void test() {
final MockProcessorContext context = new MockProcessorContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ public interface StateStore {
*
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
* @deprecated Since 2.7.0. Callers should invoke {@link this#init(StateStoreContext, StateStore)} instead.
* Implementers may choose to implement this method for backward compatibility or to throw an
* informative exception instead.
*/
@Deprecated
void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
Expand All @@ -45,12 +46,19 @@ public void flush() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void init(final StateStoreContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void close() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.List;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
Expand All @@ -31,19 +31,28 @@
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

import java.util.List;

abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {
static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";

private AbstractReadWriteDecorator(final T inner) {
super(inner);
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void init(final StateStoreContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void close() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

/**
Expand Down Expand Up @@ -47,9 +48,42 @@ public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context)
return (StreamsMetricsImpl) context.metrics();
}

/**
* Should be removed as part of KAFKA-10217
*/
public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) {
return (StreamsMetricsImpl) context.metrics();
}

public static String changelogFor(final ProcessorContext context, final String storeName) {
return context instanceof InternalProcessorContext
? ((InternalProcessorContext) context).changelogFor(storeName)
: ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
}

public static String changelogFor(final StateStoreContext context, final String storeName) {
return context instanceof InternalProcessorContext
? ((InternalProcessorContext) context).changelogFor(storeName)
: ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
}

public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams and must be disabled for unit tests."
);
}
}

public static InternalProcessorContext asInternalProcessorContext(final StateStoreContext context) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams and must be disabled for unit tests."
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public String name() {
return name;
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;

Expand All @@ -54,23 +56,34 @@ public class CachingKeyValueStore
super(underlying);
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
initInternal(context);
initInternal(asInternalProcessorContext(context));
super.init(context, root);
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
streamThread = Thread.currentThread();
}

private void initInternal(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
@Override
public void init(final StateStoreContext context,
final StateStore root) {
initInternal(asInternalProcessorContext(context));
super.init(context, root);
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
streamThread = Thread.currentThread();
}

private void initInternal(final InternalProcessorContext context) {
this.context = context;

this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name());
this.context.registerCacheFlushListener(cacheName, entries -> {
for (final ThreadCache.DirtyEntry entry : entries) {
putAndMaybeForward(entry, (InternalProcessorContext) context);
putAndMaybeForward(entry, context);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordQueue;
Expand All @@ -34,6 +35,7 @@
import java.util.NoSuchElementException;
import java.util.Objects;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;

Expand Down Expand Up @@ -61,9 +63,16 @@ class CachingSessionStore
this.maxObservedTimestamp = RecordQueue.UNKNOWN;
}

@Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
initInternal((InternalProcessorContext) context);
initInternal(asInternalProcessorContext(context));
super.init(context, root);
}

@Override
public void init(final StateStoreContext context, final StateStore root) {
initInternal(asInternalProcessorContext(context));
super.init(context, root);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
Expand All @@ -37,6 +38,7 @@
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
import static org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;

Expand Down Expand Up @@ -67,14 +69,16 @@ class CachingWindowStore
this.maxObservedTimestamp = new AtomicLong(RecordQueue.UNKNOWN);
}

@Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
if (!(context instanceof InternalProcessorContext)) {
throw new IllegalArgumentException(
"Caching requires internal features of KafkaStreams and must be disabled for unit tests."
);
}
initInternal((InternalProcessorContext) context);
initInternal(asInternalProcessorContext(context));
super.init(context, root);
}

@Override
public void init(final StateStoreContext context, final StateStore root) {
initInternal(asInternalProcessorContext(context));
super.init(context, root);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.List;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

public class ChangeLoggingKeyValueBytesStore
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]>
implements KeyValueStore<Bytes, byte[]> {
Expand All @@ -36,12 +39,24 @@ public class ChangeLoggingKeyValueBytesStore
super(inner);
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
super.init(context, root);
this.context = (InternalProcessorContext) context;
this.context = asInternalProcessorContext(context);
maybeSetEvictionListener();
}

@Override
public void init(final StateStoreContext context,
final StateStore root) {
super.init(context, root);
this.context = asInternalProcessorContext(context);
maybeSetEvictionListener();
}

private void maybeSetEvictionListener() {
// if the inner store is an LRU cache, add the eviction listener to log removed record
if (wrapped() instanceof MemoryLRUCache) {
((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

/**
* Simple wrapper around a {@link SessionStore} to support writing
* updates to a changelog
Expand All @@ -38,10 +41,17 @@ class ChangeLoggingSessionBytesStore
super(bytesStore);
}

@Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
super.init(context, root);
this.context = (InternalProcessorContext) context;
this.context = asInternalProcessorContext(context);
}

@Override
public void init(final StateStoreContext context, final StateStore root) {
super.init(context, root);
this.context = asInternalProcessorContext(context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

/**
* Simple wrapper around a {@link WindowStore} to support writing
* updates to a changelog
Expand All @@ -43,15 +46,18 @@ class ChangeLoggingWindowBytesStore
this.retainDuplicates = retainDuplicates;
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
if (!(context instanceof InternalProcessorContext)) {
throw new IllegalArgumentException(
"Change logging requires internal features of KafkaStreams and must be disabled for unit tests."
);
}
this.context = (InternalProcessorContext) context;
this.context = asInternalProcessorContext(context);
super.init(context, root);
}

@Override
public void init(final StateStoreContext context,
final StateStore root) {
this.context = asInternalProcessorContext(context);
super.init(context, root);
}

Expand Down
Loading

0 comments on commit 52a9ebe

Please sign in to comment.