Skip to content

Commit

Permalink
Throttle and Debounce implementations (#22)
Browse files Browse the repository at this point in the history
* Throttle and Debounce implementations

* Comments
  • Loading branch information
tginsberg authored Aug 22, 2024
1 parent 5a4d9fb commit dc0bbdc
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 12 deletions.
41 changes: 29 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,20 @@ implementation("com.ginsberg:gatherers4j:0.1.0")
# Gatherers In This Library

### Streams
| Function | Purpose |
|---------------------------|-----------------------------------------------------------------------------------------------------------------------|
| `concat(stream)` | Creates a stream which is the concatenation of the source stream and the given stream, which must be of the same type |
| `dedupeConsecutive()` | Remove consecutive duplicates from a stream |
| `dedupeConsecutiveBy(fn)` | Remove consecutive duplicates from a stream as returned by `fn` |
| `distinctBy(fn)` | Emit only distinct elements from the stream, as measured by `fn` |
| `interleave(stream)` | Creates a stream of alternating objects from the input stream and the argument stream |
| `last(n)` | Constrain the stream to the last `n` values |
| `withIndex()` | Maps all elements of the stream as-is along with their 0-based index |
| `withIndexStartingAt(n)` | Maps all elements of the stream as-is along with an index starting at the number specified |
| `zipWith(stream)` | Creates a stream of `Pair` objects whose values come from the input stream and argument stream |
| `zipWithNext()` | Creates a stream of `List` objects via a sliding window of width 2 and stepping 1 |
| Function | Purpose |
|------------------------------|-----------------------------------------------------------------------------------------------------------------------|
| `concat(stream)` | Creates a stream which is the concatenation of the source stream and the given stream, which must be of the same type |
| `debounce(amount, duration)` | Limit stream elements to `amount` elements over `duration`, dropping any elements over the limit until a new `duration` starts |
| `dedupeConsecutive()` | Remove consecutive duplicates from a stream |
| `dedupeConsecutiveBy(fn)` | Remove consecutive duplicates from a stream as returned by `fn` |
| `distinctBy(fn)` | Emit only distinct elements from the stream, as measured by `fn` |
| `interleave(stream)` | Creates a stream of alternating objects from the input stream and the argument stream |
| `last(n)` | Constrain the stream to the last `n` values |
| `throttle(amount, duration)` | Limit stream elements to `amount` elements over `duration`, pausing until a new `duration` period starts |
| `withIndex()` | Maps all elements of the stream as-is along with their 0-based index |
| `withIndexStartingAt(n)` | Maps all elements of the stream as-is along with an index starting at the number specified |
| `zipWith(stream)` | Creates a stream of `Pair` objects whose values come from the input stream and argument stream |
| `zipWithNext()` | Creates a stream of `List` objects via a sliding window of width 2 and stepping 1 |

### Mathematics/Statistics
| Function | Purpose |
Expand Down Expand Up @@ -184,6 +186,21 @@ Stream
// [IndexedValue(0, "A"), IndexedValue(1, "B"), IndexedValue(2, "C")]
```

### Throttle the number of elements consumed in a period

```java
Stream
.of("A", "B", "C")
.gather(Gatherers4j.throttle(2, Duration.ofSeconds(1))) // Two per second
.toList();

// ["A", "B", "C"]
^
|
+----------- Pause
```


#### Zip two streams of together into a `Stream<Pair>`

The left and right streams can be of different types.
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/ginsberg/gatherers4j/GathererUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

public class GathererUtils {

public static long NANOS_PER_MILLISECOND = 1_000_000;

static boolean safeEquals(final Object left, final Object right) {
if (left == null && right == null) {
return true;
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/com/ginsberg/gatherers4j/Gatherers4j.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.ginsberg.gatherers4j;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Gatherer;
Expand All @@ -35,6 +36,19 @@ public static <INPUT> ConcatenationGatherer<INPUT> concat(final Stream<INPUT> co
return new ConcatenationGatherer<>(concatThis);
}

/**
* Limit the number of elements in the stream to some number per period, dropping anything over the
* limit during the period.
*
* @param amount A positive number of elements to allow per period
* @param duration A positive duration for the length of the period
* @return A non-null ThrottlingGatherer<INPUT>
* @param <INPUT> Type of elements in the stream
*/
public static <INPUT> ThrottlingGatherer<INPUT> debounce(final int amount, final Duration duration) {
return new ThrottlingGatherer<>(ThrottlingGatherer.LimitRule.Drop, amount, duration);
}

/**
* <p>Given a stream of objects, filter the objects such that any consecutively appearing
* after the first one are dropped.
Expand Down Expand Up @@ -181,6 +195,19 @@ public static <INPUT> BigDecimalSimpleMovingAverageGatherer<INPUT> simpleMovingA
return new BigDecimalSimpleMovingAverageGatherer<>(mappingFunction, windowSize);
}

/**
* Limit the number of elements in the stream to some number per period. When the limit is reached,
* consumption is paused until a new period starts and the count resets.
*
* @param amount A positive number of elements to allow per period
* @param duration A positive duration for the length of the period
* @return A non-null ThrottlingGatherer<INPUT>
* @param <INPUT> Type of elements in the stream
*/
public static <INPUT> ThrottlingGatherer<INPUT> throttle(final int amount, final Duration duration) {
return new ThrottlingGatherer<>(ThrottlingGatherer.LimitRule.Pause, amount, duration);
}

/**
* Maps all elements of the stream as-is along with their 0-based index.
*/
Expand Down
117 changes: 117 additions & 0 deletions src/main/java/com/ginsberg/gatherers4j/ThrottlingGatherer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2024 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import java.util.stream.Gatherer;

import static com.ginsberg.gatherers4j.GathererUtils.NANOS_PER_MILLISECOND;
import static com.ginsberg.gatherers4j.GathererUtils.mustNotBeNull;

public class ThrottlingGatherer<INPUT> implements Gatherer<INPUT, ThrottlingGatherer.State, INPUT> {

public enum LimitRule {
Drop,
Pause
}

private final LimitRule limitRule;
private final Duration duration;
private final int allowed;
private Clock clock = Clock.systemUTC();

ThrottlingGatherer(final LimitRule limitRule, final int allowed, final Duration duration) {
mustNotBeNull(duration, "Duration must not be null");
if (duration.toMillis() < 1) {
throw new IllegalArgumentException("Minimum duration is 1ms");
}
if (allowed <= 0) {
throw new IllegalArgumentException("Allowed must be positive");
}
this.limitRule = limitRule == null ? LimitRule.Pause : limitRule;
this.duration = duration;
this.allowed = allowed;
}

public ThrottlingGatherer<INPUT> withClock(final Clock clock) {
mustNotBeNull(clock, "Clock must not be null");
this.clock = clock;
return this;
}

@Override
public Supplier<State> initializer() {
return () -> new State(limitRule, duration, allowed, clock);
}

@Override
public Integrator<State, INPUT, INPUT> integrator() {
return (state, element, downstream) -> {
if (!downstream.isRejecting() && state.attempt()) {
downstream.push(element);
}
return !downstream.isRejecting();
};
}

public static class State {
final int allowedPerPeriod;
final long periodDurationMillis;
final LimitRule limitRule;
final Clock clock;
long thisPeriodEnd;
int remainingPermits;

State(final LimitRule limitRule, final Duration duration, final int allowed, final Clock clock) {
this.limitRule = limitRule;
this.allowedPerPeriod = allowed;
this.periodDurationMillis = duration.toMillis();
this.clock = clock;
resetPeriod();
}

private void resetPeriod() {
thisPeriodEnd = clock.millis() + periodDurationMillis;
remainingPermits = allowedPerPeriod;
}

// Assuming this is not run in parallel. Gate with a lock if that assumption fails/changes.
boolean attempt() {
final long now = clock.millis();
if(now < thisPeriodEnd) {
// The current period has not ended
if(remainingPermits == 0) {
if(limitRule == LimitRule.Drop) {
return false;
}
// Wait until next period, reset counters, fall through to take permit.
LockSupport.parkNanos((thisPeriodEnd - now) * NANOS_PER_MILLISECOND);
resetPeriod();
}
} else {
// We're in a new period, reset the counters
// and fall through to take permit.
resetPeriod();
}
remainingPermits--;
return true;
}
}
}
164 changes: 164 additions & 0 deletions src/test/java/com/ginsberg/gatherers4j/ThrottlingGathererTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright 2024 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import org.junit.jupiter.api.Test;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class ThrottlingGathererTest {

@Test
void amountIsNegative() {
assertThatThrownBy(() ->
Stream.of("A").gather(Gatherers4j.throttle(-1, Duration.ofSeconds(1)))
).isExactlyInstanceOf(IllegalArgumentException.class);
}

@Test
void amountIsZero() {
assertThatThrownBy(() ->
Stream.of("A").gather(Gatherers4j.throttle(-1, Duration.ofSeconds(1)))
).isExactlyInstanceOf(IllegalArgumentException.class);
}

@Test
void clockMustNotBeNull() {
assertThatThrownBy(() ->
Stream.of("A").gather(Gatherers4j.throttle(1, Duration.ofSeconds(1)).withClock(null))
).isExactlyInstanceOf(IllegalArgumentException.class);
}

@Test
void defaultsRuleToPause() {
// Arrange/Act
final ThrottlingGatherer<String> gatherer = new ThrottlingGatherer<>(null, 1, Duration.ofSeconds(1));

// Assert
assertThat(gatherer).hasFieldOrPropertyWithValue("limitRule", ThrottlingGatherer.LimitRule.Pause);
}

@Test
void durationIsNegative() {
assertThatThrownBy(() ->
Stream.of("A").gather(Gatherers4j.throttle(1, Duration.ofSeconds(-1)))
).isExactlyInstanceOf(IllegalArgumentException.class);
}

@Test
void durationIsNull() {
assertThatThrownBy(() ->
Stream.of("A").gather(Gatherers4j.throttle(1, null))
).isExactlyInstanceOf(IllegalArgumentException.class);
}

@Test
void durationIsZero() {
assertThatThrownBy(() ->
Stream.of("A").gather(Gatherers4j.throttle(1, Duration.ofSeconds(0)))
).isExactlyInstanceOf(IllegalArgumentException.class);
}

@Test
void testThrottlingCrossesPeriod() {
// Arrange
final Stream<String> input = Stream.of("A", "B", "C");
final Duration duration = Duration.ofMillis(100);
final Clock clock = new PredictableClock(0, 0, 0, 101, 0, 0);

// Act
final List<Long> output = input
.gather(Gatherers4j.throttle(2, duration).withClock(clock))
.map(_ -> System.currentTimeMillis())
.toList();

// Assert
assertThat(output.get(1) - output.get(0)).isLessThan(duration.toMillis());
assertThat(output.get(2) - output.get(0)).isGreaterThanOrEqualTo(duration.toMillis());
}

@Test
void testThrottlingWithDrop() {
// Arrange
final Stream<String> input = Stream.of("A", "B", "C");
final Duration duration = Duration.ofMillis(100);

// Act
final List<String> output = input
.gather(Gatherers4j.debounce(2, duration))
.toList();

// Assert
assertThat(output).containsExactly("A", "B");
}

@Test
void testThrottlingWithPause() {
// Arrange
final Stream<String> input = Stream.of("A", "B", "C");
final Duration duration = Duration.ofMillis(100);

// Act
final List<Long> output = input
.gather(Gatherers4j.throttle(2, duration))
.map(_ -> System.currentTimeMillis())
.toList();

// Assert
assertThat(output.get(1) - output.get(0)).isLessThan(duration.toMillis());
assertThat(output.get(2) - output.get(0)).isGreaterThanOrEqualTo(duration.toMillis());
}

private static class PredictableClock extends Clock {

private final int[] pauses;
private int invocation;

private PredictableClock(final int... pauses) {
this.pauses = pauses;
}

@Override
public ZoneId getZone() {
return null;
}

@Override
public Instant instant() {
int when = pauses[invocation];
if (when > 0) {
LockSupport.parkNanos(when * GathererUtils.NANOS_PER_MILLISECOND);
}
invocation = (invocation + 1) % pauses.length;
return Instant.now();
}

@Override
public Clock withZone(ZoneId zone) {
return null;
}
}
}

0 comments on commit dc0bbdc

Please sign in to comment.