Skip to content

Commit

Permalink
quarkusio#2663 WIP Kafka Streams extension
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarmorling committed Jun 1, 2019
1 parent bd1326f commit 8d3504c
Show file tree
Hide file tree
Showing 13 changed files with 567 additions and 0 deletions.
10 changes: 10 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@
<artifactId>quarkus-kafka-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
Expand Down Expand Up @@ -1048,6 +1053,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class FeatureBuildItem extends MultiBuildItem {
public static final String JDBC_MARIADB = "jdbc-mariadb";
public static final String JDBC_POSTGRESQL = "jdbc-postgresql";
public static final String JDBC_MSSQL = "jdbc-mssql";
public static final String KAFKA_STREAMS = "kafka-streams";
public static final String KEYCLOAK = "keycloak";
public static final String KOTLIN = "kotlin";
public static final String MAILER = "mailer";
Expand Down
84 changes: 84 additions & 0 deletions extensions/kafka-streams/deployment/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2019 Red Hat, Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>quarkus-kafka-streams-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>quarkus-kafka-streams-deployment</artifactId>
<name>Quarkus - Kafka Streams - Deployment</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-undertow-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jsonp-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2018 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.quarkus.kafka.streams.deployment;

import java.io.IOException;

import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;

import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem;
import io.quarkus.deployment.recording.RecorderContext;
import io.quarkus.kafka.streams.runtime.SmallRyeHealthTemplate;

class KafkaStreamsProcessor {

// private static final DotName HEALTH = DotName.createSimple(Health.class.getName());
//
// /**
// * The configuration for health checking.
// */
// SmallRyeHealthConfig health;
//
// @ConfigRoot(name = "smallrye-health")
// static final class SmallRyeHealthConfig {
// /**
// * The path of the health-checking servlet.
// */
// @ConfigItem(defaultValue = "/health")
// String path;
// }

@BuildStep
@Record(ExecutionTime.STATIC_INIT)
void build(SmallRyeHealthTemplate template, RecorderContext recorder,
BuildProducer<FeatureBuildItem> feature,
// BuildProducer<ServletBuildItem> servlet,
// BuildProducer<AdditionalBeanBuildItem> additionalBean,
// BuildProducer<BeanDefiningAnnotationBuildItem> beanDefiningAnnotation,
BuildProducer<ReflectiveClassBuildItem> reflectiveClasses) throws IOException {

feature.produce(new FeatureBuildItem(FeatureBuildItem.KAFKA_STREAMS));
reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, StreamsPartitionAssignor.class));
reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, DefaultPartitionGrouper.class));

//
// // Register the servlet
// ServletBuildItem servletBuildItem = ServletBuildItem.builder("health", SmallRyeHealthServlet.class.getName())
// .addMapping(health.path).build();
// servlet.produce(servletBuildItem);
//
// // Make ArC discover the beans marked with the @Health qualifier
// beanDefiningAnnotation.produce(new BeanDefiningAnnotationBuildItem(HEALTH));
//
// // Add additional beans
// additionalBean.produce(new AdditionalBeanBuildItem(SmallRyeHealthReporter.class, SmallRyeHealthServlet.class));
//
// // Discover and register the HealthCheckResponseProvider
// Set<String> providers = ServiceUtil.classNamesNamedIn(getClass().getClassLoader(),
// "META-INF/services/" + HealthCheckResponseProvider.class.getName());
//
// if (providers.isEmpty()) {
// throw new IllegalStateException("No HealthCheckResponseProvider implementation found.");
// } else if (providers.size() > 1) {
// throw new IllegalStateException(
// String.format("Multiple HealthCheckResponseProvider implementations found: %s", providers));
// }
//
// template.registerHealthCheckResponseProvider(
// (Class<? extends HealthCheckResponseProvider>) recorder.classProxy(providers.iterator().next()));
}
//
// @BuildStep
// public void kubernetes(BuildProducer<KubernetesHealthPathBuildItem> healthPathItemProducer) {
// healthPathItemProducer.produce(new KubernetesHealthPathBuildItem(health.path));
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkus.smallrye.health.test;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import javax.enterprise.context.Dependent;

import org.eclipse.microprofile.health.Health;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;

@Dependent
@Health
public class BasicHealthCheck implements HealthCheck {

@Override
public HealthCheckResponse call() {
return new HealthCheckResponse() {
@Override
public String getName() {
return "basic";
}

@Override
public State getState() {
return State.UP;
}

@Override
public Optional<Map<String, Object>> getData() {
return Optional.of(Collections.singletonMap("foo", "bar"));
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2018 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.quarkus.smallrye.health.test;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import javax.enterprise.context.Dependent;

import org.eclipse.microprofile.health.Health;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;

@Dependent
@Health
public class FailingHealthCheck implements HealthCheck {

@Override
public HealthCheckResponse call() {
return new HealthCheckResponse() {
@Override
public String getName() {
return "failing";
}

@Override
public State getState() {
return State.DOWN;
}

@Override
public Optional<Map<String, Object>> getData() {
return Optional.of(Collections.singletonMap("status", "all broken"));
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2018 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.quarkus.smallrye.health.test;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.ArrayList;
import java.util.List;

import javax.enterprise.inject.Instance;
import javax.inject.Inject;

import org.eclipse.microprofile.health.Health;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.EmptyAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

public class FailingUnitTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(FailingHealthCheck.class)
.addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml"));
@Inject
@Health
Instance<HealthCheck> checks;

@Test
public void testHealthServlet() {
RestAssured.when().get("/health").then().statusCode(503);
}

@Test
public void testHealthBeans() {
List<HealthCheck> check = new ArrayList<>();
for (HealthCheck i : checks) {
check.add(i);
}
assertEquals(1, check.size());
assertEquals(HealthCheckResponse.State.DOWN, check.get(0).call().getState());
}
}
Loading

0 comments on commit 8d3504c

Please sign in to comment.