Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try to transform throttle into a reusable transformation #3409

Closed
wants to merge 6 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,13 @@ public Mono<Task.Result> fixMessageInconsistencies(Context context, RunningOptio
}

private Flux<Task.Result> fixInconsistenciesInImapUid(Context context, RunningOptions runningOptions) {
return ReactorUtils.Throttler.forOperation(this::detectInconsistencyInImapUid)
.window(runningOptions.getMessagesPerSecond(), PERIOD)
.throttle(messageIdToImapUidDAO.retrieveAllMessages())
.doOnNext(any -> context.incrementProcessedImapUidEntries())
.flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
return messageIdToImapUidDAO.retrieveAllMessages()
.transform(ReactorUtils.<ComposedMessageIdWithMetaData, Task.Result>throttle()
.elements(runningOptions.getMessagesPerSecond())
.per(PERIOD)
.forOperation(metaData -> detectInconsistencyInImapUid(metaData)
.doOnNext(any -> context.incrementProcessedImapUidEntries())
.flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO))));
}

private Mono<Inconsistency> detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) {
Expand Down Expand Up @@ -469,11 +471,13 @@ private Mono<Inconsistency> detectOrphanImapUidEntry(ComposedMessageIdWithMetaDa
}

private Flux<Task.Result> fixInconsistenciesInMessageId(Context context, RunningOptions runningOptions) {
return ReactorUtils.Throttler.forOperation(this::detectInconsistencyInMessageId)
.window(runningOptions.getMessagesPerSecond(), PERIOD)
.throttle(messageIdDAO.retrieveAllMessages())
.doOnNext(any -> context.incrementMessageIdEntries())
.flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
return messageIdDAO.retrieveAllMessages()
.transform(ReactorUtils.<ComposedMessageIdWithMetaData, Task.Result>throttle()
.elements(runningOptions.getMessagesPerSecond())
.per(PERIOD)
.forOperation(metadata -> detectInconsistencyInMessageId(metadata)
.doOnNext(any -> context.incrementMessageIdEntries())
.flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO))));
}

private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ private Flux<Either<Failure, ReIndexingEntry>> reIndexingEntryForUid(Mailbox mai
}

private Mono<Task.Result> reIndexMessages(Flux<Either<Failure, ReIndexingEntry>> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) {
return ReactorUtils.Throttler.<Either<Failure, ReIndexingEntry>, Task.Result>forOperation(
entry -> reIndex(entry, reprocessingContext))
.window(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1))
.throttle(entriesToIndex)
return entriesToIndex.transform(ReactorUtils.<Either<Failure, ReIndexingEntry>, Task.Result>throttle()
.elements(runningOptions.getMessagesPerSecond())
.per(Duration.ofSeconds(1))
.forOperation(entry -> reIndex(entry, reprocessingContext)))
.reduce(Task::combine)
.switchIfEmpty(Mono.just(Result.COMPLETED));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RecomputeCurrentQuotasService {
Expand Down Expand Up @@ -164,10 +163,11 @@ public RecomputeCurrentQuotasService(UsersRepository usersRepository,

public Mono<Task.Result> recomputeCurrentQuotas(Context context, RunningOptions runningOptions) {
try {
Flux<Username> users = Iterators.toFlux(usersRepository.list());
return ReactorUtils.Throttler.<Username, Task.Result>forOperation(username -> recomputeUserCurrentQuotas(context, username))
.window(runningOptions.getUsersPerSecond(), Duration.ofSeconds(1))
.throttle(users)
return Iterators.toFlux(usersRepository.list())
.transform(ReactorUtils.<Username, Task.Result>throttle()
.elements(runningOptions.getUsersPerSecond())
.per(Duration.ofSeconds(1))
.forOperation(username -> recomputeUserCurrentQuotas(context, username)))
.reduce(Task.Result.COMPLETED, Task::combine);
} catch (UsersRepositoryException e) {
LOGGER.error("Error while accessing users from repository", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,38 +44,35 @@ public class ReactorUtils {

public static final String MDC_KEY_PREFIX = "MDC-";

public static class Throttler<T, U> {
private static final Duration DELAY = Duration.ZERO;

public static <T, U> RequiresWindowingParameters<T, U> forOperation(Function<T, Publisher<U>> operation) {
return (maxSize, duration) -> new Throttler<>(operation, maxSize, duration);
}

@FunctionalInterface
public interface RequiresWindowingParameters<T, U> {
Throttler<T, U> window(int maxSize, Duration duration);
}

private Throttler(Function<T, Publisher<U>> operation, int windowMaxSize, Duration windowDuration) {
Preconditions.checkArgument(windowMaxSize > 0, "'windowMaxSize' must be strictly positive");
Preconditions.checkArgument(!windowDuration.isNegative(), "'windowDuration' must be strictly positive");
Preconditions.checkArgument(!windowDuration.isZero(), "'windowDuration' must be strictly positive");
private static final Duration DELAY = Duration.ZERO;

public static <T, U> RequiresQuantity<T, U> throttle() {
return elements -> duration -> operation -> {
Preconditions.checkArgument(elements > 0, "'windowMaxSize' must be strictly positive");
Preconditions.checkArgument(!duration.isNegative(), "'windowDuration' must be strictly positive");
Preconditions.checkArgument(!duration.isZero(), "'windowDuration' must be strictly positive");

return flux -> flux
.windowTimeout(elements, duration)
.zipWith(Flux.interval(DELAY, duration))
.flatMap(Tuple2::getT1, elements, elements)
.flatMap(operation, elements);
};
}

this.operation = operation;
this.windowMaxSize = windowMaxSize;
this.windowDuration = windowDuration;
}
@FunctionalInterface
public interface RequiresQuantity<T, U> {
RequiresPeriod<T, U> elements(int maxSize);
}

private final Function<T, Publisher<U>> operation;
private final int windowMaxSize;
private final Duration windowDuration;
@FunctionalInterface
public interface RequiresPeriod<T, U> {
RequiresOperation<T, U> per(Duration duration);
}

public Flux<U> throttle(Flux<T> flux) {
return flux.windowTimeout(windowMaxSize, windowDuration)
.zipWith(Flux.interval(DELAY, windowDuration))
.flatMap(Tuple2::getT1)
.flatMap(operation, windowMaxSize);
}
@FunctionalInterface
public interface RequiresOperation<T, U> {
Function<Flux<T>, Flux<U>> forOperation(Function<T, Publisher<U>> operation);
}

public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.james.util;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.ByteArrayInputStream;
Expand All @@ -34,7 +35,6 @@

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.james.util.ReactorUtils.Throttler;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
Expand All @@ -56,29 +56,37 @@ class ReactorUtilsTest {
class Throttling {
@Test
void windowShouldThrowWhenMaxSizeIsNegative() {
assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
.window(-1, Duration.ofSeconds(1)))
assertThatThrownBy(() -> ReactorUtils.<Integer, Integer>throttle()
.elements(-1)
.per(Duration.ofSeconds(1))
.forOperation(Mono::just))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void windowShouldThrowWhenMaxSizeIsZero() {
assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
.window(0, Duration.ofSeconds(1)))
assertThatThrownBy(() -> ReactorUtils.throttle()
.elements(0)
.per(Duration.ofSeconds(1))
.forOperation(Mono::just))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void windowShouldThrowWhenDurationIsNegative() {
assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
.window(1, Duration.ofSeconds(-1)))
assertThatThrownBy(() -> ReactorUtils.throttle()
.elements(1)
.per(Duration.ofSeconds(-1))
.forOperation(Mono::just))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void windowShouldThrowWhenDurationIsZero() {
assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
.window(1, Duration.ZERO))
assertThatThrownBy(() -> ReactorUtils.throttle()
.elements(1)
.per(Duration.ofSeconds(0))
.forOperation(Mono::just))
.isInstanceOf(IllegalArgumentException.class);
}

Expand All @@ -89,20 +97,36 @@ void throttleShouldApplyMaxSize() {

Stopwatch stopwatch = Stopwatch.createUnstarted();

Flux<Integer> originalFlux = Flux.range(0, 10);
ImmutableList<Long> windowMembership = Throttler.<Integer, Long>forOperation(
i -> Mono.fromCallable(() -> stopwatch.elapsed(TimeUnit.MILLISECONDS)))
.window(windowMaxSize, windowDuration)
.throttle(originalFlux)
.doOnSubscribe(signal -> stopwatch.start())
ImmutableList<Long> windowMembership = Flux.range(0, 10)
.transform(ReactorUtils.<Integer, Long>throttle()
.elements(windowMaxSize)
.per(windowDuration)
.forOperation(i -> Mono.fromCallable(() -> stopwatch.elapsed(TimeUnit.MILLISECONDS))))
.map(i -> i / 100)
.doOnSubscribe(signal -> stopwatch.start())
.collect(Guavate.toImmutableList())
.block();

assertThat(windowMembership)
.containsExactly(0L, 0L, 0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L);
}

@Test
void largeWindowShouldNotOverrunIntermediateBuffers() {
// windowMaxSize exceeds Queues.SMALL_BUFFER_SIZE & Queues.SMALL_BUFFER_SIZE (256 by default)
// Combined with slow operations, this ensures we are not filling up intermediate buffers.
int windowMaxSize = 3_000;
Duration windowDuration = Duration.ofMillis(100);

assertThatCode(() -> Flux.range(0, 10_000)
.transform(ReactorUtils.<Integer, Long>throttle()
.elements(windowMaxSize)
.per(windowDuration)
.forOperation(i -> Mono.delay(windowDuration.multipliedBy(2))))
.blockLast())
.doesNotThrowAnyException();
}

@Test
void throttleDownStreamConcurrencyShouldNotExceedWindowMaxSize() {
int windowMaxSize = 3;
Expand All @@ -116,9 +140,11 @@ void throttleDownStreamConcurrencyShouldNotExceedWindowMaxSize() {
.flatMap(i -> Mono.delay(windowDuration.multipliedBy(2)).thenReturn(i))
.flatMap(i -> Mono.fromRunnable(ongoingProcessing::decrementAndGet).thenReturn(i));

ImmutableList<Integer> ongoingProcessingUponComputationStart = Throttler.forOperation(longRunningOperation)
.window(windowMaxSize, windowDuration)
.throttle(originalFlux)
ImmutableList<Integer> ongoingProcessingUponComputationStart = originalFlux
.transform(ReactorUtils.<Integer, Integer>throttle()
.elements(windowMaxSize)
.per(windowDuration)
.forOperation(longRunningOperation))
.collect(Guavate.toImmutableList())
.block();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,10 @@ private Mono<Result> correctProjection(ProjectionEntry entry, Progress progress)
}

private Mono<Result> correctProjection(Flux<ProjectionEntry> entries, RunningOptions runningOptions, Progress progress) {
return ReactorUtils.Throttler.<ProjectionEntry, Result>forOperation(entry -> correctProjection(entry, progress))
.window(runningOptions.getMessagesPerSecond(), PERIOD)
.throttle(entries)
return entries.transform(ReactorUtils.<ProjectionEntry, Task.Result>throttle()
.elements(runningOptions.getMessagesPerSecond())
.per(PERIOD)
.forOperation(entry -> correctProjection(entry, progress)))
.reduce(Task::combine)
.switchIfEmpty(Mono.just(Result.COMPLETED));
}
Expand Down