Skip to content

Commit

Permalink
ReactiveX#212 Add more tests checking we get the right exception (Rea…
Browse files Browse the repository at this point in the history
…ctiveX#214)

Check that we are getting the resilience exception even when the source
will throw an exception (not on subscribe)
  • Loading branch information
madgnome authored and RobWin committed Mar 16, 2018
1 parent 5e850e9 commit 9bbc73f
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,16 @@ public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorDuringSub
).expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));
}

@Test
public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubscribe() {
circuitBreaker.transitionToOpenState();
StepVerifier.create(
Flux.error(new IOException("BAM!"), true)
.transform(CircuitBreakerOperator.of(circuitBreaker))
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate()))
.transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()))
).expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,18 @@ public void shouldEmitBulkheadFullExceptionEvenWhenErrorDuringSubscribe() {

assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}

@Test
public void shouldEmitBulkheadFullExceptionEvenWhenErrorNotOnSubscribe() {
bulkhead.isCallPermitted();

StepVerifier.create(
Flux.error(new IOException("BAM!"), true)
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate())))
.expectSubscription()
.expectError(BulkheadFullException.class)
.verify(Duration.ofSeconds(1));

assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,18 @@ public void shouldEmitBulkheadFullExceptionEvenWhenErrorDuringSubscribe() {

assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}

@Test
public void shouldEmitBulkheadFullExceptionEvenWhenErrorNotOnSubscribe() {
bulkhead.isCallPermitted();

StepVerifier.create(
Mono.error(new IOException("BAM!")).delayElement(Duration.ofMillis(1))
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate())))
.expectSubscription()
.expectError(BulkheadFullException.class)
.verify(Duration.ofSeconds(1));

assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ public void shouldPropagateError() {
assertSingleFailedCall();
}

@Test
public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubscribe() {
circuitBreaker.transitionToForcedOpenState();
StepVerifier.create(
Mono.error(new IOException("BAM!")).delayElement(Duration.ofMillis(1))
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.verify(Duration.ofSeconds(1));

assertNoRegisteredCall();
}

@Test
public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe() {
circuitBreaker.transitionToForcedOpenState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void shouldPropagateError() {
}

@Test
public void shouldEmitErrorWithBulkheadFullException() {
public void shouldEmitRequestNotPermittedException() {
saturateRateLimiter();

StepVerifier.create(
Expand All @@ -63,4 +63,32 @@ public void shouldEmitErrorWithBulkheadFullException() {

assertNoPermitLeft();
}

@Test
public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorDuringSubscribe() {
saturateRateLimiter();

StepVerifier.create(
Flux.error(new IOException("BAM!"))
.transform(RateLimiterOperator.of(rateLimiter)))
.expectSubscription()
.expectError(RequestNotPermitted.class)
.verify(Duration.ofSeconds(1));

assertNoPermitLeft();
}

@Test
public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorNotOnSubscribe() {
saturateRateLimiter();

StepVerifier.create(
Flux.error(new IOException("BAM!"), true)
.transform(RateLimiterOperator.of(rateLimiter)))
.expectSubscription()
.expectError(RequestNotPermitted.class)
.verify(Duration.ofSeconds(1));

assertNoPermitLeft();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void shouldEmitErrorWithBulkheadFullException() {
}

@Test
public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe() {
public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorDuringSubscribe() {
saturateRateLimiter();
StepVerifier.create(
Mono.error(new IOException("BAM!"))
Expand All @@ -74,4 +74,16 @@ public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe()

assertNoPermitLeft();
}

@Test
public void shouldEmitRequestNotPermittedExceptionEvenWhenErrorNotOnSubscribe() {
saturateRateLimiter();
StepVerifier.create(
Mono.error(new IOException("BAM!")).delayElement(Duration.ofMillis(1))
.transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate())))
.expectError(RequestNotPermitted.class)
.verify(Duration.ofSeconds(1));

assertNoPermitLeft();
}
}

0 comments on commit 9bbc73f

Please sign in to comment.