Skip to content

Commit

Permalink
Initial support for Snappy in the Kafka client extension.
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored and gsmet committed Feb 19, 2021
1 parent 4868b64 commit 05e5e8b
Show file tree
Hide file tree
Showing 26 changed files with 1,178 additions and 47 deletions.
4 changes: 2 additions & 2 deletions .github/native-tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
},
{
"category": "Messaging",
"timeout": 85,
"test-modules": "artemis-core artemis-jms kafka kafka-streams reactive-messaging-amqp reactive-messaging-kafka reactive-messaging-http"
"timeout": 120,
"test-modules": "artemis-core artemis-jms kafka kafka-avro kafka-snappy kafka-streams reactive-messaging-amqp reactive-messaging-kafka reactive-messaging-http"
},
{
"category": "Security1",
Expand Down
17 changes: 17 additions & 0 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,23 @@ Update the `oauth.client.id`, `oauth.client.secret` and `oauth.token.endpoint.ur

OAuth authentication works for both JVM and native modes.

== Using Snappy

On _outgoing_ channels, you can enable Snappy compression by setting the `compression.type` attribute to `snappy`:

[source, properties]
----
mp.messaging.outgoing.fruit-out.compression.type=snappy
----

In JVM mode, it will work out of the box.
However, to compile your application to a native executable, you need to:

1. Uses GraalVM 21.+
2. Add `quarkus.kafka.snappy.enabled=true` to your `application.properties`

In native mode, Snappy is disabled by default as the use of Snappy requires embedding a native library and unpacking it when the application starts.

== Configuration

More details about the SmallRye Reactive Messaging configuration can be found in the https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.8/kafka/kafka.html[SmallRye Reactive Messaging - Kafka Connector Documentation].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,13 @@ public class KafkaBuildTimeConfig {
*/
@ConfigItem(name = "health.enabled", defaultValue = "false")
public boolean healthEnabled;

/**
* Whether or not to enable Snappy in native mode.
* <p>
* Note that Snappy requires GraalVM 21+ and embeds a native library in the native executable.
* This library is unpacked and loaded when the application starts.
*/
@ConfigItem(name = "snappy.enabled", defaultValue = "false")
public boolean snappyEnabled;
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,17 @@
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.AdditionalIndexedClassesBuildItem;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.IndexDependencyBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveHierarchyBuildItem;
import io.quarkus.deployment.pkg.NativeConfig;
import io.quarkus.kafka.client.runtime.KafkaRecorder;
import io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;
import io.quarkus.kafka.client.serialization.JsonbSerializer;
Expand All @@ -68,7 +73,7 @@

public class KafkaProcessor {

static final Class[] BUILT_INS = {
static final Class<?>[] BUILT_INS = {
//serializers
ShortSerializer.class,
DoubleSerializer.class,
Expand Down Expand Up @@ -109,9 +114,12 @@ void contributeClassesToIndex(BuildProducer<AdditionalIndexedClassesBuildItem> a
}

@BuildStep
public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
public void build(
KafkaBuildTimeConfig config,
CombinedIndexBuildItem indexBuildItem, BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<NativeImageProxyDefinitionBuildItem> proxies,
Capabilities capabilities, BuildProducer<UnremovableBeanBuildItem> beans) {
Capabilities capabilities, BuildProducer<UnremovableBeanBuildItem> beans,
BuildProducer<NativeImageResourceBuildItem> nativeLibs, NativeConfig nativeConfig) {
final Set<DotName> toRegister = new HashSet<>();

collectImplementors(toRegister, indexBuildItem, Serializer.class);
Expand All @@ -134,13 +142,15 @@ public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer<Reflectiv
collectSubclasses(toRegister, indexBuildItem, i);
}
if (capabilities.isPresent(Capability.JSONB)) {
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, JsonbSerializer.class, JsonbDeserializer.class));
reflectiveClass.produce(
new ReflectiveClassBuildItem(false, false, JsonbSerializer.class, JsonbDeserializer.class));
collectSubclasses(toRegister, indexBuildItem, JsonbSerializer.class);
collectSubclasses(toRegister, indexBuildItem, JsonbDeserializer.class);
}
if (capabilities.isPresent(Capability.JACKSON)) {
reflectiveClass.produce(
new ReflectiveClassBuildItem(false, false, ObjectMapperSerializer.class, ObjectMapperDeserializer.class));
new ReflectiveClassBuildItem(false, false, ObjectMapperSerializer.class,
ObjectMapperDeserializer.class));
collectSubclasses(toRegister, indexBuildItem, ObjectMapperSerializer.class);
collectSubclasses(toRegister, indexBuildItem, ObjectMapperDeserializer.class);

Expand All @@ -162,6 +172,88 @@ public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer<Reflectiv
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, "java.nio.DirectByteBuffer"));
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, "sun.misc.Cleaner"));

handleAvro(reflectiveClass, proxies);
handleOpenTracing(reflectiveClass, capabilities);
handleStrimziOAuth(reflectiveClass);
if (config.snappyEnabled) {
handleSnappy(reflectiveClass, nativeLibs, nativeConfig);
}

}

private void handleSnappy(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<NativeImageResourceBuildItem> nativeLibs, NativeConfig nativeConfig) {
reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true,
"org.xerial.snappy.SnappyInputStream",
"org.xerial.snappy.SnappyOutputStream"));

String root = "org/xerial/snappy/native/";
// add linux64 native lib when targeting containers
if (nativeConfig.containerRuntime.isPresent() || nativeConfig.containerBuild) {
String dir = "Linux/x86_64";
String snappyNativeLibraryName = "libsnappyjava.so";
String path = root + dir + "/" + snappyNativeLibraryName;
nativeLibs.produce(new NativeImageResourceBuildItem(path));
} else { // otherwise the native lib of the platform this build runs on
String dir = getOs() + "/" + getArch();
String snappyNativeLibraryName = System.mapLibraryName("snappyjava");
if (snappyNativeLibraryName.toLowerCase().endsWith(".dylib")) {
snappyNativeLibraryName = snappyNativeLibraryName.replace(".dylib", ".jnilib");
}
String path = root + dir + "/" + snappyNativeLibraryName;
nativeLibs.produce(new NativeImageResourceBuildItem(path));
}
}

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
void loadSnappyIfEnabled(KafkaRecorder recorder, KafkaBuildTimeConfig config) {
if (config.snappyEnabled) {
recorder.loadSnappy();
}
}

private void handleOpenTracing(BuildProducer<ReflectiveClassBuildItem> reflectiveClass, Capabilities capabilities) {
//opentracing contrib kafka interceptors: https://github.com/opentracing-contrib/java-kafka-client
if (capabilities.isPresent(Capability.OPENTRACING)) {
try {
Class.forName("io.opentracing.contrib.kafka.TracingProducerInterceptor", false,
Thread.currentThread().getContextClassLoader());
reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.opentracing.contrib.kafka.TracingProducerInterceptor",
"io.opentracing.contrib.kafka.TracingConsumerInterceptor"));
} catch (ClassNotFoundException e) {
//ignore, opentracing contrib kafka is not in the classpath
}
}
}

private void handleStrimziOAuth(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
try {
Class.forName("io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler", false,
Thread.currentThread().getContextClassLoader());

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true,
"io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true,
"org.keycloak.jose.jws.JWSHeader",
"org.keycloak.representations.AccessToken",
"org.keycloak.representations.AccessToken$Access",
"org.keycloak.representations.AccessTokenResponse",
"org.keycloak.representations.IDToken",
"org.keycloak.representations.JsonWebToken",
"org.keycloak.jose.jwk.JSONWebKeySet",
"org.keycloak.jose.jwk.JWK",
"org.keycloak.json.StringOrArrayDeserializer",
"org.keycloak.json.StringListMapDeserializer"));
} catch (ClassNotFoundException e) {
//ignore, Strimzi OAuth Client is not on the classpath
}
}

private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<NativeImageProxyDefinitionBuildItem> proxies) {
// Avro - for both Confluent and Apicurio
try {
Class.forName("io.confluent.kafka.serializers.KafkaAvroDeserializer", false,
Expand Down Expand Up @@ -227,42 +319,6 @@ public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer<Reflectiv
} catch (ClassNotFoundException e) {
//ignore, Apicurio Avro is not in the classpath
}

//opentracing contrib kafka interceptors: https://github.com/opentracing-contrib/java-kafka-client
if (capabilities.isPresent(Capability.OPENTRACING)) {
try {
Class.forName("io.opentracing.contrib.kafka.TracingProducerInterceptor", false,
Thread.currentThread().getContextClassLoader());
reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.opentracing.contrib.kafka.TracingProducerInterceptor",
"io.opentracing.contrib.kafka.TracingConsumerInterceptor"));
} catch (ClassNotFoundException e) {
//ignore, opentracing contrib kafka is not in the classpath
}
}

try {
Class.forName("io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler", false,
Thread.currentThread().getContextClassLoader());

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true,
"io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true,
"org.keycloak.jose.jws.JWSHeader",
"org.keycloak.representations.AccessToken",
"org.keycloak.representations.AccessToken$Access",
"org.keycloak.representations.AccessTokenResponse",
"org.keycloak.representations.IDToken",
"org.keycloak.representations.JsonWebToken",
"org.keycloak.jose.jwk.JSONWebKeySet",
"org.keycloak.jose.jwk.JWK",
"org.keycloak.json.StringOrArrayDeserializer",
"org.keycloak.json.StringListMapDeserializer"));
} catch (ClassNotFoundException e) {
//ignore, Strimzi OAuth Client is not on the classpath
}

}

@BuildStep
Expand All @@ -277,7 +333,8 @@ public AdditionalBeanBuildItem runtimeConfig() {
public void withSasl(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<ReflectiveHierarchyBuildItem> reflectiveHierarchy) {

reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, AbstractLogin.DefaultLoginCallbackHandler.class));
reflectiveClass
.produce(new ReflectiveClassBuildItem(false, false, AbstractLogin.DefaultLoginCallbackHandler.class));
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, SaslClientCallbackHandler.class));
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, DefaultLogin.class));

Expand Down Expand Up @@ -321,4 +378,23 @@ UnremovableBeanBuildItem ensureJsonParserAvailable() {
"io.quarkus.jsonb.JsonbProducer",
"javax.json.bind.Jsonb");
}

public static String getArch() {
String osArch = System.getProperty("os.arch");
return osArch.replaceAll("\\W", "");
}

static String getOs() {
String osName = System.getProperty("os.name");

if (osName.contains("Windows")) {
return "Windows";
} else if (osName.contains("Mac")) {
return "Mac";
} else if (osName.contains("Linux")) {
return "Linux";
} else {
return osName.replaceAll("\\W", "");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.quarkus.kafka.client.runtime;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URL;

import org.xerial.snappy.OSInfo;
import org.xerial.snappy.SnappyError;
import org.xerial.snappy.SnappyErrorCode;
import org.xerial.snappy.SnappyLoader;

import io.quarkus.runtime.annotations.Recorder;

@Recorder
public class KafkaRecorder {

public void loadSnappy() {
// Resolve the library file name with a suffix (e.g., dll, .so, etc.)
String snappyNativeLibraryName = System.mapLibraryName("snappyjava");
String snappyNativeLibraryPath = "/org/xerial/snappy/native/" + OSInfo.getNativeLibFolderPathForCurrentOS();
boolean hasNativeLib = hasResource(snappyNativeLibraryPath + "/" + snappyNativeLibraryName);
if (!hasNativeLib) {
if (OSInfo.getOSName().equals("Mac")) {
// Fix for openjdk7 for Mac
String altName = "libsnappyjava.jnilib";
if (hasResource(snappyNativeLibraryPath + "/" + altName)) {
snappyNativeLibraryName = altName;
hasNativeLib = true;
}
}
}

if (!hasNativeLib) {
String errorMessage = String.format("no native library is found for os.name=%s and os.arch=%s", OSInfo.getOSName(),
OSInfo.getArchName());
throw new SnappyError(SnappyErrorCode.FAILED_TO_LOAD_NATIVE_LIBRARY, errorMessage);
}

File out = extractLibraryFile(
SnappyLoader.class.getResource(snappyNativeLibraryPath + "/" + snappyNativeLibraryName),
snappyNativeLibraryName);

System.load(out.getAbsolutePath());
}

private static boolean hasResource(String path) {
return SnappyLoader.class.getResource(path) != null;
}

private static File extractLibraryFile(URL library, String name) {
String tmp = System.getProperty("java.io.tmpdir");
File extractedLibFile = new File(tmp, name);

try (BufferedInputStream inputStream = new BufferedInputStream(library.openStream());
FileOutputStream fileOS = new FileOutputStream(extractedLibFile)) {
byte[] data = new byte[8192];
int byteContent;
while ((byteContent = inputStream.read(data, 0, 8192)) != -1) {
fileOS.write(data, 0, byteContent);
}
} catch (IOException e) {
throw new UncheckedIOException(
"Unable to extract native library " + name + " to " + extractedLibFile.getAbsolutePath(), e);
}

extractedLibFile.deleteOnExit();

return extractedLibFile;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

import java.lang.invoke.MethodHandle;
import java.util.List;
import java.util.function.BooleanSupplier;

import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.AppInfoParser;
import org.graalvm.home.Version;

import com.oracle.svm.core.annotate.Alias;
import com.oracle.svm.core.annotate.RecomputeFieldValue;
Expand All @@ -19,11 +21,11 @@

/**
* Here is where surgery happens
* * Remove Snappy
* * Remove Snappy if not available (require GraalVM 21+).
* * Remove JMX
*/

@TargetClass(value = CompressionType.class, innerClass = "SnappyConstructors")
@TargetClass(value = CompressionType.class, innerClass = "SnappyConstructors", onlyWith = GraalVM20OrEarlier.class)
final class SubstituteSnappy {

@Alias
Expand All @@ -36,7 +38,15 @@ final class SubstituteSnappy {

}

@TargetClass(value = CompressionType.class)
final class GraalVM20OrEarlier implements BooleanSupplier {

@Override
public boolean getAsBoolean() {
return Version.getCurrent().compareTo(21) < 0;
}
}

@TargetClass(value = CompressionType.class, onlyWith = GraalVM20OrEarlier.class)
final class FixEnumAccess {

@Substitute
Expand Down
Loading

0 comments on commit 05e5e8b

Please sign in to comment.