Skip to content

Commit

Permalink
MINOR: Refactor unit tests around RocksDBConfigSetter (#9358)
Browse files Browse the repository at this point in the history
* Extract the mock RocksDBConfigSetter into a separate class.
* De-dup unit tests covering RocksDBConfigSetter.

Reviewers: Boyang Chen <[email protected]>
  • Loading branch information
guozhangwang authored Oct 6, 2020
1 parent c77183d commit 53a35c1
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 41 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@
<subpackage name="test">
<allow pkg="org.apache.kafka" />
<allow pkg="org.bouncycastle" />
<allow pkg="org.rocksdb" />
</subpackage>

<subpackage name="raft">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockRocksDbConfigSetter;
import org.apache.kafka.test.TestUtils;
import org.easymock.Capture;
import org.easymock.EasyMock;
Expand Down Expand Up @@ -796,17 +796,9 @@ public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() {
PowerMock.verify(Executors.class, rocksDBMetricsRecordingTriggerThread);
}

public static class TestRocksDbConfigSetter implements RocksDBConfigSetter {
@Override
public void setConfig(final String storeName,
final org.rocksdb.Options options,
final Map<String, Object> configs) {
}
}

@Test
public void shouldWarnAboutRocksDBConfigSetterIsNotGuaranteedToBeBackwardsCompatible() {
props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestRocksDbConfigSetter.class.getName());
props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class.getName());

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockRocksDbConfigSetter;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;

Expand Down Expand Up @@ -193,7 +193,7 @@ private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass());
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class);
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");

final LogContext logContext = new LogContext("KeyValueStoreTestDriver ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
import org.rocksdb.Options;

import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -49,20 +45,6 @@ protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext
return store;
}

public static class TheRocksDbConfigSetter implements RocksDBConfigSetter {
static boolean called = false;

@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
called = true;
}
}

@Test
public void shouldUseCustomRocksDbConfigSetter() {
assertTrue(TheRocksDbConfigSetter.called);
}

@Test
public void shouldPerformRangeQueriesWithCachingDisabled() {
context.setTime(1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRocksDbConfigSetter;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
Expand Down Expand Up @@ -298,9 +299,21 @@ public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles()
public void shouldCallRocksDbConfigSetter() {
MockRocksDbConfigSetter.called = false;

final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
final Object param = new Object();
props.put("abc.def", param);
final InternalMockProcessorContext context = new InternalMockProcessorContext(
dir,
Serdes.String(),
Serdes.String(),
new StreamsConfig(props)
);

rocksDBStore.init((StateStoreContext) context, rocksDBStore);

assertTrue(MockRocksDbConfigSetter.called);
assertThat(MockRocksDbConfigSetter.configMap.get("abc.def"), equalTo(param));
}

@Test
Expand Down Expand Up @@ -698,17 +711,6 @@ public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() {
}
}

public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
static boolean called;

@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
called = true;

options.setLevel0FileNumCompactionTrigger(10);
}
}

public static class TestingBloomFilterRocksDBConfigSetter implements RocksDBConfigSetter {

static boolean bloomFiltersSet;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;

import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.Options;

import java.util.HashMap;
import java.util.Map;

public class MockRocksDbConfigSetter implements RocksDBConfigSetter {
public static boolean called = false;
public static Map<String, Object> configMap = new HashMap<>();

@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
called = true;

configMap.putAll(configs);
}
}

0 comments on commit 53a35c1

Please sign in to comment.