From 15548cefa617779866bd15d4ec0ad77ea25c39a8 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Mon, 29 Jul 2019 14:00:35 -0700 Subject: [PATCH] feat(ksql-connect): introduce ConnectClient for REST requests (#3137) --- .../io/confluent/ksql/util/KsqlConfig.java | 14 ++- ksql-engine/pom.xml | 21 ++++ .../ksql/services/ConnectClient.java | 72 +++++++++++ .../ksql/services/DefaultConnectClient.java | 113 ++++++++++++++++++ .../ksql/services/DefaultServiceContext.java | 13 +- .../ksql/services/LazyServiceContext.java | 5 + .../ksql/services/SandboxConnectClient.java | 37 ++++++ .../services/SandboxedServiceContext.java | 14 ++- .../ksql/services/ServiceContext.java | 9 ++ .../confluent/ksql/KsqlContextTestUtil.java | 4 +- .../services/DefaultConnectClientTest.java | 98 +++++++++++++++ .../services/SandboxConnectClientTest.java | 48 ++++++++ .../services/SandboxedServiceContextTest.java | 11 ++ .../ksql/services/TestServiceContext.java | 11 +- .../execution/ListSourceExecutorTest.java | 3 +- .../execution/ListTopicsExecutorTest.java | 3 +- pom.xml | 15 +++ 17 files changed, 477 insertions(+), 14 deletions(-) create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/services/SandboxConnectClientTest.java diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 1b59a91c3d71..4f6c74b03417 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -63,6 +63,8 @@ public class KsqlConfig extends AbstractConfig { public static final String SCHEMA_REGISTRY_URL_PROPERTY = "ksql.schema.registry.url"; + public static final String CONNECT_URL_PROPERTY = "ksql.connect.registry.url"; + public static final String KSQL_ENABLE_UDFS = "ksql.udfs.enabled"; public static final String KSQL_EXT_DIR = "ksql.extension.dir"; @@ -154,8 +156,8 @@ public class KsqlConfig extends AbstractConfig { "Extension for supplying custom metrics to be emitted along with " + "the engine's default JMX metrics"; - public static final String - defaultSchemaRegistryUrl = "http://localhost:8081"; + public static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://localhost:8081"; + public static final String DEFAULT_CONNECT_URL = "http://localhost:8083"; public static final String KSQL_STREAMS_PREFIX = "ksql.streams."; @@ -425,9 +427,15 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { ).define( SCHEMA_REGISTRY_URL_PROPERTY, ConfigDef.Type.STRING, - defaultSchemaRegistryUrl, + DEFAULT_SCHEMA_REGISTRY_URL, ConfigDef.Importance.MEDIUM, "The URL for the schema registry, defaults to http://localhost:8081" + ).define( + CONNECT_URL_PROPERTY, + ConfigDef.Type.STRING, + DEFAULT_CONNECT_URL, + Importance.MEDIUM, + "The URL for the connect deployment, defaults to http://localhost:8083" ).define( KSQL_ENABLE_UDFS, ConfigDef.Type.BOOLEAN, diff --git a/ksql-engine/pom.xml b/ksql-engine/pom.xml index 8da56c206782..ea9c78667d37 100644 --- a/ksql-engine/pom.xml +++ b/ksql-engine/pom.xml @@ -32,22 +32,27 @@ io.confluent.ksql ksql-common + io.confluent.support support-metrics-common + io.confluent.ksql ksql-serde + io.confluent.ksql ksql-parser + io.confluent.ksql ksql-metastore + io.confluent.ksql ksql-udf @@ -78,11 +83,21 @@ commons-csv + + org.apache.httpcomponents + fluent-hc + + org.apache.kafka connect-api + + org.apache.kafka + connect-runtime + + org.codehaus.janino janino @@ -132,6 +147,12 @@ test + + com.github.tomakehurst + wiremock-jre8 + test + + diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java new file mode 100644 index 000000000000..5e99970b5f55 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java @@ -0,0 +1,72 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.services; + +import io.confluent.ksql.util.KsqlPreconditions; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; + +/** + * An interface defining the common operations to communicate with + * a Kafka Connect cluster. + */ +public interface ConnectClient { + + /** + * Creates a connector with {@code connector} as the name under the + * specified configuration. + * + * @param connector the name of the connector + * @param config the connector configuration + */ + ConnectResponse create(String connector, Map config); + + /** + * An optionally successful response. Either contains a value of type + * {@code } or an error, which is the string representation of the + * response entity. + */ + class ConnectResponse { + private final Optional datum; + private final Optional error; + + public static ConnectResponse of(final T datum) { + return new ConnectResponse<>(datum, null); + } + + public static ConnectResponse of(final String error) { + return new ConnectResponse<>(null, error); + } + + private ConnectResponse(final T datum, final String error) { + KsqlPreconditions.checkArgument( + datum != null ^ error != null, + "expected exactly one of datum or error to be null"); + this.datum = Optional.ofNullable(datum); + this.error = Optional.ofNullable(error); + } + + public Optional datum() { + return datum; + } + + public Optional error() { + return error; + } + } + +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java new file mode 100644 index 000000000000..9f1f2ded1491 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java @@ -0,0 +1,113 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.services; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.json.JsonMapper; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.KsqlServerException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; +import java.util.Objects; +import org.apache.http.HttpStatus; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.fluent.Request; +import org.apache.http.entity.ContentType; +import org.apache.http.util.EntityUtils; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The default implementation of {@code ConnectClient}. This implementation is + * thread safe, and the methods are all blocking and are configured with + * default timeouts of {@value #DEFAULT_TIMEOUT_MS}ms. + */ +public class DefaultConnectClient implements ConnectClient { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultConnectClient.class); + private static final ObjectMapper MAPPER = JsonMapper.INSTANCE.mapper; + + private static final String CONNECTORS = "/connectors"; + private static final int DEFAULT_TIMEOUT_MS = 5_000; + + private final URI connectURI; + + public DefaultConnectClient(final String connectURI) { + Objects.requireNonNull(connectURI, "connectURI"); + + try { + this.connectURI = new URI(connectURI); + } catch (URISyntaxException e) { + throw new KsqlException( + "Could not initialize connect client due to invalid URI: " + connectURI, e); + } + } + + @Override + public ConnectResponse create( + final String connector, + final Map config + ) { + try { + LOG.debug("Issuing request to Kafka Connect at URI {} with name {} and config {}", + connectURI, + connector, + config); + + final ConnectResponse connectResponse = Request + .Post(connectURI.resolve(CONNECTORS)) + .socketTimeout(DEFAULT_TIMEOUT_MS) + .connectTimeout(DEFAULT_TIMEOUT_MS) + .bodyString( + MAPPER.writeValueAsString( + ImmutableMap.of( + "name", connector, + "config", config)), + ContentType.APPLICATION_JSON + ) + .execute() + .handleResponse(createHandler(HttpStatus.SC_CREATED, ConnectorInfo.class)); + + connectResponse.error() + .ifPresent(error -> LOG.warn("Did not CREATE connector {}: {}", connector, error)); + + return connectResponse; + } catch (final Exception e) { + throw new KsqlServerException(e); + } + } + + private static ResponseHandler> createHandler( + final int expectedStatus, + final Class entityClass + ) { + return httpResponse -> { + if (httpResponse.getStatusLine().getStatusCode() != expectedStatus) { + final String entity = EntityUtils.toString(httpResponse.getEntity()); + return ConnectResponse.of(entity); + } + + final T info = MAPPER.readValue( + httpResponse.getEntity().getContent(), + entityClass); + + return ConnectResponse.of(info); + }; + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultServiceContext.java b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultServiceContext.java index 8057b2516a3b..d5fa3e2cf14f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultServiceContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultServiceContext.java @@ -35,6 +35,7 @@ public class DefaultServiceContext implements ServiceContext { private final KafkaTopicClient topicClient; private final Supplier srClientFactory; private final SchemaRegistryClient srClient; + private final ConnectClient connectClient; public static DefaultServiceContext create(final KsqlConfig ksqlConfig) { return create( @@ -57,7 +58,8 @@ public static DefaultServiceContext create( kafkaClientSupplier, adminClient, new KafkaTopicClientImpl(adminClient), - srClientFactory + srClientFactory, + new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY)) ); } @@ -65,13 +67,15 @@ public static DefaultServiceContext create( final KafkaClientSupplier kafkaClientSupplier, final AdminClient adminClient, final KafkaTopicClient topicClient, - final Supplier srClientFactory + final Supplier srClientFactory, + final ConnectClient connectClient ) { this.kafkaClientSupplier = Objects.requireNonNull(kafkaClientSupplier, "kafkaClientSupplier"); this.adminClient = Objects.requireNonNull(adminClient, "adminClient"); this.topicClient = Objects.requireNonNull(topicClient, "topicClient"); this.srClientFactory = Objects.requireNonNull(srClientFactory, "srClientFactory"); this.srClient = Objects.requireNonNull(srClientFactory.get(), "srClient"); + this.connectClient = Objects.requireNonNull(connectClient, "connectClient"); } @Override @@ -99,6 +103,11 @@ public Supplier getSchemaRegistryClientFactory() { return srClientFactory; } + @Override + public ConnectClient getConnectClient() { + return connectClient; + } + @Override public void close() { adminClient.close(); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/LazyServiceContext.java b/ksql-engine/src/main/java/io/confluent/ksql/services/LazyServiceContext.java index d981a3488cf9..c2795f3714c7 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/LazyServiceContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/LazyServiceContext.java @@ -53,6 +53,11 @@ public Supplier getSchemaRegistryClientFactory() { return serviceContextSupplier.get().getSchemaRegistryClientFactory(); } + @Override + public ConnectClient getConnectClient() { + return serviceContextSupplier.get().getConnectClient(); + } + @Override public void close() { serviceContextSupplier.get().close(); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java new file mode 100644 index 000000000000..4e6d99763640 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java @@ -0,0 +1,37 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.services; + +import static io.confluent.ksql.util.LimitedProxyBuilder.methodParams; + +import io.confluent.ksql.services.ConnectClient.ConnectResponse; +import io.confluent.ksql.util.LimitedProxyBuilder; +import java.util.Map; + +/** + * Supplies {@link ConnectClient}s to use that do not make any + * state changes to the external connect clusters. + */ +final class SandboxConnectClient { + + private SandboxConnectClient() { } + + public static ConnectClient createProxy() { + return LimitedProxyBuilder.forClass(ConnectClient.class) + .swallow("create", methodParams(String.class, Map.class), ConnectResponse.of("sandbox")) + .build(); + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java index 166784ff91c3..5c163bfe84cc 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java @@ -33,6 +33,7 @@ public final class SandboxedServiceContext implements ServiceContext { private final KafkaTopicClient topicClient; private final SchemaRegistryClient srClient; private final KafkaClientSupplier kafkaClientSupplier; + private final ConnectClient connectClient; public static SandboxedServiceContext create(final ServiceContext serviceContext) { if (serviceContext instanceof SandboxedServiceContext) { @@ -44,21 +45,25 @@ public static SandboxedServiceContext create(final ServiceContext serviceContext .createProxy(serviceContext.getTopicClient()); final SchemaRegistryClient schemaRegistryClient = SandboxedSchemaRegistryClient.createProxy(serviceContext.getSchemaRegistryClient()); + final ConnectClient connectClient = SandboxConnectClient.createProxy(); return new SandboxedServiceContext( kafkaClientSupplier, kafkaTopicClient, - schemaRegistryClient); + schemaRegistryClient, + connectClient); } private SandboxedServiceContext( final KafkaClientSupplier kafkaClientSupplier, final KafkaTopicClient topicClient, - final SchemaRegistryClient srClient + final SchemaRegistryClient srClient, + final ConnectClient connectClient ) { this.kafkaClientSupplier = Objects.requireNonNull(kafkaClientSupplier, "kafkaClientSupplier"); this.topicClient = Objects.requireNonNull(topicClient, "topicClient"); this.srClient = Objects.requireNonNull(srClient, "srClient"); + this.connectClient = Objects.requireNonNull(connectClient, "connectClient"); } @Override @@ -86,6 +91,11 @@ public Supplier getSchemaRegistryClientFactory() { return () -> srClient; } + @Override + public ConnectClient getConnectClient() { + return connectClient; + } + @Override public void close() { // No op. diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/ServiceContext.java b/ksql-engine/src/main/java/io/confluent/ksql/services/ServiceContext.java index be0d84ead093..8ff5384cb4ce 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/ServiceContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/ServiceContext.java @@ -84,6 +84,15 @@ public interface ServiceContext extends AutoCloseable { */ Supplier getSchemaRegistryClientFactory(); + /** + * Get the shared {@link ConnectClient} instance. + * + *

The default implementation is thread-safe and can be shared across threads. + * + * @return a shared {@link ConnectClient} + */ + ConnectClient getConnectClient(); + @Override void close(); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java index ce7305d0058d..263786c30dfa 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java @@ -19,6 +19,7 @@ import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; +import io.confluent.ksql.services.DefaultConnectClient; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.KafkaTopicClientImpl; import io.confluent.ksql.services.ServiceContext; @@ -50,7 +51,8 @@ public static KsqlContext create( clientSupplier, adminClient, kafkaTopicClient, - () -> schemaRegistryClient + () -> schemaRegistryClient, + new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY)) ); final KsqlEngine engine = new KsqlEngine( diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java new file mode 100644 index 000000000000..b6aba696cf34 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java @@ -0,0 +1,98 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.services; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.json.JsonMapper; +import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers; +import io.confluent.ksql.services.ConnectClient.ConnectResponse; +import org.apache.http.HttpStatus; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class DefaultConnectClientTest { + + private static final ObjectMapper MAPPER = JsonMapper.INSTANCE.mapper; + private static final ConnectorInfo SAMPLE_INFO = new ConnectorInfo( + "foo", + ImmutableMap.of("key", "value"), + ImmutableList.of(new ConnectorTaskId("foo", 1)), + ConnectorType.SOURCE + ); + + @Rule + public WireMockRule wireMockRule = new WireMockRule( + WireMockConfiguration.wireMockConfig().dynamicPort()); + + private ConnectClient client; + + @Before + public void setup() { + client = new DefaultConnectClient("http://localhost:" + wireMockRule.port()); + } + + @Test + public void testCreate() throws JsonProcessingException { + // Given: + WireMock.stubFor( + WireMock.post(WireMock.urlEqualTo("/connectors")) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_CREATED) + .withBody(MAPPER.writeValueAsString(SAMPLE_INFO))) + ); + + // When: + final ConnectResponse response = + client.create("foo", ImmutableMap.of()); + + // Then: + assertThat(response.datum(), OptionalMatchers.of(is(SAMPLE_INFO))); + assertThat("Expected no error!", !response.error().isPresent()); + } + + @Test + public void testCreateWithError() throws JsonProcessingException { + // Given: + WireMock.stubFor( + WireMock.post(WireMock.urlEqualTo("/connectors")) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_BAD_REQUEST) + .withBody("Oh no!")) + ); + + // When: + final ConnectResponse response = + client.create("foo", ImmutableMap.of()); + + // Then: + assertThat("Expected no datum!", !response.datum().isPresent()); + assertThat(response.error(), OptionalMatchers.of(is("Oh no!"))); + } + +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxConnectClientTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxConnectClientTest.java new file mode 100644 index 000000000000..f43416d776c4 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxConnectClientTest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.services; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers; +import io.confluent.ksql.services.ConnectClient.ConnectResponse; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +public class SandboxConnectClientTest { + + private ConnectClient sandboxClient; + + @Before + public void setUp() { + sandboxClient = SandboxConnectClient.createProxy(); + } + + @Test + public void shouldReturnErrorOnCreate() { + // When: + final ConnectResponse foo = sandboxClient.create("foo", ImmutableMap.of()); + + // Then: + assertThat(foo.error(), OptionalMatchers.of(is("sandbox"))); + assertThat("expected no datum", !foo.datum().isPresent()); + } + +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java index cf398c105a7a..d6a3edae35b6 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java @@ -18,6 +18,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -55,6 +56,7 @@ public static Collection> getMethodsToTest() { .ignore("getKafkaClientSupplier") .ignore("getSchemaRegistryClient") .ignore("getSchemaRegistryClientFactory") + .ignore("getConnectClient") .ignore("close") .build(); } @@ -162,6 +164,15 @@ public void shouldGetSandboxedSchemaRegistryFactory() { assertThat(factory.get(), is(sameInstance(sandboxedServiceContext.getSchemaRegistryClient()))); } + @Test + public void shouldGetSandboxedConnectClient() { + // When: + final ConnectClient client = sandboxedServiceContext.getConnectClient(); + + // Then: + assertThat("Expected proxy class", Proxy.isProxyClass(client.getClass())); + } + @Test public void shouldNoNothingOnClose() { sandboxedServiceContext.close(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java b/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java index 991c5437ded5..fb23445631b7 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java @@ -62,7 +62,8 @@ public static ServiceContext create( new FakeKafkaClientSupplier(), new FakeKafkaClientSupplier().getAdminClient(Collections.emptyMap()), topicClient, - srClientFactory + srClientFactory, + new DefaultConnectClient("http://localhost:8083") ); } @@ -78,7 +79,8 @@ public static ServiceContext create( kafkaClientSupplier, adminClient, new KafkaTopicClientImpl(adminClient), - srClientFactory + srClientFactory, + new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY)) ); } @@ -86,8 +88,9 @@ public static ServiceContext create( final KafkaClientSupplier kafkaClientSupplier, final AdminClient adminClient, final KafkaTopicClient topicClient, - final Supplier srClientFactory + final Supplier srClientFactory, + final ConnectClient connectClient ) { - return new DefaultServiceContext(kafkaClientSupplier, adminClient, topicClient, srClientFactory); + return new DefaultServiceContext(kafkaClientSupplier, adminClient, topicClient, srClientFactory, connectClient); } } \ No newline at end of file diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java index d9031d26bdac..7f4e232327e3 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java @@ -247,7 +247,8 @@ public void shouldNotCallTopicClientForExtendedDescription() { engine.getServiceContext().getKafkaClientSupplier(), engine.getServiceContext().getAdminClient(), spyTopicClient, - engine.getServiceContext().getSchemaRegistryClientFactory() + engine.getServiceContext().getSchemaRegistryClientFactory(), + engine.getServiceContext().getConnectClient() ); // When: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java index 2420d861f73f..318abeb5ea80 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java @@ -59,7 +59,8 @@ public void shouldListKafkaTopics() { engine.getServiceContext().getKafkaClientSupplier(), mockAdminClient, engine.getServiceContext().getTopicClient(), - engine.getServiceContext().getSchemaRegistryClientFactory() + engine.getServiceContext().getSchemaRegistryClientFactory(), + engine.getServiceContext().getConnectClient() ); // When: diff --git a/pom.xml b/pom.xml index 15ffc55a1144..e2b3ad259500 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ 2.0.0-M22 1.0.0-M33 + 4.5.9 http://packages.confluent.io/maven/ 1.2.1 6.0.2.RELEASE @@ -107,6 +108,7 @@ 1.0.2 0.2.2 2.9.0 + 2.24.0