Skip to content

Commit

Permalink
Kafka Companion Consume from pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Mar 22, 2024
1 parent a083daf commit d2b5012
Showing 1 changed file with 65 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -710,6 +711,29 @@ public ConsumerTask<K, V> fromTopics(Set<String> topics,
}));
}

/**
* Create {@link ConsumerTask} for consuming records from the topics matching the given pattern.
* <p>
* The plug function is used to modify the {@link Multi} generating the records, ex. limiting the number of records
* produced.
* <p>
* The task will run until a failure occurs or is explicitly stopped (subscription to the {@link Multi} cancelled).
*
* @param topicsPattern Pattern to subscribe to
* @param plugFunction the function to apply to the resulting multi
* @return the {@link ConsumerTask}
*/
public ConsumerTask<K, V> fromTopics(Pattern topicsPattern,
Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> plugFunction) {
return new ConsumerTask<>(Multi.createFrom().deferred(() -> {
if (!polling.compareAndSet(false, true)) {
return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
}
getOrCreateConsumer().subscribe(topicsPattern, this);
return getConsumeMulti().plug(plugFunction);
}));
}

/**
* Create {@link ConsumerTask} for consuming records from the given topic.
* <p>
Expand Down Expand Up @@ -742,6 +766,19 @@ public ConsumerTask<K, V> fromTopics(String... topics) {
return fromTopics(new HashSet<>(Arrays.asList(topics)));
}

/**
* Create {@link ConsumerTask} for consuming records from the topics matching the given pattern.
* <p>
* The resulting {@link ConsumerTask} will be already started.
* The task will run until a failure occurs or is explicitly stopped (subscription to the {@link Multi} cancelled).
*
* @param topicsPattern Pattern to subscribe to
* @return the {@link ConsumerTask}
*/
public ConsumerTask<K, V> fromTopics(Pattern topicsPattern) {
return fromTopics(topicsPattern, Function.identity());
}

/**
* Create {@link ConsumerTask} for consuming records from the given topics.
* <p>
Expand Down Expand Up @@ -769,6 +806,20 @@ public ConsumerTask<K, V> fromTopics(Set<String> topics, long numberOfRecords) {
return fromTopics(topics, until(numberOfRecords));
}

/**
* Create {@link ConsumerTask} for consuming records from the topics matching the given pattern.
* <p>
* The resulting {@link ConsumerTask} will be already started.
* The task will run until the given number of records consumed.
*
* @param topicsPattern Pattern to subscribe to
* @param numberOfRecords the number of records to produce
* @return the {@link ConsumerTask}
*/
public ConsumerTask<K, V> fromTopics(Pattern topicsPattern, long numberOfRecords) {
return fromTopics(topicsPattern, until(numberOfRecords));
}

/**
* Create {@link ConsumerTask} for consuming records from the given topics.
* <p>
Expand All @@ -783,6 +834,20 @@ public ConsumerTask<K, V> fromTopics(Set<String> topics, Duration during) {
return fromTopics(topics, until(during));
}

/**
* Create {@link ConsumerTask} for consuming records from the topics matching the given pattern.
* <p>
* The resulting {@link ConsumerTask} will be already started.
* The task will run during the given duration.
*
* @param topicsPattern Pattern to subscribe to
* @param during the duration of the producing task to run
* @return the {@link ConsumerTask}
*/
public ConsumerTask<K, V> fromTopics(Pattern topicsPattern, Duration during) {
return fromTopics(topicsPattern, until(during));
}

/**
* Create {@link ConsumerTask} for consuming records from the given topic.
* <p>
Expand Down

0 comments on commit d2b5012

Please sign in to comment.