From 3c18d3941345339cc072248f0e145e1140ad0ad8 Mon Sep 17 00:00:00 2001 From: Sergey Beryozkin Date: Tue, 29 Nov 2022 14:12:33 +0000 Subject: [PATCH 1/4] Allow same origin requests --- .../CORSHandlerTestWildcardOriginCase.java | 54 +++++++++++++++++++ .../vertx/http/runtime/cors/CORSFilter.java | 17 +++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/cors/CORSHandlerTestWildcardOriginCase.java b/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/cors/CORSHandlerTestWildcardOriginCase.java index bea0ee1578301..c1852e10dd84d 100644 --- a/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/cors/CORSHandlerTestWildcardOriginCase.java +++ b/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/cors/CORSHandlerTestWildcardOriginCase.java @@ -49,6 +49,60 @@ void corsNotMatchingOrigin() { .header("Access-Control-Allow-Credentials", "false"); } + @Test + void corsSameOriginRequest() { + String origin = "http://localhost:8081"; + given().header("Origin", origin) + .get("/test").then() + .statusCode(200) + .header("Access-Control-Allow-Origin", origin); + } + + @Test + void corsInvalidSameOriginRequest1() { + String origin = "http"; + given().header("Origin", origin) + .get("/test").then() + .statusCode(403) + .header("Access-Control-Allow-Origin", nullValue()); + } + + @Test + void corsInvalidSameOriginRequest2() { + String origin = "http://local"; + given().header("Origin", origin) + .get("/test").then() + .statusCode(403) + .header("Access-Control-Allow-Origin", nullValue()); + } + + @Test + void corsInvalidSameOriginRequest3() { + String origin = "http://localhost"; + given().header("Origin", origin) + .get("/test").then() + .statusCode(403) + .header("Access-Control-Allow-Origin", nullValue()); + } + + @Test + void corsInvalidSameOriginRequest4() { + String origin = "http://localhost:9999"; + given().header("Origin", origin) + .get("/test").then() + .statusCode(403) + .header("Access-Control-Allow-Origin", nullValue()); + } + + @Test + void corsInvalidSameOriginRequest5() { + String origin = "https://localhost:8483"; + given().header("Origin", origin) + .get("/test").then() + .statusCode(403) + .header("Access-Control-Allow-Origin", nullValue()); + } + @Test @DisplayName("Returns false 'Access-Control-Allow-Credentials' header on matching origin '*'") void corsMatchingOriginWithWildcard() { diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java index 4a7fee1a14e4f..d9cba6755d00f 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java @@ -1,5 +1,6 @@ package io.quarkus.vertx.http.runtime.cors; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -176,7 +177,7 @@ public void handle(RoutingContext event) { } boolean allowsOrigin = isConfiguredWithWildcard(corsConfig.origins) || corsConfig.origins.get().contains(origin) - || isOriginAllowedByRegex(allowedOriginsRegex, origin); + || isOriginAllowedByRegex(allowedOriginsRegex, origin) || isSameOrigin(request, origin); if (allowsOrigin) { response.headers().set(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, origin); @@ -210,4 +211,18 @@ public void handle(RoutingContext event) { } } } + + private static boolean isSameOrigin(HttpServerRequest request, String origin) { + String absUriString = request.absoluteURI(); + if (absUriString.startsWith(origin)) { + // Make sure that Origin URI contains scheme, host, and port. + // If no port is set in Origin URI then the request URI must not have it set either + URI baseUri = URI.create(absUriString.substring(0, origin.length())); + if (baseUri.getScheme() != null && baseUri.getHost() != null + && (baseUri.getPort() > 0 || URI.create(absUriString).getPort() == -1)) { + return true; + } + } + return false; + } } From 9c5c1f80306d05bd861abb9caa0004de58f82b00 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Fri, 2 Dec 2022 10:41:13 +1100 Subject: [PATCH 2/4] Add fast path to same origin check --- .../vertx/http/runtime/cors/CORSFilter.java | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java index d9cba6755d00f..7bf623c08c7a7 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java @@ -212,7 +212,23 @@ public void handle(RoutingContext event) { } } - private static boolean isSameOrigin(HttpServerRequest request, String origin) { + static boolean isSameOrigin(HttpServerRequest request, String origin) { + //fast path check, when everything is the same + if (origin.startsWith(request.scheme())) { + if (!substringMatch(origin, request.scheme().length(), "://", false)) { + return false; + } + if (substringMatch(origin, request.scheme().length() + 3, request.host(), true)) { + //they are a simple match + return true; + } + return isSameOriginSlowPath(request, origin); + } else { + return false; + } + } + + static boolean isSameOriginSlowPath(HttpServerRequest request, String origin) { String absUriString = request.absoluteURI(); if (absUriString.startsWith(origin)) { // Make sure that Origin URI contains scheme, host, and port. @@ -225,4 +241,26 @@ private static boolean isSameOrigin(HttpServerRequest request, String origin) { } return false; } + + static boolean substringMatch(String str, int pos, String substring, boolean requireFull) { + int length = str.length(); + int subLength = substring.length(); + int strPos = pos; + int subPos = 0; + if (pos + subLength > length) { + //too long, avoid checking in the loop + return false; + } + for (;;) { + if (subPos == subLength) { + //if we are at the end return the correct value, depending on if we are also at the end of the origin + return !requireFull || strPos == length; + } + if (str.charAt(strPos) != substring.charAt(subPos)) { + return false; + } + strPos++; + subPos++; + } + } } From 2ce7349f0d7c4dc7297da2a31ed17f914409b102 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Fri, 2 Dec 2022 10:56:10 +1100 Subject: [PATCH 3/4] Add fast path to same host logic Also clean up the slow path --- .../vertx/http/runtime/cors/CORSFilter.java | 40 ++++++++++++---- .../http/runtime/cors/CORSFilterTest.java | 47 +++++++++++++++++++ 2 files changed, 79 insertions(+), 8 deletions(-) diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java index 7bf623c08c7a7..05869dfbfb877 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/cors/CORSFilter.java @@ -79,7 +79,7 @@ public static List parseAllowedOriginsRegex(Optional> allo * If any regular expression origins are configured, try to match on them. * Regular expressions must begin and end with '/' * - * @param allowedOrigins the configured regex origins. + * @param allowOriginsRegex the configured regex origins. * @param origin the specified origin * @return true if any configured regular expressions match the specified origin, false otherwise */ @@ -230,13 +230,37 @@ static boolean isSameOrigin(HttpServerRequest request, String origin) { static boolean isSameOriginSlowPath(HttpServerRequest request, String origin) { String absUriString = request.absoluteURI(); - if (absUriString.startsWith(origin)) { - // Make sure that Origin URI contains scheme, host, and port. - // If no port is set in Origin URI then the request URI must not have it set either - URI baseUri = URI.create(absUriString.substring(0, origin.length())); - if (baseUri.getScheme() != null && baseUri.getHost() != null - && (baseUri.getPort() > 0 || URI.create(absUriString).getPort() == -1)) { - return true; + //we already know the scheme is correct, as the fast path will reject that + URI baseUri = URI.create(absUriString); + URI originUri = URI.create(origin); + if (!originUri.getPath().isEmpty()) { + //origin should not contain a path component + //just reject it in this case + return false; + } + if (!baseUri.getHost().equals(originUri.getHost())) { + return false; + } + if (baseUri.getPort() == originUri.getPort()) { + return true; + } + if (baseUri.getPort() != -1 && originUri.getPort() != -1) { + //ports are explictly set + return false; + } + if (baseUri.getScheme().equals("http")) { + if (baseUri.getPort() == 80 || baseUri.getPort() == -1) { + if (originUri.getPort() == 80 || originUri.getPort() == -1) { + //port is either unset or 80 + return true; + } + } + } else if (baseUri.getScheme().equals("https")) { + if (baseUri.getPort() == 443 || baseUri.getPort() == -1) { + if (originUri.getPort() == 443 || originUri.getPort() == -1) { + //port is either unset or 443 + return true; + } } } return false; diff --git a/extensions/vertx-http/runtime/src/test/java/io/quarkus/vertx/http/runtime/cors/CORSFilterTest.java b/extensions/vertx-http/runtime/src/test/java/io/quarkus/vertx/http/runtime/cors/CORSFilterTest.java index 5686242b7eaef..e2348e9efcf7b 100644 --- a/extensions/vertx-http/runtime/src/test/java/io/quarkus/vertx/http/runtime/cors/CORSFilterTest.java +++ b/extensions/vertx-http/runtime/src/test/java/io/quarkus/vertx/http/runtime/cors/CORSFilterTest.java @@ -2,7 +2,9 @@ import static io.quarkus.vertx.http.runtime.cors.CORSFilter.isConfiguredWithWildcard; import static io.quarkus.vertx.http.runtime.cors.CORSFilter.isOriginAllowedByRegex; +import static io.quarkus.vertx.http.runtime.cors.CORSFilter.isSameOrigin; import static io.quarkus.vertx.http.runtime.cors.CORSFilter.parseAllowedOriginsRegex; +import static io.quarkus.vertx.http.runtime.cors.CORSFilter.substringMatch; import java.util.Arrays; import java.util.Collections; @@ -12,6 +14,9 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import io.vertx.core.http.HttpServerRequest; public class CORSFilterTest { @@ -37,4 +42,46 @@ public void isOriginAllowedByRegexTest() { Assertions.assertEquals(regexList.size(), 1); Assertions.assertTrue(isOriginAllowedByRegex(regexList, "https://abc-123.app.mydomain.com")); } + + @Test + public void sameOriginTest() { + var request = Mockito.mock(HttpServerRequest.class); + Mockito.when(request.scheme()).thenReturn("http"); + Mockito.when(request.host()).thenReturn("localhost"); + Mockito.when(request.absoluteURI()).thenReturn("http://localhost"); + Assertions.assertTrue(isSameOrigin(request, "http://localhost")); + Assertions.assertTrue(isSameOrigin(request, "http://localhost:80")); + Assertions.assertFalse(isSameOrigin(request, "http://localhost:8080")); + Assertions.assertFalse(isSameOrigin(request, "https://localhost")); + Mockito.when(request.host()).thenReturn("localhost:8080"); + Mockito.when(request.absoluteURI()).thenReturn("http://localhost:8080"); + Assertions.assertFalse(isSameOrigin(request, "http://localhost")); + Assertions.assertFalse(isSameOrigin(request, "http://localhost:80")); + Assertions.assertTrue(isSameOrigin(request, "http://localhost:8080")); + Assertions.assertFalse(isSameOrigin(request, "https://localhost:8080")); + Mockito.when(request.scheme()).thenReturn("https"); + Mockito.when(request.host()).thenReturn("localhost"); + Mockito.when(request.absoluteURI()).thenReturn("http://localhost"); + Assertions.assertFalse(isSameOrigin(request, "http://localhost")); + Assertions.assertFalse(isSameOrigin(request, "http://localhost:443")); + Assertions.assertFalse(isSameOrigin(request, "https://localhost:8080")); + Assertions.assertTrue(isSameOrigin(request, "https://localhost")); + Mockito.when(request.host()).thenReturn("localhost:8443"); + Mockito.when(request.absoluteURI()).thenReturn("https://localhost:8443"); + Assertions.assertFalse(isSameOrigin(request, "http://localhost")); + Assertions.assertFalse(isSameOrigin(request, "http://localhost:80")); + Assertions.assertFalse(isSameOrigin(request, "http://localhost:8443")); + Assertions.assertTrue(isSameOrigin(request, "https://localhost:8443")); + + } + + @Test + public void testSubstringMatches() { + Assertions.assertTrue(substringMatch("localhost", 0, "local", false)); + Assertions.assertFalse(substringMatch("localhost", 0, "local", true)); + Assertions.assertFalse(substringMatch("localhost", 1, "local", false)); + Assertions.assertTrue(substringMatch("localhost", 5, "host", false)); + Assertions.assertTrue(substringMatch("localhost", 5, "host", true)); + + } } From 0f3a9384a0da9ca149b40a4b88f8713fc0a80b46 Mon Sep 17 00:00:00 2001 From: ozangunalp Date: Tue, 13 Dec 2022 15:16:52 +0000 Subject: [PATCH 4/4] Kafka native container dev services Fixes #28194 --- .../src/main/asciidoc/kafka-dev-services.adoc | 24 ++++- .../deployment/DevServicesKafkaProcessor.java | 90 ++++++++++------- .../KafkaDevServicesBuildTimeConfig.java | 42 ++++++-- .../deployment/KafkaNativeContainer.java | 99 +++++++++++++++++++ .../src/main/resources/application.properties | 1 + 5 files changed, 207 insertions(+), 49 deletions(-) create mode 100644 extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaNativeContainer.java diff --git a/docs/src/main/asciidoc/kafka-dev-services.adoc b/docs/src/main/asciidoc/kafka-dev-services.adoc index 30588ba1e1381..e48283d851827 100644 --- a/docs/src/main/asciidoc/kafka-dev-services.adoc +++ b/docs/src/main/asciidoc/kafka-dev-services.adoc @@ -50,16 +50,32 @@ Note that the Kafka advertised address is automatically configured with the chos [[configuring-the-image]] == Configuring the image -Dev Services for Kafka supports https://redpanda.com[Redpanda] and https://strimzi.io[Strimzi] (in https://github.com/apache/kafka/blob/trunk/config/kraft/README.md[Kraft] mode). +Dev Services for Kafka supports https://redpanda.com[Redpanda], https://github/ozangunalp/kafka-native[kafka-native] +and https://strimzi.io[Strimzi] (in https://github.com/apache/kafka/blob/trunk/config/kraft/README.md[Kraft] mode) images. -Redpanda is a Kafka compatible event streaming platform. -Because it provides a faster startup time dev services defaults to `vectorized/redpanda` images. +**Redpanda** is a Kafka compatible event streaming platform. +Because it provides a fast startup times, dev services defaults to Redpanda images from `vectorized/redpanda`. You can select any version from https://hub.docker.com/r/vectorized/redpanda. -Strimzi provides container images and Operators for running Apache Kafka on Kubernetes. +**kafka-native** provides images of standard Apache Kafka distribution compiled to native binary using Quarkus and GraalVM. +While still being _experimental_, it provides very fast startup times with small footprint. + +Image type can be configured using + +[source, properties] +---- +quarkus.kafka.devservices.provider=kafka-native +---- + +**Strimzi** provides container images and Operators for running Apache Kafka on Kubernetes. While Strimzi is optimized for Kubernetes, the images work perfectly in classic container environments. Strimzi container images run "genuine" Kafka broker on JVM, which is slower to start. +[source, properties] +---- +quarkus.kafka.devservices.provider=strimzi +---- + For Strimzi, you can select any image with a Kafka version which has Kraft support (2.8.1 and higher) from https://quay.io/repository/strimzi-test-container/test-container?tab=tags [source, properties] diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java index 03b704e56fe5e..88e167dc08b5d 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java @@ -225,39 +225,53 @@ private RunningDevService startKafka(DockerStatusBuildItem dockerStatusBuildItem // Starting the broker final Supplier defaultKafkaBrokerSupplier = () -> { - if (config.imageName.contains("strimzi")) { - StrimziKafkaContainer container = new StrimziKafkaContainer(config.imageName) - .withBrokerId(1) - .withKraft() - .waitForRunning(); - ConfigureUtil.configureSharedNetwork(container, "kafka"); - if (config.serviceName != null) { - container.withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, config.serviceName); - } - if (config.fixedExposedPort != 0) { - container.withPort(config.fixedExposedPort); - } - timeout.ifPresent(container::withStartupTimeout); - - container.start(); - return new RunningDevService(Feature.KAFKA_CLIENT.getName(), - container.getContainerId(), - container::close, - KAFKA_BOOTSTRAP_SERVERS, container.getBootstrapServers()); - } else { - RedPandaKafkaContainer container = new RedPandaKafkaContainer( - DockerImageName.parse(config.imageName).asCompatibleSubstituteFor("vectorized/redpanda"), - config.fixedExposedPort, - launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null, - useSharedNetwork, config.redpanda); - timeout.ifPresent(container::withStartupTimeout); - container.start(); - - return new RunningDevService(Feature.KAFKA_CLIENT.getName(), - container.getContainerId(), - container::close, - KAFKA_BOOTSTRAP_SERVERS, container.getBootstrapServers()); + switch (config.provider) { + case REDPANDA: + RedPandaKafkaContainer redpanda = new RedPandaKafkaContainer( + DockerImageName.parse(config.imageName).asCompatibleSubstituteFor("vectorized/redpanda"), + config.fixedExposedPort, + launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null, + useSharedNetwork, config.redpanda); + timeout.ifPresent(redpanda::withStartupTimeout); + redpanda.start(); + + return new RunningDevService(Feature.KAFKA_CLIENT.getName(), + redpanda.getContainerId(), + redpanda::close, + KAFKA_BOOTSTRAP_SERVERS, redpanda.getBootstrapServers()); + case STRIMZI: + StrimziKafkaContainer strimzi = new StrimziKafkaContainer(config.imageName) + .withBrokerId(1) + .withKraft() + .waitForRunning(); + ConfigureUtil.configureSharedNetwork(strimzi, "kafka"); + if (config.serviceName != null) { + strimzi.withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, config.serviceName); + } + if (config.fixedExposedPort != 0) { + strimzi.withPort(config.fixedExposedPort); + } + timeout.ifPresent(strimzi::withStartupTimeout); + + strimzi.start(); + return new RunningDevService(Feature.KAFKA_CLIENT.getName(), + strimzi.getContainerId(), + strimzi::close, + KAFKA_BOOTSTRAP_SERVERS, strimzi.getBootstrapServers()); + case KAFKA_NATIVE: + KafkaNativeContainer kafkaNative = new KafkaNativeContainer(DockerImageName.parse(config.imageName), + config.fixedExposedPort, + launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null, + useSharedNetwork); + timeout.ifPresent(kafkaNative::withStartupTimeout); + kafkaNative.start(); + + return new RunningDevService(Feature.KAFKA_CLIENT.getName(), + kafkaNative.getContainerId(), + kafkaNative::close, + KAFKA_BOOTSTRAP_SERVERS, kafkaNative.getBootstrapServers()); } + return null; }; return maybeContainerAddress @@ -300,11 +314,15 @@ private static final class KafkaDevServiceCfg { private final String serviceName; private final Map topicPartitions; private final Duration topicPartitionsTimeout; + + private final KafkaDevServicesBuildTimeConfig.Provider provider; + private final RedPandaBuildTimeConfig redpanda; public KafkaDevServiceCfg(KafkaDevServicesBuildTimeConfig config) { this.devServicesEnabled = config.enabled.orElse(true); - this.imageName = config.imageName; + this.provider = config.provider; + this.imageName = config.imageName.orElseGet(provider::getDefaultImageName); this.fixedExposedPort = config.port.orElse(0); this.shared = config.shared; this.serviceName = config.serviceName; @@ -323,13 +341,15 @@ public boolean equals(Object o) { return false; } KafkaDevServiceCfg that = (KafkaDevServiceCfg) o; - return devServicesEnabled == that.devServicesEnabled && Objects.equals(imageName, that.imageName) + return devServicesEnabled == that.devServicesEnabled + && Objects.equals(provider, that.provider) + && Objects.equals(imageName, that.imageName) && Objects.equals(fixedExposedPort, that.fixedExposedPort); } @Override public int hashCode() { - return Objects.hash(devServicesEnabled, imageName, fixedExposedPort); + return Objects.hash(devServicesEnabled, provider, imageName, fixedExposedPort); } } diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java index 30b9a780a1bbf..36fc407e05d26 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java @@ -28,15 +28,9 @@ public class KafkaDevServicesBuildTimeConfig { public Optional port; /** - * The Kafka container image to use. - *

- * Only Redpanda and Strimzi images are supported. - * Default image is Redpanda. + * Kafka dev service container type. *

- * Note that Strimzi images are launched in Kraft mode. - * In order to use a Strimzi image you need to set a compatible image name such as - * {@code quay.io/strimzi-test-container/test-container:0.100.0-kafka-3.1.0} or - * {@code quay.io/strimzi/kafka:0.27.1-kafka-3.0.0} + * Redpanda, Strimzi and kafka-native container providers are supported. Default is redpanda. *

* For Redpanda: * See https://vectorized.io/docs/quick-start-docker/ and https://hub.docker.com/r/vectorized/redpanda @@ -44,9 +38,37 @@ public class KafkaDevServicesBuildTimeConfig { * For Strimzi: * See https://github.com/strimzi/test-container and https://quay.io/repository/strimzi-test-container/test-container *

+ * For Kafka Native: + * See https://github.com/ozangunalp/kafka-native and https://quay.io/repository/ogunalp/kafka-native + *

+ * Note that Strimzi and Kafka Native images are launched in Kraft mode. */ - @ConfigItem(defaultValue = "docker.io/vectorized/redpanda:v22.3.4") - public String imageName; + @ConfigItem(defaultValue = "redpanda") + public Provider provider = Provider.REDPANDA; + + public enum Provider { + REDPANDA("docker.io/vectorized/redpanda:v22.3.4"), + STRIMZI("quay.io/strimzi-test-container/test-container:latest-kafka-3.2.1"), + KAFKA_NATIVE("quay.io/ogunalp/kafka-native:latest"); + + private final String defaultImageName; + + Provider(String imageName) { + this.defaultImageName = imageName; + } + + public String getDefaultImageName() { + return defaultImageName; + } + } + + /** + * The Kafka container image to use. + *

+ * Dependent on the provider. + */ + @ConfigItem + public Optional imageName; /** * Indicates if the Kafka broker managed by Quarkus Dev Services is shared. diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaNativeContainer.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaNativeContainer.java new file mode 100644 index 0000000000000..b6fff5fd04a9c --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaNativeContainer.java @@ -0,0 +1,99 @@ +package io.quarkus.kafka.client.deployment; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.DockerImageName; + +import com.github.dockerjava.api.command.InspectContainerResponse; + +import io.quarkus.devservices.common.ConfigureUtil; + +public class KafkaNativeContainer extends GenericContainer { + + private static final String STARTER_SCRIPT = "/work/run.sh"; + + private final Integer fixedExposedPort; + private final boolean useSharedNetwork; + + private String additionalArgs = null; + private int exposedPort = -1; + + private String hostName = null; + + public KafkaNativeContainer(DockerImageName dockerImageName, int fixedExposedPort, String serviceName, + boolean useSharedNetwork) { + super(dockerImageName); + this.fixedExposedPort = fixedExposedPort; + this.useSharedNetwork = useSharedNetwork; + if (serviceName != null) { + withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, serviceName); + } + String cmd = String.format("while [ ! -f %s ]; do sleep 0.1; done; sleep 0.1; %s", STARTER_SCRIPT, STARTER_SCRIPT); + withCommand("sh", "-c", cmd); + waitingFor(Wait.forLogMessage(".*Kafka broker started.*", 1)); + } + + @Override + protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { + super.containerIsStarting(containerInfo, reused); + // Set exposed port + this.exposedPort = getMappedPort(DevServicesKafkaProcessor.KAFKA_PORT); + // follow output + // Start and configure the advertised address + String cmd = "#!/bin/bash\n"; + cmd += "/work/kafka"; + cmd += " -Dkafka.advertised.listeners=" + getBootstrapServers(); + if (useSharedNetwork) { + cmd += " -Dkafka.listeners=BROKER://:9093,PLAINTEXT://:9092,CONTROLLER://:9094"; + cmd += " -Dkafka.interbroker.listener.name=BROKER"; + cmd += " -Dkafka.controller.listener.names=CONTROLLER"; + cmd += " -Dkafka.listener.security.protocol.map=BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"; + cmd += " -Dkafka.early.start.listeners=BROKER,CONTROLLER,PLAINTEXT"; + } + if (additionalArgs != null) { + cmd += " " + additionalArgs; + } + + //noinspection OctalInteger + copyFileToContainer( + Transferable.of(cmd.getBytes(StandardCharsets.UTF_8), 0777), + STARTER_SCRIPT); + } + + private String getKafkaAdvertisedListeners() { + List addresses = new ArrayList<>(); + if (useSharedNetwork) { + addresses.add(String.format("BROKER://%s:9093", hostName)); + } + // See https://github.com/quarkusio/quarkus/issues/21819 + // Kafka is always exposed to the Docker host network + addresses.add(String.format("PLAINTEXT://%s:%d", getHost(), getExposedKafkaPort())); + return String.join(",", addresses); + } + + public int getExposedKafkaPort() { + return exposedPort; + } + + @Override + protected void configure() { + super.configure(); + + addExposedPort(DevServicesKafkaProcessor.KAFKA_PORT); + hostName = ConfigureUtil.configureSharedNetwork(this, "kafka"); + + if (fixedExposedPort != null) { + addFixedExposedPort(fixedExposedPort, DevServicesKafkaProcessor.KAFKA_PORT); + } + } + + public String getBootstrapServers() { + return getKafkaAdvertisedListeners(); + } + +} diff --git a/integration-tests/kafka-devservices/src/main/resources/application.properties b/integration-tests/kafka-devservices/src/main/resources/application.properties index 8e83b2b6d39c7..bdd341fbda057 100644 --- a/integration-tests/kafka-devservices/src/main/resources/application.properties +++ b/integration-tests/kafka-devservices/src/main/resources/application.properties @@ -5,6 +5,7 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN # enable health check quarkus.kafka.health.enabled=true +quarkus.kafka.devservices.provider=kafka-native quarkus.kafka.devservices.topic-partitions.test=2 quarkus.kafka.devservices.topic-partitions.test-consumer=3 quarkus.kafka.devservices.topic-partitions-timeout=4S