Skip to content

Commit

Permalink
Adding Bulkhead integration with Ratpack and Bulkhead call finished m…
Browse files Browse the repository at this point in the history
…etric (ReactiveX#169)

* Adding Bulkhead integration with Ratpack and Bulkhead call finished metric

* fixup! Adding Bulkhead integration with Ratpack and Bulkhead call finished metric

* Adding documentation for bulkhead event and integration with Ratpack

* fixup! Adding Bulkhead integration with Ratpack and Bulkhead call finished metric

* fixup! Adding Bulkhead integration with Ratpack and Bulkhead call finished metric

* fixup! Adding Bulkhead integration with Ratpack and Bulkhead call finished metric

* fixup! Adding Bulkhead integration with Ratpack and Bulkhead call finished metric
  • Loading branch information
Jan Sykora authored and RobWin committed Dec 11, 2017
1 parent 831d066 commit 73da955
Show file tree
Hide file tree
Showing 26 changed files with 1,989 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.github.resilience4j.bulkhead;

import io.github.resilience4j.bulkhead.event.BulkheadEvent;
import io.github.resilience4j.bulkhead.event.BulkheadOnCallFinishedEvent;
import io.github.resilience4j.bulkhead.event.BulkheadOnCallPermittedEvent;
import io.github.resilience4j.bulkhead.event.BulkheadOnCallRejectedEvent;
import io.github.resilience4j.bulkhead.internal.SemaphoreBulkhead;
Expand Down Expand Up @@ -417,5 +418,6 @@ interface EventPublisher extends io.github.resilience4j.core.EventPublisher<Bulk

EventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> eventConsumer);

EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> eventConsumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,7 @@ enum Type {
CALL_PERMITTED,
/** A BulkheadEvent which informs that a call was rejected due to bulkhead being full */
CALL_REJECTED,
/** A BulkheadEvent which informs that a call was finished(success and failure is indistinguishable) */
CALL_FINISHED
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2017 Jan Sykora
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.resilience4j.bulkhead.event;

/**
* A BulkheadEvent which informs that a call has been finished. The event doesn't
* provide any information on whether the call's execution was successful or not.
*/
public class BulkheadOnCallFinishedEvent extends AbstractBulkheadEvent {

public BulkheadOnCallFinishedEvent(String bulkheadName) {
super(bulkheadName);
}

@Override
public Type getEventType() {
return Type.CALL_FINISHED;
}

@Override
public String toString() {
return String.format(
"%s: Bulkhead '%s' has finished a call.",
getCreationTime(),
getBulkheadName()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,4 @@ public String toString() {
getBulkheadName()
);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.event.BulkheadEvent;
import io.github.resilience4j.bulkhead.event.BulkheadOnCallFinishedEvent;
import io.github.resilience4j.bulkhead.event.BulkheadOnCallPermittedEvent;
import io.github.resilience4j.bulkhead.event.BulkheadOnCallRejectedEvent;
import io.github.resilience4j.core.EventConsumer;
Expand All @@ -34,7 +35,7 @@
/**
* A Bulkhead implementation based on a semaphore.
*/
public class SemaphoreBulkhead implements Bulkhead{
public class SemaphoreBulkhead implements Bulkhead {

private final String name;
private final Semaphore semaphore;
Expand Down Expand Up @@ -71,7 +72,7 @@ public SemaphoreBulkhead(String name) {
/**
* Create a bulkhead using a configuration supplier
*
* @param name the name of this bulkhead
* @param name the name of this bulkhead
* @param configSupplier BulkheadConfig supplier
*/
public SemaphoreBulkhead(String name, Supplier<BulkheadConfig> configSupplier) {
Expand All @@ -95,6 +96,7 @@ public boolean isCallPermitted() {
@Override
public void onComplete() {
semaphore.release();
publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name));
}

@Override
Expand Down Expand Up @@ -132,6 +134,12 @@ public EventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent>
return this;
}

@Override
public EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> onCallFinishedEventConsumer) {
registerConsumer(BulkheadOnCallFinishedEvent.class, onCallFinishedEventConsumer);
return this;
}

@Override
public void consumeEvent(BulkheadEvent event) {
super.processEvent(event);
Expand All @@ -150,12 +158,10 @@ boolean tryEnterBulkhead() {

if (timeout == 0) {
callPermitted = semaphore.tryAcquire();
}
else {
} else {
try {
callPermitted = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException ex) {
} catch (InterruptedException ex) {
callPermitted = false;
}
}
Expand All @@ -164,7 +170,7 @@ boolean tryEnterBulkhead() {
}

private void publishBulkheadEvent(Supplier<BulkheadEvent> eventSupplier) {
if(eventProcessor.hasConsumers()) {
if (eventProcessor.hasConsumers()) {
eventProcessor.consumeEvent(eventSupplier.get());
}
}
Expand All @@ -178,5 +184,4 @@ public int getAvailableConcurrentCalls() {
return semaphore.availablePermits();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public void shouldReturnTheSameConsumer() {

@Test
public void shouldConsumeOnCallPermittedEvent() {

// Given
Bulkhead bulkhead = Bulkhead.of("test", config);
BDDMockito.given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
Expand All @@ -79,7 +78,6 @@ public void shouldConsumeOnCallPermittedEvent() {

@Test
public void shouldConsumeOnCallRejectedEvent() {

// Given
Bulkhead bulkhead = Bulkhead.of("test", config);

Expand All @@ -96,5 +94,35 @@ public void shouldConsumeOnCallRejectedEvent() {
then(logger).should(times(1)).info("CALL_REJECTED");
}

@Test
public void shouldConsumeOnCallFinishedEventWhenExecutionIsFinished() throws Exception {
// Given
Bulkhead bulkhead = Bulkhead.of("test", config);

// When
bulkhead.getEventPublisher()
.onCallFinished(event ->
logger.info(event.getEventType().toString()));

Try.ofSupplier(Bulkhead.decorateSupplier(bulkhead,helloWorldService::returnHelloWorld));

// Then
then(logger).should(times(1)).info("CALL_FINISHED");
}

@Test
public void shouldConsumeOnCallFinishedEventOnComplete() throws Exception {
// Given
Bulkhead bulkhead = Bulkhead.of("test", config);

// When
bulkhead.getEventPublisher()
.onCallFinished(event ->
logger.info(event.getEventType().toString()));

bulkhead.onComplete();

// Then
then(logger).should(times(1)).info("CALL_FINISHED");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public void testBulkhead() throws InterruptedException {

bulkhead.isCallPermitted();

testSubscriber.assertValueCount(4)
.assertValues(CALL_PERMITTED, CALL_PERMITTED, CALL_REJECTED, CALL_PERMITTED);
testSubscriber.assertValueCount(6)
.assertValues(CALL_PERMITTED, CALL_PERMITTED, CALL_REJECTED, CALL_FINISHED, CALL_FINISHED, CALL_PERMITTED);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ dependencies {

==== Basic Usage

Installing the `Resilience4jModule` module provides a `CircuitBreakerRegistry`, `RateLimiterRegistry`,
Installing the `Resilience4jModule` module provides a `BulkheadRegistry` `CircuitBreakerRegistry`, `RateLimiterRegistry`,
and `RetryRegistry` with the default configurations. It also install the Guice method interceptors
for CircuitBreakers, RateLimiters, and Retries. Finally, it allows configuration of metrics
and even the building of CircuitBreakers, RateLimiters, and Retries. See below for configuration details.
for Bulkheads, CircuitBreakers, RateLimiters, and Retries. Finally, it allows configuration of metrics
and even the building of Bulkheads, CircuitBreakers, RateLimiters, and Retries. See below for configuration details.

Note: If you don't register a `CircuitBreakerRegistry` or `RateLimiterRegistry` or `RetryRegistry`, the defaults
will be used.
Expand Down Expand Up @@ -93,6 +93,18 @@ Ratpack promises provide the means by which an application can become fully non-
Resilience4j provides transformers that can be applied to Promises. This is ideal when promising a value
that is coming from some sort of I/O source.

===== Bulkhead

You can easily apply a Bulkhead to any Ratpack Promise.

[source,java]
----
public Promise<String> methodWhichReturnsAPromise() {
return backendBConnector.methodWhichReturnsAPromise()
.transform(BulkheadTransformer.of(bulkhead).recover(t -> "recovered"));
}
----

===== CircuitBreaker

You can easily apply a CircuitBreaker to any Ratpack Promise.
Expand Down Expand Up @@ -141,6 +153,23 @@ methods returning types:
* Flowable
* Single

===== Bulkhead
The demo shows how to use the `Bulkhead` annotation to have your Ratpack application limiting number of method calls.
You can either annotate a class in order to protect all public methods or just some specific methods.
For example:

[source,java]
----
@Bulkhead(name = "backendA", recovery = MyRecoveryFunction.class)
@Singleton
public class BackendAConnector implements Connector {
...
}
----
Where `MyRecoveryFunction` is implements `io.github.resilience4j.ratpack.RecoveryFunction` and provides
a fallback value that is returned when the bulkhead identified by `name` is full or call ends in exception.


===== CircuitBreaker
The demo shows how to use the `CircuitBreaker` annotation to make your Ratpack application more fault tolerant.
You can either annotate a class in order to protect all public methods or just some specific methods.
Expand Down Expand Up @@ -191,7 +220,7 @@ a fallback value that is returned when the rate limiter rate limit identified by

==== Functional style

You can still use a functional programming style for CircuitBreaker, Retry, and RateLimiter. For example:
You can still use a functional programming style for Bulkhead, CircuitBreaker, Retry, and RateLimiter. For example:

[source,java]
----
Expand All @@ -212,7 +241,7 @@ public class BusinessBService implements BusinessService {
}
----

==== Adding CircuitBreakers, RateLimiters, and Retries
==== Adding Bulkheads, CircuitBreakers, RateLimiters, and Retries
These can be defined in the module configuration or in an external configuration.
Note that the module only provide default registries, which you can replace by
binding your own.
Expand All @@ -227,7 +256,12 @@ public class MyModule extends AbstractModule {
protected void configure() {
Resilience4jModule module = new Resilience4jModule();
module.configure(c -> c
.circuitBreaker("test1", cb -> cb
.bulkhead("test1", b -> b
.defaults(true)
).bulkhead("test2", b -> b
.maxConcurrentCalls(100)
.maxWaitTime(1000)
).circuitBreaker("test1", cb -> cb
.defaults(true)
).circuitBreaker("test2", cb -> cb
.failureRateThreshold(50)
Expand Down Expand Up @@ -275,6 +309,12 @@ ratpack {
[source,yaml]
----
resilience4j:
bulkheads:
test1:
defaults: true
test2:
maxConcurrentCalls: 100
maxWaitTime: 1000
circuitBreakers:
test1:
defaults: true
Expand All @@ -300,7 +340,7 @@ resilience4j:

==== Metrics
Both dropwizard and prometheus metrics can be auto configured and enabled for all registered
circuitbreaker instances, ratelimiter instances, and retry instances.
bulkhead instances, circuitbreaker instances, ratelimiter instances, and retry instances.

For dropwizard metrics to work, add a compile dependency on resilience4j-metrics and
bind a MetricRegistry instance.
Expand Down Expand Up @@ -342,6 +382,41 @@ public class MyModule extends AbstractModule {

==== Event Monitoring

===== Bulkhead
These are the same endpoints as implemented for Bulkhead,
so for detailed documentation please refer to previous sections.

List of available endpoints:

* `/bulkhead/events`
* `/bulkhead/stream/events`
* `/bulkhead/events/{bulkheadName}`
* `/bulkhead/stream/events/{bulkheadName}`
* `/bulkhead/events/{bulkheadName}/{eventType}`
* `/bulkhead/stream/events/{bulkheadName}/{eventType}`

Example of response:
----
{
"bulkheadEvents": [
{
"bulkheadName": "backendA",
"type": "CALL_PERMITTED",
"creationTime": "2017-05-05T21:29:40.463+03:00[Europe/Uzhgorod]"
},
{
"bulkheadName": "backendA",
"type": "CALL_REJECTED",
"creationTime": "2017-05-05T21:29:40.469+03:00[Europe/Uzhgorod]"
},
{
"bulkheadName": "backendA",
"type": "CALL_FINISHED",
"creationTime": "2017-05-05T21:29:41.268+03:00[Europe/Uzhgorod]"
}
]
}
===== CircuitBreaker
The emitted CircuitBreaker events are stored in a separate circular event consumer buffers. The size of a event consumer buffer can be configured per CircuitBreaker in the application.yml file (eventConsumerBufferSize).
Expand Down Expand Up @@ -535,4 +610,4 @@ Example of response:
}
]
}
----
----
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ include::../../../../../resilience4j-bulkhead/src/test/java/io/github/resilience

===== Consume emitted BulkheadEvents

The BulkHead emits a stream of BulkHeadEvents. There are two types of events emitted: permitted execution & rejected execution. If you want to consume these events, you have to register an event consumer.
The BulkHead emits a stream of BulkHeadEvents. There are two types of events emitted: permitted execution, rejected execution & finished execution. If you want to consume these events, you have to register an event consumer.

[source,java]
----
bulkhead.getEventPublisher()
.onCallPermitted(event -> logger.info(...))
.onCallRejected(event -> logger.info(...));
.onCallRejected(event -> logger.info(...))
.onCallFinished(event -> logger.info(...));
----

==== Monitoring
Expand Down
Loading

0 comments on commit 73da955

Please sign in to comment.