Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: trivial cleanups, javadoc errors, omitted StateStore tests, etc. #8130

Merged
merged 18 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
7991605
Add omitted [WindowStoreBuilderTest, TimestampedWindowStoreBuilderTes…
dongjinleekr Jun 19, 2020
7cfe008
Improve Stores Javadoc
dongjinleekr Jun 19, 2020
3f9f171
Remove unused method + duplicated parameters
dongjinleekr Jun 19, 2020
9bdc180
Remove unthrown Exceptions: TaskManager, ProduceBenchWorker
dongjinleekr Jun 19, 2020
bede35c
Remove unnecessary public from [Sink,Source]TaskContext
dongjinleekr Jun 19, 2020
1f89531
Fix incorrect MeteredTimestampedWindowStore javadoc
dongjinleekr Jun 19, 2020
b9525c5
Remove duplicated entity in checkstyle/import-control.xml
dongjinleekr Jun 19, 2020
ada16b4
Add omitted WindowStore, SessionStore test cases: GlobalStateStorePro…
dongjinleekr Jun 19, 2020
d581d15
Update [KeyValue,TimestampedKeyValue,WindowStore,TimestampedWindowSto…
dongjinleekr Jun 20, 2020
fc7194d
Give BytesStoreSupplier mock with name to [KeyValue,TimestampedKeyVal…
dongjinleekr Jun 20, 2020
df01c35
Add metricsScope nullity check to [KeyValue,TimestampedKeyValue,Windo…
dongjinleekr Jun 20, 2020
29d766a
Add nullity check in 'WindowStoreBuilder' constructor. (For parity wi…
dongjinleekr Jun 20, 2020
ac6ac58
Remove unthrown Exception in SessionStoreBuilderTest#setUp
dongjinleekr Jun 20, 2020
958cb5b
Fix typo in `XXXStoreBuilder` validation methods
dongjinleekr Jun 20, 2020
dff84f2
Remove unused variables: RocksDBStore, SuppressionIntegrationTest
dongjinleekr Jun 20, 2020
e92dda5
Remove redundant type parameters from AbstractJoinIntegrationTest, Su…
dongjinleekr Jun 20, 2020
6972309
Revert WindowedSerdes#timeWindowedSerdeFrom(Class, long)
dongjinleekr Oct 7, 2020
9f7a73f
Add supplier mock object in TimestampedKeyValueStoreMaterializerTest#…
dongjinleekr Oct 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.utils" />
<allow pkg="org.apache.zookeeper" />
<allow pkg="org.apache.zookeeper" />
<allow pkg="org.apache.log4j" />
<subpackage name="testutil">
<allow pkg="org.apache.log4j" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface SinkTaskContext {
* and the configuration is using variable references such as those compatible with
* {@link org.apache.kafka.common.config.ConfigTransformer}.
*/
public Map<String, String> configs();
Map<String, String> configs();

/**
* Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface SourceTaskContext {
* and the configuration is using variable references such as those compatible with
* {@link org.apache.kafka.common.config.ConfigTransformer}.
*/
public Map<String, String> configs();
Map<String, String> configs();

/**
* Get the OffsetStorageReader for this SourceTask.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2447,7 +2447,7 @@ class Log(@volatile private var _dir: File,

// replace old segment with new ones
info(s"Replacing overflowed segment $segment with split segments $newSegments")
replaceSegments(newSegments.toList, List(segment), isRecoveredSwapFile = false)
replaceSegments(newSegments.toList, List(segment))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Is this ok? I am not familiar with this part of the code. If false the default anyway?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, since the method definition also has isRecoveredSwapFile: Boolean = false by default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang for verifying.

newSegments.toList
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public byte[] serialize() {
final byte[][] headerKeysBytes;
final byte[][] headerValuesBytes;


int size = 0;
size += Long.BYTES; // value.context.timestamp
size += Long.BYTES; // value.context.offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public String metricsScope() {
* @param maxCacheSize maximum number of items in the LRU (cannot be negative)
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build
* an LRU Map based store
* @throws IllegalArgumentException if {@code maxCacheSize} is negative
*/
public static KeyValueBytesStoreSupplier lruMap(final String name, final int maxCacheSize) {
Objects.requireNonNull(name, "name cannot be null");
Expand Down Expand Up @@ -231,6 +232,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
* caching and means that null values will be ignored.
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
*/
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final Duration retentionPeriod,
Expand All @@ -257,6 +259,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
* caching and means that null values will be ignored.
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
*/
public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
final Duration retentionPeriod,
Expand Down Expand Up @@ -328,6 +331,7 @@ private static WindowBytesStoreSupplier persistentWindowStore(final String name,
* caching and means that null values will be ignored.
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
*/
public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
final Duration retentionPeriod,
Expand Down Expand Up @@ -481,7 +485,7 @@ public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(final Wi
* <p>
* The provided supplier should <strong>not</strong> be a supplier for
* {@link WindowStore WindowStores}. For this case, passed in timestamps will be dropped and not stored in the
* windows-store. On read, no valid timestamp but a dummy timestamp will be returned.
* window-store. On read, no valid timestamp but a dummy timestamp will be returned.
*
* @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public KeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier,
final Serde<V> valueSerde,
final Time time) {
super(storeSupplier.name(), keySerde, valueSerde, time);
Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null");
this.storeSupplier = storeSupplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
import org.apache.kafka.streams.state.WindowStore;

/**
* A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its
* A Metered {@link TimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its
* inner WindowStore implementation do not need to provide its own metrics collecting functionality.
* The inner {@link WindowStore} of this class is of type &lt;Bytes,byte[]&gt;, hence we use {@link Serde}s
* to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;
*
* @param <K>
* @param <V>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public SessionStoreBuilder(final SessionBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time);
super(Objects.requireNonNull(storeSupplier, "storeSupplier cannot be null").name(), keySerde, valueSerde, time);
Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null");
this.storeSupplier = storeSupplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public TimestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSup
keySerde,
valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde),
time);
Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null");
this.storeSupplier = storeSupplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public TimestampedWindowStoreBuilder(final WindowBytesStoreSupplier storeSupplie
final Serde<V> valueSerde,
final Time time) {
super(storeSupplier.name(), keySerde, valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), time);
Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null");
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null");
this.storeSupplier = storeSupplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {
private final Logger log = LoggerFactory.getLogger(WindowStoreBuilder.class);

Expand All @@ -34,6 +36,8 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier,
final Serde<V> valueSerde,
final Time time) {
super(storeSupplier.name(), keySerde, valueSerde, time);
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null");
this.storeSupplier = storeSupplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void runTestWithDriver(final TestRecord<Long, String> expectedFinalResult, final
}

final TestRecord<Long, String> updatedExpectedFinalResult =
new TestRecord<Long, String>(
new TestRecord<>(
expectedFinalResult.key(),
expectedFinalResult.value(),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ public void shouldAllowDisablingChangelog() {
final String input = "input" + testId;
final String outputSuppressed = "output-suppressed" + testId;
final String outputRaw = "output-raw" + testId;
final String changeLog = "suppressionintegrationtest-shouldAllowDisablingChangelog-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog";

cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ private static <K, V> void verify(final List<TestRecord<K, V>> results,
for (final TestRecord<K, V> result : results) {
final KeyValueTimestamp<K, V> expected = expectedIterator.next();
try {
assertThat(result, equalTo(new TestRecord<K, V>(expected.key(), expected.value(), null, expected.timestamp())));
assertThat(result, equalTo(new TestRecord<>(expected.key(), expected.value(), null, expected.timestamp())));
} catch (final AssertionError e) {
throw new AssertionError(printRecords(results) + " != " + expectedResults, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() {
final InMemoryKeyValueStore store = new InMemoryKeyValueStore("name");
EasyMock.expect(supplier.name()).andReturn("name").anyTimes();
EasyMock.expect(supplier.get()).andReturn(store);
EasyMock.expect(supplier.metricsScope()).andReturn("metricScope");
EasyMock.replay(supplier);

final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
Expand Down Expand Up @@ -90,6 +91,14 @@ public void before() {
false),
Serdes.String(),
Serdes.String()).build());
stores.put(
"s-store",
Stores.sessionStoreBuilder(
Stores.inMemorySessionStore(
"s-store",
Duration.ofMillis(10L)),
Serdes.String(),
Serdes.String()).build());

final ProcessorContextImpl mockContext = niceMock(ProcessorContextImpl.class);
expect(mockContext.applicationId()).andStubReturn("appId");
Expand Down Expand Up @@ -175,6 +184,26 @@ public void shouldReturnTimestampedKeyValueStoreAsKeyValueStore() {
}
}

@Test
public void shouldReturnWindowStore() {
final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
final List<ReadOnlyWindowStore<String, String>> stores =
provider.stores("w-store", QueryableStoreTypes.windowStore());
assertEquals(1, stores.size());
for (final ReadOnlyWindowStore<String, String> store : stores) {
assertThat(store, instanceOf(ReadOnlyWindowStore.class));
assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
}
}

@Test
public void shouldNotReturnWindowStoreAsTimestampedStore() {
final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> stores =
provider.stores("w-store", QueryableStoreTypes.timestampedWindowStore());
assertEquals(0, stores.size());
}

@Test
public void shouldReturnTimestampedWindowStoreAsWindowStore() {
final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
Expand All @@ -186,4 +215,15 @@ public void shouldReturnTimestampedWindowStoreAsWindowStore() {
assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
}
}

@Test
public void shouldReturnSessionStore() {
final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
final List<ReadOnlySessionStore<String, String>> stores =
provider.stores("s-store", QueryableStoreTypes.sessionStore());
assertEquals(1, stores.size());
for (final ReadOnlySessionStore<String, String> store : stores) {
assertThat(store, instanceOf(ReadOnlySessionStore.class));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@

import java.util.Collections;

import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertThrows;

@RunWith(EasyMockRunner.class)
public class KeyValueStoreBuilderTest {
Expand All @@ -50,6 +55,7 @@ public class KeyValueStoreBuilderTest {
public void setUp() {
EasyMock.expect(supplier.get()).andReturn(inner);
EasyMock.expect(supplier.name()).andReturn("name");
expect(supplier.metricsScope()).andReturn("metricScope");
EasyMock.replay(supplier);
builder = new KeyValueStoreBuilder<>(
supplier,
Expand Down Expand Up @@ -134,9 +140,16 @@ public void shouldThrowNullPointerIfTimeIsNull() {
new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
}

@Test(expected = NullPointerException.class)
@Test
public void shouldThrowNullPointerIfMetricsScopeIsNull() {
new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime());
reset(supplier);
expect(supplier.get()).andReturn(new RocksDBStore("name", null));
expect(supplier.name()).andReturn("name");
replay(supplier);

final Exception e = assertThrows(NullPointerException.class,
() -> new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()));
assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ public class SessionStoreBuilderTest {
private SessionStoreBuilder<String, String> builder;

@Before
public void setUp() throws Exception {

public void setUp() {
expect(supplier.get()).andReturn(inner);
expect(supplier.name()).andReturn("name");
expect(supplier.metricsScope()).andReturn("metricScope");
replay(supplier);

builder = new SessionStoreBuilder<>(
Expand Down Expand Up @@ -120,7 +120,7 @@ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
@Test
public void shouldThrowNullPointerIfInnerIsNull() {
final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime()));
assertThat(e.getMessage(), equalTo("supplier cannot be null"));
assertThat(e.getMessage(), equalTo("storeSupplier cannot be null"));
}

@Test
Expand All @@ -146,8 +146,21 @@ public void shouldThrowNullPointerIfTimeIsNull() {

@Test
public void shouldThrowNullPointerIfMetricsScopeIsNull() {
final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()));
assertThat(e.getMessage(), equalTo("name cannot be null"));
reset(supplier);
expect(supplier.get()).andReturn(new RocksDBSessionStore(
new RocksDBSegmentedBytesStore(
"name",
null,
10L,
5L,
new SessionKeySchema())
));
expect(supplier.name()).andReturn("name");
replay(supplier);

final Exception e = assertThrows(NullPointerException.class,
() -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()));
assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
Expand Down Expand Up @@ -125,6 +126,14 @@ public void before() {
Serdes.String(),
Serdes.String()),
"the-processor");
topology.addStateStore(
Stores.sessionStoreBuilder(
Stores.inMemorySessionStore(
"session-store",
Duration.ofMillis(10L)),
Serdes.String(),
Serdes.String()),
"the-processor");

final Properties properties = new Properties();
final String applicationId = "applicationId";
Expand Down Expand Up @@ -258,6 +267,17 @@ public void shouldFindTimestampedWindowStoresAsWindowStore() {
}
}

@Test
public void shouldFindSessionStores() {
mockThread(true);
final List<ReadOnlySessionStore<String, String>> sessionStores =
provider.stores(StoreQueryParameters.fromNameAndType("session-store", QueryableStoreTypes.sessionStore()));
assertEquals(2, sessionStores.size());
for (final ReadOnlySessionStore<String, String> store: sessionStores) {
assertThat(store, instanceOf(ReadOnlySessionStore.class));
}
}

@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() {
mockThread(true);
Expand Down Expand Up @@ -286,6 +306,13 @@ public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() {
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore()));
}

@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfSessionStoreClosed() {
mockThread(true);
taskOne.getStore("session-store").close();
provider.stores(StoreQueryParameters.fromNameAndType("session-store", QueryableStoreTypes.sessionStore()));
}

@Test
public void shouldReturnEmptyListIfNoStoresFoundWithName() {
mockThread(true);
Expand Down
Loading