Skip to content

Commit

Permalink
Force fetch SchemaInfo on Pulsar thread for AutoConsumeSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Jun 16, 2023
1 parent d8ade2c commit 924be8c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.GlobalOpenTelemetry;
Expand All @@ -32,6 +34,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.providers.locals.ContextOperator;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarAttributesExtractor;
Expand Down Expand Up @@ -115,6 +118,14 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
Multi<PulsarIncomingMessage<T>> receiveMulti = Multi.createBy().repeating()
.completionStage(consumer::receiveAsync)
.until(m -> closed.get())
.plug(msgMulti -> {
// Calling getValue on the pulsar-client-internal thread to make sure the SchemaInfo is fetched
if (schema instanceof AutoConsumeSchema || schema instanceof KeyValueSchema) {
return msgMulti.onItem().call(msg -> Uni.createFrom().item(msg::getValue));
} else {
return msgMulti;
}
})
.emitOn(command -> context.runOnContext(event -> command.run()))
.onItem().transform(message -> new PulsarIncomingMessage<>(message, ackHandler, failureHandler))
.onFailure(throwable -> isEndOfStream(client, throwable)).recoverWithCompletion()
Expand All @@ -133,6 +144,17 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
.completionStage(consumer::batchReceiveAsync)
.until(m -> closed.get())
.filter(m -> m.size() > 0)
.plug(msgMulti -> {
// Calling getValue on the pulsar-client-internal thread to make sure the SchemaInfo is fetched
if (schema instanceof AutoConsumeSchema || schema instanceof KeyValueSchema) {
return msgMulti.onItem().call(msg -> Uni.createFrom().item(() -> {
msg.forEach(m -> m.getValue());
return null;
}));
} else {
return msgMulti;
}
})
.emitOn(command -> context.runOnContext(event -> command.run()))
.onItem().transform(m -> new PulsarIncomingBatchMessage<>(m, ackHandler, failureHandler))
.onFailure(throwable -> isEndOfStream(client, throwable)).recoverWithCompletion()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.pulsar.common.schema.SchemaType;

import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.Validation;

@ApplicationScoped
public class SchemaResolver {
Expand Down Expand Up @@ -53,6 +54,10 @@ private Schema<?> getSchema(PulsarConnectorCommonConfiguration configuration, Sc

public static String getSchemaName(Schema<?> schema) {
SchemaInfo schemaInfo = schema.getSchemaInfo();
return schemaInfo != null ? schemaInfo.getName() : schema.getClass().getSimpleName();
if (schemaInfo == null || Validation.isBlank(schemaInfo.getName())) {
return schema.getClass().getSimpleName();
} else {
return schemaInfo.getName();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.utility.MountableFile;

import io.smallrye.reactive.messaging.pulsar.PulsarConnector;
import io.smallrye.reactive.messaging.pulsar.base.PulsarContainer;
Expand All @@ -42,11 +41,11 @@ public class PulsarAuthenticationTest extends WeldTestBaseWithoutExtension {
@BeforeAll
static void beforeAll() throws PulsarClientException, InterruptedException {
container = new PulsarContainer()
.withCopyToContainer(MountableFile.forClasspathResource("htpasswd"), "/pulsar/conf/.htpasswd")
.withEnv("PULSAR_PREFIX_authenticationEnabled", "true")
.withEnv("PULSAR_PREFIX_authenticationProviders",
"org.apache.pulsar.broker.authentication.AuthenticationProviderBasic")
.withEnv("PULSAR_PREFIX_basicAuthConf", "file:///pulsar/conf/.htpasswd")
// base64 of htpasswd
.withEnv("PULSAR_PREFIX_basicAuthConf", "c3VwZXJ1c2VyOiRhcHIxJHpMOHY4VTZsJGNHTWdkckVja25RNHkzeC9ndWROajE=")
.withEnv("PULSAR_PREFIX_brokerClientAuthenticationEnabled", "true")
.withEnv("PULSAR_PREFIX_brokerClientAuthenticationPlugin",
"org.apache.pulsar.client.impl.auth.AuthenticationBasic")
Expand Down

This file was deleted.

0 comments on commit 924be8c

Please sign in to comment.