diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerBuilder.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerBuilder.java index 857b45b6d..77b0b6b3f 100644 --- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerBuilder.java +++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/ConsumerBuilder.java @@ -19,6 +19,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -710,6 +711,29 @@ public ConsumerTask fromTopics(Set topics, })); } + /** + * Create {@link ConsumerTask} for consuming records from the topics matching the given pattern. + *

+ * The plug function is used to modify the {@link Multi} generating the records, ex. limiting the number of records + * produced. + *

+ * The task will run until a failure occurs or is explicitly stopped (subscription to the {@link Multi} cancelled). + * + * @param topicsPattern Pattern to subscribe to + * @param plugFunction the function to apply to the resulting multi + * @return the {@link ConsumerTask} + */ + public ConsumerTask fromTopics(Pattern topicsPattern, + Function>, Multi>> plugFunction) { + return new ConsumerTask<>(Multi.createFrom().deferred(() -> { + if (!polling.compareAndSet(false, true)) { + return Multi.createFrom().failure(new IllegalStateException("Consumer already in use")); + } + getOrCreateConsumer().subscribe(topicsPattern, this); + return getConsumeMulti().plug(plugFunction); + })); + } + /** * Create {@link ConsumerTask} for consuming records from the given topic. *

@@ -742,6 +766,19 @@ public ConsumerTask fromTopics(String... topics) { return fromTopics(new HashSet<>(Arrays.asList(topics))); } + /** + * Create {@link ConsumerTask} for consuming records from the topics matching the given pattern. + *

+ * The resulting {@link ConsumerTask} will be already started. + * The task will run until a failure occurs or is explicitly stopped (subscription to the {@link Multi} cancelled). + * + * @param topicsPattern Pattern to subscribe to + * @return the {@link ConsumerTask} + */ + public ConsumerTask fromTopics(Pattern topicsPattern) { + return fromTopics(topicsPattern, Function.identity()); + } + /** * Create {@link ConsumerTask} for consuming records from the given topics. *

@@ -769,6 +806,20 @@ public ConsumerTask fromTopics(Set topics, long numberOfRecords) { return fromTopics(topics, until(numberOfRecords)); } + /** + * Create {@link ConsumerTask} for consuming records from the topics matching the given pattern. + *

+ * The resulting {@link ConsumerTask} will be already started. + * The task will run until the given number of records consumed. + * + * @param topicsPattern Pattern to subscribe to + * @param numberOfRecords the number of records to produce + * @return the {@link ConsumerTask} + */ + public ConsumerTask fromTopics(Pattern topicsPattern, long numberOfRecords) { + return fromTopics(topicsPattern, until(numberOfRecords)); + } + /** * Create {@link ConsumerTask} for consuming records from the given topics. *

@@ -783,6 +834,20 @@ public ConsumerTask fromTopics(Set topics, Duration during) { return fromTopics(topics, until(during)); } + /** + * Create {@link ConsumerTask} for consuming records from the topics matching the given pattern. + *

+ * The resulting {@link ConsumerTask} will be already started. + * The task will run during the given duration. + * + * @param topicsPattern Pattern to subscribe to + * @param during the duration of the producing task to run + * @return the {@link ConsumerTask} + */ + public ConsumerTask fromTopics(Pattern topicsPattern, Duration during) { + return fromTopics(topicsPattern, until(during)); + } + /** * Create {@link ConsumerTask} for consuming records from the given topic. *