diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java index fcb44b4479..6d91fc25fc 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java @@ -20,8 +20,10 @@ import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.eclipse.microprofile.reactive.messaging.Message; import io.opentelemetry.api.GlobalOpenTelemetry; @@ -32,6 +34,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.providers.locals.ContextOperator; import io.smallrye.reactive.messaging.pulsar.tracing.PulsarAttributesExtractor; @@ -115,6 +118,14 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema schema, Multi> receiveMulti = Multi.createBy().repeating() .completionStage(consumer::receiveAsync) .until(m -> closed.get()) + .plug(msgMulti -> { + // Calling getValue on the pulsar-client-internal thread to make sure the SchemaInfo is fetched + if (schema instanceof AutoConsumeSchema || schema instanceof KeyValueSchema) { + return msgMulti.onItem().call(msg -> Uni.createFrom().item(msg::getValue)); + } else { + return msgMulti; + } + }) .emitOn(command -> context.runOnContext(event -> command.run())) .onItem().transform(message -> new PulsarIncomingMessage<>(message, ackHandler, failureHandler)) .onFailure(throwable -> isEndOfStream(client, throwable)).recoverWithCompletion() @@ -133,6 +144,17 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema schema, .completionStage(consumer::batchReceiveAsync) .until(m -> closed.get()) .filter(m -> m.size() > 0) + .plug(msgMulti -> { + // Calling getValue on the pulsar-client-internal thread to make sure the SchemaInfo is fetched + if (schema instanceof AutoConsumeSchema || schema instanceof KeyValueSchema) { + return msgMulti.onItem().call(msg -> Uni.createFrom().item(() -> { + msg.forEach(m -> m.getValue()); + return null; + })); + } else { + return msgMulti; + } + }) .emitOn(command -> context.runOnContext(event -> command.run())) .onItem().transform(m -> new PulsarIncomingBatchMessage<>(m, ackHandler, failureHandler)) .onFailure(throwable -> isEndOfStream(client, throwable)).recoverWithCompletion() diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/SchemaResolver.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/SchemaResolver.java index b7b49883ed..5fc7f87faa 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/SchemaResolver.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/SchemaResolver.java @@ -14,6 +14,7 @@ import org.apache.pulsar.common.schema.SchemaType; import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; +import io.smallrye.reactive.messaging.providers.helpers.Validation; @ApplicationScoped public class SchemaResolver { @@ -53,6 +54,10 @@ private Schema getSchema(PulsarConnectorCommonConfiguration configuration, Sc public static String getSchemaName(Schema schema) { SchemaInfo schemaInfo = schema.getSchemaInfo(); - return schemaInfo != null ? schemaInfo.getName() : schema.getClass().getSimpleName(); + if (schemaInfo == null || Validation.isBlank(schemaInfo.getName())) { + return schema.getClass().getSimpleName(); + } else { + return schemaInfo.getName(); + } } } diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/auth/PulsarAuthenticationTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/auth/PulsarAuthenticationTest.java index 24c533f765..207d32e4d3 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/auth/PulsarAuthenticationTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/auth/PulsarAuthenticationTest.java @@ -26,7 +26,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.utility.MountableFile; import io.smallrye.reactive.messaging.pulsar.PulsarConnector; import io.smallrye.reactive.messaging.pulsar.base.PulsarContainer; @@ -42,11 +41,11 @@ public class PulsarAuthenticationTest extends WeldTestBaseWithoutExtension { @BeforeAll static void beforeAll() throws PulsarClientException, InterruptedException { container = new PulsarContainer() - .withCopyToContainer(MountableFile.forClasspathResource("htpasswd"), "/pulsar/conf/.htpasswd") .withEnv("PULSAR_PREFIX_authenticationEnabled", "true") .withEnv("PULSAR_PREFIX_authenticationProviders", "org.apache.pulsar.broker.authentication.AuthenticationProviderBasic") - .withEnv("PULSAR_PREFIX_basicAuthConf", "file:///pulsar/conf/.htpasswd") + // base64 of htpasswd + .withEnv("PULSAR_PREFIX_basicAuthConf", "c3VwZXJ1c2VyOiRhcHIxJHpMOHY4VTZsJGNHTWdkckVja25RNHkzeC9ndWROajE=") .withEnv("PULSAR_PREFIX_brokerClientAuthenticationEnabled", "true") .withEnv("PULSAR_PREFIX_brokerClientAuthenticationPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationBasic") diff --git a/smallrye-reactive-messaging-pulsar/src/test/resources/htpasswd b/smallrye-reactive-messaging-pulsar/src/test/resources/htpasswd deleted file mode 100644 index 7a1e841170..0000000000 --- a/smallrye-reactive-messaging-pulsar/src/test/resources/htpasswd +++ /dev/null @@ -1 +0,0 @@ -superuser:$apr1$zL8v8U6l$cGMgdrEcknQ4y3x/gudNj1