quotes; // <1> + +/** + * Endpoint retrieving the "quotes" Pulsar topic and sending the items to a server sent event. + */ +@GET +@Produces(MediaType.SERVER_SENT_EVENTS) // <2> +public Multistream() { + return quotes; // <3> +} +---- +<1> Injects the `quotes` channel using the `@Channel` qualifier +<2> Indicates that the content is sent using `Server Sent Events` +<3> Returns the stream (_Reactive Stream_) + +No need to configure anything, as Quarkus will automatically associate the `quotes` channel to the `quotes` Pulsar topic. +It will also generate a deserializer for the `Quote` class. + +[TIP] +==== +.Message Schemas in Pulsar +In this example we used JSON Schema with Pulsar messages. +For more options on Pulsar Schemas, see xref:pulsar.adoc#pulsar-schema[Pulsar Reference Guide - Schema]. + +// TODO write about schema +==== + +== The HTML page + +Final touch, the HTML page requesting quotes and displaying the prices obtained over SSE. + +Inside the _pulsar-quickstart-producer_ project, create the `src/main/resources/META-INF/resources/quotes.html` file with the following content: + +[source, html] +---- + + + + +Prices + + + + + +++ + + + +---- + +Nothing spectacular here. +When the user clicks the button, HTTP request is made to request a quote, and a pending quote is added to the list. +On each quote received over SSE, the corresponding item in the list is updated. + +== Get it running + +You just need to run both applications. +In one terminal, run: + +[source,bash] +---- +mvn -f pulsar-quickstart-producer quarkus:dev +---- + +In another terminal, run: + +[source, bash] +---- +mvn -f pulsar-quickstart-processor quarkus:dev +---- + +Quarkus starts a Pulsar broker automatically, configures the application and shares the Pulsar broker instance between different applications. +See xref:pulsar.adoc[Dev Services for Pulsar] for more details. + +Open `http://localhost:8080/quotes.html` in your browser and request some quotes by clicking the button. + +== Running in JVM or Native mode + +When not running in dev or test mode, you will need to start your Pulsar broker. +// TODO +You can follow the instructions from the https://pulsar.apache.org/docs/3.0.x/getting-started-docker/[Run a standalone Pulsar cluster in Docker] or create a `docker-compose.yaml` file with the following content: + +[source, yaml] +---- +version: '3.8' + +services: + + pulsar: + image: apachepulsar/pulsar:3.0.0 + command: [ + "sh", "-c", + "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss" + ] + + ports: + - "6650:6650" + - "8080:8080" + tmpfs: + - /pulsar/data + healthcheck: + test: curl --fail http://localhost:8080/admin/v2/clusters || exit 1 + interval: 10s + timeout: 10s + retries: 5 + start_period: 5s + environment: + PULSAR_PREFIX_advertisedListeners: internal:pulsar://localhost:6650,external:pulsar://pulsar:6650 + PULSAR_PREFIX_transactionCoordinatorEnabled: true + PULSAR_PREFIX_systemTopicEnabled: true + networks: + - pulsar-quickstart-network + + producer: + image: quarkus-quickstarts/pulsar-quickstart-producer:1.0-${QUARKUS_MODE:-jvm} + depends_on: + pulsar: + condition: service_healthy + build: + context: pulsar-quickstart-producer + dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + deploy: + restart_policy: + condition: on-failure + environment: + PULSAR_CLIENT_SERVICE_URL: pulsar://pulsar:6650 + ports: + - "8082:8080" + networks: + - pulsar-quickstart-network + + processor: + image: quarkus-quickstarts/pulsar-quickstart-processor:1.0-${QUARKUS_MODE:-jvm} + depends_on: + pulsar: + condition: service_healthy + build: + context: pulsar-quickstart-processor + dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + deploy: + restart_policy: + condition: on-failure + environment: + QUARKUS_HTTP_PORT: 8082 + PULSAR_CLIENT_SERVICE_URL: pulsar://pulsar:6650 + ports: + - "8083:8080" + networks: + - pulsar-quickstart-network + +networks: + pulsar-quickstart-network: + name: pulsar-quickstart +---- + +Make sure you first build both applications in JVM mode with: + +[source, bash] +---- +mvn -f pulsar-quickstart-producer package +mvn -f pulsar-quickstart-processor package +---- + +Once packaged, run `docker-compose up`. + +NOTE: This is a development cluster, do not use in production. + +You can also build and run our applications as native executables. +First, compile both applications as native: + +[source, bash] +---- +mvn -f pulsar-quickstart-producer package -Dnative -Dquarkus.native.container-build=true +mvn -f pulsar-quickstart-processor package -Dnative -Dquarkus.native.container-build=true +---- + +Run the system with: + +[source, bash] +---- +export QUARKUS_MODE=native +docker-compose up --build +---- + +== Going further + +This guide has shown how you can interact with Pulsar using Quarkus. +It utilizes https://smallrye.io/smallrye-reactive-messaging[SmallRye Reactive Messaging] to build data streaming applications. + +For the exhaustive list of features and configuration options, check the xref:pulsar.adoc[Reference guide for Apache Pulsar Extension]. + +[NOTE] +==== +In this guide we explore Smallrye Reactive Messaging framework to interact with Apache Pulsar. +xref:pulsar.adoc#pulsar-clients[using Pulsar clients directly]. +==== diff --git a/docs/src/main/asciidoc/pulsar.adoc b/docs/src/main/asciidoc/pulsar.adoc new file mode 100644 index 0000000000000..b6b43d98e1750 --- /dev/null +++ b/docs/src/main/asciidoc/pulsar.adoc @@ -0,0 +1,1172 @@ +//// +This guide is maintained in the main Quarkus repository +and pull requests should be submitted there: +https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc +//// += Apache Pulsar Reference Guide +include::_attributes.adoc[] +:categories: messaging +:summary: This reference guide provides an in-depth look on Apache Pulsar and Smallrye Reactive Messaging framework. +:numbered: +:sectnums: +:sectnumlevels: 4 + +This reference guide demonstrates how your Quarkus application can utilize SmallRye Reactive Messaging to interact with Apache Pulsar. + +== Introduction + +https://pulsar.apache.org[Apache Pulsar] is an open-source, distributed messaging and streaming platform built for the cloud. +It provides a multi-tenant, high-performance solution to server messaging with tiered storage capabilities. + +Pulsar implements the publish-subscribe pattern: + +- Producers publish messages to _topics_. +- Consumers create _subscriptions_ to those topics to receive and process incoming messages, and send _acknowledgments_ to the broker when processing is finished. +- When a subscription is created, Pulsar retains all messages, even if the consumer is disconnected. +The retained messages are discarded only when a consumer acknowledges that all these messages are processed successfully. + +A Pulsar cluster consists of + +- One or more _brokers_, which are stateless components. +- A _metadata store_ for maintaining topic metadata, schema, coordination and cluster configuration. +- A set of _bookies_ used for persistent storage of messages. + +== Quarkus Extension for Apache Pulsar + +Quarkus provides support for Apache Pulsar through https://smallrye.io/smallrye-reactive-messaging/[SmallRye Reactive Messaging] framework. +Based on Eclipse MicroProfile Reactive Messaging specification 3.0, it proposes a flexible programming model bridging CDI and event-driven. + +[NOTE] +==== +This guide provides an in-depth look on Apache Pulsar and SmallRye Reactive Messaging framework. +For a quick start take a look at xref:pulsar-getting-started.adoc[Getting Started to SmallRye Reactive Messaging with Apache Pulsar]. +==== + +You can add the `smallrye-reactive-messaging-pulsar` extensions to your project by running the following command in your project base directory: + +:add-extension-extensions: smallrye-reactive-messaging-pulsar +include::{includes}/devtools/extension-add.adoc[] + +This will add the following to your build file: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- +++++Quotes
+ + ++ +---- + +[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle"] +.build.gradle +---- +implementation("io.quarkus:quarkus-smallrye-reactive-messaging-pulsar") +---- + +[NOTE] +==== +The extension includes `pulsar-clients-original` version 3.0.0 as a transitive dependency and is compatible with Pulsar brokers version 2.10.x. +==== + +== Configuring Smallrye Pulsar Connector + +Because Smallrye Reactive Messaging framework supports different messaging backends like Apache Kafka, Apache Pulsar, AMQP, Apache Camel, JMS, MQTT, etc., it employs a generic vocabulary: + +- Applications send and receive *messages*. https://javadoc.io/doc/io.smallrye.reactive/smallrye-reactive-messaging-api/latest/org/eclipse/microprofile/reactive/messaging/Message.html[`Message`] wraps a _payload_ and can be extended with some _metadata_. +This should not be confused with a Pulsar https://javadoc.io/doc/org.apache.pulsar/pulsar-client-api/latest/org/apache/pulsar/client/api/Message.html[`Message`], which consists of value, key +With the Pulsar connector, a Reactive Messaging _message_ corresponds to a Pulsar _message_. +- Messages transit on *channels*. Application components connect to channels to publish and consume messages. The Pulsar connector maps _channels_ to Pulsar _topics_. +- Channels are connected to message backends using *connectors*. +Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. +Each connector is dedicated to a specific messaging technology. +For example, the connector dealing with Pulsar is named `smallrye-pulsar`. + +A minimal configuration for the Pulsar connector with an incoming channel looks like the following: + +[source, properties] +---- +%prod.pulsar.client.serviceUrl=pulsar:6650 <1> +mp.messaging.incoming.prices.connector=smallrye-pulsar <2> +---- +<1> Configure the Pulsar broker service url for the production profile. +You can configure it globally or per channel using `mp.messaging.incoming.$channel.serviceUrl` property. +In dev mode and when running tests, <io.quarkus +quarkus-smallrye-reactive-messaging-pulsar +> automatically starts a Pulsar broker. + +<2> Configure the connector to manage the prices channel. +By default, the _topic_ name is same as the channel name. + +You can configure the topic attribute to override it. + +NOTE: The `%prod` prefix indicates that the property is only used when the application runs in prod mode (so not in dev or test). Refer to the xref:config-reference.adoc#profiles[Profile documentation] for further details. + +[TIP] +.Connector auto-attachment +==== +If you have a single connector on your classpath, you can omit the `connector` attribute configuration. +Quarkus automatically associates _orphan_ channels to the (unique) connector found on the classpath. +_Orphan_ channels are outgoing channels without a downstream consumer or incoming channels without an upstream producer. + +This auto-attachment can be disabled using: + +[source, properties] +---- +quarkus.reactive-messaging.auto-connector-attachment=false +---- +==== + +For more configuration options see < >. + + +== Receiving messages from Pulsar + +The Pulsar Connector connects to a Pulsar broker using a Pulsar client and creates consumers to +receive messages from Pulsar brokers, and it maps each Pulsar `Message` into Reactive Messaging `Message`. + +=== Example + +Let’s imagine you have a Pulsar broker running, and accessible using the `pulsar:6650` address. +Configure your application to receive Pulsar messages on the `prices` channel as follows: + +[source, properties] +---- +mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 # <1> +mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest # <2> +---- + +1. Configure the Pulsar broker service url. +2. Make sure consumer subscription starts receiving messages from the `Earliest` position. + +[NOTE] +==== +You don’t need to set the Pulsar topic, nor the consumer name. +By default, the connector uses the channel name (`prices`). +You can configure the `topic` and `consumerName` attributes to override them. +==== + +[NOTE] +==== +In Pulsar, consumers need to provide a `subscriptionName` for topic subscriptions. +If not provided the connector generates a unique **subscription name**. +==== + +Then, your application can receive the `double` payload directly: + +[source, java] +---- +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class PriceConsumer { + + @Incoming("prices") + public void consume(double price) { + // process your price. + } + +} +---- + +Or, you can retrieve the Reactive Messaging type `Message `: + +[source, java] +---- +@Incoming("prices") +public CompletionStage consume(Message msg) { + // access record metadata + var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow(); + // process the message payload. + double price = msg.getPayload(); + // Acknowledge the incoming message (acknowledge the Pulsar message back to the broker) + return msg.ack(); +} +---- + +The Reactive Messaging `Message` type lets the consuming method access the incoming message metadata and handle the acknowledgment manually. + +If you want to access the Pulsar message objects directly, use: + +[source, java] +---- +@Incoming("prices") +public void consume(org.apache.pulsar.client.api.Message msg) { + String key = msg.getKey(); + String value = msg.getValue(); + String topic = msg.topicName(); + // ... +} +---- + +`org.apache.pulsar.client.api.Message` is provided by the underlying Pulsar client and can be used directly with the consumer method. + + +Alternatively, your application can inject a `Multi` in your bean, identified with the channel name and subscribe to its events as the following example: + +[source, java] +---- +import io.smallrye.mutiny.Multi; +import org.eclipse.microprofile.reactive.messaging.Channel; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import org.jboss.resteasy.reactive.RestStreamElementType; + +@Path("/prices") +public class PriceResource { + + @Inject + @Channel("prices") + Multi prices; + + @GET + @Path("/prices") + @RestStreamElementType(MediaType.TEXT_PLAIN) + public Multi stream() { + return prices; + } +} +---- + +[NOTE] +==== +When consuming messages with `@Channel`, the application code is responsible for the subscription. +In the example above, the RESTEasy Reactive endpoint handles that for you. +==== + +Following types can be injected as channels: + +[source, java] +---- +@Inject @Channel("prices") Multi streamOfPayloads; + +@Inject @Channel("prices") Multi > streamOfMessages; + +@Inject @Channel("prices") Publisher publisherOfPayloads; + +@Inject @Channel("prices") Publisher > publisherOfMessages; +---- + +As with the previous `Message` example, if your injected channel receives payloads (`Multi `), it acknowledges the message automatically, and support multiple subscribers. +If your injected channel receives Message (`Multi >`), you will be responsible for the acknowledgment and broadcasting. + +=== Pulsar Subscription Types + +Pulsar *subscriptionType* consumer configuration can be used flexibly to achieve different messaging scenarios, such as publish-subscribe or queuing. + +- *Exclusive* subscription type allows specifying a _unique subscription name_ for "fan-out pub-sub messaging". This is the default subscription type. +- *Shared*, *Key_Shared* or *Failover* subscription types allow multiple consumers to share the _same subscription name_, to achieve "message queuing" among consumers. + +If a subscription name is not provided Quarkus generates a unique id. + +=== Deserialization and Pulsar Schema + +The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar consumer. +See the < > for more information. + +=== Acknowledgement Strategies + +When a message produced from a Pulsar Message is *acknowledged*, the connector sends an https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#acknowledgment[acknowledgement request] to the Pulsar broker. +All Reactive Messaging messages need to be *acknowledged*, which is handled automatically in most cases. +Acknowledgement requests can be sent to the Pulsar broker using the following two strategies: + +- **Individual acknowledgement** is the default strategy, an acknowledgement request is to the broker for each message. +- **Cumulative acknowledgement**, configured using `ack-strategy=cumulative`, the consumer only acknowledges the last message it received. +All messages in the stream up to (and including) the provided message are not redelivered to that consumer. + +[NOTE] +==== +By default, the Pulsar consumer does not wait for the acknowledgement confirmation from the broker to validate an acknowledgement. +You can enable this using `ackReceiptEnabled=true`. +==== + +=== Failure Handling Strategies + +If a message produced from a Pulsar message is *nacked*, a failure strategy is applied. +The Quarkus Pulsar extension supports 4 strategies: + +- `nack` *(default)* sends https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#negative-acknowledgment[negative acknowledgment] to the broker, triggering the broker to redeliver this message to the consumer. +The negative acknowledgment can be further configured using `negativeAckRedeliveryDelayMicros` and `negativeAck.redeliveryBackoff` properties. +- `fail` fail the application, no more messages will be processed. +- `ignore` the failure is logged, but the acknowledgement strategy will be applied and the processing will continue. +- `reconsume-later` sends the message to the https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#retry-letter-topic[retry letter topic] using the `reconsumeLater` API to be reconsumed with a delay. +The delay can be configured using the `reconsumeLater.delay` property and defaults to 3 seconds. +Custom delay or properties per message can be configured by adding an instance of `io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata` to the failure metadata. + +==== Acknowledgement timeout + +Similar to the negative acknowledgement, with the https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#acknowledgment-timeout[acknowledgement timeout] mechanism, the Pulsar client tracks the unacknowledged messages, +for the given *ackTimeout* period and sends *redeliver unacknowledged messages request* to the broker, thus the broker resends the unacknowledged messages to the consumer. + +To configure the timeout and redelivery backoff mechanism you can set `ackTimeoutMillis` and `ackTimeout.redeliveryBackoff` properties. +The `ackTimeout.redeliveryBackoff` value accepts comma separated values of min delay in milliseconds, max delay in milliseconds and multiplier respectively: + +[source, properties] +---- +mp.messaging.incoming.out.failure-strategy=ignore +mp.messaging.incoming.out.ackTimeoutMillis=10000 +mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2 +---- + +==== Reconsume later and retry letter topic + +The https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#retry-letter-topic[retry letter topic] pushes messages that are not consumed successfully to a dead letter topic and continue message consumption. +Note that dead letter topic can be used in different message redelivery methods, such as acknowledgment timeout, negative acknowledgment or retry letter topic. + +[source, properties] +---- +mp.messaging.incoming.data.failure-strategy=reconsume-later +mp.messaging.incoming.data.reconsumeLater.delay=5000 +mp.messaging.incoming.data.enableRetry=true +mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2 +---- + +==== Dead-letter topic + +The https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#dead-letter-topic[dead letter topic] pushes messages that are not consumed successfully to a dead letter topic an continue message consumption. +Note that dead letter topic can be used in different message redelivery methods, such as acknowledgment timeout, negative acknowledgment or retry letter topic. + +[source, properties] +---- +mp.messaging.incoming.data.failure-strategy=nack +mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2 +mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic +mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription +mp.messaging.incoming.data.subscriptionType=Shared +---- + +[IMPORTANT] +==== +*Negative acknowledgment* or *acknowledgment timeout* methods for redelivery will redeliver the whole batch of messages containing at least an unprocessed message. +See < > for more information. +==== + +=== Receiving Pulsar Messages in Batches + +By default, incoming methods receive each Pulsar message individually. +You can enable batch mode using `batchReceive=true` property, or setting a `batchReceivePolicy` in consumer configuration. + +[source, java] +---- +@Incoming("prices") +public CompletionStage consumeMessage(PulsarIncomingBatchMessage messages) { + for (PulsarMessage msg : messages) { + msg.getMetadata(PulsarIncomingMessageMetadata.class).ifPresent(metadata -> { + String key = metadata.getKey(); + String topic = metadata.getTopicName(); + long timestamp = metadata.getEventTime(); + //... process messages + }); + } + // ack will commit the latest offsets (per partition) of the batch. + return messages.ack(); +} + +@Incoming("prices") +public void consumeRecords(Messages messages) { + for (Message msg : messages) { + //... process messages + } +} +---- + +Or you can directly receive the list of payloads to the consume method: + +[source, java] +---- +@Incoming("prices") +public void consume(List prices) { + for (double price : prices) { + // process price + } +} +---- + +[NOTE] +==== +Quarkus auto-detects batch types for incoming channels and sets batch configuration automatically. +You can configure batch mode explicitly with `mp.messaging.incoming.$channel.batchReceive` property. +==== + +== Sending messages to Pulsar + +The Pulsar Connector can write Reactive Messaging Messages as Pulsar Message. + +=== Example + +Let’s imagine you have a Pulsar broker running, and accessible using the `pulsar:6650` address. +Configure your application to write the messages from the `prices` channel into a Pulsar Messages as follows: + +[source, properties] +---- +mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 # <1> +---- + +1. Configure the Pulsar broker service url. + +[NOTE] +==== +You don’t need to set the Pulsar topic, nor the producer name. +By default, the connector uses the channel name (`prices`). +You can configure the `topic` and `producerName` attributes to override them. +==== + +Then, your application must send `Message ` to the `prices` +channel. It can use `double` payloads as in the following snippet: + +[source, java] +---- +import io.smallrye.mutiny.Multi; +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import jakarta.enterprise.context.ApplicationScoped; +import java.time.Duration; +import java.util.Random; + +@ApplicationScoped +public class PulsarPriceProducer { + + private final Random random = new Random(); + + @Outgoing("prices-out") + public Multi generate() { + // Build an infinite stream of random prices + // It emits a price every second + return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) + .map(x -> random.nextDouble()); + } + +} +---- + +Note that the generate method returns a `Multi `, which implements the `Flow.Publisher` interface. +This publisher will be used by the framework to generate messages and send them to the configured Pulsar topic. + +Instead of returning a payload, you can return a `io.smallrye.reactive.messaging.pulsar.OutgoingMessage` to send Pulsar messages: + +[source, java] +---- +@Outgoing("out") +public Multi > generate() { + return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) + .map(x -> OutgoingMessage.of("my-key", random.nextDouble())); +} +---- + +Payload can be wrapped inside `org.eclipse.microprofile.reactive.messaging.Message` to have more control on the written records: + +[source, java] +---- +@Outgoing("generated-price") +public Multi > generate() { + return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) + .map(x -> Message.of(random.nextDouble()) + .addMetadata(PulsarOutgoingMessageMetadata.builder() + .withKey("my-key") + .withProperties(Map.of("property-key", "value")) + .build())); +} +---- + +When sending `Messages`, you can add an instance of +`io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata` to influence how the message is going to be written to Pulsar. + +Other than method signatures returning a `Flow.Publisher`, outgoing method can also return single message. +In this case the producer will use this method as generator to create an infinite stream. + +[source, java] +---- +@Outgoing("prices-out") T generate(); // T excluding void + +@Outgoing("prices-out") Message generate(); + +@Outgoing("prices-out") Uni generate(); + +@Outgoing("prices-out") Uni > generate(); + +@Outgoing("prices-out") CompletionStage generate(); + +@Outgoing("prices-out") CompletionStage > generate(); +---- + +=== Serialization and Pulsar Schema + +The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar producer. +See the < > for more information. + +=== Sending key/value pairs + +In order to send Kev/Value pairs to Pulsar, you can configure the Pulsar producer Schema with a +https://javadoc.io/doc/org.apache.pulsar/pulsar-client-api/latest/org/apache/pulsar/common/schema/KeyValue.html[KeyValue] schema. + +[source, java] +---- +package pulsar.outbound; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.KeyValue; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import io.smallrye.common.annotation.Identifier; + +@ApplicationScoped +public class PulsarKeyValueExample { + + @Identifier("out") + @Produces + Schema > schema = Schema.KeyValue(Schema.STRING, Schema.INT64); + + @Incoming("in") + @Outgoing("out") + public KeyValue process(long in) { + return new KeyValue<>("my-key", in); + } + +} +---- + +If you need more control on the written records, use `PulsarOutgoingMessageMetadata`. + +=== Acknowledgement + +Upon receiving a message from a Producer, a Pulsar broker assigns a `MessageId` to the message and sends it back to the producer, +confirming that the message is published. + +By default, the connector does wait for Pulsar to acknowledge the record +to continue the processing (acknowledging the received `Message`). +You can disable this by setting the `waitForWriteCompletion` attribute to `false`. + +If a record cannot be written, the message is `nacked`. + +[IMPORTANT] +==== +The Pulsar client automatically retries sending messages in case of failure, until the *send timeout* is reached. +The *send timeout* is configurable with `sendTimeoutMs` attribute and by default is 30 seconds. +==== + +=== Back-pressure and inflight records + +The Pulsar outbound connector handles back-pressure, monitoring the number of pending messages waiting to be written to the Pulsar broker. +The number of pending messages is configured using the `maxPendingMessages` attribute and defaults to 1000. + +The connector only sends that amount of messages concurrently. +No other messages will be sent until at least one pending message gets acknowledged by the broker. +Then, the connector writes a new message to Pulsar when one of the broker’s pending messages get acknowledged. + +You can also remove the limit of pending messages by setting `maxPendingMessages` to `0`. +Note that Pulsar also enables to configure the number of pending messages per partition using `maxPendingMessagesAcrossPartitions`. + +[[producer-batching]] +=== Producer Batching + +By default, the Pulsar producer batches individual messages together to be published to the broker. +You can configure batching parameters using `batchingMaxPublishDelayMicros`, `batchingPartitionSwitchFrequencyByPublishDelay`, +`batchingMaxMessages`, `batchingMaxBytes` configuration properties, or disable it completely with `batchingEnabled=false`. + +When using `Key_Shared` consumer subscriptions, the `batcherBuilder` can be configured to `BatcherBuilder.KEY_BASED`. + +== Pulsar Transactions and Exactly-Once Processing + +https://pulsar.apache.org/docs/3.0.x/txn-why/[Pulsar transactions] enable event streaming applications to consume, process, and produce messages in one atomic operation. + +Transactions allow one or multiple producers to send batch of messages to multiple topics where all messages in the batch are eventually visible to any consumer, or none is ever visible to consumers. + +[IMPORTANT] +==== +In order to be used, transaction support needs to be activated on the broker configuration, using `transactionCoordinatorEnabled=true` and `systemTopicEnabled=true` broker configuration. +==== + +On the client side, the transaction support also needs to be enabled on `PulsarClient` configuration: + +[source, properties] +---- +mp.messaging.outgoing.tx-producer.enableTransaction=true +---- + +Pulsar connector provides `PulsarTransactions` custom emitter for writing records inside a transaction. + +It can be used as a regular emitter `@Channel`: + +[source, java] +---- +package pulsar.outbound; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.pulsar.OutgoingMessage; +import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions; + +@ApplicationScoped +public class PulsarTransactionalProducer { + + @Inject + @Channel("tx-out-example") + PulsarTransactions > txProducer; + + @Inject + @Channel("other-producer") + PulsarTransactions producer; + + @Incoming("in") + public Uni emitInTransaction(Message in) { + return txProducer.withTransaction(emitter -> { + emitter.send(OutgoingMessage.of("a", 1)); + emitter.send(OutgoingMessage.of("b", 2)); + emitter.send(OutgoingMessage.of("c", 3)); + producer.send(emitter, "4"); + producer.send(emitter, "5"); + producer.send(emitter, "6"); + return Uni.createFrom().completionStage(in::ack); + }); + } + +} +---- + +The function given to the `withTransaction` method receives a `TransactionalEmitter` for producing records, and returns a `Uni` that provides the result of the transaction. +If the processing completes successfully, the producer is flushed and the transaction is committed. +If the processing throws an exception, returns a failing `Uni`, or marks the `TransactionalEmitter` for abort, the transaction is aborted. + +[NOTE] +==== +Multiple transactional producers can participate in a single transaction. +This ensures all messages are sent using the started transaction and before the transaction is committed, all participating producers are flushed. +==== + +If this method is called on a Vert.x context, the processing function is also called on that context. +Otherwise, it is called on the sending thread of the producer. + +=== Exactly-Once Processing + +Pulsar Transactions API also allows managing consumer offsets inside a transaction, together with produced messages. +This in turn enables coupling a consumer with a transactional producer in a consume-transform-produce pattern, +also known as exactly-once processing. +It means that an application consumes messages, processes them, publishes the results to a topic, and commits offsets of the consumed messages in a transaction. + +The `PulsarTransactions` emitter also provides a way to apply exactly-once processing to an incoming Pulsar message inside a transaction. + +The following example includes a batch of Pulsar messages inside a transaction. + +[source, properties] +---- +mp.messaging.outgoing.tx-out-example.enableTransaction=true +# ... +mp.messaging.incoming.in-channel.enableTransaction=true +mp.messaging.incoming.in-channel.batchReceive=true +---- + +[source, java] +---- +package pulsar.outbound; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage; +import io.smallrye.reactive.messaging.pulsar.PulsarMessage; +import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions; + +@ApplicationScoped +public class PulsarExactlyOnceProcessor { + + @Inject + @Channel("tx-out-example") + PulsarTransactions txProducer; + + @Incoming("in-channel") + public Uni emitInTransaction(PulsarIncomingBatchMessage batch) { + return txProducer.withTransactionAndAck(batch, emitter -> { + for (PulsarMessage record : batch) { + emitter.send(PulsarMessage.of(record.getPayload() + 1, record.getKey())); + } + return Uni.createFrom().voidItem(); + }); + } + +} +---- + +If the processing completes successfully, the message is acknowledged inside the transaction and the transaction is committed. + +[IMPORTANT] +==== +When using exactly-once processing, messages can only be acked individually rather than cumulatively. +==== + +If the processing needs to abort, the message is nack'ed. One of the failure strategies can be employed in order to retry the processing or simply fail-stop. +Note that the `Uni` returned from the `withTransaction` will yield a failure if the transaction fails and is aborted. + +The application can choose to handle the error case, but for the message consumption to continue, `Uni` returned from the `@Incoming` method must not result in failure. +`PulsarTransactions#withTransactionAndAck` method will ack and nack the message but will not stop the reactive stream. +Ignoring the failure simply resets the consumer to the last committed offsets and resumes the processing from there. + +[IMPORTANT] +==== +In order to avoid duplicates in case of failure, it is recommended to enable message deduplication and batch index level acknowledgment on the broker side: +[source, properties] +---- +quarkus.pulsar.devservices.broker-config.brokerDeduplicationEnabled=true +quarkus.pulsar.devservices.broker-config.brokerDeduplicationEntriesInterval=1000 +quarkus.pulsar.devservices.broker-config.brokerDeduplicationSnapshotIntervalSeconds=3000 +quarkus.pulsar.devservices.broker-config.acknowledgmentAtBatchIndexLevelEnabled=3000 + +mp.messaging.incoming.data.batchIndexAckEnabled=true +---- +==== + +[[pulsar-schema-configuration]] +== Pulsar Schema Configuration & Auto Schema Discovery + +Pulsar messages are stored with payloads as unstructured byte array. +A Pulsar **schema** defines how to serialize structured data to the raw message bytes. +The **schema** is applied in producers and consumers to write and read with an enforced data structure. +It serializes data into raw bytes before they are published to a topic and deserializes the raw bytes before they are delivered to consumers. + +Pulsar uses a schema registry as a central repository to store the registered schema information, +which enables producers/consumers to coordinate the schema of a topic's messages through brokers. +By default the Apache BookKeeper is used to store schemas. + +Pulsar API provides built-in schema information for a number of +https://pulsar.apache.org/docs/3.0.x/schema-understand#primitive-type[primitive types] +and https://pulsar.apache.org/docs/3.0.x/schema-understand#complex-type[complex types] such as Key/Value, Avro and Protobuf. + +The Pulsar Connector allows specifying the schema as a primitive type using the `schema` property: + +[source, properties] +---- +mp.messaging.incoming.prices.connector=smallrye-pulsar +mp.messaging.incoming.prices.schema=INT32 + +mp.messaging.outgoing.prices-out.connector=smallrye-pulsar +mp.messaging.outgoing.prices-out.schema=DOUBLE +---- + +If the value for the `schema` property matches a https://javadoc.io/doc/org.apache.pulsar/pulsar-client-api/latest/org/apache/pulsar/common/schema/SchemaType.html[Schema Type] +a simple schema will be created with that type and will be used for that channel. + +The Pulsar Connector allows configuring complex schema types by providing `Schema` beans through CDI, identified with the `@Identifier` qualifier. + +For example the following bean provides an JSON schema and a Key/Value schema: + +[source, java] +---- +package pulsar.configuration; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; + +import io.smallrye.common.annotation.Identifier; + +@ApplicationScoped +public class PulsarSchemaProvider { + + @Produces + @Identifier("user-schema") + Schema userSchema = Schema.JSON(User.class); + + @Produces + @Identifier("a-channel") + Schema > keyValueSchema() { + return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED); + } + + public static class User { + String name; + int age; + + } +} +---- + +To configure the incoming channel `users` with defined schema, you need to set the `schema` property to the identifier of the schema `user-schema`: + +[source, properties] +---- +mp.messaging.incoming.users.connector=smallrye-pulsar +mp.messaging.incoming.users.schema=user-schema +---- + +If no `schema` property is found, the connector looks for `Schema` beans identified with the channel name. +For example, the outgoing channel `a-channel` will use the key/value schema. + +[source, properties] +---- +mp.messaging.outgoing.a-channel.connector=smallrye-pulsar +---- + +If no schema information is provided incoming channels will use `Schema.AUTO_CONSUME()`, whereas outgoing channels will use `Schema.AUTO_PRODUCE_BYTES()` schemas. + +=== Auto Schema Discovery + +When using SmallRye Reactive Messaging Pulsar (`io.quarkus:quarkus-smallrye-reactive-messaging-pulsar`), Quarkus can often automatically detect the correct Pulsar Schema to configure. +This autodetection is based on declarations of `@Incoming` and `@Outgoing` methods, as well as injected ``@Channel``s. + +For example, if you declare + +[source,java] +---- +@Outgoing("generated-price") +public Multi generate() { + ... +} +---- + +and your configuration indicates that the `generated-price` channel uses the `smallrye-pulsar` connector, then Quarkus will automatically set the `schema` attribute of the `generated-price` channel to Pulsar Schema `INT32`. + +Similarly, if you declare + +[source,java] +---- +@Incoming("my-pulsar-consumer") +public void consume(org.apache.pulsar.api.client.Message record) { + ... +} +---- + +and your configuration indicates that the `my-pulsar-consumer` channel uses the `smallrye-pulsar` connector, then Quarkus will automatically set the `schema` attribute to Pulsar `BYTES` Schema. + +Finally, if you declare + +[source,java] +---- +@Inject +@Channel("price-create") +Emitter priceEmitter; +---- + +and your configuration indicates that the `price-create` channel uses the `smallrye-pulsar` connector, then Quarkus will automatically set the `schema` to Pulsar `INT64` Schema. + +The full set of types supported by the Pulsar Schema autodetection is: + +* `short` and `java.lang.Short` +* `int` and `java.lang.Integer` +* `long` and `java.lang.Long` +* `float` and `java.lang.Float` +* `double` and `java.lang.Double` +* `byte[]` +* `java.time.Instant` +* `java.sql.Timestamp` +* `java.time.LocalDate` +* `java.time.LocalTime` +* `java.time.LocalDateTime` +* `java.nio.ByteBuffer` +* classes generated from Avro schemas, as well as Avro `GenericRecord`, will be configured with `AVRO` schema type +* classes generated from Protobuf schemas, will be configured with `PROTOBUF` schema type +* other classes will automatically be configured with `JSON` schema type + +[NOTE] +==== +Note that `JSON` schema type enforces schema validation. +==== + +In addition to those Pulsar-provided schemas, Quarkus provides following schema implementations _without enforcing validation_ : + +* `io.vertx.core.buffer.Buffer` will be configured with `io.quarkus.pulsar.schema.BufferSchema` schema +* `io.vertx.core.json.JsonObject` will be configured with `io.quarkus.pulsar.schema.JsonObjectSchema` schema +* `io.vertx.core.json.JsonArray` will be configured with `io.quarkus.pulsar.schema.JsonArraySchema` schema +* For schema-less Json serialization, if the `schema` configuration is set to `ObjectMapper `, +a Schema will be generated using the Jackson `ObjectMapper`, without enforcing a Pulsar Schema validation. +`io.quarkus.pulsar.schema.ObjectMapperSchema` can be used to explicitly configure JSON schema without validation. + +If a `schema` is set by configuration, it won't be replaced by the auto-detection. + +In case you have any issues with serializer auto-detection, you can switch it off completely by setting `quarkus.reactive-messaging.pulsar.serializer-autodetection.enabled=false`. +If you find you need to do this, please file a bug in the link:https://github.com/quarkusio/quarkus/issues[Quarkus issue tracker] so we can fix whatever problem you have. + +[[pulsar-dev-services]] +include::pulsar-dev-services.adoc[leveloffset=+1] + +[[configuring-pulsar-clients]] +== Configuring Pulsar clients +Pulsar clients, consumers and producers are very customizable to configure how a Pulsar client application behaves. + +The Pulsar connector creates a Pulsar client and, a consumer or a producer per channel, each with sensible defaults to ease their configuration. +Although the creation is handled, all available configuration options remain configurable through Pulsar channels. + +While idiomatic way of creating `PulsarClient`, `PulsarConsumer` or `PulsarProducer` are through builder APIs, in its essence +those APIs build each time a configuration object, to pass onto the implementation. +Those are https://javadoc.io/doc/org.apache.pulsar/pulsar-client-original/latest/org/apache/pulsar/client/impl/conf/ClientConfigurationData.html[ClientConfigurationData], +https://javadoc.io/doc/org.apache.pulsar/pulsar-client-original/latest/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.html[ConsumerConfigurationData] +and https://javadoc.io/doc/org.apache.pulsar/pulsar-client-original/latest/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.html[ProducerConfigurationData]. + +Pulsar Connector allows receiving properties for those configuration objects directly. +For example, the broker authentication information for `PulsarClient` is received using `authPluginClassName` and `authParams` properties. +In order to configure the authentication for the incoming channel `data` : + +[source, properties] +---- +mp.messaging.incoming.data.connector=smallrye-pulsar +mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650 +mp.messaging.incoming.data.topic=topic +mp.messaging.incoming.data.subscriptionInitialPosition=Earliest +mp.messaging.incoming.data.schema=INT32 +mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic +mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"} +---- + +Note that the Pulsar consumer property `subscriptionInitialPosition` is also configured with the `Earliest` value which represents with enum value `SubscriptionInitialPosition.Earliest`. + +This approach covers most of the configuration cases. +However, non-serializable objects such as `CryptoKeyReader`, `ServiceUrlProvider` etc. cannot be configured this way. +The Pulsar Connector allows taking into account instances of Pulsar configuration data objects – +`ClientConfigurationData`, `ConsumerConfigurationData`, `ProducerConfigurationData`: + +[source, java] +---- +import jakarta.enterprise.inject.Produces; +import io.smallrye.common.annotation.Identifier; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; + +class PulsarConfig { + + @Produces + @Identifier("my-consumer-options") + public ConsumerConfigurationData getConsumerConfig() { + ConsumerConfigurationData data = new ConsumerConfigurationData<>(); + data.setAckReceiptEnabled(true); + data.setCryptoKeyReader(DefaultCryptoKeyReader.builder() + //... + .build()); + return data; + } +} +---- + +This instance is retrieved and used to configure the client used by the connector. +You need to indicate the name of the client using the `client-configuration`, `consumer-configuration` or `producer-configuration` attributes: + +[source, properties] +---- +mp.messaging.incoming.prices.consumer-configuration=my-consumer-options +---- + +If no `[client|consumer|producer]-configuration` is configured, the connector will look for instances identified with the channel name: + +[source, java] +---- +import jakarta.enterprise.inject.Produces; +import io.smallrye.common.annotation.Identifier; +import org.apache.pulsar.client.impl.AutoClusterFailover; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; + +class PulsarConfig { + + @Produces + @Identifier("prices") + public ClientConfigurationData getClientConfig() { + ClientConfigurationData data = new ClientConfigurationData(); + data.setEnableTransaction(true); + data.setServiceUrlProvider(AutoClusterFailover.builder() + // ... + .build()); + return data; + } +} +---- + +You also can provide a `Map ` containing configuration values by key: + +[source,java] +---- +import jakarta.enterprise.inject.Produces; +import io.smallrye.common.annotation.Identifier; +import org.apache.pulsar.client.api.BatcherBuilder; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl; +import java.util.Map; + +class PulsarConfig { + + @Produces + @Identifier("prices") + public Map getProducerConfig() { + return Map.of( + "batcherBuilder", BatcherBuilder.KEY_BASED, + "sendTimeoutMs", 3000, + "customMessageRouter", new PartialRoundRobinMessageRouterImpl(4)); + } +} +---- + +Different configuration sources are loaded in the following order of precedence, from the least important to the highest: + +1. `Map ` config map produced with default config identifier, `default-pulsar-client`, `default-pulsar-consumer`, `default-pulsar-producer`. +2. `Map ` config map produced with identifier in the configuration or channel name +3. `[Client|Producer|Consuemr]ConfigurationData` object produced with identifier in the channel configuration or the channel name +4. Channel configuration properties named with `[Client|Producer|Consuemr]ConfigurationData` field names. + +See < > for the exhaustive list of configuration options. + +=== Configuring Pulsar Authentication + +Pulsar provides a pluggable authentication framework, and Pulsar brokers/proxies use this mechanism to authenticate clients. + +Clients can be configured in `application.properties` file using `authPluginClassName` and `authParams` attributes: + +[source, properties] +---- +pulsar.client.serviceUrl=pulsar://pulsar:6650 +pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic +pulsar.client.authParams={"userId":"superuser","password":"admin"} +---- + +Or programmatically: + +[source, java] +---- +import java.util.Map; + +import jakarta.enterprise.inject.Produces; +import io.smallrye.common.annotation.Identifier; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.auth.AuthenticationBasic; + +class PulsarConfig { + + @Produces + @Identifier("prices") + public ClientConfigurationData config() { + var data = new ClientConfigurationData(); + var auth = new AuthenticationBasic(); + auth.configure(Map.of("userId", "superuser", "password", "admin")); + data.setAuthentication(auth); + return data; + } +} +---- + +==== Configuring access to Datastax Luna Streaming + +Luna Streaming is a production-ready distribution of Apache Pulsar, with tools and support from DataStax. +After creating your DataStax Luna Pulsar tenant, note the auto generated token, and configure the token authentication: + +[source, properties] +---- +pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651 +pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken +pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ.... +---- + +Make sure to create topics beforehand, or enable the _Auto Topic Creation_ in the namespace configuration. + +Note that the topic configuration needs to reference full name of topics: + +[source, properties] +---- +mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices +---- + +[[pulsar-health-check]] +== Health Checks + +The Quarkus extension reports startup, readiness and liveness of each channel managed by the Pulsar connector. +Health checks rely on the Pulsar client to verify that a connection is established with the broker. + +**Startup** and **Readiness** probes for both inbound and outbound channels report *OK* when the +connection with the broker is established. + +The **Liveness** probe for both inbound and outbound channels reports *OK* when the +connection is established with the broker **AND** that no failures have been caught. + +Note that a message processing failures *nacks* the message which is +then handled by the failure-strategy. It is the responsibility of the +failure-strategy to report the failure and influence the outcome of the +liveness checks. The `fail` failure strategy reports the failure and so +the liveness check will report the failure. + +[[configuration-reference]] +== Configuration Reference + +Following are the list of configuration attributes for the Pulsar connector channels, consumers, producers and clients. +See the < > for more information on how the Pulsar clients are configured. + +=== Incoming channel configuration (receiving from Pulsar) + +The following attributes are configured using: + +[source, properties] +---- +mp.messaging.incoming.your-channel-name.attribute=value +---- + +include::{includes}/smallrye-pulsar-incoming.adoc[] + +You can also configure properties supported by the underlying Pulsar consumer. + +These properties can also be globally configured using `pulsar.consumer` prefix: + +[source, properties] +---- +pulsar.consumer.subscriptionInitialPosition=Earliest +---- + +include::{includes}/smallrye-pulsar-consumer.adoc[] + +=== Outgoing channel configuration (publishing to Pulsar) + +include::{includes}/smallrye-pulsar-outgoing.adoc[] + +You can also configure properties supported by the underlying Pulsar producer. + +These properties can also be globally configured using `pulsar.producer` prefix: + +[source, properties] +---- +pulsar.producer.batchingEnabled=false +---- + +include::{includes}/smallrye-pulsar-producer.adoc[] + +[[pulsar-client-configuration]] +=== Pulsar Client Configuration + +Following is the configuration reference for the underlying `PulsarClient`. +These options can be configured using the channel attribute: + +[source, properties] +---- +mp.messaging.incoming.your-channel-name.numIoThreads=4 +---- + +Or configured globally using `pulsar.client` prefix: + +[source, properties] +---- +pulsar.client.serviceUrl=pulsar://pulsar:6650 +---- + +include::{includes}/smallrye-pulsar-client.adoc[] + +[IMPORTANT] +==== +Configuration properties not configurable in configuration files (non-serializable) is noted in the column `Config file`. +==== + +== Going further + +This guide has shown how you can interact with Pulsar using Quarkus. +It utilizes SmallRye Reactive Messaging to build data streaming applications. + +If you want to go further, check the documentation of https://smallrye.io/smallrye-reactive-messaging[SmallRye Reactive Messaging], the implementation used in Quarkus. diff --git a/extensions/pom.xml b/extensions/pom.xml index bb7a1aa80f3fa..e4c9d03f36e7d 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -68,6 +68,7 @@ smallrye-reactive-messaging smallrye-reactive-messaging-kafka smallrye-reactive-messaging-amqp +smallrye-reactive-messaging-pulsar smallrye-reactive-messaging-mqtt smallrye-reactive-messaging-rabbitmq smallrye-context-propagation diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/pom.xml b/extensions/smallrye-reactive-messaging-pulsar/deployment/pom.xml new file mode 100644 index 0000000000000..8647639a16569 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/pom.xml @@ -0,0 +1,138 @@ + ++ diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaDiscoveryState.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaDiscoveryState.java new file mode 100644 index 0000000000000..f4d9c63799fa3 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaDiscoveryState.java @@ -0,0 +1,165 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar.deployment; + +import static io.quarkus.smallrye.reactivemessaging.deployment.SmallRyeReactiveMessagingProcessor.getChannelPropertyKey; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; +import org.jboss.jandex.AnnotationInstance; +import org.jboss.jandex.AnnotationTarget; +import org.jboss.jandex.ClassInfo; +import org.jboss.jandex.DotName; +import org.jboss.jandex.IndexView; +import org.jboss.jandex.Type; + +import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelDirection; +import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem; +import io.smallrye.reactive.messaging.pulsar.PulsarConnector; + +class DefaultSchemaDiscoveryState { + private final IndexView index; + + private final Map4.0.0 + ++ + +io.quarkus +quarkus-smallrye-reactive-messaging-pulsar-parent +999-SNAPSHOT +quarkus-smallrye-reactive-messaging-pulsar-deployment +Quarkus - SmallRye Reactive Messaging - Pulsar - Deployment + ++ + ++ +io.quarkus +quarkus-netty-deployment ++ +io.quarkus +quarkus-core-deployment ++ +io.quarkus +quarkus-smallrye-reactive-messaging-deployment ++ +io.quarkus +quarkus-smallrye-reactive-messaging-pulsar ++ +io.quarkus +quarkus-mutiny-reactive-streams-operators-deployment ++ +io.quarkus +quarkus-vertx-deployment ++ +io.quarkus +quarkus-jackson-deployment ++ +io.quarkus +quarkus-devservices-deployment ++ +io.quarkus +quarkus-junit5-internal +test ++ +org.testcontainers +testcontainers ++ ++ +junit +junit ++ +io.quarkus +quarkus-junit4-mock ++ +io.quarkus +quarkus-resteasy-deployment +test ++ +io.rest-assured +rest-assured +test ++ +org.assertj +assertj-core +test ++ +org.mockito +mockito-core +test ++ +org.awaitility +awaitility +test ++ + ++ ++ +maven-compiler-plugin ++ ++ ++ +io.quarkus +quarkus-extension-processor +${project.version} ++ +maven-surefire-plugin ++ +true ++ ++ +test-pulsar ++ ++ +start-containers ++ ++ ++ +maven-surefire-plugin ++ +false +isPulsarConnector = new HashMap<>(); + private final Set alreadyConfigured = new HashSet<>(); + + private Boolean connectorHasSchema; + + DefaultSchemaDiscoveryState(IndexView index) { + this.index = index; + } + + Config getConfig() { + return ConfigProvider.getConfig(); + } + + boolean isPulsarConnector(List channelsManagedByConnectors, boolean incoming, + String channelName) { + // First look in the channelsManagedByConnectors list + Optional match = channelsManagedByConnectors.stream().filter(cn -> cn + .getDirection() == (incoming ? ChannelDirection.INCOMING : ChannelDirection.OUTGOING) + && cn.getName().equalsIgnoreCase(channelName)).findFirst(); + if (match.isPresent()) { + return true; + } + + String channelType = incoming ? "incoming" : "outgoing"; + return isPulsarConnector.computeIfAbsent(channelType + "|" + channelName, ignored -> { + String connectorKey = getChannelPropertyKey(channelName, "connector", incoming); + String connector = getConfig() + .getOptionalValue(connectorKey, String.class) + .orElse("ignored"); + return PulsarConnector.CONNECTOR_NAME.equals(connector); + }); + } + + boolean shouldNotConfigure(String key) { + // if we know at build time that schema is configured on the connector, + // we should NOT emit default configuration for schema on the channel + // (in other words, only a user can explicitly override a connector configuration) + // + // more config properties could possibly be handled in the same way, but these should suffice for now + + if (key.startsWith("mp.messaging.outgoing.") && key.endsWith(".schema")) { + if (connectorHasSchema == null) { + connectorHasSchema = getConfig() + .getOptionalValue("mp.messaging.connector." + PulsarConnector.CONNECTOR_NAME + ".schema", + String.class) + .isPresent(); + } + return connectorHasSchema; + } + + if (key.startsWith("mp.messaging.incoming.") && key.endsWith(".schema")) { + if (connectorHasSchema == null) { + connectorHasSchema = getConfig() + .getOptionalValue("mp.messaging.connector." + PulsarConnector.CONNECTOR_NAME + ".schema", + String.class) + .isPresent(); + } + return connectorHasSchema; + } + + return false; + } + + void ifNotYetConfigured(String key, Runnable runnable) { + if (!alreadyConfigured.contains(key)) { + alreadyConfigured.add(key); + runnable.run(); + } + } + + boolean isAvroGenerated(DotName className) { + ClassInfo clazz = index.getClassByName(className); + return clazz != null && clazz.declaredAnnotation(DotNames.AVRO_GENERATED) != null; + } + + boolean isProtobufGenerated(DotName className) { + ClassInfo clazz = index.getClassByName(className); + return clazz != null && Objects.equals(clazz.superName(), DotNames.PROTOBUF_GENERATED); + } + + boolean hasObjectMapperConfigSchema(Type type, String channelName, boolean incoming) { + String key = getChannelPropertyKey(channelName, "schema", incoming); + Optional schema = getConfig().getOptionalValue(key, String.class); + return schema.isPresent() && schema.get().equals(SyntheticBeanBuilder.objectMapperSchemaId(type)); + } + + List findAnnotationsOnMethods(DotName annotation) { + return index.getAnnotations(annotation) + .stream() + .filter(it -> it.target().kind() == AnnotationTarget.Kind.METHOD) + .collect(Collectors.toList()); + } + + List findRepeatableAnnotationsOnMethods(DotName annotation) { + return index.getAnnotationsWithRepeatable(annotation, index) + .stream() + .filter(it -> it.target().kind() == AnnotationTarget.Kind.METHOD) + .collect(Collectors.toList()); + } + + List findAnnotationsOnInjectionPoints(DotName annotation) { + return index.getAnnotations(annotation) + .stream() + .filter(it -> it.target().kind() == AnnotationTarget.Kind.FIELD + || it.target().kind() == AnnotationTarget.Kind.METHOD_PARAMETER) + .collect(Collectors.toList()); + } + + List findProvidedSchemaWithIdentifier(String identifier) { + return index.getAnnotations(DotNames.IDENTIFIER) + .stream() + .filter(it -> it.target().kind() == AnnotationTarget.Kind.FIELD + || it.target().kind() == AnnotationTarget.Kind.METHOD) + .filter(a -> a.target().hasAnnotation(DotNames.PRODUCES)) + .filter(a -> { + AnnotationTarget target = a.target(); + if (target.kind() == AnnotationTarget.Kind.FIELD) { + return target.asField().type().name().equals(DotNames.PULSAR_SCHEMA); + } + if (target.kind() == AnnotationTarget.Kind.METHOD) { + return target.asMethod().returnType().name().equals(DotNames.PULSAR_SCHEMA); + } + return false; + }) + .filter(a -> Objects.equals(identifier, a.value().asString())) + .collect(Collectors.toList()); + } + + List findImplementedSchemaWithIdentifier(String identifier) { + return index.getAllKnownImplementors(DotNames.PULSAR_SCHEMA) + .stream() + .filter(t -> t.hasAnnotation(DotNames.IDENTIFIER) && + Objects.equals(t.annotation(DotNames.IDENTIFIER).value().asString(), identifier)) + .collect(Collectors.toList()); + } +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java new file mode 100644 index 0000000000000..a5df54ef7f7e7 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java @@ -0,0 +1,51 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar.deployment; + +import org.jboss.jandex.DotName; + +final class DotNames { + // @formatter:off + static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName()); + static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName()); + static final DotName CHANNEL = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Channel.class.getName()); + + static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName()); + static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName()); + static final DotName PULSAR_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions.class.getName()); + + static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName()); + static final DotName PULSAR_MESSAGE = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.PulsarMessage.class.getName()); + static final DotName PULSAR_BATCH_MESSAGE = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.PulsarBatchMessage.class.getName()); + static final DotName PULSAR_API_MESSAGE = DotName.createSimple(org.apache.pulsar.client.api.Message.class.getName()); + static final DotName PULSAR_API_MESSAGES = DotName.createSimple(org.apache.pulsar.client.api.Messages.class.getName()); + static final DotName OUTGOING_MESSAGE = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.OutgoingMessage.class.getName()); + + static final DotName COMPLETION_STAGE = DotName.createSimple(java.util.concurrent.CompletionStage.class.getName()); + static final DotName UNI = DotName.createSimple(io.smallrye.mutiny.Uni.class.getName()); + + static final DotName SUBSCRIBER = DotName.createSimple(org.reactivestreams.Subscriber.class.getName()); + static final DotName SUBSCRIBER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder.class.getName()); + static final DotName PUBLISHER = DotName.createSimple(org.reactivestreams.Publisher.class.getName()); + static final DotName PUBLISHER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder.class.getName()); + static final DotName PROCESSOR = DotName.createSimple(org.reactivestreams.Processor.class.getName()); + static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName()); + static final DotName FLOW_PUBLISHER = DotName.createSimple(java.util.concurrent.Flow.Publisher.class.getName()); + static final DotName MULTI = DotName.createSimple(io.smallrye.mutiny.Multi.class.getName()); + static final DotName PULSAR_GENERIC_RECORD = DotName.createSimple(org.apache.pulsar.client.api.schema.GenericRecord.class.getName()); + + static final DotName AVRO_GENERATED = DotName.createSimple("org.apache.avro.specific.AvroGenerated"); + static final DotName AVRO_GENERIC_RECORD = DotName.createSimple("org.apache.avro.generic.GenericRecord"); + static final DotName PROTOBUF_GENERATED = DotName.createSimple("com.google.protobuf.GeneratedMessageV3"); + static final DotName PULSAR_SCHEMA = DotName.createSimple(org.apache.pulsar.client.api.Schema.class.getName()); + static final DotName PULSAR_AUTHENTICATION = DotName.createSimple(org.apache.pulsar.client.api.Authentication.class.getName()); + + static final DotName LIST = DotName.createSimple(java.util.List.class.getName()); + + static final DotName VERTX_BUFFER = DotName.createSimple(io.vertx.core.buffer.Buffer.class.getName()); + static final DotName VERTX_JSON_ARRAY = DotName.createSimple(io.vertx.core.json.JsonArray.class.getName()); + static final DotName VERTX_JSON_OBJECT = DotName.createSimple(io.vertx.core.json.JsonObject.class.getName()); + static final DotName BYTE_BUFFER = DotName.createSimple(java.nio.ByteBuffer.class.getName()); + + static final DotName PRODUCES = DotName.createSimple(jakarta.enterprise.inject.Produces.class.getName()); + static final DotName IDENTIFIER = DotName.createSimple(io.smallrye.common.annotation.Identifier.class.getName()); + // @formatter:on +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarBuildTimeConfig.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarBuildTimeConfig.java new file mode 100644 index 0000000000000..76faf20c24a07 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarBuildTimeConfig.java @@ -0,0 +1,15 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar.deployment; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(name = "pulsar", phase = ConfigPhase.BUILD_TIME) +public class PulsarBuildTimeConfig { + + /** + * Configuration for DevServices. DevServices allows Quarkus to automatically start a Pulsar Container in dev and test mode. + */ + @ConfigItem + public PulsarDevServicesBuildTimeConfig devservices; +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarContainer.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarContainer.java new file mode 100644 index 0000000000000..1bdd4b93b94b5 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarContainer.java @@ -0,0 +1,69 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar.deployment; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.DockerImageName; + +import com.github.dockerjava.api.command.InspectContainerResponse; + +public class PulsarContainer extends GenericContainer { + + public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.0.0"); + + public static final String STARTER_SCRIPT = "/run_pulsar.sh"; + + public static final int BROKER_PORT = 6650; + public static final int BROKER_HTTP_PORT = 8080; + + public PulsarContainer() { + this(PULSAR_IMAGE); + } + + public PulsarContainer(DockerImageName imageName) { + super(imageName); + super.withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT); + super.withStartupTimeout(Duration.ofSeconds(60)); + super.waitingFor(Wait.forLogMessage(".*Created namespace public/default.*", 1)); + super.withCommand("sh", "-c", runStarterScript()); + super.withTmpFs(Collections.singletonMap("/pulsar/data", "rw")); + } + + protected String runStarterScript() { + return "while [ ! -x " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT; + } + + @Override + protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { + super.containerIsStarting(containerInfo, reused); + String advertisedListeners = String.format("internal:pulsar://localhost:%s,external:pulsar://%s:%s", + BROKER_PORT, this.getHost(), this.getMappedPort(BROKER_PORT)); + + String command = "#!/bin/bash \n"; + command += "export PULSAR_PREFIX_advertisedListeners=" + advertisedListeners + " \n"; + command += "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"; + copyFileToContainer( + Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), + STARTER_SCRIPT); + } + + public PulsarContainer withPort(final int fixedPort) { + if (fixedPort <= 0) { + throw new IllegalArgumentException("The fixed port must be greater than 0"); + } + addFixedExposedPort(fixedPort, BROKER_PORT); + return self(); + } + + public String getPulsarBrokerUrl() { + return String.format("pulsar://%s:%s", this.getHost(), this.getMappedPort(BROKER_PORT)); + } + + public String getHttpServiceUrl() { + return String.format("http://%s:%s", this.getHost(), this.getMappedPort(BROKER_HTTP_PORT)); + } +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarDevServicesBuildTimeConfig.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarDevServicesBuildTimeConfig.java new file mode 100644 index 0000000000000..8b7d724005748 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarDevServicesBuildTimeConfig.java @@ -0,0 +1,72 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar.deployment; + +import java.util.Map; +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +@ConfigGroup +public class PulsarDevServicesBuildTimeConfig { + + /** + * If Dev Services for Pulsar has been explicitly enabled or disabled. Dev Services are generally enabled + * by default, unless there is an existing configuration present. For Pulsar, Dev Services starts a broker unless + * {@code pulsar.client.serviceUrl} is set or if all the Reactive Messaging Pulsar channel are configured with + * {@code serviceUrl}. + */ + @ConfigItem + public Optional enabled = Optional.empty(); + + /** + * Optional fixed port the dev service will listen to. + * + * If not defined, the port will be chosen randomly. + */ + @ConfigItem + public Optional
port; + + /** + * The image to use. + * Note that only Apache Pulsar images are supported. + * Specifically, the image repository must end with {@code apachepulsar/pulsar}. + * + * Check https://hub.docker.com/r/apachepulsar/pulsar to find the available versions. + */ + @ConfigItem(defaultValue = "apachepulsar/pulsar:3.0.0") + public String imageName; + + /** + * Indicates if the Pulsar broker managed by Quarkus Dev Services is shared. + * When shared, Quarkus looks for running containers using label-based service discovery. + * If a matching container is found, it is used, and so a second one is not started. + * Otherwise, Dev Services for Pulsar starts a new container. + * + * The discovery uses the {@code quarkus-dev-service-pulsar} label. + * The value is configured using the {@code service-name} property. + *
+ * Container sharing is only used in dev mode. + */ + @ConfigItem(defaultValue = "true") + public boolean shared; + + /** + * The value of the {@code quarkus-dev-service-pulsar} label attached to the started container. + * This property is used when {@code shared} is set to {@code true}. + * In this case, before starting a container, Dev Services for Pulsar looks for a container with the + * {@code quarkus-dev-service-pulsar} label + * set to the configured value. If found, it will use this container instead of starting a new one. Otherwise, it + * starts a new container with the {@code quarkus-dev-service-pulsar} label set to the specified value. + *
+ * This property is used when you need multiple shared Pulsar brokers. + */ + @ConfigItem(defaultValue = "pulsar") + public String serviceName; + + /** + * Broker config to set on the Pulsar instance + */ + @ConfigItem + public Map
brokerConfig; + +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarDevServicesProcessor.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarDevServicesProcessor.java new file mode 100644 index 0000000000000..6e4ff1fa51a45 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarDevServicesProcessor.java @@ -0,0 +1,272 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar.deployment; + +import java.io.Closeable; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Supplier; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; +import org.jboss.logging.Logger; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; + +import io.quarkus.deployment.Feature; +import io.quarkus.deployment.IsNormal; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.BuildSteps; +import io.quarkus.deployment.builditem.CuratedApplicationShutdownBuildItem; +import io.quarkus.deployment.builditem.DevServicesResultBuildItem; +import io.quarkus.deployment.builditem.DevServicesResultBuildItem.RunningDevService; +import io.quarkus.deployment.builditem.DockerStatusBuildItem; +import io.quarkus.deployment.builditem.LaunchModeBuildItem; +import io.quarkus.deployment.console.ConsoleInstalledBuildItem; +import io.quarkus.deployment.console.StartupLogCompressor; +import io.quarkus.deployment.dev.devservices.GlobalDevServicesConfig; +import io.quarkus.deployment.logging.LoggingSetupBuildItem; +import io.quarkus.devservices.common.ContainerLocator; +import io.quarkus.runtime.LaunchMode; +import io.quarkus.runtime.configuration.ConfigUtils; + +/** + * Starts a Pulsar broker as dev service if needed. + * It uses https://hub.docker.com/r/apachepulsar/pulsar as image. + */ +@BuildSteps(onlyIfNot = IsNormal.class, onlyIf = GlobalDevServicesConfig.Enabled.class) +public class PulsarDevServicesProcessor { + + private static final Logger log = Logger.getLogger(PulsarDevServicesProcessor.class); + + /** + * Label to add to shared Dev Service for pulsar running in containers. + * This allows other applications to discover the running service and use it instead of starting a new instance. + */ + private static final String DEV_SERVICE_LABEL = "quarkus-dev-service-pulsar"; + + private static final ContainerLocator pulsarContainerLocator = new ContainerLocator(DEV_SERVICE_LABEL, + PulsarContainer.BROKER_PORT); + private static final String PULSAR_CLIENT_SERVICE_URL = "pulsar.client.serviceUrl"; + private static final String PULSAR_ADMIN_SERVICE_URL = "pulsar.admin.serviceUrl"; + static volatile RunningDevService devService; + static volatile PulsarDevServiceCfg cfg; + static volatile boolean first = true; + + @BuildStep + public DevServicesResultBuildItem startPulsarDevService( + DockerStatusBuildItem dockerStatusBuildItem, + LaunchModeBuildItem launchMode, + PulsarBuildTimeConfig pulsarClientBuildTimeConfig, + Optional consoleInstalledBuildItem, + CuratedApplicationShutdownBuildItem closeBuildItem, + LoggingSetupBuildItem loggingSetupBuildItem, + GlobalDevServicesConfig devServicesConfig) { + + PulsarDevServiceCfg configuration = getConfiguration(pulsarClientBuildTimeConfig); + + if (devService != null) { + boolean shouldShutdownTheBroker = !configuration.equals(cfg); + if (!shouldShutdownTheBroker) { + return devService.toBuildItem(); + } + shutdownBroker(); + cfg = null; + } + + StartupLogCompressor compressor = new StartupLogCompressor( + (launchMode.isTest() ? "(test) " : "") + "Pulsar Dev Services Starting:", consoleInstalledBuildItem, + loggingSetupBuildItem); + try { + RunningDevService newDevService = startPulsarContainer(dockerStatusBuildItem, configuration, launchMode, + devServicesConfig.timeout); + if (newDevService != null) { + devService = newDevService; + Map config = devService.getConfig(); + if (newDevService.isOwner()) { + log.info("Dev Services for Pulsar started."); + log.infof("Other Quarkus applications in dev mode will find the " + + "broker automatically. For Quarkus applications in production mode, you can connect to" + + " this by starting your application with -Dpulsar.client.serviceUrl=%s", + config.get(PULSAR_CLIENT_SERVICE_URL)); + } + } + if (devService == null) { + compressor.closeAndDumpCaptured(); + } else { + compressor.close(); + } + } catch (Throwable t) { + compressor.closeAndDumpCaptured(); + throw new RuntimeException(t); + } + + if (devService == null) { + return null; + } + + // Configure the watch dog + if (first) { + first = false; + Runnable closeTask = () -> { + if (devService != null) { + shutdownBroker(); + + log.info("Dev Services for Pulsar shut down."); + } + first = true; + devService = null; + cfg = null; + }; + closeBuildItem.addCloseTask(closeTask, true); + } + cfg = configuration; + return devService.toBuildItem(); + } + + private void shutdownBroker() { + if (devService != null) { + try { + devService.close(); + } catch (Throwable e) { + log.error("Failed to stop the Pulsar broker", e); + } finally { + devService = null; + } + } + } + + private RunningDevService startPulsarContainer(DockerStatusBuildItem dockerStatusBuildItem, PulsarDevServiceCfg config, + LaunchModeBuildItem launchMode, Optional timeout) { + if (!config.devServicesEnabled) { + // explicitly disabled + log.debug("Not starting Dev Services for Pulsar, as it has been disabled in the config."); + return null; + } + + // Check if pulsar.serviceUrl is set + if (ConfigUtils.isPropertyPresent(PULSAR_CLIENT_SERVICE_URL)) { + log.debug("Not starting Dev Services for Pulsar, the pulsar.serviceUrl is configured."); + return null; + } + + // Verify that we have Pulsar channels without host and port + if (!hasPulsarChannelWithoutHostAndPort()) { + log.debug("Not starting Dev Services for Pulsar, all the channels are configured."); + return null; + } + + if (!dockerStatusBuildItem.isDockerAvailable()) { + log.warn("Docker isn't working, please configure the Pulsar broker location."); + return null; + } + + final Supplier defaultPulsarBrokerSupplier = () -> { + // Starting the broker + PulsarContainer container = new PulsarContainer(DockerImageName.parse(config.imageName) + .asCompatibleSubstituteFor("apachepulsar/pulsar")) + .withNetwork(Network.SHARED); + config.brokerConfig.forEach((key, value) -> container.addEnv("PULSAR_PREFIX_" + key, value)); + if (launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT) { // Only adds the label in dev mode. + container.withLabel(DEV_SERVICE_LABEL, config.serviceName); + } + if (config.fixedExposedPort != 0) { + container.withPort(config.fixedExposedPort); + } + timeout.ifPresent(container::withStartupTimeout); + container.start(); + + return getRunningService(container.getContainerId(), container::close, container.getPulsarBrokerUrl(), + container.getHttpServiceUrl()); + }; + + return pulsarContainerLocator.locateContainer(config.serviceName, config.shared, launchMode.getLaunchMode()) + .map(containerAddress -> getRunningService(containerAddress.getId(), null, + getServiceUrl(containerAddress.getHost(), containerAddress.getPort()), + getHttpServiceUrl(containerAddress.getHost(), + pulsarContainerLocator.locatePublicPort(config.serviceName, config.shared, + launchMode.getLaunchMode(), PulsarContainer.BROKER_HTTP_PORT).orElse(8080)))) + .orElseGet(defaultPulsarBrokerSupplier); + } + + private String getServiceUrl(String host, int port) { + return String.format("pulsar://%s:%d", host, port); + } + + private String getHttpServiceUrl(String host, int port) { + return String.format("http://%s:%d", host, port); + } + + private RunningDevService getRunningService(String containerId, Closeable closeable, String pulsarBrokerUrl, + String httpServiceUrl) { + Map configMap = new HashMap<>(); + configMap.put(PULSAR_CLIENT_SERVICE_URL, pulsarBrokerUrl); + configMap.put(PULSAR_ADMIN_SERVICE_URL, httpServiceUrl); + return new RunningDevService(Feature.SMALLRYE_REACTIVE_MESSAGING_PULSAR.getName(), containerId, closeable, configMap); + } + + private boolean hasPulsarChannelWithoutHostAndPort() { + Config config = ConfigProvider.getConfig(); + for (String name : config.getPropertyNames()) { + boolean isIncoming = name.startsWith("mp.messaging.incoming."); + boolean isOutgoing = name.startsWith("mp.messaging.outgoing."); + boolean isConnector = name.endsWith(".connector"); + boolean isConfigured = false; + if ((isIncoming || isOutgoing) && isConnector) { + String connectorValue = config.getValue(name, String.class); + boolean isPulsar = connectorValue.equalsIgnoreCase("smallrye-pulsar"); + boolean hasServiceUrl = ConfigUtils.isPropertyPresent(name.replace(".connector", ".serviceUrl")); + isConfigured = isPulsar && hasServiceUrl; + } + + if (!isConfigured) { + return true; + } + } + return false; + } + + private PulsarDevServiceCfg getConfiguration(PulsarBuildTimeConfig cfg) { + PulsarDevServicesBuildTimeConfig devServicesConfig = cfg.devservices; + return new PulsarDevServiceCfg(devServicesConfig); + } + + private static final class PulsarDevServiceCfg { + private final boolean devServicesEnabled; + private final String imageName; + private final Integer fixedExposedPort; + private final boolean shared; + private final String serviceName; + private final Map brokerConfig; + + public PulsarDevServiceCfg(PulsarDevServicesBuildTimeConfig devServicesConfig) { + this.devServicesEnabled = devServicesConfig.enabled.orElse(true); + this.imageName = devServicesConfig.imageName; + this.fixedExposedPort = devServicesConfig.port.orElse(0); + this.shared = devServicesConfig.shared; + this.serviceName = devServicesConfig.serviceName; + this.brokerConfig = devServicesConfig.brokerConfig; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PulsarDevServiceCfg that = (PulsarDevServiceCfg) o; + return devServicesEnabled == that.devServicesEnabled && Objects.equals(imageName, that.imageName) + && Objects.equals(fixedExposedPort, that.fixedExposedPort) + && Objects.equals(brokerConfig, that.brokerConfig); + } + + @Override + public int hashCode() { + return Objects.hash(devServicesEnabled, imageName, fixedExposedPort, brokerConfig); + } + } + +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java new file mode 100644 index 0000000000000..d816493bc180e --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java @@ -0,0 +1,584 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar.deployment; + +import static io.quarkus.smallrye.reactivemessaging.deployment.SmallRyeReactiveMessagingProcessor.getChannelPropertyKey; + +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import org.apache.pulsar.common.schema.SchemaType; +import org.jboss.jandex.AnnotationInstance; +import org.jboss.jandex.DotName; +import org.jboss.jandex.MethodInfo; +import org.jboss.jandex.MethodParameterInfo; +import org.jboss.jandex.Type; +import org.jboss.logging.Logger; + +import io.quarkus.arc.deployment.SyntheticBeanBuildItem; +import io.quarkus.arc.processor.KotlinUtils; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; +import io.quarkus.deployment.builditem.CombinedIndexBuildItem; +import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; +import io.quarkus.deployment.recording.RecorderContext; +import io.quarkus.pulsar.SchemaProviderRecorder; +import io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames; +import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem; + +public class PulsarSchemaDiscoveryProcessor { + + static Logger log = Logger.getLogger(PulsarSchemaDiscoveryProcessor.class); + + /** + * Handles the serializer/deserializer detection and whether the graceful shutdown should be used in dev mode. + */ + @BuildStep + @Record(ExecutionTime.STATIC_INIT) + public void defaultChannelConfiguration( + ReactiveMessagingPulsarBuildTimeConfig buildTimeConfig, + CombinedIndexBuildItem combinedIndex, + List channelsManagedByConnectors, + BuildProducer defaultConfigProducer, + BuildProducer syntheticBean, + RecorderContext recorderContext, + SchemaProviderRecorder recorder) { + if (buildTimeConfig.schemaAutodetectionEnabled) { + DefaultSchemaDiscoveryState discoveryState = new DefaultSchemaDiscoveryState(combinedIndex.getIndex()); + discoverDefaultSerdeConfig(discoveryState, channelsManagedByConnectors, defaultConfigProducer, + buildTimeConfig.schemaGenerationEnabled ? new SyntheticBeanBuilder(syntheticBean, recorder, recorderContext) + : null); + } + } + + // visible for testing + void discoverDefaultSerdeConfig(DefaultSchemaDiscoveryState discovery, + List channelsManagedByConnectors, + BuildProducer config, + SyntheticBeanBuilder syntheticBean) { + for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.INCOMING)) { + String channelName = annotation.value().asString(); + if (!discovery.isPulsarConnector(channelsManagedByConnectors, true, channelName)) { + continue; + } + + MethodInfo method = annotation.target().asMethod(); + + Type incomingType = getIncomingTypeFromMethod(method); + processIncomingType(discovery, config, incomingType, channelName, syntheticBean); + } + + for (AnnotationInstance annotation : discovery.findAnnotationsOnMethods(DotNames.OUTGOING)) { + String channelName = annotation.value().asString(); + if (!discovery.isPulsarConnector(channelsManagedByConnectors, false, channelName)) { + continue; + } + + MethodInfo method = annotation.target().asMethod(); + + Type outgoingType = getOutgoingTypeFromMethod(method); + processOutgoingType(discovery, config, outgoingType, channelName, syntheticBean); + } + + for (AnnotationInstance annotation : discovery.findAnnotationsOnInjectionPoints(DotNames.CHANNEL)) { + String channelName = annotation.value().asString(); + if (!discovery.isPulsarConnector(channelsManagedByConnectors, false, channelName) + && !discovery.isPulsarConnector(channelsManagedByConnectors, true, channelName)) { + continue; + } + + Type injectionPointType = getInjectionPointType(annotation); + if (injectionPointType == null) { + continue; + } + + Type incomingType = getIncomingTypeFromChannelInjectionPoint(injectionPointType); + processIncomingType(discovery, config, incomingType, channelName, syntheticBean); + processPulsarTransactions(discovery, config, channelName, injectionPointType); + + Type outgoingType = getOutgoingTypeFromChannelInjectionPoint(injectionPointType); + processOutgoingType(discovery, config, outgoingType, channelName, syntheticBean); + } + } + + private static String outgoingSchemaKey(String channelName) { + return getChannelPropertyKey(channelName, "schema", false); + } + + private void processPulsarTransactions(DefaultSchemaDiscoveryState discovery, + BuildProducer config, + String channelName, + Type injectionPointType) { + if (injectionPointType != null && isPulsarEmitter(injectionPointType)) { + String enableTransactionKey = getChannelPropertyKey(channelName, "enableTransaction", false); + log.infof("Transactional producer detected for channel '%s', setting following default config values: " + + "'" + enableTransactionKey + "=true'", channelName); + produceRuntimeConfigurationDefaultBuildItem(discovery, config, enableTransactionKey, "true"); + } + } + + private void processIncomingType(DefaultSchemaDiscoveryState discovery, + BuildProducer config, + Type incomingType, + String channelName, + SyntheticBeanBuilder syntheticBean) { + extractValueType(incomingType, (value, isBatch) -> { + if (discovery.findProvidedSchemaWithIdentifier(channelName).isEmpty()) { + if (discovery.hasObjectMapperConfigSchema(value, channelName, true)) { + objectMapperSchemaFor(SyntheticBeanBuilder.objectMapperSchemaId(value), value, syntheticBean); + } else { + String schema = schemaFor(discovery, value, syntheticBean); + produceRuntimeConfigurationDefaultBuildItem(discovery, config, incomingSchemaKey(channelName), schema); + } + } + if (Boolean.TRUE.equals(isBatch)) { + produceRuntimeConfigurationDefaultBuildItem(discovery, config, + getChannelPropertyKey(channelName, "batchReceive", true), "true"); + } + }); + } + + private static String incomingSchemaKey(String channelName) { + return getChannelPropertyKey(channelName, "schema", true); + } + + private Type getInjectionPointType(AnnotationInstance annotation) { + switch (annotation.target().kind()) { + case FIELD: + return annotation.target().asField().type(); + case METHOD_PARAMETER: + MethodParameterInfo parameter = annotation.target().asMethodParameter(); + return parameter.method().parameterType(parameter.position()); + default: + return null; + } + } + + private void produceRuntimeConfigurationDefaultBuildItem(DefaultSchemaDiscoveryState discovery, + BuildProducer config, + String key, String value) { + if (value == null) { + return; + } + + if (discovery.shouldNotConfigure(key)) { + return; + } + + discovery.ifNotYetConfigured(key, () -> config.produce(new RunTimeConfigurationDefaultBuildItem(key, value))); + } + + private Type getIncomingTypeFromMethod(MethodInfo method) { + List parameterTypes = method.parameterTypes(); + int parametersCount = parameterTypes.size(); + Type returnType = method.returnType(); + + Type incomingType = null; + + // @Incoming + if ((isVoid(returnType) && parametersCount >= 1) + || (isCompletionStage(returnType) && parametersCount >= 1) + || (isUni(returnType) && parametersCount >= 1)) { + incomingType = parameterTypes.get(0); + } else if ((isSubscriber(returnType) && parametersCount == 0) + || (isSubscriberBuilder(returnType) && parametersCount == 0)) { + incomingType = returnType.asParameterizedType().arguments().get(0); + } else if (KotlinUtils.isKotlinSuspendMethod(method)) { + incomingType = parameterTypes.get(0); + } + + // @Incoming @Outgoing + if (method.hasAnnotation(DotNames.OUTGOING)) { + if ((isCompletionStage(returnType) && parametersCount >= 1) + || (isUni(returnType) && parametersCount >= 1) + || (isPublisher(returnType) && parametersCount == 1) + || (isPublisherBuilder(returnType) && parametersCount == 1) + || (isFlowPublisher(returnType) && parametersCount == 1) + || (isMulti(returnType) && parametersCount == 1)) { + incomingType = parameterTypes.get(0); + } else if ((isProcessor(returnType) && parametersCount == 0) + || (isProcessorBuilder(returnType) && parametersCount == 0)) { + incomingType = returnType.asParameterizedType().arguments().get(0); + } else if (parametersCount >= 1) { + incomingType = parameterTypes.get(0); + } else if (KotlinUtils.isKotlinSuspendMethod(method)) { + incomingType = parameterTypes.get(0); + } + + // @Incoming @Outgoing stream manipulation + if (incomingType != null + && (isPublisher(incomingType) || isPublisherBuilder(incomingType) + || isMulti(incomingType) || isFlowPublisher(incomingType))) { + incomingType = incomingType.asParameterizedType().arguments().get(0); + } + } + return incomingType; + } + + private Type getIncomingTypeFromChannelInjectionPoint(Type injectionPointType) { + if (injectionPointType == null) { + return null; + } + + if (isPublisher(injectionPointType) || isPublisherBuilder(injectionPointType) + || isMulti(injectionPointType) || isFlowPublisher(injectionPointType)) { + return injectionPointType.asParameterizedType().arguments().get(0); + } else { + return null; + } + } + + private Type getOutgoingTypeFromMethod(MethodInfo method) { + List parameterTypes = method.parameterTypes(); + int parametersCount = parameterTypes.size(); + Type returnType = method.returnType(); + + Type outgoingType = null; + + // @Outgoing + if ((isPublisher(returnType) && parametersCount == 0) + || (isPublisherBuilder(returnType) && parametersCount == 0) + || (isMulti(returnType) && parametersCount == 0) + || (isFlowPublisher(returnType) && parametersCount == 0) + || (isCompletionStage(returnType) && parametersCount == 0) + || (isUni(returnType) && parametersCount == 0)) { + outgoingType = returnType.asParameterizedType().arguments().get(0); + } else if (parametersCount == 0) { + outgoingType = returnType; + } else if (KotlinUtils.isKotlinSuspendMethod(method)) { + outgoingType = getReturnTypeFromKotlinSuspendMethod(method); + } + + // @Incoming @Outgoing + if (method.hasAnnotation(DotNames.INCOMING)) { + if ((isCompletionStage(returnType) && parametersCount == 1) + || (isUni(returnType) && parametersCount == 1) + || (isPublisher(returnType) && parametersCount == 1) + || (isPublisherBuilder(returnType) && parametersCount == 1) + || (isFlowPublisher(returnType) && parametersCount == 1) + || (isMulti(returnType) && parametersCount == 1)) { + outgoingType = returnType.asParameterizedType().arguments().get(0); + } else if ((isProcessor(returnType) && parametersCount == 0) + || (isProcessorBuilder(returnType) && parametersCount == 0)) { + outgoingType = returnType.asParameterizedType().arguments().get(1); + } else if (parametersCount == 1) { + outgoingType = returnType; + } else if (KotlinUtils.isKotlinSuspendMethod(method)) { + outgoingType = getReturnTypeFromKotlinSuspendMethod(method); + } + + // @Incoming @Outgoing stream manipulation + if (outgoingType != null + && (isPublisher(outgoingType) || isPublisherBuilder(outgoingType) + || isMulti(outgoingType) || isFlowPublisher(outgoingType))) { + outgoingType = outgoingType.asParameterizedType().arguments().get(0); + } + } + return outgoingType; + } + + private static Type getReturnTypeFromKotlinSuspendMethod(MethodInfo method) { + Type continuationReturnType = method.parameterType(method.parametersCount() - 1); + + if (continuationReturnType.kind() == Type.Kind.PARAMETERIZED_TYPE) { + Type firstGenericType = continuationReturnType.asParameterizedType().arguments().get(0); + if (firstGenericType.kind() == Type.Kind.WILDCARD_TYPE) { + return firstGenericType.asWildcardType().superBound(); + } + } + + return null; + } + + private Type getOutgoingTypeFromChannelInjectionPoint(Type injectionPointType) { + if (injectionPointType == null) { + return null; + } + + if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType) || isPulsarEmitter(injectionPointType)) { + return injectionPointType.asParameterizedType().arguments().get(0); + } else { + return null; + } + } + + private void processOutgoingType(DefaultSchemaDiscoveryState discovery, + BuildProducer config, + Type outgoingType, + String channelName, SyntheticBeanBuilder syntheticBean) { + extractValueType(outgoingType, (value, isBatch) -> { + if (discovery.findProvidedSchemaWithIdentifier(channelName).isEmpty()) { + if (discovery.hasObjectMapperConfigSchema(value, channelName, false)) { + objectMapperSchemaFor(SyntheticBeanBuilder.objectMapperSchemaId(value), value, syntheticBean); + } else { + String schema = schemaFor(discovery, value, syntheticBean); + produceRuntimeConfigurationDefaultBuildItem(discovery, config, outgoingSchemaKey(channelName), schema); + } + } + }); + } + + private void extractValueType(Type type, BiConsumer schemaAcceptor) { + if (type == null) { + return; + } + + if (isMessage(type)) { + List typeArguments = type.asParameterizedType().arguments(); + Type messageTypeParameter = typeArguments.get(0); + if (isList(messageTypeParameter)) { + List messageListTypeArguments = messageTypeParameter.asParameterizedType().arguments(); + schemaAcceptor.accept(messageListTypeArguments.get(0), true); + } else { + schemaAcceptor.accept(messageTypeParameter, false); + } + } else if (isList(type)) { + List typeArguments = type.asParameterizedType().arguments(); + schemaAcceptor.accept(typeArguments.get(0), true); + } else if (isPulsarMessage(type) || isPulsarApiMessage(type) || isOutgoingMessage(type)) { + List typeArguments = type.asParameterizedType().arguments(); + schemaAcceptor.accept(typeArguments.get(0), false); + } else if (isPulsarApiMessages(type) || isPulsarBatchMessage(type)) { + List typeArguments = type.asParameterizedType().arguments(); + schemaAcceptor.accept(typeArguments.get(0), true); + } else if (isKeyedMulti(type)) { + List typeArguments = type.asParameterizedType().arguments(); + schemaAcceptor.accept(typeArguments.get(1), false); + } else if (isRawMessage(type)) { + schemaAcceptor.accept(type, false); + } + } + + // --- + + private static boolean isVoid(Type type) { + return type.kind() == Type.Kind.VOID; + } + + private static boolean isCompletionStage(Type type) { + // raw type CompletionStage is wrong, must be CompletionStage + return DotNames.COMPLETION_STAGE.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isUni(Type type) { + // raw type Uni is wrong, must be Uni + return DotNames.UNI.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isMulti(Type type) { + // raw type Multi is wrong, must be Multi + return DotNames.MULTI.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isFlowPublisher(Type type) { + // raw type Flow.Publisher is wrong, must be Multi + return DotNames.FLOW_PUBLISHER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isSubscriber(Type type) { + // raw type Subscriber is wrong, must be Subscriber + return DotNames.SUBSCRIBER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isSubscriberBuilder(Type type) { + // raw type SubscriberBuilder is wrong, must be SubscriberBuilder + return DotNames.SUBSCRIBER_BUILDER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + + private static boolean isPublisher(Type type) { + // raw type Publisher is wrong, must be Publisher + return DotNames.PUBLISHER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isPublisherBuilder(Type type) { + // raw type PublisherBuilder is wrong, must be PublisherBuilder + return DotNames.PUBLISHER_BUILDER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isProcessor(Type type) { + // raw type Processor is wrong, must be Processor + return DotNames.PROCESSOR.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + + private static boolean isProcessorBuilder(Type type) { + // raw type ProcessorBuilder is wrong, must be ProcessorBuilder + return DotNames.PROCESSOR_BUILDER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + + // --- + + private static boolean isEmitter(Type type) { + // raw type Emitter is wrong, must be Emitter + return DotNames.EMITTER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isMutinyEmitter(Type type) { + // raw type MutinyEmitter is wrong, must be MutinyEmitter + return DotNames.MUTINY_EMITTER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isPulsarEmitter(Type type) { + // raw type PulsarTransactions is wrong, must be PulsarTransactions + return DotNames.PULSAR_EMITTER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + // --- + + private static boolean isMessage(Type type) { + // raw type Message is wrong, must be Message + return DotNames.MESSAGE.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isPulsarMessage(Type type) { + // raw type PulsarMessage is wrong, must be PulsarMessage + return DotNames.PULSAR_MESSAGE.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isOutgoingMessage(Type type) { + // raw type Record is wrong, must be Record + return DotNames.OUTGOING_MESSAGE.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isPulsarApiMessage(Type type) { + // raw type ConsumerRecord is wrong, must be ConsumerRecord + return DotNames.PULSAR_API_MESSAGE.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isKeyedMulti(Type type) { + return ReactiveMessagingDotNames.KEYED_MULTI.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + + private static boolean isList(Type type) { + return DotNames.LIST.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isPulsarBatchMessage(Type type) { + return DotNames.PULSAR_BATCH_MESSAGE.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isPulsarApiMessages(Type type) { + return DotNames.PULSAR_API_MESSAGES.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isRawMessage(Type type) { + switch (type.kind()) { + case PRIMITIVE: + case CLASS: + case ARRAY: + return true; + default: + return false; + } + } + + private String knownSchemaFor(Type type) { + if (type == null) { + return null; + } + DotName typeName = type.name(); + + // statically known schema + if (KNOWN_SCHEMA.containsKey(typeName)) { + return KNOWN_SCHEMA.get(typeName); + } + + // unknown + return null; + } + + private void objectMapperSchemaFor(String schemaId, Type type, SyntheticBeanBuilder syntheticBean) { + if (syntheticBean != null) { + syntheticBean.produceObjectMapperSchemaBean(schemaId, type); + } + } + + private String schemaFor(DefaultSchemaDiscoveryState discovery, Type type, SyntheticBeanBuilder syntheticBean) { + String result = knownSchemaFor(type); + if (result == null && type != null && syntheticBean != null && type.kind() == Type.Kind.CLASS) { + String schemaId = syntheticBean.produceSchemaBean(discovery, type); + if (schemaId != null) { + log.infof("Generating Schema for type %s %s", type.name().toString(), schemaId); + } + result = schemaId; + } + return result; + } + + // --- + + // @formatter:off + private static final Map KNOWN_SCHEMA = Map.ofEntries( + // primitives + Map.entry(DotName.createSimple("byte"), SchemaType.INT8.name()), + Map.entry(DotName.createSimple("short"), SchemaType.INT16.name()), + Map.entry(DotName.createSimple("int"), SchemaType.INT32.name()), + Map.entry(DotName.createSimple("long"), SchemaType.INT64.name()), + Map.entry(DotName.createSimple("float"), SchemaType.FLOAT.name()), + Map.entry(DotName.createSimple("double"), SchemaType.DOUBLE.name()), + Map.entry(DotName.createSimple("boolean"), SchemaType.BOOLEAN.name()), + // primitive wrappers + Map.entry(DotName.createSimple(Byte.class.getName()), SchemaType.INT8.name()), + Map.entry(DotName.createSimple(Short.class.getName()), SchemaType.INT16.name()), + Map.entry(DotName.createSimple(Integer.class.getName()), SchemaType.INT32.name()), + Map.entry(DotName.createSimple(Long.class.getName()), SchemaType.INT64.name()), + Map.entry(DotName.createSimple(Float.class.getName()), SchemaType.FLOAT.name()), + Map.entry(DotName.createSimple(Double.class.getName()), SchemaType.DOUBLE.name()), + Map.entry(DotName.createSimple(Boolean.class.getName()), SchemaType.BOOLEAN.name()), + // arrays + Map.entry(DotName.createSimple("[B"), SchemaType.BYTES.name()), + // other + Map.entry(DotName.createSimple(String.class.getName()), SchemaType.STRING.name()), + Map.entry(DotName.createSimple(java.time.Instant.class.getName()), SchemaType.INSTANT.name()), + Map.entry(DotName.createSimple(java.sql.Timestamp.class.getName()), SchemaType.TIMESTAMP.name()), + Map.entry(DotName.createSimple(java.time.LocalDate.class.getName()), SchemaType.LOCAL_DATE.name()), + Map.entry(DotName.createSimple(java.time.LocalTime.class.getName()), SchemaType.LOCAL_TIME.name()), + Map.entry(DotName.createSimple(java.time.LocalDateTime.class.getName()), SchemaType.LOCAL_DATE_TIME.name()), + // Pulsar + Map.entry(DotNames.PULSAR_GENERIC_RECORD, SchemaType.AUTO_CONSUME.name()) + ); + // @formatter:on + +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/ReactiveMessagingPulsarBuildTimeConfig.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/ReactiveMessagingPulsarBuildTimeConfig.java new file mode 100644 index 0000000000000..34cde49902c8f --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/ReactiveMessagingPulsarBuildTimeConfig.java @@ -0,0 +1,21 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar.deployment; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(name = "reactive-messaging.pulsar", phase = ConfigPhase.BUILD_TIME) +public class ReactiveMessagingPulsarBuildTimeConfig { + /** + * Whether or not Pulsar Schema auto-detection is enabled. + */ + @ConfigItem(name = "schema-autodetection.enabled", defaultValue = "true") + public boolean schemaAutodetectionEnabled; + + /** + * Whether Pulsar Schema generation is enabled. + * When no Schema are found and not set, Quarkus generates a JSON Schema. + */ + @ConfigItem(name = "schema-generation.enabled", defaultValue = "true") + public boolean schemaGenerationEnabled; +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/SmallRyeReactiveMessagingPulsarProcessor.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/SmallRyeReactiveMessagingPulsarProcessor.java new file mode 100644 index 0000000000000..2e3096fbdcbf7 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/SmallRyeReactiveMessagingPulsarProcessor.java @@ -0,0 +1,189 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar.deployment; + +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.logging.Level; + +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.jboss.jandex.ClassInfo; +import org.jboss.logging.Logger; +import org.objectweb.asm.ClassVisitor; +import org.objectweb.asm.FieldVisitor; +import org.objectweb.asm.MethodVisitor; +import org.objectweb.asm.Opcodes; + +import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.deployment.Feature; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.BytecodeTransformerBuildItem; +import io.quarkus.deployment.builditem.CombinedIndexBuildItem; +import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.IndexDependencyBuildItem; +import io.quarkus.deployment.builditem.LogCategoryBuildItem; +import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; +import io.quarkus.deployment.builditem.nativeimage.NativeImageConfigBuildItem; +import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.gizmo.Gizmo; +import io.quarkus.pulsar.PulsarRuntimeConfigProducer; + +public class SmallRyeReactiveMessagingPulsarProcessor { + + static Logger log = Logger.getLogger(SmallRyeReactiveMessagingPulsarProcessor.class); + + @BuildStep + FeatureBuildItem feature() { + return new FeatureBuildItem(Feature.SMALLRYE_REACTIVE_MESSAGING_PULSAR); + } + + @BuildStep + public AdditionalBeanBuildItem runtimeConfig() { + return AdditionalBeanBuildItem.builder() + .addBeanClass(PulsarRuntimeConfigProducer.class) + .setUnremovable() + .build(); + } + + @BuildStep + void logging(BuildProducer log) { + log.produce(new LogCategoryBuildItem("org.apache.pulsar.common.util.netty.DnsResolverUtil", Level.OFF)); + } + + @BuildStep + void disableStatsLogging(BuildProducer runtimeConfig) { + runtimeConfig.produce(new RunTimeConfigurationDefaultBuildItem( + "mp.messaging.connector.smallrye-pulsar.statsIntervalSeconds", "0")); + } + + @BuildStep + NativeImageResourceBuildItem nativeImageResources() { + return new NativeImageResourceBuildItem( + "org/asynchttpclient/config/ahc-default.properties", + "org/asynchttpclient/config/ahc.properties"); + } + + @BuildStep + void bytecodeTransformer(BuildProducer producer) { + String klass = "org.asynchttpclient.request.body.multipart.FileLikePart"; + + producer.produce(new BytecodeTransformerBuildItem(klass, new BiFunction () { + @Override + public ClassVisitor apply(String cls, ClassVisitor classVisitor) { + return new FileLikePartJavaxRemover(Gizmo.ASM_API_VERSION, classVisitor); + } + + })); + } + + private class FileLikePartJavaxRemover extends ClassVisitor { + public FileLikePartJavaxRemover(int version, ClassVisitor cv) { + super(version, cv); + log.debug("Removing javax.activation from FileLikePart"); + } + + @Override + public FieldVisitor visitField(int access, String name, String descriptor, String signature, Object value) { + if (name.equals("MIME_TYPES_FILE_TYPE_MAP")) { + return null; + } + return super.visitField(access, name, descriptor, signature, value); + } + + // invoked for every method + @Override + public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) { + MethodVisitor visitor = super.visitMethod(access, name, desc, signature, exceptions); + if (visitor == null) { + return null; + } + if (name.equals(" ")) { + return new MethodVisitor(Gizmo.ASM_API_VERSION, visitor) { + @Override + public void visitCode() { + mv.visitCode(); + mv.visitInsn(Opcodes.RETURN);// our new code + } + }; + } + if (name.equals("computeContentType")) { + return new MethodVisitor(Gizmo.ASM_API_VERSION, visitor) { + @Override + public void visitCode() { + super.visitCode(); + visitVarInsn(Opcodes.ALOAD, 1); // first param + visitInsn(Opcodes.ARETURN); + } + }; + } + visitor.visitMaxs(0, 0); + return visitor; + } + + } + + @BuildStep + IndexDependencyBuildItem indexPulsar() { + return new IndexDependencyBuildItem("org.apache.pulsar", "pulsar-client-original"); + } + + @BuildStep + public NativeImageConfigBuildItem pulsarRuntimeInitialized( + CombinedIndexBuildItem combinedIndex, + BuildProducer reflectiveClass, + BuildProducer nativeSslSupport) { + nativeSslSupport.produce(new ExtensionSslNativeSupportBuildItem(Feature.SMALLRYE_REACTIVE_MESSAGING_PULSAR)); + reflectiveClass.produce(ReflectiveClassBuildItem + .builder(ClientConfigurationData.class.getName(), + ProducerConfigurationData.class.getName(), + ConsumerConfigurationData.class.getName(), + "com.google.protobuf.GeneratedMessageV3", + "org.apache.pulsar.common.protocol.schema.ProtobufNativeSchemaData", + "org.apache.pulsar.client.impl.schema.ProtobufNativeSchema$ProtoBufParsingInfo", + "org.apache.pulsar.client.impl.schema.ProtobufSchema$ProtoBufParsingInfo", + "org.apache.pulsar.common.schema.KeyValue") + .fields(true) + .methods(true) + .constructors(true) + .build()); + reflectiveClass.produce(ReflectiveClassBuildItem + .builder("org.apache.pulsar.client.util.SecretsSerializer") + .constructors().build()); + + Collection authPluginClasses = combinedIndex.getIndex() + .getAllKnownImplementors(DotNames.PULSAR_AUTHENTICATION); + for (ClassInfo authPluginClass : authPluginClasses) { + reflectiveClass.produce(ReflectiveClassBuildItem.builder(authPluginClass.name().toString()) + .constructors().build()); + } + + return NativeImageConfigBuildItem.builder() + .addNativeImageSystemProperty("io.netty.handler.ssl.noOpenSsl", "true") + .addRuntimeInitializedClass("org.apache.pulsar.common.allocator.PulsarByteBufAllocator") + .addRuntimeInitializedClass("org.apache.pulsar.common.protocol.Commands") + .addRuntimeInitializedClass("org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient") + .addRuntimeInitializedClass("org.apache.pulsar.client.impl.crypto.MessageCryptoBc") + .addRuntimeInitializedClass("org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema") + .addRuntimeInitializedClass("org.apache.pulsar.client.impl.Backoff") + .addRuntimeInitializedClass("org.apache.pulsar.client.impl.ConnectionPool") + .addRuntimeInitializedClass("org.apache.pulsar.client.impl.ControlledClusterFailover") + .addRuntimeInitializedClass("org.apache.pulsar.client.impl.HttpClient") + .addRuntimeInitializedClass("org.apache.pulsar.client.util.WithSNISslEngineFactory") + .addRuntimeInitializedClass("com.yahoo.sketches.quantiles.DoublesSketch") + .addRuntimeInitializedClass("io.netty.buffer.PooledByteBufAllocator") + .addRuntimeInitializedClass("io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledHeapByteBuf") + .addRuntimeInitializedClass("io.netty.incubator.channel.uring.IOUringEventLoopGroup") + .addRuntimeInitializedClass("io.netty.incubator.channel.uring.Native") + .addRuntimeInitializedClass("io.netty.incubator.channel.uring.IOUring") + .addRuntimeInitializedClass("org.asynchttpclient.RequestBuilderBase") + .addRuntimeInitializedClass("org.asynchttpclient.RequestBuilder") + .addRuntimeInitializedClass("org.asynchttpclient.BoundRequestBuilder") + .addRuntimeInitializedClass("org.asynchttpclient.ntlm.NtlmEngine") + .addRuntimeInitializedClass("sun.awt.dnd.SunDropTargetContextPeer$EventDispatcher") + .build(); + } + +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/SyntheticBeanBuilder.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/SyntheticBeanBuilder.java new file mode 100644 index 0000000000000..bbce1efc73f3d --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/SyntheticBeanBuilder.java @@ -0,0 +1,124 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar.deployment; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.Supplier; + +import jakarta.enterprise.context.Dependent; + +import org.jboss.jandex.ParameterizedType; +import org.jboss.jandex.Type; + +import io.quarkus.arc.deployment.SyntheticBeanBuildItem; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.recording.RecorderContext; +import io.quarkus.pulsar.SchemaProviderRecorder; +import io.quarkus.runtime.RuntimeValue; +import io.quarkus.runtime.util.HashUtil; +import io.smallrye.common.annotation.Identifier; + +public class SyntheticBeanBuilder { + final BuildProducer syntheticBeanBuildItem; + final SchemaProviderRecorder recorder; + final RecorderContext recorderContext; + final Map alreadyGeneratedSchema; + + public SyntheticBeanBuilder(BuildProducer syntheticBeanBuildItem, + SchemaProviderRecorder recorder, + RecorderContext recorderContext) { + this.syntheticBeanBuildItem = syntheticBeanBuildItem; + this.recorder = recorder; + this.recorderContext = recorderContext; + this.alreadyGeneratedSchema = new HashMap<>(); + } + + static String objectMapperSchemaId(Type type) { + return "ObjectMapper<" + type.name() + ">"; + } + + String schemaIdFor(Type type) { + return alreadyGeneratedSchema.get(type.toString()); + } + + void produceObjectMapperSchemaBean(String schemaId, Type type) { + if (!alreadyGeneratedSchema.containsKey(type.toString()) + || alreadyGeneratedSchema.get(type.toString()).equals(schemaId)) { + var runtimeValue = recorder.createObjectMapperSchema(recorderContext.classProxy(type.name().toString())); + produceSyntheticBeanSchema(syntheticBeanBuildItem, runtimeValue, schemaId, type); + alreadyGeneratedSchema.put(type.toString(), schemaId); + } + } + + public String produceSchemaBean(DefaultSchemaDiscoveryState discovery, Type type) { + if (syntheticBeanBuildItem != null && type.kind() == Type.Kind.CLASS) { + String schemaId = schemaIdFor(type); + if (schemaId == null) { + if (discovery.isAvroGenerated(type.name()) || DotNames.AVRO_GENERIC_RECORD.equals(type.name())) { + schemaId = generateId(type, "AVRO"); + produceSyntheticBeanSchema(syntheticBeanBuildItem, + recorder.createAvroSchema(recorderContext.classProxy(type.name().toString())), schemaId, type); + } else if (discovery.isProtobufGenerated(type.name())) { + schemaId = generateId(type, "PROTOBUF"); + produceSyntheticBeanSchema(syntheticBeanBuildItem, + recorder.createProtoBufSchema(recorderContext.classProxy(type.name().toString())), schemaId, type); + } else if (type.name().equals(DotNames.VERTX_JSON_OBJECT)) { + schemaId = generateId(type, "JSON_OBJECT"); + produceSyntheticBeanSchema(syntheticBeanBuildItem, recorder.createJsonObjectSchema(), schemaId, type); + } else if (type.name().equals(DotNames.VERTX_JSON_ARRAY)) { + schemaId = generateId(type, "JSON_ARRAY"); + produceSyntheticBeanSchema(syntheticBeanBuildItem, recorder.createJsonArraySchema(), schemaId, type); + } else if (type.name().equals(DotNames.VERTX_BUFFER)) { + schemaId = generateId(type, "BUFFER"); + produceSyntheticBeanSchema(syntheticBeanBuildItem, recorder.createBufferSchema(), schemaId, type); + } else if (type.name().equals(DotNames.BYTE_BUFFER)) { + schemaId = generateId(type, "BYTE_BUFFER"); + produceSyntheticBeanSchema(syntheticBeanBuildItem, recorder.createByteBufferSchema(), schemaId, type); + } else { + schemaId = generateId(type, "JSON"); + produceSyntheticBeanSchema(syntheticBeanBuildItem, + recorder.createJsonSchema(recorderContext.classProxy(type.name().toString())), schemaId, type); + } + alreadyGeneratedSchema.put(type.toString(), schemaId); + } + return schemaId; + } + return null; + } + + void produceSyntheticBeanSchema(BuildProducer syntheticBeanBuildItem, + RuntimeValue> runtimeValue, + String schemaId, + Type type) { + ParameterizedType providerType = ParameterizedType.create(DotNames.PULSAR_SCHEMA, type); + syntheticBeanBuildItem.produce(SyntheticBeanBuildItem.configure(Object.class) + .providerType(providerType) + .addType(providerType) + .addQualifier().annotation(Identifier.class).addValue("value", schemaId).done() + .scope(Dependent.class) + .runtimeValue(runtimeValue) + .unremovable() + .done()); + } + + void produceSyntheticBeanSchema(BuildProducer syntheticBeanBuildItem, + Supplier> supplier, + String schemaId, + Type type) { + ParameterizedType providerType = ParameterizedType.create(DotNames.PULSAR_SCHEMA, type); + syntheticBeanBuildItem.produce(SyntheticBeanBuildItem.configure(Object.class) + .providerType(providerType) + .addType(providerType) + .addQualifier().annotation(Identifier.class).addValue("value", schemaId).done() + .scope(Dependent.class) + .supplier(supplier) + .unremovable() + .done()); + } + + String generateId(Type type, String targetType) { + String baseName = type.name().withoutPackagePrefix(); + return baseName + "_" + targetType + "Schema_" + + HashUtil.sha1(Long.toString(UUID.randomUUID().getMostSignificantBits())); + } +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/ConsumingBean.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/ConsumingBean.java new file mode 100644 index 0000000000000..b77a144550a63 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/ConsumingBean.java @@ -0,0 +1,21 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; + +@ApplicationScoped +public class ConsumingBean { + + volatile long last = -1; + + @Incoming("in") + public void consume(long content) { + last = content; + } + + public long get() { + return last; + } + +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/ProducingBean.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/ProducingBean.java new file mode 100644 index 0000000000000..26b2f9ae350d1 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/ProducingBean.java @@ -0,0 +1,25 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar; + +import java.time.Duration; +import java.util.concurrent.Flow; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +@ApplicationScoped +public class ProducingBean { + + @Outgoing("source") + public Flow.Publisher generate() { + return Multi.createFrom().range(1, 11) + .map(Integer::longValue) + .map(i -> i * 2) + .onItem() + .transformToUniAndConcatenate(l -> Uni.createFrom().item(l).onItem().delayIt().by(Duration.ofMillis(10))); + } + +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/PulsarAuthTest.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/PulsarAuthTest.java new file mode 100644 index 0000000000000..d4205ddbc3d30 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/PulsarAuthTest.java @@ -0,0 +1,28 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar; + +import static org.awaitility.Awaitility.await; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class PulsarAuthTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer( + () -> ShrinkWrap.create(JavaArchive.class) + .addClasses(ConsumingBean.class, ProducingBean.class, TestResource.class)) + .withConfigurationResource("application-secured.properties"); + + @Test + public void test() { + await().until(() -> { + String value = RestAssured.get("/last").asString(); + return value.equalsIgnoreCase("20"); + }); + } +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/TestResource.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/TestResource.java new file mode 100644 index 0000000000000..492ee53a8fdb7 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/TestResource.java @@ -0,0 +1,18 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +@Path("/last") +public class TestResource { + + @Inject + ConsumingBean bean; + + @GET + public long getLast() { + return bean.get(); + } + +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java new file mode 100644 index 0000000000000..76ae6bfd31286 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java @@ -0,0 +1,2042 @@ +package io.quarkus.smallrye.reactivemessaging.pulsar.deployment; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.groups.Tuple.tuple; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; + +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; + +import org.apache.avro.specific.AvroGenerated; +import org.apache.pulsar.client.api.Messages; +import org.apache.pulsar.client.api.Schema; +import org.assertj.core.groups.Tuple; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.spi.ConfigProviderResolver; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder; +import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; +import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; +import org.jboss.jandex.IndexView; +import org.jboss.jandex.Indexer; +import org.jboss.jandex.Type; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import io.quarkus.arc.deployment.SyntheticBeanBuildItem; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; +import io.quarkus.deployment.recording.RecorderContext; +import io.quarkus.pulsar.SchemaProviderRecorder; +import io.quarkus.runtime.RuntimeValue; +import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem; +import io.smallrye.common.annotation.Identifier; +import io.smallrye.config.SmallRyeConfigBuilder; +import io.smallrye.config.common.MapBackedConfigSource; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.MutinyEmitter; +import io.smallrye.reactive.messaging.pulsar.OutgoingMessage; +import io.smallrye.reactive.messaging.pulsar.PulsarBatchMessage; +import io.smallrye.reactive.messaging.pulsar.PulsarMessage; +import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; + +public class DefaultSchemaConfigTest { + private static void doTest(Tuple[] expectations, Class>... classesToIndex) { + doTest(null, expectations, Map.of(), classesToIndex); + } + + private static void doTest(Tuple[] expectations, Map generatedSchemas, Class>... classesToIndex) { + doTest(null, expectations, generatedSchemas, classesToIndex); + } + + private static void doTest(Config customConfig, Tuple[] expectations, Map generatedSchemas, + Class>... classesToIndex) { + List syntheticBeans = new ArrayList<>(); + List configs = new ArrayList<>(); + + List > classes = new ArrayList<>(Arrays.asList(classesToIndex)); + classes.add(Incoming.class); + DefaultSchemaDiscoveryState discovery = new DefaultSchemaDiscoveryState(index(classes)) { + @Override + Config getConfig() { + return customConfig != null ? customConfig : super.getConfig(); + } + + @Override + boolean isPulsarConnector(List list, boolean incoming, String channelName) { + return true; + } + }; + RecorderContext rcMock = Mockito.mock(RecorderContext.class); + Mockito.when(rcMock.classProxy(Mockito.anyString())).thenAnswer(a -> Class.forName(a.getArgument(0))); + SyntheticBeanBuilder syntheticBean = new SyntheticBeanBuilder(syntheticBeans::add, + new SchemaProviderRecorder(), rcMock) { + @Override + void produceSyntheticBeanSchema(BuildProducer syntheticBeanBuildItem, + RuntimeValue> runtimeValue, String schemaId, Type type) { + // no-op + } + + @Override + void produceSyntheticBeanSchema(BuildProducer syntheticBeanBuildItem, + Supplier> supplier, String schemaId, Type type) { + // no-op + } + + @Override + String generateId(Type type, String targetType) { + // remove the random bits + return type.name().withoutPackagePrefix() + targetType + "Schema"; + } + }; + + try { + new PulsarSchemaDiscoveryProcessor().discoverDefaultSerdeConfig(discovery, + Collections.emptyList(), + configs::add, syntheticBean); + + assertThat(configs) + .extracting(RunTimeConfigurationDefaultBuildItem::getKey, RunTimeConfigurationDefaultBuildItem::getValue) + .containsExactlyInAnyOrder(expectations); + + assertThat(syntheticBean.alreadyGeneratedSchema).containsExactlyInAnyOrderEntriesOf(generatedSchemas); + } finally { + // must not leak the Config instance associated to the system classloader + if (customConfig == null) { + ConfigProviderResolver.instance().releaseConfig(discovery.getConfig()); + } + } + } + + private static IndexView index(List > classes) { + Indexer indexer = new Indexer(); + for (Class> clazz : classes) { + try { + try (InputStream stream = DefaultSchemaConfigTest.class.getClassLoader() + .getResourceAsStream(clazz.getName().replace('.', '/') + ".class")) { + indexer.index(stream); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return indexer.complete(); + } + + // --- + + @Test + public void stringInLongOut() { + // @formatter:off + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.schema", "INT64"), + tuple("mp.messaging.outgoing.channel2.schema", "INT64"), + tuple("mp.messaging.outgoing.channel3.schema", "INT64"), + tuple("mp.messaging.outgoing.channel4.schema", "INT64"), + tuple("mp.messaging.outgoing.channel5.schema", "INT64"), + tuple("mp.messaging.outgoing.channel6.schema", "INT64"), + tuple("mp.messaging.outgoing.channel7.schema", "INT64"), + tuple("mp.messaging.outgoing.channel8.schema", "INT64"), + tuple("mp.messaging.outgoing.channel9.schema", "INT64"), + tuple("mp.messaging.outgoing.channel10.schema", "INT64"), + tuple("mp.messaging.outgoing.channel11.schema", "INT64"), + tuple("mp.messaging.outgoing.channel12.schema", "INT64"), + + tuple("mp.messaging.incoming.channel13.schema", "STRING"), + tuple("mp.messaging.incoming.channel14.schema", "STRING"), + tuple("mp.messaging.incoming.channel15.schema", "STRING"), + tuple("mp.messaging.incoming.channel16.schema", "STRING"), + tuple("mp.messaging.incoming.channel17.schema", "STRING"), + tuple("mp.messaging.incoming.channel18.schema", "STRING"), + tuple("mp.messaging.incoming.channel19.schema", "STRING"), + tuple("mp.messaging.incoming.channel20.schema", "STRING"), + tuple("mp.messaging.incoming.channel21.schema", "STRING"), + + tuple("mp.messaging.incoming.channel22.schema", "STRING"), + tuple("mp.messaging.outgoing.channel23.schema", "INT64"), + tuple("mp.messaging.incoming.channel24.schema", "STRING"), + tuple("mp.messaging.outgoing.channel25.schema", "INT64"), + tuple("mp.messaging.incoming.channel26.schema", "STRING"), + tuple("mp.messaging.outgoing.channel27.schema", "INT64"), + tuple("mp.messaging.incoming.channel28.schema", "STRING"), + tuple("mp.messaging.outgoing.channel29.schema", "INT64"), + tuple("mp.messaging.incoming.channel30.schema", "STRING"), + tuple("mp.messaging.outgoing.channel31.schema", "INT64"), + tuple("mp.messaging.incoming.channel32.schema", "STRING"), + tuple("mp.messaging.outgoing.channel33.schema", "INT64"), + tuple("mp.messaging.incoming.channel34.schema", "STRING"), + tuple("mp.messaging.outgoing.channel35.schema", "INT64"), + tuple("mp.messaging.incoming.channel36.schema", "STRING"), + tuple("mp.messaging.outgoing.channel37.schema", "INT64"), + tuple("mp.messaging.incoming.channel38.schema", "STRING"), + tuple("mp.messaging.outgoing.channel39.schema", "INT64"), + tuple("mp.messaging.incoming.channel40.schema", "STRING"), + tuple("mp.messaging.outgoing.channel41.schema", "INT64"), + tuple("mp.messaging.incoming.channel42.schema", "STRING"), + tuple("mp.messaging.outgoing.channel43.schema", "INT64"), + tuple("mp.messaging.incoming.channel44.schema", "STRING"), + tuple("mp.messaging.outgoing.channel45.schema", "INT64"), + tuple("mp.messaging.incoming.channel46.schema", "STRING"), + tuple("mp.messaging.outgoing.channel47.schema", "INT64"), + tuple("mp.messaging.incoming.channel48.schema", "STRING"), + tuple("mp.messaging.outgoing.channel49.schema", "INT64"), + tuple("mp.messaging.incoming.channel50.schema", "STRING"), + tuple("mp.messaging.outgoing.channel51.schema", "INT64"), + tuple("mp.messaging.incoming.channel52.schema", "STRING"), + tuple("mp.messaging.outgoing.channel53.schema", "INT64"), + + tuple("mp.messaging.incoming.channel54.schema", "STRING"), + tuple("mp.messaging.outgoing.channel55.schema", "INT64"), + tuple("mp.messaging.incoming.channel56.schema", "STRING"), + tuple("mp.messaging.outgoing.channel57.schema", "INT64"), + tuple("mp.messaging.incoming.channel58.schema", "STRING"), + tuple("mp.messaging.outgoing.channel59.schema", "INT64"), + tuple("mp.messaging.incoming.channel60.schema", "STRING"), + tuple("mp.messaging.outgoing.channel61.schema", "INT64"), + tuple("mp.messaging.incoming.channel62.schema", "STRING"), + tuple("mp.messaging.outgoing.channel63.schema", "INT64"), + tuple("mp.messaging.incoming.channel64.schema", "STRING"), + tuple("mp.messaging.outgoing.channel65.schema", "INT64"), + }; + // @formatter:on + + doTest(expectations, StringInLongOut.class); + } + + private static class StringInLongOut { + // @Outgoing + + @Outgoing("channel1") + Publisher > method1() { + return null; + } + + @Outgoing("channel2") + Publisher method2() { + return null; + } + + @Outgoing("channel3") + PublisherBuilder > method3() { + return null; + } + + @Outgoing("channel4") + PublisherBuilder method4() { + return null; + } + + @Outgoing("channel5") + Multi > method5() { + return null; + } + + @Outgoing("channel6") + Multi method6() { + return null; + } + + @Outgoing("channel7") + Message method7() { + return null; + } + + @Outgoing("channel8") + Long method8() { + return null; + } + + @Outgoing("channel9") + CompletionStage > method9() { + return null; + } + + @Outgoing("channel10") + CompletionStage method10() { + return null; + } + + @Outgoing("channel11") + Uni > method11() { + return null; + } + + @Outgoing("channel12") + Uni method12() { + return null; + } + + // @Incoming + + @Incoming("channel13") + Subscriber > method13() { + return null; + } + + @Incoming("channel14") + Subscriber method14() { + return null; + } + + @Incoming("channel15") + SubscriberBuilder , Void> method15() { + return null; + } + + @Incoming("channel16") + SubscriberBuilder method16() { + return null; + } + + @Incoming("channel17") + void method17(String msg) { + } + + @Incoming("channel18") + CompletionStage> method18(Message msg) { + return null; + } + + @Incoming("channel19") + CompletionStage> method19(String payload) { + return null; + } + + @Incoming("channel20") + Uni> method20(Message msg) { + return null; + } + + @Incoming("channel21") + Uni> method21(String payload) { + return null; + } + + // @Incoming @Outgoing + + @Incoming("channel22") + @Outgoing("channel23") + Processor , Message > method22() { + return null; + } + + @Incoming("channel24") + @Outgoing("channel25") + Processor method23() { + return null; + } + + @Incoming("channel26") + @Outgoing("channel27") + ProcessorBuilder , Message > method24() { + return null; + } + + @Incoming("channel28") + @Outgoing("channel29") + ProcessorBuilder method25() { + return null; + } + + @Incoming("channel30") + @Outgoing("channel31") + Publisher > method26(Message msg) { + return null; + } + + @Incoming("channel32") + @Outgoing("channel33") + Publisher method27(String payload) { + return null; + } + + @Incoming("channel34") + @Outgoing("channel35") + PublisherBuilder > method28(Message msg) { + return null; + } + + @Incoming("channel36") + @Outgoing("channel37") + PublisherBuilder method29(String payload) { + return null; + } + + @Incoming("channel38") + @Outgoing("channel39") + Multi > method30(Message msg) { + return null; + } + + @Incoming("channel40") + @Outgoing("channel41") + Multi method31(String payload) { + return null; + } + + @Incoming("channel42") + @Outgoing("channel43") + Message method32(Message msg) { + return null; + } + + @Incoming("channel44") + @Outgoing("channel45") + Long method33(String payload) { + return null; + } + + @Incoming("channel46") + @Outgoing("channel47") + CompletionStage > method34(Message msg) { + return null; + } + + @Incoming("channel48") + @Outgoing("channel49") + CompletionStage method35(String payload) { + return null; + } + + @Incoming("channel50") + @Outgoing("channel51") + Uni > method36(Message msg) { + return null; + } + + @Incoming("channel52") + @Outgoing("channel53") + Uni method37(String payload) { + return null; + } + + // @Incoming @Outgoing stream manipulation + + @Incoming("channel54") + @Outgoing("channel55") + Publisher > method38(Publisher > msg) { + return null; + } + + @Incoming("channel56") + @Outgoing("channel57") + Publisher method39(Publisher payload) { + return null; + } + + @Incoming("channel58") + @Outgoing("channel59") + PublisherBuilder > method40(PublisherBuilder > msg) { + return null; + } + + @Incoming("channel60") + @Outgoing("channel61") + PublisherBuilder method41(PublisherBuilder payload) { + return null; + } + + @Incoming("channel62") + @Outgoing("channel63") + Multi > method42(Multi > msg) { + return null; + } + + @Incoming("channel64") + @Outgoing("channel65") + Multi method43(Multi payload) { + return null; + } + } + + // --- + + @Test + public void byteArrayInAvroDtoOut() { + // @formatter:off + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.outgoing.channel2.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.outgoing.channel3.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.outgoing.channel4.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.outgoing.channel5.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.outgoing.channel6.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.outgoing.channel7.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.outgoing.channel8.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.outgoing.channel9.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.outgoing.channel10.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.outgoing.channel11.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.outgoing.channel12.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + + tuple("mp.messaging.incoming.channel13.schema", "BYTES"), + tuple("mp.messaging.incoming.channel14.schema", "BYTES"), + tuple("mp.messaging.incoming.channel15.schema", "BYTES"), + tuple("mp.messaging.incoming.channel16.schema", "BYTES"), + tuple("mp.messaging.incoming.channel17.schema", "BYTES"), + tuple("mp.messaging.incoming.channel18.schema", "BYTES"), + tuple("mp.messaging.incoming.channel19.schema", "BYTES"), + tuple("mp.messaging.incoming.channel20.schema", "BYTES"), + tuple("mp.messaging.incoming.channel21.schema", "BYTES"), + + tuple("mp.messaging.incoming.channel22.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel23.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel24.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel25.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel26.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel27.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel28.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel29.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel30.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel31.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel32.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel33.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel34.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel35.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel36.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel37.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel38.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel39.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel40.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel41.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel42.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel43.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel44.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel45.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel46.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel47.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel48.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel49.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel50.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel51.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel52.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel53.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + + tuple("mp.messaging.incoming.channel54.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel55.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel56.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel57.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel58.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel59.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel60.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel61.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel62.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel63.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + tuple("mp.messaging.incoming.channel64.schema", "BYTES"), + tuple("mp.messaging.outgoing.channel65.schema", "DefaultSchemaConfigTest$AvroDtoAVROSchema"), + }; + Map generatedSchemas = Map.of( + "io.quarkus.smallrye.reactivemessaging.pulsar.deployment.DefaultSchemaConfigTest$AvroDto", + "DefaultSchemaConfigTest$AvroDtoAVROSchema" + ); + // @formatter:on + + doTest(expectations, generatedSchemas, AvroDto.class, ByteArrayInAvroDtoOut.class); + } + + // simulating an Avro-generated class, the autodetection code only looks for this annotation + @AvroGenerated + private static class AvroDto { + } + + private static class ByteArrayInAvroDtoOut { + // @Outgoing + + @Outgoing("channel1") + Publisher > method1() { + return null; + } + + @Outgoing("channel2") + Publisher method2() { + return null; + } + + @Outgoing("channel3") + PublisherBuilder > method3() { + return null; + } + + @Outgoing("channel4") + PublisherBuilder method4() { + return null; + } + + @Outgoing("channel5") + Multi > method5() { + return null; + } + + @Outgoing("channel6") + Multi method6() { + return null; + } + + @Outgoing("channel7") + Message method7() { + return null; + } + + @Outgoing("channel8") + AvroDto method8() { + return null; + } + + @Outgoing("channel9") + CompletionStage > method9() { + return null; + } + + @Outgoing("channel10") + CompletionStage method10() { + return null; + } + + @Outgoing("channel11") + Uni > method11() { + return null; + } + + @Outgoing("channel12") + Uni method12() { + return null; + } + + // @Incoming + + @Incoming("channel13") + Subscriber > method13() { + return null; + } + + @Incoming("channel14") + Subscriber method14() { + return null; + } + + @Incoming("channel15") + SubscriberBuilder , Void> method15() { + return null; + } + + @Incoming("channel16") + SubscriberBuilder method16() { + return null; + } + + @Incoming("channel17") + void method17(byte[] msg) { + } + + @Incoming("channel18") + CompletionStage> method18(Message msg) { + return null; + } + + @Incoming("channel19") + CompletionStage> method19(byte[] payload) { + return null; + } + + @Incoming("channel20") + Uni> method20(Message msg) { + return null; + } + + @Incoming("channel21") + Uni> method21(byte[] payload) { + return null; + } + + // @Incoming @Outgoing + + @Incoming("channel22") + @Outgoing("channel23") + Processor , Message > method22() { + return null; + } + + @Incoming("channel24") + @Outgoing("channel25") + Processor method23() { + return null; + } + + @Incoming("channel26") + @Outgoing("channel27") + ProcessorBuilder , Message > method24() { + return null; + } + + @Incoming("channel28") + @Outgoing("channel29") + ProcessorBuilder method25() { + return null; + } + + @Incoming("channel30") + @Outgoing("channel31") + Publisher > method26(Message msg) { + return null; + } + + @Incoming("channel32") + @Outgoing("channel33") + Publisher method27(byte[] payload) { + return null; + } + + @Incoming("channel34") + @Outgoing("channel35") + PublisherBuilder > method28(Message msg) { + return null; + } + + @Incoming("channel36") + @Outgoing("channel37") + PublisherBuilder method29(byte[] payload) { + return null; + } + + @Incoming("channel38") + @Outgoing("channel39") + Multi > method30(Message msg) { + return null; + } + + @Incoming("channel40") + @Outgoing("channel41") + Multi method31(byte[] payload) { + return null; + } + + @Incoming("channel42") + @Outgoing("channel43") + Message method32(Message msg) { + return null; + } + + @Incoming("channel44") + @Outgoing("channel45") + AvroDto method33(byte[] payload) { + return null; + } + + @Incoming("channel46") + @Outgoing("channel47") + CompletionStage > method34(Message msg) { + return null; + } + + @Incoming("channel48") + @Outgoing("channel49") + CompletionStage method35(byte[] payload) { + return null; + } + + @Incoming("channel50") + @Outgoing("channel51") + Uni > method36(Message msg) { + return null; + } + + @Incoming("channel52") + @Outgoing("channel53") + Uni method37(byte[] payload) { + return null; + } + + // @Incoming @Outgoing stream manipulation + + @Incoming("channel54") + @Outgoing("channel55") + Publisher > method38(Publisher > msg) { + return null; + } + + @Incoming("channel56") + @Outgoing("channel57") + Publisher method39(Publisher payload) { + return null; + } + + @Incoming("channel58") + @Outgoing("channel59") + PublisherBuilder > method40(PublisherBuilder > msg) { + return null; + } + + @Incoming("channel60") + @Outgoing("channel61") + PublisherBuilder method41(PublisherBuilder payload) { + return null; + } + + @Incoming("channel62") + @Outgoing("channel63") + Multi > method42(Multi > msg) { + return null; + } + + @Incoming("channel64") + @Outgoing("channel65") + Multi method43(Multi payload) { + return null; + } + } + + // --- + + @Test + public void jacksonDtoInVertxJsonObjectOut() { + // @formatter:off + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.outgoing.channel2.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.outgoing.channel3.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.outgoing.channel4.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.outgoing.channel5.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.outgoing.channel6.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.outgoing.channel7.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.outgoing.channel8.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.outgoing.channel9.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.outgoing.channel10.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.outgoing.channel11.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.outgoing.channel12.schema", "JsonObjectJSON_OBJECTSchema"), + + tuple("mp.messaging.incoming.channel13.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.incoming.channel14.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.incoming.channel15.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.incoming.channel16.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.incoming.channel17.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.incoming.channel18.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.incoming.channel19.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.incoming.channel20.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.incoming.channel21.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + + tuple("mp.messaging.incoming.channel22.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel23.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel24.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel25.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel26.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel27.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel28.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel29.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel30.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel31.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel32.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel33.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel34.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel35.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel36.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel37.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel38.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel39.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel40.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel41.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel42.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel43.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel44.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel45.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel46.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel47.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel48.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel49.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel50.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel51.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel52.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel53.schema", "JsonObjectJSON_OBJECTSchema"), + + tuple("mp.messaging.incoming.channel54.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel55.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel56.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel57.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel58.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel59.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel60.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel61.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel62.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel63.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel64.schema", "DefaultSchemaConfigTest$JacksonDtoJSONSchema"), + tuple("mp.messaging.outgoing.channel65.schema", "JsonObjectJSON_OBJECTSchema"), + }; + var generatedSchemas = Map.of( + "io.quarkus.smallrye.reactivemessaging.pulsar.deployment.DefaultSchemaConfigTest$JacksonDto", + "DefaultSchemaConfigTest$JacksonDtoJSONSchema", + "io.vertx.core.json.JsonObject", "JsonObjectJSON_OBJECTSchema" + ); + // @formatter:on + + doTest(expectations, generatedSchemas, JacksonDto.class, JacksonDtoInVertxJsonObjectOut.class); + + } + + static class JacksonDto { + } + + private static class JacksonDtoInVertxJsonObjectOut { + // @Outgoing + + @Outgoing("channel1") + Publisher > method1() { + return null; + } + + @Outgoing("channel2") + Publisher method2() { + return null; + } + + @Outgoing("channel3") + PublisherBuilder > method3() { + return null; + } + + @Outgoing("channel4") + PublisherBuilder