Skip to content

Commit

Permalink
feat: spin up a connect worker embedded inside the KSQL JVM (#3241)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Aug 22, 2019
1 parent 1552f92 commit 4d7ef2a
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 5 deletions.
60 changes: 60 additions & 0 deletions config/connect.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Sample configuration for a distributed Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The group ID is a unique identifier for the set of workers that form a single Kafka Connect
# cluster
group.id=ksql-connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# Internal Storage Topics.
#
# Kafka Connect distributed workers store the connector and task configurations, connector offsets,
# and connector statuses in three internal topics. These topics MUST be compacted.
# When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them
# as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions
# as specified in these properties, and other topic-specific settings inherited from your brokers'
# auto-creation settings. If you need more control over these other topic-specific settings, you may want to
# manually create these topics before starting Kafka Connect distributed workers.
#
# The following properties set the names of these three internal topics for storing configs, offsets, and status.
config.storage.topic=ksql-connect-configs
offset.storage.topic=ksql-connect-offsets
status.storage.topic=ksql-connect-statuses

# The following properties set the replication factor for the three internal topics, defaulting to 3 for each
# and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with
# only a single broker, we set the replication factor here to just 1. That's okay for the examples, but
# ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of
# losing connector offsets, configurations, and status.
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

# The config storage topic must have a single partition, and this cannot be changed via properties.
# Offsets for all connectors and tasks are written quite frequently and therefore the offset topic
# should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly
# with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records
# the status less frequently, and so by default the topic is created with 5 partitions.
#offset.storage.partitions=25
#status.storage.partitions=5

# The offsets, status, and configurations are written to the topics using converters specified through
# the following required properties. Most users will always want to use the JSON converter without schemas.
# Offset and config data is never visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false

# fill this configuration in to use custom connectors
# plugin.path=
5 changes: 5 additions & 0 deletions config/ksql-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ ksql.logging.processing.stream.auto.create=true

# The set of Kafka brokers to bootstrap Kafka cluster information from:
bootstrap.servers=localhost:9092

ksql.connect.polling.enable=true

# uncomment the below to start an embedded Connect worker
# ksql.connect.worker.config=config/connect.properties
# ksql.connect.configs.topic=ksql-connect-configs

# Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry:
#ksql.schema.registry.url=?
3 changes: 2 additions & 1 deletion config/log4j-rolling.properties
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ log4j.appender.kafka_appender=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka_appender.layout=io.confluent.common.logging.log4j.StructuredJsonLayout
log4j.appender.kafka_appender.BrokerList=localhost:9092
log4j.appender.kafka_appender.Topic=default_ksql_processing_log
log4j.logger.processing=ERROR, kafka_appender

# loggers
log4j.logger.org.apache.kafka.streams=INFO, streams
Expand All @@ -64,3 +63,5 @@ log4j.additivity.org.I0Itec.zkclient=false

log4j.logger.processing=ERROR, kafka_appender
log4j.additivity.processing=false

log4j.logger.org.reflections=ERROR, main
1 change: 1 addition & 0 deletions config/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ log4j.additivity.org.apache.kafka.streams=false
log4j.logger.org.apache.zookeeper=ERROR, stdout
log4j.logger.org.apache.kafka=ERROR, stdout
log4j.logger.org.I0Itec.zkclient=ERROR, stdout
log4j.logger.org.reflections=ERROR, stdout
11 changes: 11 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class KsqlConfig extends AbstractConfig {

public static final String CONNECT_CONFIGS_TOPIC_PROPERTY = "ksql.connect.configs.topic";

public static final String CONNECT_WORKER_CONFIG_FILE_PROPERTY = "ksql.connect.worker.config";

public static final String KSQL_ENABLE_UDFS = "ksql.udfs.enabled";

public static final String KSQL_EXT_DIR = "ksql.extension.dir";
Expand Down Expand Up @@ -458,6 +460,15 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
DEFAULT_CONNECT_CONFIGS_TOPIC,
Importance.LOW,
"The name for the connect configuration topic, defaults to 'connect-configs'"
).define(
CONNECT_WORKER_CONFIG_FILE_PROPERTY,
ConfigDef.Type.STRING,
"",
Importance.LOW,
"The path to a connect worker configuration file. An empty value for this configuration"
+ "will prevent connect from starting up embedded within KSQL. For more information"
+ " on configuring connect, see "
+ "https://docs.confluent.io/current/connect/userguide.html#configuring-workers."
).define(
KSQL_ENABLE_UDFS,
ConfigDef.Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import java.io.IOException;
import java.net.BindException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.cli.ConnectDistributed;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.Connect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConnectExecutable implements Executable {

private static final Logger LOG = LoggerFactory.getLogger(ConnectExecutable.class);

private final ConnectDistributed connectDistributed;
private final Map<String, String> workerProps;
private Connect connect;

public static ConnectExecutable of(final String configFile) throws IOException {
final Map<String, String> workerProps = !configFile.isEmpty()
? Utils.propsToStringMap(Utils.loadProps(configFile))
: Collections.emptyMap();

return new ConnectExecutable(workerProps);
}

private ConnectExecutable(final Map<String, String> workerProps) {
this.workerProps = Objects.requireNonNull(workerProps, "workerProps");
connectDistributed = new ConnectDistributed();
}

@Override
public void start() {
try {
connect = connectDistributed.startConnect(workerProps);
} catch (final ConnectException e) {
if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof BindException) {
LOG.warn("Cannot start a local connect instance because connect is running locally!", e);
} else {
throw e;
}
}
}

@Override
public void stop() {
if (connect != null) {
connect.stop();
}
}

@Override
public void join() {
if (connect != null) {
connect.awaitStop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -56,7 +57,8 @@ public static void main(final String[] args) {
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG)).toString();
enforceStreamStateDirAvailability(new File(streamsStateDirPath));
final Optional<String> queriesFile = serverOptions.getQueriesFile(properties);
final Executable executable = createExecutable(properties, queriesFile, installDir);
final Executable executable = createExecutable(
properties, queriesFile, installDir, ksqlConfig);
new KsqlServerMain(executable).tryStartApp();
} catch (final Exception e) {
log.error("Failed to start KSQL", e);
Expand Down Expand Up @@ -84,18 +86,27 @@ void tryStartApp() throws Exception {
private static Executable createExecutable(
final Map<String, String> properties,
final Optional<String> queriesFile,
final String installDir
) {
final String installDir,
final KsqlConfig ksqlConfig
) throws IOException {
if (queriesFile.isPresent()) {
return StandaloneExecutorFactory.create(properties, queriesFile.get(), installDir);
}

final KsqlRestConfig restConfig = new KsqlRestConfig(ensureValidProps(properties));
return KsqlRestApplication.buildApplication(
final Executable restApp = KsqlRestApplication.buildApplication(
restConfig,
KsqlVersionCheckerAgent::new,
Integer.MAX_VALUE
);
final String connectConfigFile =
ksqlConfig.getString(KsqlConfig.CONNECT_WORKER_CONFIG_FILE_PROPERTY);
if (connectConfigFile.isEmpty()) {
return restApp;
}

final Executable connect = ConnectExecutable.of(connectConfigFile);
return MultiExecutable.of(connect, restApp);
}

private static Map<?, ?> ensureValidProps(final Map<String, String> properties) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import java.util.Objects;

/**
* {@code MultiExecutable} wraps multiple {@code Executable}s and ensures that when
* an action is called all internal executables will perform the action, regardless
* of whether or not previous executables succeeded.
*
* <p>The executables will be started, stopped and joined in the order that they
* are supplied in {@link #of(Executable...)}.</p>
*/
public final class MultiExecutable implements Executable {

private final Executable[] executables;

public static Executable of(final Executable... executables) {
return new MultiExecutable(executables);
}

private MultiExecutable(final Executable... executables) {
this.executables = Objects.requireNonNull(executables, "executables");
}

@Override
public void start() throws Exception {
doAction(Executable::start);
}

@Override
public void stop() throws Exception {
doAction(Executable::stop);
}

@Override
public void join() throws InterruptedException {
doAction(Executable::join);
}

@SuppressWarnings("unchecked")
private <T extends Exception> void doAction(
final ExceptionalConsumer<Executable, T> action
) throws T {

T exception = null;
for (final Executable executable : executables) {
try {
action.accept(executable);
} catch (final Exception e) {
if (exception == null) {
exception = (T) e;
} else {
exception.addSuppressed(e);
}
}
}

if (exception != null) {
throw exception;
}
}

@FunctionalInterface
private interface ExceptionalConsumer<I, T extends Exception> {
void accept(I value) throws T;
}
}
Loading

0 comments on commit 4d7ef2a

Please sign in to comment.