Skip to content

Commit

Permalink
feat: add KsqlRocksDBConfigSetter to bound memory and set num threads (
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Sep 5, 2019
1 parent 2c61e34 commit cdcaa2d
Show file tree
Hide file tree
Showing 10 changed files with 770 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.ProcessingLogServerUtils;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.services.DefaultServiceContext;
import io.confluent.ksql.services.LazyServiceContext;
import io.confluent.ksql.services.ServiceContext;
Expand Down Expand Up @@ -97,6 +98,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -137,11 +139,13 @@ public final class KsqlRestApplication extends Application<KsqlRestConfig> imple
private final ServerState serverState;
private final ProcessingLogContext processingLogContext;
private final List<KsqlServerPrecondition> preconditions;
private final Consumer<KsqlConfig> rocksDBConfigSetterHandler;

public static String getCommandsStreamName() {
return COMMANDS_STREAM_NAME;
}

@VisibleForTesting
// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
KsqlRestApplication(
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
Expand All @@ -160,7 +164,8 @@ public static String getCommandsStreamName() {
final KsqlSecurityExtension securityExtension,
final ServerState serverState,
final ProcessingLogContext processingLogContext,
final List<KsqlServerPrecondition> preconditions
final List<KsqlServerPrecondition> preconditions,
final Consumer<KsqlConfig> rocksDBConfigSetterHandler
) {
super(config);
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
Expand All @@ -183,8 +188,9 @@ public static String getCommandsStreamName() {
this.serviceContextBinderFactory = Objects.requireNonNull(
serviceContextBinderFactory, "serviceContextBinderFactory");
this.securityExtension = Objects.requireNonNull(
securityExtension, "securityExtension"
);
securityExtension, "securityExtension");
this.rocksDBConfigSetterHandler = Objects.requireNonNull(
rocksDBConfigSetterHandler, "rocksDBConfigSetterHandler");
}

@Override
Expand Down Expand Up @@ -250,6 +256,8 @@ private void waitForPreconditions() {
}

private void initialize() {
rocksDBConfigSetterHandler.accept(ksqlConfig);

final String commandTopic = commandStore.getCommandTopicName();
KsqlInternalTopicUtils.ensureTopic(
commandTopic,
Expand Down Expand Up @@ -543,6 +551,9 @@ static KsqlRestApplication buildApplication(
KsqlServerPrecondition.class
);

final Consumer<KsqlConfig> rocksDBConfigSetterHandler =
RocksDBConfigSetterHandler::maybeConfigureRocksDBConfigSetter;

return new KsqlRestApplication(
serviceContext,
ksqlEngine,
Expand All @@ -559,7 +570,8 @@ static KsqlRestApplication buildApplication(
securityExtension,
serverState,
processingLogContext,
preconditions
preconditions,
rocksDBConfigSetterHandler
);
}

Expand Down Expand Up @@ -596,7 +608,7 @@ private void displayWelcomeMessage() {
writer.flush();
}

static void maybeCreateProcessingLogStream(
private static void maybeCreateProcessingLogStream(
final ProcessingLogConfig config,
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.util;

import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;

public final class RocksDBConfigSetterHandler {

private RocksDBConfigSetterHandler() {
}

public static void maybeConfigureRocksDBConfigSetter(final KsqlConfig ksqlConfig) {
final Map<String, Object> streamsProps = ksqlConfig.getKsqlStreamConfigProps();
final Class<?> clazz =
(Class) streamsProps.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);

if (clazz != null && org.apache.kafka.common.Configurable.class.isAssignableFrom(clazz)) {
try {
((org.apache.kafka.common.Configurable) Utils.newInstance(clazz))
.configure(ksqlConfig.originals());
} catch (Exception e) {
throw new ConfigException(
"Failed to configure Configurable RocksDBConfigSetter. "
+ StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG + ": " + clazz.getName(),
e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.confluent.rest.RestConfig;
import java.util.Collections;
import java.util.function.Consumer;
import javax.ws.rs.core.Configurable;
import java.util.LinkedList;
import java.util.Optional;
Expand Down Expand Up @@ -119,6 +120,8 @@ public class KsqlRestApplicationTest {
private KsqlServerPrecondition precondition1;
@Mock
private KsqlServerPrecondition precondition2;
@Mock
private Consumer<KsqlConfig> rocksDBConfigSetterHandler;
private PreparedStatement<CreateSource> logCreateStatement;
private KsqlRestApplication app;

Expand Down Expand Up @@ -164,7 +167,8 @@ public void setUp() {
securityExtension,
serverState,
processingLogContext,
ImmutableList.of(precondition1, precondition2)
ImmutableList.of(precondition1, precondition2),
rocksDBConfigSetterHandler
);
}

Expand Down Expand Up @@ -351,4 +355,13 @@ public void shouldNotInitializeUntilPreconditionsChecked() {
inOrder.verify(precondition1).checkPrecondition(restConfig, serviceContext);
inOrder.verify(precondition2).checkPrecondition(restConfig, serviceContext);
}

@Test
public void shouldConfigureRocksDBConfigSetter() {
// When:
app.startKsql();

// Then:
verify(rocksDBConfigSetterHandler).accept(ksqlConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.util;

import static org.hamcrest.Matchers.containsString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.rocksdb.Options;

@RunWith(MockitoJUnitRunner.class)
public class RocksDBConfigSetterHandlerTest {

@Mock
private KsqlConfig ksqlConfig;

@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Test
public void shouldConfigure() throws Exception {
// Given:
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(
ImmutableMap.of(
StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
Class.forName("io.confluent.ksql.rest.util.RocksDBConfigSetterHandlerTest$ConfigurableTestRocksDBConfigSetter"))
);
final Runnable mockRunnable = mock(Runnable.class);
when(ksqlConfig.originals()).thenReturn(
ImmutableMap.of(ConfigurableTestRocksDBConfigSetter.TEST_CONFIG, mockRunnable));

// When:
RocksDBConfigSetterHandler.maybeConfigureRocksDBConfigSetter(ksqlConfig);

// Then:
verify(mockRunnable).run();
}

@Test
public void shouldStartWithNonConfigurableRocksDBConfigSetter() throws Exception {
// Given:
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(
ImmutableMap.of(
StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
Class.forName("io.confluent.ksql.rest.util.RocksDBConfigSetterHandlerTest$NonConfigurableTestRocksDBConfigSetter"))
);

// No error when:
RocksDBConfigSetterHandler.maybeConfigureRocksDBConfigSetter(ksqlConfig);
}

@Test
public void shouldThrowIfFailToRocksDBConfigSetter() throws Exception {
// Given:
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(
ImmutableMap.of(
StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
Class.forName("io.confluent.ksql.rest.util.RocksDBConfigSetterHandlerTest$ConfigurableTestRocksDBConfigSetterWithoutPublicConstructor"))
);

// Expect:
expectedException.expect(ConfigException.class);
expectedException.expectMessage(containsString("Failed to configure Configurable RocksDBConfigSetter."));
expectedException.expectMessage(containsString(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG));
expectedException.expectMessage(containsString("io.confluent.ksql.rest.util.RocksDBConfigSetterHandlerTest$ConfigurableTestRocksDBConfigSetterWithoutPublicConstructor"));

// When:
RocksDBConfigSetterHandler.maybeConfigureRocksDBConfigSetter(ksqlConfig);
}

public static class ConfigurableTestRocksDBConfigSetter
extends NonConfigurableTestRocksDBConfigSetter
implements org.apache.kafka.common.Configurable {

static final String TEST_CONFIG = "test.runnable";

@Override
public void configure(final Map<String, ?> config) {
final Runnable supplier = (Runnable) config.get(TEST_CONFIG);
supplier.run();
}
}

static class ConfigurableTestRocksDBConfigSetterWithoutPublicConstructor
extends NonConfigurableTestRocksDBConfigSetter
implements org.apache.kafka.common.Configurable {

@Override
public void configure(final Map<String, ?> config) {
}
}

private static class NonConfigurableTestRocksDBConfigSetter implements RocksDBConfigSetter {

@Override
public void setConfig(
final String storeName,
final Options options,
final Map<String, Object> configs) {
// do nothing
}

@Override
public void close(final String storeName, final Options options) {
}
}
}
76 changes: 76 additions & 0 deletions ksql-rocksdb-config-setter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2019 Confluent Inc.
~
~ Licensed under the Confluent Community License (the "License"); you may not use
~ this file except in compliance with the License. You may obtain a copy of the
~ License at
~
~ http://www.confluent.io/confluent-community-license
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OF ANY KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-parent</artifactId>
<version>5.3.1-SNAPSHOT</version>
</parent>

<artifactId>ksql-rocksdb-config-setter</artifactId>
<name>KSQL RocksDB Config Setter</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>



<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit cdcaa2d

Please sign in to comment.