diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index a7dc27fcd69f4..4b1e2b8583adb 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -307,6 +307,7 @@ + diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 38baeb6260a9e..a47998e6928d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -50,13 +50,13 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockRocksDbConfigSetter; import org.apache.kafka.test.TestUtils; import org.easymock.Capture; import org.easymock.EasyMock; @@ -796,17 +796,9 @@ public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() { PowerMock.verify(Executors.class, rocksDBMetricsRecordingTriggerThread); } - public static class TestRocksDbConfigSetter implements RocksDBConfigSetter { - @Override - public void setConfig(final String storeName, - final org.rocksdb.Options options, - final Map configs) { - } - } - @Test public void shouldWarnAboutRocksDBConfigSetterIsNotGuaranteedToBeBackwardsCompatible() { - props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestRocksDbConfigSetter.class.getName()); + props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class.getName()); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 7a39121812331..7af3f3945acf9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -35,10 +35,10 @@ import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.StreamsProducer; import org.apache.kafka.streams.state.internals.MeteredKeyValueStore; -import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockRocksDbConfigSetter; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestUtils; @@ -193,7 +193,7 @@ private KeyValueStoreTestDriver(final StateSerdes serdes) { props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass()); - props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class); + props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); final LogContext logContext = new LogContext("KeyValueStoreTestDriver "); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index 5937af03121d0..aa6a4b82af89c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -21,13 +21,9 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.junit.Test; -import org.rocksdb.Options; - -import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -49,20 +45,6 @@ protected KeyValueStore createKeyValueStore(final ProcessorContext return store; } - public static class TheRocksDbConfigSetter implements RocksDBConfigSetter { - static boolean called = false; - - @Override - public void setConfig(final String storeName, final Options options, final Map configs) { - called = true; - } - } - - @Test - public void shouldUseCustomRocksDbConfigSetter() { - assertTrue(TheRocksDbConfigSetter.called); - } - @Test public void shouldPerformRangeQueriesWithCachingDisabled() { context.setTime(1L); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index dead979d0ddf8..18432884ea245 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRocksDbConfigSetter; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; @@ -298,9 +299,21 @@ public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() public void shouldCallRocksDbConfigSetter() { MockRocksDbConfigSetter.called = false; + final Properties props = StreamsTestUtils.getStreamsConfig(); + props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); + final Object param = new Object(); + props.put("abc.def", param); + final InternalMockProcessorContext context = new InternalMockProcessorContext( + dir, + Serdes.String(), + Serdes.String(), + new StreamsConfig(props) + ); + rocksDBStore.init((StateStoreContext) context, rocksDBStore); assertTrue(MockRocksDbConfigSetter.called); + assertThat(MockRocksDbConfigSetter.configMap.get("abc.def"), equalTo(param)); } @Test @@ -698,17 +711,6 @@ public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() { } } - public static class MockRocksDbConfigSetter implements RocksDBConfigSetter { - static boolean called; - - @Override - public void setConfig(final String storeName, final Options options, final Map configs) { - called = true; - - options.setLevel0FileNumCompactionTrigger(10); - } - } - public static class TestingBloomFilterRocksDBConfigSetter implements RocksDBConfigSetter { static boolean bloomFiltersSet; diff --git a/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java b/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java new file mode 100644 index 0000000000000..c2e63af3742a6 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.streams.state.RocksDBConfigSetter; +import org.rocksdb.Options; + +import java.util.HashMap; +import java.util.Map; + +public class MockRocksDbConfigSetter implements RocksDBConfigSetter { + public static boolean called = false; + public static Map configMap = new HashMap<>(); + + @Override + public void setConfig(final String storeName, final Options options, final Map configs) { + called = true; + + configMap.putAll(configs); + } +}