diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a7dc27fcd69f..4b1e2b8583ad 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -307,6 +307,7 @@
+
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 38baeb6260a9..a47998e6928d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -50,13 +50,13 @@
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockRocksDbConfigSetter;
import org.apache.kafka.test.TestUtils;
import org.easymock.Capture;
import org.easymock.EasyMock;
@@ -796,17 +796,9 @@ public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() {
PowerMock.verify(Executors.class, rocksDBMetricsRecordingTriggerThread);
}
- public static class TestRocksDbConfigSetter implements RocksDBConfigSetter {
- @Override
- public void setConfig(final String storeName,
- final org.rocksdb.Options options,
- final Map configs) {
- }
- }
-
@Test
public void shouldWarnAboutRocksDBConfigSetterIsNotGuaranteedToBeBackwardsCompatible() {
- props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestRocksDbConfigSetter.class.getName());
+ props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class.getName());
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 7a3912181233..7af3f3945acf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -35,10 +35,10 @@
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
-import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockRocksDbConfigSetter;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
@@ -193,7 +193,7 @@ private KeyValueStoreTestDriver(final StateSerdes serdes) {
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass());
- props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class);
+ props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
final LogContext logContext = new LogContext("KeyValueStoreTestDriver ");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 5937af03121d..aa6a4b82af89 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -21,13 +21,9 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
-import org.rocksdb.Options;
-
-import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -49,20 +45,6 @@ protected KeyValueStore createKeyValueStore(final ProcessorContext
return store;
}
- public static class TheRocksDbConfigSetter implements RocksDBConfigSetter {
- static boolean called = false;
-
- @Override
- public void setConfig(final String storeName, final Options options, final Map configs) {
- called = true;
- }
- }
-
- @Test
- public void shouldUseCustomRocksDbConfigSetter() {
- assertTrue(TheRocksDbConfigSetter.called);
- }
-
@Test
public void shouldPerformRangeQueriesWithCachingDisabled() {
context.setTime(1L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index dead979d0ddf..18432884ea24 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -40,6 +40,7 @@
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRocksDbConfigSetter;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
@@ -298,9 +299,21 @@ public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles()
public void shouldCallRocksDbConfigSetter() {
MockRocksDbConfigSetter.called = false;
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
+ final Object param = new Object();
+ props.put("abc.def", param);
+ final InternalMockProcessorContext context = new InternalMockProcessorContext(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+
rocksDBStore.init((StateStoreContext) context, rocksDBStore);
assertTrue(MockRocksDbConfigSetter.called);
+ assertThat(MockRocksDbConfigSetter.configMap.get("abc.def"), equalTo(param));
}
@Test
@@ -698,17 +711,6 @@ public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() {
}
}
- public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
- static boolean called;
-
- @Override
- public void setConfig(final String storeName, final Options options, final Map configs) {
- called = true;
-
- options.setLevel0FileNumCompactionTrigger(10);
- }
- }
-
public static class TestingBloomFilterRocksDBConfigSetter implements RocksDBConfigSetter {
static boolean bloomFiltersSet;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java b/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java
new file mode 100644
index 000000000000..c2e63af3742a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockRocksDbConfigSetter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.state.RocksDBConfigSetter;
+import org.rocksdb.Options;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockRocksDbConfigSetter implements RocksDBConfigSetter {
+ public static boolean called = false;
+ public static Map configMap = new HashMap<>();
+
+ @Override
+ public void setConfig(final String storeName, final Options options, final Map configs) {
+ called = true;
+
+ configMap.putAll(configs);
+ }
+}