Skip to content

Commit

Permalink
Issue ReactiveX#61 first cycle permissions fix
Browse files Browse the repository at this point in the history
  • Loading branch information
storozhukBM committed Mar 24, 2017
1 parent 1420473 commit 12f94cb
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) {
permissionsPerCycle = rateLimiterConfig.getLimitForPeriod();

waitingThreads = new AtomicInteger(0);
state = new AtomicReference<>(new State(0, 0, 0));
state = new AtomicReference<>(new State(0, permissionsPerCycle, 0));

PublishProcessor<RateLimiterEvent> publisher = PublishProcessor.create();
this.eventPublisher = publisher.toSerialized();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class AtomicRateLimiterTest {
private static final String LIMITER_NAME = "test";
private static final long CYCLE_IN_NANOS = 500_000_000L;
private static final long POLL_INTERVAL_IN_NANOS = 2_000_000L;
private static final int PERMISSIONS_RER_CYCLE = 1;
private RateLimiterConfig rateLimiterConfig;
private AtomicRateLimiter rateLimiter;
private AtomicRateLimiter.AtomicRateLimiterMetrics metrics;
Expand All @@ -68,7 +69,7 @@ private void setTimeOnNanos(long nanoTime) throws Exception {
@Before
public void setup() {
rateLimiterConfig = RateLimiterConfig.custom()
.limitForPeriod(1)
.limitForPeriod(PERMISSIONS_RER_CYCLE)
.limitRefreshPeriod(Duration.ofNanos(CYCLE_IN_NANOS))
.timeoutDuration(Duration.ZERO)
.build();
Expand All @@ -78,6 +79,14 @@ public void setup() {
eventStream = rateLimiter.getEventStream();
}

@Test
public void permissionsInFirstCycle() throws Exception {
setTimeOnNanos(CYCLE_IN_NANOS - 10);
RateLimiter.Metrics metrics = rateLimiter.getMetrics();
int availablePermissions = metrics.getAvailablePermissions();
then(availablePermissions).isEqualTo(PERMISSIONS_RER_CYCLE);
}

@Test
public void acquireAndRefreshWithEventPublishing() throws Exception {
CompletableFuture<ArrayList<String>> events = subscribeOnAllEventsDescriptions(4);
Expand Down

0 comments on commit 12f94cb

Please sign in to comment.