Skip to content

Commit

Permalink
feat(ksql-connect): introduce ConnectClient for REST requests (#3137)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Jul 29, 2019
1 parent ba03d6f commit 15548ce
Show file tree
Hide file tree
Showing 17 changed files with 477 additions and 14 deletions.
14 changes: 11 additions & 3 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class KsqlConfig extends AbstractConfig {

public static final String SCHEMA_REGISTRY_URL_PROPERTY = "ksql.schema.registry.url";

public static final String CONNECT_URL_PROPERTY = "ksql.connect.registry.url";

public static final String KSQL_ENABLE_UDFS = "ksql.udfs.enabled";

public static final String KSQL_EXT_DIR = "ksql.extension.dir";
Expand Down Expand Up @@ -154,8 +156,8 @@ public class KsqlConfig extends AbstractConfig {
"Extension for supplying custom metrics to be emitted along with "
+ "the engine's default JMX metrics";

public static final String
defaultSchemaRegistryUrl = "http://localhost:8081";
public static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://localhost:8081";
public static final String DEFAULT_CONNECT_URL = "http://localhost:8083";

public static final String KSQL_STREAMS_PREFIX = "ksql.streams.";

Expand Down Expand Up @@ -425,9 +427,15 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
).define(
SCHEMA_REGISTRY_URL_PROPERTY,
ConfigDef.Type.STRING,
defaultSchemaRegistryUrl,
DEFAULT_SCHEMA_REGISTRY_URL,
ConfigDef.Importance.MEDIUM,
"The URL for the schema registry, defaults to http://localhost:8081"
).define(
CONNECT_URL_PROPERTY,
ConfigDef.Type.STRING,
DEFAULT_CONNECT_URL,
Importance.MEDIUM,
"The URL for the connect deployment, defaults to http://localhost:8083"
).define(
KSQL_ENABLE_UDFS,
ConfigDef.Type.BOOLEAN,
Expand Down
21 changes: 21 additions & 0 deletions ksql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,27 @@
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-common</artifactId>
</dependency>

<dependency>
<groupId>io.confluent.support</groupId>
<artifactId>support-metrics-common</artifactId>
</dependency>

<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-serde</artifactId>
</dependency>

<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-parser</artifactId>
</dependency>

<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-metastore</artifactId>
</dependency>

<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-udf</artifactId>
Expand Down Expand Up @@ -78,11 +83,21 @@
<artifactId>commons-csv</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
</dependency>

<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
Expand Down Expand Up @@ -132,6 +147,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.services;

import io.confluent.ksql.util.KsqlPreconditions;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;

/**
* An interface defining the common operations to communicate with
* a Kafka Connect cluster.
*/
public interface ConnectClient {

/**
* Creates a connector with {@code connector} as the name under the
* specified configuration.
*
* @param connector the name of the connector
* @param config the connector configuration
*/
ConnectResponse<ConnectorInfo> create(String connector, Map<String, String> config);

/**
* An optionally successful response. Either contains a value of type
* {@code <T>} or an error, which is the string representation of the
* response entity.
*/
class ConnectResponse<T> {
private final Optional<T> datum;
private final Optional<String> error;

public static <T> ConnectResponse<T> of(final T datum) {
return new ConnectResponse<>(datum, null);
}

public static <T> ConnectResponse<T> of(final String error) {
return new ConnectResponse<>(null, error);
}

private ConnectResponse(final T datum, final String error) {
KsqlPreconditions.checkArgument(
datum != null ^ error != null,
"expected exactly one of datum or error to be null");
this.datum = Optional.ofNullable(datum);
this.error = Optional.ofNullable(error);
}

public Optional<T> datum() {
return datum;
}

public Optional<String> error() {
return error;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.services;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Objects;
import org.apache.http.HttpStatus;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The default implementation of {@code ConnectClient}. This implementation is
* thread safe, and the methods are all <i>blocking</i> and are configured with
* default timeouts of {@value #DEFAULT_TIMEOUT_MS}ms.
*/
public class DefaultConnectClient implements ConnectClient {

private static final Logger LOG = LoggerFactory.getLogger(DefaultConnectClient.class);
private static final ObjectMapper MAPPER = JsonMapper.INSTANCE.mapper;

private static final String CONNECTORS = "/connectors";
private static final int DEFAULT_TIMEOUT_MS = 5_000;

private final URI connectURI;

public DefaultConnectClient(final String connectURI) {
Objects.requireNonNull(connectURI, "connectURI");

try {
this.connectURI = new URI(connectURI);
} catch (URISyntaxException e) {
throw new KsqlException(
"Could not initialize connect client due to invalid URI: " + connectURI, e);
}
}

@Override
public ConnectResponse<ConnectorInfo> create(
final String connector,
final Map<String, String> config
) {
try {
LOG.debug("Issuing request to Kafka Connect at URI {} with name {} and config {}",
connectURI,
connector,
config);

final ConnectResponse<ConnectorInfo> connectResponse = Request
.Post(connectURI.resolve(CONNECTORS))
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.bodyString(
MAPPER.writeValueAsString(
ImmutableMap.of(
"name", connector,
"config", config)),
ContentType.APPLICATION_JSON
)
.execute()
.handleResponse(createHandler(HttpStatus.SC_CREATED, ConnectorInfo.class));

connectResponse.error()
.ifPresent(error -> LOG.warn("Did not CREATE connector {}: {}", connector, error));

return connectResponse;
} catch (final Exception e) {
throw new KsqlServerException(e);
}
}

private static <T> ResponseHandler<ConnectResponse<T>> createHandler(
final int expectedStatus,
final Class<T> entityClass
) {
return httpResponse -> {
if (httpResponse.getStatusLine().getStatusCode() != expectedStatus) {
final String entity = EntityUtils.toString(httpResponse.getEntity());
return ConnectResponse.of(entity);
}

final T info = MAPPER.readValue(
httpResponse.getEntity().getContent(),
entityClass);

return ConnectResponse.of(info);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DefaultServiceContext implements ServiceContext {
private final KafkaTopicClient topicClient;
private final Supplier<SchemaRegistryClient> srClientFactory;
private final SchemaRegistryClient srClient;
private final ConnectClient connectClient;

public static DefaultServiceContext create(final KsqlConfig ksqlConfig) {
return create(
Expand All @@ -57,21 +58,24 @@ public static DefaultServiceContext create(
kafkaClientSupplier,
adminClient,
new KafkaTopicClientImpl(adminClient),
srClientFactory
srClientFactory,
new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY))
);
}

DefaultServiceContext(
final KafkaClientSupplier kafkaClientSupplier,
final AdminClient adminClient,
final KafkaTopicClient topicClient,
final Supplier<SchemaRegistryClient> srClientFactory
final Supplier<SchemaRegistryClient> srClientFactory,
final ConnectClient connectClient
) {
this.kafkaClientSupplier = Objects.requireNonNull(kafkaClientSupplier, "kafkaClientSupplier");
this.adminClient = Objects.requireNonNull(adminClient, "adminClient");
this.topicClient = Objects.requireNonNull(topicClient, "topicClient");
this.srClientFactory = Objects.requireNonNull(srClientFactory, "srClientFactory");
this.srClient = Objects.requireNonNull(srClientFactory.get(), "srClient");
this.connectClient = Objects.requireNonNull(connectClient, "connectClient");
}

@Override
Expand Down Expand Up @@ -99,6 +103,11 @@ public Supplier<SchemaRegistryClient> getSchemaRegistryClientFactory() {
return srClientFactory;
}

@Override
public ConnectClient getConnectClient() {
return connectClient;
}

@Override
public void close() {
adminClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public Supplier<SchemaRegistryClient> getSchemaRegistryClientFactory() {
return serviceContextSupplier.get().getSchemaRegistryClientFactory();
}

@Override
public ConnectClient getConnectClient() {
return serviceContextSupplier.get().getConnectClient();
}

@Override
public void close() {
serviceContextSupplier.get().close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.services;

import static io.confluent.ksql.util.LimitedProxyBuilder.methodParams;

import io.confluent.ksql.services.ConnectClient.ConnectResponse;
import io.confluent.ksql.util.LimitedProxyBuilder;
import java.util.Map;

/**
* Supplies {@link ConnectClient}s to use that do not make any
* state changes to the external connect clusters.
*/
final class SandboxConnectClient {

private SandboxConnectClient() { }

public static ConnectClient createProxy() {
return LimitedProxyBuilder.forClass(ConnectClient.class)
.swallow("create", methodParams(String.class, Map.class), ConnectResponse.of("sandbox"))
.build();
}
}
Loading

0 comments on commit 15548ce

Please sign in to comment.