Skip to content

Commit

Permalink
Implement Observability support
Browse files Browse the repository at this point in the history
cescoffier authored and aureamunoz committed Oct 11, 2023

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent a62d668 commit 5a76dfc
Showing 22 changed files with 945 additions and 54 deletions.
10 changes: 9 additions & 1 deletion api/revapi.json
Original file line number Diff line number Diff line change
@@ -27,7 +27,15 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.stork.api.Service::<init>(java.lang.String, io.smallrye.stork.api.LoadBalancer, io.smallrye.stork.api.ServiceDiscovery, io.smallrye.stork.api.ServiceRegistrar, boolean)",
"new": "method void io.smallrye.stork.api.Service::<init>(java.lang.String, java.lang.String, java.lang.String, io.smallrye.stork.api.observability.ObservationCollector, io.smallrye.stork.api.LoadBalancer, io.smallrye.stork.api.ServiceDiscovery, io.smallrye.stork.api.ServiceRegistrar<?>, boolean)",
"justification": "Implementing observability"
}
]
}
}, {
"extension" : "revapi.reporter.json",
64 changes: 56 additions & 8 deletions api/src/main/java/io/smallrye/stork/api/Service.java
Original file line number Diff line number Diff line change
@@ -5,6 +5,8 @@
import java.util.concurrent.Semaphore;

import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.observability.ObservationCollector;
import io.smallrye.stork.api.observability.StorkObservation;

/**
* Represents a <em>Service</em>.
@@ -16,23 +18,34 @@ public class Service {
private final Semaphore instanceSelectionLock;
private final LoadBalancer loadBalancer;
private final ServiceDiscovery serviceDiscovery;
private final ServiceRegistrar serviceRegistrar;
private final ServiceRegistrar<?> serviceRegistrar;
private final String serviceName;
private final String serviceDiscoveryType;
private final String serviceSelectionType;
private final ObservationCollector observations;

/**
* Creates a new Service.
*
* @param serviceName the name, must not be {@code null}, must not be blank
* @param serviceDiscoveryType the type of the service discovery (for observability purpose)
* @param serviceSelectionType the type of the service selection (for observability purpose)
* @param collector the observation collector, must not be {@code null}
* @param loadBalancer the load balancer, can be {@code null}
* @param serviceDiscovery the service discovery, must not be {@code null}
* @param serviceRegistrar the service registrar, can be {@code null}
* @param requiresStrictRecording whether strict recording must be enabled
*/
public Service(String serviceName, LoadBalancer loadBalancer, ServiceDiscovery serviceDiscovery,
ServiceRegistrar serviceRegistrar, boolean requiresStrictRecording) {
public Service(String serviceName,
String serviceSelectionType, String serviceDiscoveryType, ObservationCollector collector,
LoadBalancer loadBalancer, ServiceDiscovery serviceDiscovery,
ServiceRegistrar<?> serviceRegistrar, boolean requiresStrictRecording) {
this.loadBalancer = loadBalancer;
this.serviceDiscovery = serviceDiscovery;
this.serviceRegistrar = serviceRegistrar;
this.serviceDiscoveryType = serviceDiscoveryType;
this.serviceSelectionType = serviceSelectionType;
this.observations = collector;
this.serviceName = serviceName;
this.instanceSelectionLock = requiresStrictRecording ? new Semaphore(1) : null;
}
@@ -41,21 +54,37 @@ public Service(String serviceName, LoadBalancer loadBalancer, ServiceDiscovery s
* Selects a service instance.
* <p>
* The selection looks for the service instances and select the one to use using the load balancer.
*
* <p>
* <b>Note:</b> this method doesn't record a start of an operation using this load balancer and does not
* synchronize load balancer invocations even if the load balancer is not thread safe
*
* @return a Uni with a ServiceInstance, or with {@link NoServiceInstanceFoundException} if the load balancer failed to find
* a service instance capable of handling a call
*/
public Uni<ServiceInstance> selectInstance() {
StorkObservation observationPoints = observations.create(serviceName, serviceDiscoveryType,
serviceSelectionType);
return serviceDiscovery.getServiceInstances()
.map(this::selectInstance);
.onItemOrFailure().invoke((list, failure) -> {
if (failure != null) {
observationPoints.onServiceDiscoveryFailure(failure);
} else {
observationPoints.onServiceDiscoverySuccess(list);
}
})
.map(this::selectInstance)
.onItemOrFailure().invoke((selected, failure) -> {
if (failure != null) {
observationPoints.onServiceSelectionFailure(failure);
} else {
observationPoints.onServiceSelectionSuccess(selected.getId());
}
});
}

/**
* Using the underlying load balancer, select a service instance from the collection of service instances.
*
* <p>
* <b>Note:</b> this method doesn't record a start of an operation using this load balancer and does not
* synchronize load balancer invocations even if the load balancer is not thread safe
*
@@ -80,8 +109,23 @@ public ServiceInstance selectInstance(Collection<ServiceInstance> instances) {
* @see LoadBalancer#requiresStrictRecording()
*/
public Uni<ServiceInstance> selectInstanceAndRecordStart(boolean measureTime) {
return serviceDiscovery.getServiceInstances()
.map(list -> selectInstanceAndRecordStart(list, measureTime));
StorkObservation observationPoints = observations.create(serviceName, serviceDiscoveryType,
serviceSelectionType);
return serviceDiscovery.getServiceInstances().onItemOrFailure().invoke((list, failure) -> {
if (failure != null) {
observationPoints.onServiceDiscoveryFailure(failure);
} else {
observationPoints.onServiceDiscoverySuccess(list);
}
})
.map(list -> selectInstanceAndRecordStart(list, measureTime))
.onItemOrFailure().invoke((selected, failure) -> {
if (failure != null) {
observationPoints.onServiceSelectionFailure(failure);
} else {
observationPoints.onServiceSelectionSuccess(selected.getId());
}
});
}

/**
@@ -158,6 +202,10 @@ public ServiceRegistrar getServiceRegistrar() {
return serviceRegistrar;
}

public ObservationCollector getObservations() {
return observations;
}

/**
* @return the service name.
*/
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.smallrye.stork.api.observability;

import java.util.List;

import io.smallrye.stork.api.ServiceInstance;

public class NoopObservationCollector implements ObservationCollector {

private static final StorkEventHandler NOOP_HANDLER = ev -> {
// NOOP
};

public static final StorkObservation NOOP_STORK_EVENT = new StorkObservation(
null, null,
null, NOOP_HANDLER) {
@Override
public void onServiceDiscoverySuccess(List<ServiceInstance> instances) {
// Noop
}

@Override
public void onServiceDiscoveryFailure(Throwable throwable) {
// Noop
}

@Override
public void onServiceSelectionSuccess(long id) {
// Noop
}

@Override
public void onServiceSelectionFailure(Throwable throwable) {
// Noop
}
};

@Override
public StorkObservation create(String serviceName, String serviceDiscoveryType,
String serviceSelectionType) {
return NOOP_STORK_EVENT;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.smallrye.stork.api.observability;

public interface ObservationCollector {

StorkObservation create(String serviceName, String serviceDiscoveryType, String serviceSelectionType);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.smallrye.stork.api.observability;

public interface StorkEventHandler {
void complete(StorkObservation event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.smallrye.stork.api.observability;

import java.time.Duration;
import java.util.List;

import io.smallrye.stork.api.ServiceInstance;

public class StorkObservation {
// Handler / Reporter
private final StorkEventHandler handler;

// Metadata
private final String serviceName;
private final String serviceDiscoveryType;
private final String serviceSelectionType;

// Time
private final long begin;
private volatile long endOfServiceDiscovery;
private volatile long endOfServiceSelection;

// Service discovery data
private volatile int instancesCount = -1;

// Service selection data
private volatile long selectedInstanceId = -1L;

// Overall status
private volatile boolean done;
private volatile boolean serviceDiscoverySuccessful = false;
private volatile Throwable failure;

public StorkObservation(String serviceName, String serviceDiscoveryType, String serviceSelectionType,
StorkEventHandler handler) {
this.handler = handler;
this.serviceName = serviceName;
this.serviceDiscoveryType = serviceDiscoveryType;
this.serviceSelectionType = serviceSelectionType;
this.begin = System.nanoTime();
}

public void onServiceDiscoverySuccess(List<ServiceInstance> instances) {
this.endOfServiceDiscovery = System.nanoTime();
this.serviceDiscoverySuccessful = true;
if (instances != null) {
this.instancesCount = instances.size();
} else {
this.instancesCount = 0;
}
}

public void onServiceDiscoveryFailure(Throwable throwable) {
this.endOfServiceDiscovery = System.nanoTime();
this.failure = throwable;
}

public void onServiceSelectionSuccess(long id) {
this.endOfServiceSelection = System.nanoTime();
this.selectedInstanceId = id;
this.done = true;
this.handler.complete(this);
}

public void onServiceSelectionFailure(Throwable throwable) {
this.endOfServiceSelection = System.nanoTime();
if (failure != throwable) {
this.failure = throwable;
}
this.handler.complete(this);
}

public boolean isDone() {
return done || failure != null;
}

public Duration getOverallDuration() {
if (!isDone()) {
return null;
}
return Duration.ofNanos(endOfServiceSelection - begin);
}

public Duration getServiceDiscoveryDuration() {
return Duration.ofNanos(endOfServiceDiscovery - begin);
}

public Duration getServiceSelectionDuration() {
if (!isDone()) {
return null;
}
return Duration.ofNanos(endOfServiceSelection - endOfServiceDiscovery);
}

public String getServiceName() {
return serviceName;
}

public String getServiceDiscoveryType() {
return serviceDiscoveryType;
}

public String getServiceSelectionType() {
return serviceSelectionType;
}

public int getDiscoveredInstancesCount() {
return instancesCount;
}

public Throwable failure() {
return failure;
}

public boolean isServiceDiscoverySuccessful() {
return serviceDiscoverySuccessful;
}
}
Original file line number Diff line number Diff line change
@@ -2,6 +2,9 @@

import java.util.function.Supplier;

import io.smallrye.stork.api.observability.NoopObservationCollector;
import io.smallrye.stork.api.observability.ObservationCollector;

/**
* A provider for "utility" objects used by service discovery and load balancer implementations.
*
@@ -24,4 +27,8 @@ public interface StorkInfrastructure {
* @throws NullPointerException if utilityClass or defaultSupplier are null
*/
<T> T get(Class<T> utilityClass, Supplier<T> defaultSupplier);

default ObservationCollector getObservationCollector() {
return new NoopObservationCollector();
}
}
10 changes: 7 additions & 3 deletions core/src/main/java/io/smallrye/stork/Stork.java
Original file line number Diff line number Diff line change
@@ -207,13 +207,15 @@ private Service createService(Map<String, LoadBalancerLoader> loadBalancerLoader

final var loadBalancerConfig = serviceConfig.loadBalancer();
final LoadBalancer loadBalancer;
String loadBalancerType;
if (loadBalancerConfig == null) {
// no load balancer, use round-robin
LOGGER.debug("No load balancer configured for type {}, using {}", serviceDiscoveryType,
RoundRobinLoadBalancerProvider.ROUND_ROBIN_TYPE);
loadBalancerType = RoundRobinLoadBalancerProvider.ROUND_ROBIN_TYPE;
loadBalancer = new RoundRobinLoadBalancer();
} else {
String loadBalancerType = loadBalancerConfig.type();
loadBalancerType = loadBalancerConfig.type();
final var loadBalancerProvider = loadBalancerLoaders.get(loadBalancerType);
if (loadBalancerProvider == null) {
throw new IllegalArgumentException("No LoadBalancerProvider for type " + loadBalancerType);
@@ -223,7 +225,7 @@ private Service createService(Map<String, LoadBalancerLoader> loadBalancerLoader
}

final var serviceRegistrarConfig = serviceConfig.serviceRegistrar();
ServiceRegistrar serviceRegistrar = null;
ServiceRegistrar<?> serviceRegistrar = null;
if (serviceRegistrarConfig == null) {
LOGGER.debug("No service registrar configured for service {}", serviceConfig.serviceName());
} else {
@@ -237,7 +239,9 @@ private Service createService(Map<String, LoadBalancerLoader> loadBalancerLoader
serviceConfig.serviceName(), infrastructure);
}

return new Service(serviceConfig.serviceName(), loadBalancer, serviceDiscovery, serviceRegistrar,
return new Service(serviceConfig.serviceName(),
loadBalancerType, serviceDiscoveryType, infrastructure.getObservationCollector(),
loadBalancer, serviceDiscovery, serviceRegistrar,
loadBalancer.requiresStrictRecording());
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.smallrye.stork.integration;

import io.smallrye.stork.api.observability.ObservationCollector;

public class ObservableStorkInfrastructure extends DefaultStorkInfrastructure {

private final ObservationCollector observationCollector;

public ObservableStorkInfrastructure(ObservationCollector observationCollector) {
this.observationCollector = observationCollector;
}

@Override
public ObservationCollector getObservationCollector() {
return observationCollector;
}
}
23 changes: 23 additions & 0 deletions core/src/test/java/io/smallrye/stork/FakeObservationCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.smallrye.stork;

import io.smallrye.stork.api.observability.ObservationCollector;
import io.smallrye.stork.api.observability.StorkEventHandler;
import io.smallrye.stork.api.observability.StorkObservation;

public class FakeObservationCollector implements ObservationCollector {

private static final StorkEventHandler FAKE_HANDLER = ev -> {
// FAKE
};

public static StorkObservation FAKE_STORK_EVENT;

@Override
public StorkObservation create(String serviceName, String serviceDiscoveryType,
String serviceSelectionType) {
FAKE_STORK_EVENT = new StorkObservation(
serviceName, serviceDiscoveryType, serviceSelectionType,
FAKE_HANDLER);
return FAKE_STORK_EVENT;
}
}
48 changes: 45 additions & 3 deletions core/src/test/java/io/smallrye/stork/FakeServiceConfig.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
package io.smallrye.stork;

import java.util.Collections;
import java.util.Map;

import io.smallrye.stork.api.config.ConfigWithType;
import io.smallrye.stork.api.config.ServiceConfig;

public class FakeServiceConfig implements ServiceConfig {

private final String name;
private final String serviceName;
private final ConfigWithType lb;
private final ConfigWithType sd;
private final ConfigWithType sr;

public FakeServiceConfig(String name, ConfigWithType sd, ConfigWithType lb, ConfigWithType sr) {
this.name = name;
this.serviceName = name;
this.lb = lb;
this.sd = sd;
this.sr = sr;
}

@Override
public String serviceName() {
return name;
return serviceName;
}

@Override
@@ -41,4 +44,43 @@ public ConfigWithType serviceRegistrar() {
public boolean secure() {
return false;
}

public static final ConfigWithType FAKE_SERVICE_DISCOVERY_CONFIG = new ConfigWithType() {

@Override
public String type() {
return "fake";
}

@Override
public Map<String, String> parameters() {
return Collections.emptyMap();
}
};

public static final ConfigWithType FAKE_SECURE_SERVICE_DISCOVERY_CONFIG = new ConfigWithType() {

@Override
public String type() {
return "fake";
}

@Override
public Map<String, String> parameters() {
return Map.of("secure", "true");
}
};

public static final ConfigWithType FAKE_LOAD_BALANCER_CONFIG = new ConfigWithType() {

@Override
public String type() {
return "fake-selector";
}

@Override
public Map<String, String> parameters() {
return Collections.emptyMap();
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.smallrye.stork;

import static org.mockito.Mockito.mock;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.stork.api.ServiceDiscovery;
import io.smallrye.stork.api.config.ServiceConfig;
import io.smallrye.stork.api.config.ServiceDiscoveryAttribute;
import io.smallrye.stork.api.config.ServiceDiscoveryType;
import io.smallrye.stork.spi.ServiceDiscoveryProvider;
import io.smallrye.stork.spi.StorkInfrastructure;

@ServiceDiscoveryType("mock")
@ServiceDiscoveryAttribute(name = "failure", description = "indicates if service discovery should fail")
@ApplicationScoped
public class MockServiceDiscoveryProvider implements ServiceDiscoveryProvider<MockConfiguration> {

@Override
public ServiceDiscovery createServiceDiscovery(MockConfiguration config, String serviceName, ServiceConfig serviceConfig,
StorkInfrastructure storkInfrastructure) {
return mock(ServiceDiscovery.class);
}
}
314 changes: 314 additions & 0 deletions core/src/test/java/io/smallrye/stork/ObservationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
package io.smallrye.stork;

import static io.smallrye.stork.FakeServiceConfig.FAKE_SERVICE_DISCOVERY_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.LoadBalancer;
import io.smallrye.stork.api.NoServiceInstanceFoundException;
import io.smallrye.stork.api.Service;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.api.observability.StorkObservation;
import io.smallrye.stork.integration.ObservableStorkInfrastructure;
import io.smallrye.stork.spi.config.ConfigProvider;

public class ObservationTest {

@BeforeEach
public void setUp() throws IOException {
Stork.shutdown();
AnchoredServiceDiscoveryProvider.services.clear();
TestEnv.clearSPIs();
TestEnv.configurations.clear();
}

@AfterEach
public void cleanup() throws IOException {
Stork.shutdown();
AnchoredServiceDiscoveryProvider.services.clear();
TestEnv.clearSPIs();
TestEnv.configurations.clear();
}

@Test
void shouldGetMetricsWhenSelectingInstanceHappyPath() {
//Given a configuration service using a SD and default LB
TestEnv.configurations.add(new FakeServiceConfig("my-service",
FAKE_SERVICE_DISCOVERY_CONFIG, null, null));

ServiceInstance instance = mock(ServiceInstance.class);
AnchoredServiceDiscoveryProvider.services.add(instance);
TestEnv.install(ConfigProvider.class, TestEnv.AnchoredConfigProvider.class);
Stork stork = getNewObservableStork();

Service service = stork.getService("my-service");

//When we try to get service instances
assertThat(service.selectInstance().await().indefinitely()).isEqualTo(instance);

//One instance is found and metrics are also gathered accordingly
assertThat(service.getObservations()).isNotNull();
StorkObservation metrics = FakeObservationCollector.FAKE_STORK_EVENT;
assertThat(metrics.getServiceName()).isEqualTo("my-service");
assertThat(metrics.isDone()).isTrue();
assertThat(metrics.failure()).isNull();
assertThat(metrics.getDiscoveredInstancesCount()).isEqualTo(1);
assertThat(metrics.isServiceDiscoverySuccessful()).isTrue();
assertThat(metrics.getServiceDiscoveryType()).isEqualTo("fake");
assertThat(metrics.getServiceSelectionType()).isEqualTo("round-robin");

assertDurations(metrics);

}

private static void assertDurations(StorkObservation metrics) {
Duration overallDuration = metrics.getOverallDuration();
Duration serviceDiscoveryDuration = metrics.getServiceDiscoveryDuration();
Duration serviceSelectionDuration = metrics.getServiceSelectionDuration();
assertThat(overallDuration).isNotNull();
assertThat(serviceDiscoveryDuration).isNotNull();
assertThat(serviceSelectionDuration).isNotNull();
assertThat(overallDuration).isGreaterThanOrEqualTo(serviceDiscoveryDuration.plus(serviceSelectionDuration));
}

@Test
void shouldGetMetricsAfterSelectingInstanceWhenServiceDiscoveryFails() {
//Given a configuration service using a failing SD and default LB
FakeServiceConfig e = new FakeServiceConfig("my-service",
new MockConfiguration(), null, null);
TestEnv.configurations.add(e);

TestEnv.install(ConfigProvider.class, TestEnv.AnchoredConfigProvider.class);
Stork stork = getNewObservableStork();

//When we try to get service instances
Service service = stork.getService("my-service");

when(service.getServiceDiscovery().getServiceInstances())
.thenReturn(Uni.createFrom().failure(new RuntimeException("Service Discovery induced failure")));

//An error is thrown and metrics are also gathered accordingly
Exception exception = assertThrows(RuntimeException.class, () -> {
service.selectInstance().await().indefinitely();
});

assertThat(exception.getMessage()).isEqualTo("Service Discovery induced failure");
assertThat(service.getObservations()).isNotNull();

StorkObservation metrics = FakeObservationCollector.FAKE_STORK_EVENT;
assertThat(metrics.getServiceName()).isEqualTo("my-service");
assertThat(metrics.isDone()).isTrue();
assertThat(metrics.failure()).isEqualTo(exception);
assertThat(metrics.getDiscoveredInstancesCount()).isEqualTo(-1);
assertThat(metrics.isServiceDiscoverySuccessful()).isFalse();
assertThat(metrics.getServiceDiscoveryType()).isEqualTo("mock");
assertThat(metrics.getServiceSelectionType()).isEqualTo("round-robin");
assertDurations(metrics);

}

@Test
void shouldGetMetricsWhenSelectingInstanceFails() {
TestEnv.configurations.add(new FakeServiceConfig("my-service",
FAKE_SERVICE_DISCOVERY_CONFIG, new FakeSelectorConfiguration(), null));

ServiceInstance instance = mock(ServiceInstance.class);
AnchoredServiceDiscoveryProvider.services.add(instance);
TestEnv.install(ConfigProvider.class, TestEnv.AnchoredConfigProvider.class);
Stork stork = getNewObservableStork();

Service service = stork.getService("my-service");
LoadBalancer loadBalancer = service.getLoadBalancer();

when(loadBalancer.selectServiceInstance(any(Collection.class)))
.thenThrow(new RuntimeException("Load Balancer induced failure"));

Exception exception = assertThrows(RuntimeException.class, () -> {
service.selectInstance().await().indefinitely();
});

assertThat(exception.getMessage()).isEqualTo("Load Balancer induced failure");
assertThat(service.getObservations()).isNotNull();

StorkObservation metrics = FakeObservationCollector.FAKE_STORK_EVENT;
assertThat(metrics.getServiceName()).isEqualTo("my-service");
assertThat(metrics.isDone()).isTrue();
assertThat(metrics.failure()).isEqualTo(exception);
assertThat(metrics.getDiscoveredInstancesCount()).isEqualTo(1);
assertThat(metrics.isServiceDiscoverySuccessful()).isTrue();
assertThat(metrics.getServiceDiscoveryType()).isEqualTo("fake");
assertThat(metrics.getServiceSelectionType()).isEqualTo("fake-selector");
assertDurations(metrics);

}

@Test
void shouldGetMetricsAfterSelectingInstanceWhenNoServicesDiscovered() {
TestEnv.configurations.add(new FakeServiceConfig("my-service",
FAKE_SERVICE_DISCOVERY_CONFIG, null, null));

TestEnv.install(ConfigProvider.class, TestEnv.AnchoredConfigProvider.class);
Stork stork = getNewObservableStork();

Service service = stork.getService("my-service");
Exception exception = assertThrows(NoServiceInstanceFoundException.class, () -> {
service.selectInstance().await().indefinitely();
});

assertThat(service.getObservations()).isNotNull();

StorkObservation metrics = FakeObservationCollector.FAKE_STORK_EVENT;
assertThat(metrics.getServiceName()).isEqualTo("my-service");
assertThat(metrics.isDone()).isTrue();
assertThat(metrics.failure()).isNotNull();
assertThat(metrics.failure()).isEqualTo(exception);
assertThat(metrics.getDiscoveredInstancesCount()).isEqualTo(0);
assertThat(metrics.isServiceDiscoverySuccessful()).isTrue();
assertThat(metrics.getServiceDiscoveryType()).isEqualTo("fake");
assertThat(metrics.getServiceSelectionType()).isEqualTo("round-robin");
assertDurations(metrics);
}

// From here, same tests but using the selectInstanceAndRecordStart method

@Test
void shouldGetMetricsWhenSelectingInstanceWithRecordAndStartHappyPath() {
TestEnv.configurations.add(new FakeServiceConfig("my-service",
FAKE_SERVICE_DISCOVERY_CONFIG, null, null));

ServiceInstance instance = mock(ServiceInstance.class);
AnchoredServiceDiscoveryProvider.services.add(instance);
TestEnv.install(ConfigProvider.class, TestEnv.AnchoredConfigProvider.class);
Stork stork = getNewObservableStork();

Service service = stork.getService("my-service");
assertThat(service.selectInstanceAndRecordStart(true).await().indefinitely()).isEqualTo(instance);
assertThat(service.getObservations()).isNotNull();

StorkObservation metrics = FakeObservationCollector.FAKE_STORK_EVENT;
assertThat(metrics.getServiceName()).isEqualTo("my-service");
assertThat(metrics.isDone()).isTrue();
assertThat(metrics.failure()).isNull();
assertThat(metrics.getOverallDuration()).isNotNull();
assertThat(metrics.getDiscoveredInstancesCount()).isEqualTo(1);
assertThat(metrics.isServiceDiscoverySuccessful()).isTrue();
assertThat(metrics.getServiceDiscoveryType()).isEqualTo("fake");
assertThat(metrics.getServiceSelectionType()).isEqualTo("round-robin");
assertDurations(metrics);

}

@Test
void shouldGetMetricsAfterSelectingInstanceWithMonitoringWhenServiceDiscoveryFails() {
FakeServiceConfig e = new FakeServiceConfig("my-service",
new MockConfiguration(), null, null);
TestEnv.configurations.add(e);

TestEnv.install(ConfigProvider.class, TestEnv.AnchoredConfigProvider.class);
Stork stork = getNewObservableStork();

Service service = stork.getService("my-service");

when(service.getServiceDiscovery().getServiceInstances())
.thenReturn(Uni.createFrom().failure(new RuntimeException("Service Discovery induced failure")));

Exception exception = assertThrows(RuntimeException.class, () -> {
service.selectInstanceAndRecordStart(true).await().indefinitely();
});

assertThat(exception.getMessage()).isEqualTo("Service Discovery induced failure");
assertThat(service.getObservations()).isNotNull();

StorkObservation metrics = FakeObservationCollector.FAKE_STORK_EVENT;
assertThat(metrics.getServiceName()).isEqualTo("my-service");
assertThat(metrics.isDone()).isTrue();
assertThat(metrics.failure()).isEqualTo(exception);
assertThat(metrics.getDiscoveredInstancesCount()).isEqualTo(-1);
assertThat(metrics.isServiceDiscoverySuccessful()).isFalse();
assertThat(metrics.getServiceDiscoveryType()).isEqualTo("mock");
assertThat(metrics.getServiceSelectionType()).isEqualTo("round-robin");
assertDurations(metrics);

}

@Test
void shouldGetMetricsWhenSelectingInstanceWithMonitoringFails() {
TestEnv.configurations.add(new FakeServiceConfig("my-service",
FAKE_SERVICE_DISCOVERY_CONFIG, new FakeSelectorConfiguration(), null));

ServiceInstance instance = mock(ServiceInstance.class);
AnchoredServiceDiscoveryProvider.services.add(instance);
TestEnv.install(ConfigProvider.class, TestEnv.AnchoredConfigProvider.class);
Stork stork = getNewObservableStork();

Service service = stork.getService("my-service");
LoadBalancer loadBalancer = service.getLoadBalancer();

when(loadBalancer.selectServiceInstance(any(Collection.class)))
.thenThrow(new RuntimeException("Load Balancer induced failure"));

Exception exception = assertThrows(RuntimeException.class, () -> {
service.selectInstanceAndRecordStart(true).await().indefinitely();
});

assertThat(exception.getMessage()).isEqualTo("Load Balancer induced failure");
assertThat(service.getObservations()).isNotNull();

StorkObservation metrics = FakeObservationCollector.FAKE_STORK_EVENT;
assertThat(metrics.getServiceName()).isEqualTo("my-service");
assertThat(metrics.isDone()).isTrue();
assertThat(metrics.failure()).isEqualTo(exception);
assertThat(metrics.getDiscoveredInstancesCount()).isEqualTo(1);
assertThat(metrics.isServiceDiscoverySuccessful()).isTrue();
assertThat(metrics.getServiceDiscoveryType()).isEqualTo("fake");
assertThat(metrics.getServiceSelectionType()).isEqualTo("fake-selector");
assertDurations(metrics);

}

@Test
void shouldGetMetricsAfterSelectingInstanceWithMonitoringWhenWhenNoServicesDiscovered() {
TestEnv.configurations.add(new FakeServiceConfig("my-service",
FAKE_SERVICE_DISCOVERY_CONFIG, null, null));

TestEnv.install(ConfigProvider.class, TestEnv.AnchoredConfigProvider.class);
Stork stork = getNewObservableStork();

Service service = stork.getService("my-service");
Exception exception = assertThrows(NoServiceInstanceFoundException.class, () -> {
service.selectInstanceAndRecordStart(true).await().indefinitely();
});

assertThat(service.getObservations()).isNotNull();

StorkObservation metrics = FakeObservationCollector.FAKE_STORK_EVENT;
assertThat(metrics.getServiceName()).isEqualTo("my-service");
assertThat(metrics.isDone()).isTrue();
assertThat(metrics.failure()).isNotNull();
assertThat(metrics.failure()).isEqualTo(exception);
assertThat(metrics.getDiscoveredInstancesCount()).isEqualTo(0);
assertThat(metrics.isServiceDiscoverySuccessful()).isTrue();
assertThat(metrics.getServiceDiscoveryType()).isEqualTo("fake");
assertThat(metrics.getServiceSelectionType()).isEqualTo("round-robin");
assertDurations(metrics);

}

private static Stork getNewObservableStork() {
Stork.initialize(new ObservableStorkInfrastructure(new FakeObservationCollector()));
return Stork.getInstance();
}

}
42 changes: 3 additions & 39 deletions core/src/test/java/io/smallrye/stork/StorkTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.smallrye.stork;

import static io.smallrye.stork.FakeServiceConfig.FAKE_LOAD_BALANCER_CONFIG;
import static io.smallrye.stork.FakeServiceConfig.FAKE_SECURE_SERVICE_DISCOVERY_CONFIG;
import static io.smallrye.stork.FakeServiceConfig.FAKE_SERVICE_DISCOVERY_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
@@ -26,32 +29,6 @@
@SuppressWarnings("unchecked")
public class StorkTest {

private static final ConfigWithType FAKE_SERVICE_DISCOVERY_CONFIG = new ConfigWithType() {

@Override
public String type() {
return "fake";
}

@Override
public Map<String, String> parameters() {
return Collections.emptyMap();
}
};

private static final ConfigWithType FAKE_SECURE_SERVICE_DISCOVERY_CONFIG = new ConfigWithType() {

@Override
public String type() {
return "fake";
}

@Override
public Map<String, String> parameters() {
return Map.of("secure", "true");
}
};

private static final ConfigWithType SERVICE_DISCOVERY_CONFIG_WITH_INVALID_PROVIDER = new ConfigWithType() {

@Override
@@ -65,19 +42,6 @@ public Map<String, String> parameters() {
}
};

private static final ConfigWithType FAKE_LOAD_BALANCER_CONFIG = new ConfigWithType() {

@Override
public String type() {
return "fake-selector";
}

@Override
public Map<String, String> parameters() {
return Collections.emptyMap();
}
};

private static final ConfigWithType LOAD_BALANCER_WITH_INVALID_PROVIDER = new ConfigWithType() {

@Override
24 changes: 24 additions & 0 deletions docs/docs/diagrams/observability_sequence.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
@startuml

!include diagrams/includes/themes/light.puml

skinparam sequenceMessageAlign center
autonumber "<b>(0)"


participant Application
participant ObservableStorkInfrastructure
participant ObservationCollector
participant Stork
participant Service

Application -> ObservableStorkInfrastructure : instantiates
ObservableStorkInfrastructure -> ObservationCollector : instantiates
ObservationCollector -> ObservableStorkInfrastructure: ObservationCollector
ObservableStorkInfrastructure -> Application: ObservableStorkInfrastructure

... ...

Application -> Stork : initialize(observableInfrastructure)
Stork -> Service : instantiates (..., ObservationCollector, ...)
@enduml
37 changes: 37 additions & 0 deletions docs/docs/diagrams/observation_sequence.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
@startuml

!include diagrams/includes/themes/light.puml

skinparam sequenceMessageAlign center
autonumber "<b>(0)"


participant Service
participant ObservationCollector
participant StorkObservation
participant StorkEventHandler

Service -> ObservationCollector : create("serviceName", sd type, ss type)
ObservationCollector -> StorkObservation : instantiates
StorkObservation -> StorkObservation: Registers start time
ObservationCollector -> Service : StorkObservation

... ...

Service -> StorkObservation : onServiceDiscoverySuccess(List<ServiceInstance>)
StorkObservation -> StorkObservation : Registers end service\ndiscovery time.\nRegisters instances count\n
... ...

Service -> StorkObservation : onServiceDiscoveryFailure(Throwable)
StorkObservation -> StorkObservation : Registers end service\ndiscovery time.\nRegisters failure cause\n
... ...

Service -> StorkObservation : onServiceSelectionSuccess(instanceId)
StorkObservation -> StorkObservation : Registers end service\nselection time.\nRegisters instance id\nRegisters overall duration\n
StorkObservation -> StorkEventHandler: complete(this)
... ...

Service -> StorkObservation : onServiceSelectionFailure(Throwable))
StorkObservation -> StorkObservation : Registers overall duration.\nRegisters failure cause\n
StorkObservation -> StorkEventHandler: complete(this)
@enduml
122 changes: 122 additions & 0 deletions docs/docs/observability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Stork Observability API

Stork proposes an observability API that automatically observes some parameters to show how the Stork service discovery and selection are behaving.

For any _observation_ to happen, you need to provide your own implementation of an `ObservationCollector.` By default, Stork provides a no-op implementation.

The `ObservationCollector` is responsible for instantiating the `StorkObservation`.

The `StorkObservation` reacts to Stork events thanks to a `StorkEventHandler`.

You can extend the metrics collection by extending the `StorkEventHandler` interface.

The following sequence diagram shows how the observability is initialized :


![observability initialization](target/observability_sequence.svg#only-light)
![observability initialization](target/observability_sequence_dark.svg#only-dark)



The `StorkObservation` registers times, number of discovered instances, the selected instance and failures by reacting to the lifecycle of a Stork event such as:

- start : Observation has been started.
The beginning time is registered.
It happens when the `ObservationCollector#create()` method gets called.
- service discovery success: a collection of instances has been successfully discovered for a service.
The end discovery time and number of instances are recorded.
It happens when the `StorkObservation#onServiceDiscoverySuccess` gets called.
- service discovery error: an error occurs when discovering a service.
The end discovery time and failure cause are captured.
It happens when the `StorkObservation#onServiceDiscoveryFailure` gets called.
- service selection success: an instance has been successfully selected from the collection.
The end selection time and selected instance ID are registered.
It happens when the `StorkObservation#onServiceSelectionSuccess` gets called.
- service selection error: an error occurred during selecting the instance.
End selection time and failure cause are registered.
It happens when the `StorkObservation#onServiceSelectionFailure` gets called.
- end: Observation has finished. Overall duration is registered.
It happens when the `StorkObservation#onServiceSelectionSuccess` gets called.

The following sequence diagram represents the described observation process above:


![observation_process](target/observation_sequence.svg#only-light)
![observation_process](target/observation_sequence_dark.svg#only-dark)



## Implementing an observation collector

An `ObservationCollector` implementation must override the `create` method to provide an instance of StorkObservation.
In addition, the user can access and enrich the observation data through the `StorkEventHandler`.

A custom observation collector class should look as follows:

```java linenums="1"
{{ insert('examples/AcmeObservationCollector.java') }}
```

The next step is to initialize Stork with an `ObservableStorkInfrastructure`, taking an instance of your `ObservationCollector` as parameter.

```java linenums="1"
{{ insert('examples/ObservableInitializationExample.java') }}
```

Then, Stork uses your implementation to register metrics.


## Observing service discovery and selection behaviours

To access metrics registered by `StorkObservation`, use the following code:

```java linenums="1"
{{ insert('examples/ObservationExample.java') }}
```

# Stork Observability with Quarkus

Stork metrics are automatically enabled when using Stork together with the Micrometer extension in a Quarkus application.

Micrometer collects the metrics of the rest and grpc client using Stork, as well as when using the Stork API.

As an example, if you export the metrics to Prometheus, you will get:

````text
# HELP stork_load_balancer_failures_total The number of failures during service selection.
# TYPE stork_load_balancer_failures_total counter
stork_load_balancer_failures_total{service_name="hello-service",} 0.0
# HELP stork_service_selection_duration_seconds The duration of the selection operation
# TYPE stork_service_selection_duration_seconds summary
stork_service_selection_duration_seconds_count{service_name="hello-service",} 13.0
stork_service_selection_duration_seconds_sum{service_name="hello-service",} 0.001049291
# HELP stork_service_selection_duration_seconds_max The duration of the selection operation
# TYPE stork_service_selection_duration_seconds_max gauge
stork_service_selection_duration_seconds_max{service_name="hello-service",} 0.0
# HELP stork_overall_duration_seconds_max The total duration of the Stork service discovery and selection operations
# TYPE stork_overall_duration_seconds_max gauge
stork_overall_duration_seconds_max{service_name="hello-service",} 0.0
# HELP stork_overall_duration_seconds The total duration of the Stork service discovery and selection operations
# TYPE stork_overall_duration_seconds summary
stork_overall_duration_seconds_count{service_name="hello-service",} 13.0
stork_overall_duration_seconds_sum{service_name="hello-service",} 0.001049291
# HELP stork_service_discovery_failures_total The number of failures during service discovery
# TYPE stork_service_discovery_failures_total counter
stork_service_discovery_failures_total{service_name="hello-service",} 0.0
# HELP stork_service_discovery_duration_seconds_max The duration of the discovery operation
# TYPE stork_service_discovery_duration_seconds_max gauge
stork_service_discovery_duration_seconds_max{service_name="hello-service",} 0.0
# HELP stork_service_discovery_duration_seconds The duration of the discovery operation
# TYPE stork_service_discovery_duration_seconds summary
stork_service_discovery_duration_seconds_count{service_name="hello-service",} 13.0
stork_service_discovery_duration_seconds_sum{service_name="hello-service",} 6.585046209
# HELP stork_instances_count_total The number of service instances discovered
# TYPE stork_instances_count_total counter
stork_instances_count_total{service_name="hello-service",} 26.0
````






Binary file not shown.
1 change: 1 addition & 0 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ nav:
- Overview: 'index.md'
- Concepts: 'concepts.md'
- Programmatic API: 'programmatic-api.md'
- Observability: 'observability.md'
- Javadoc: 'https://javadoc.io/doc/io.smallrye.stork/smallrye-stork-api/latest/index.html'
- Using Stork with Quarkus: './quarkus.md'
- Service Discovery:
36 changes: 36 additions & 0 deletions docs/snippets/examples/AcmeObservationCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package examples;

import io.smallrye.stork.Stork;
import io.smallrye.stork.api.observability.ObservationCollector;
import io.smallrye.stork.api.observability.StorkEventHandler;
import io.smallrye.stork.api.observability.StorkObservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AcmeObservationCollector implements ObservationCollector {

private static final Logger LOGGER = LoggerFactory.getLogger(AcmeObservationCollector.class);

private static final StorkEventHandler ACME_HANDLER = event -> {
//This is the terminal event. Put here your custom logic to extend the metrics collection.

//E.g. Expose metrics to Micrometer, additional logs....
LOGGER.info( "Service discovery took " + event.getServiceDiscoveryDuration() + ".");
LOGGER.info( event.getDiscoveredInstancesCount() + " have been discovered for " + event.getServiceName() + ".");
LOGGER.info( "Service selection took " + event.getServiceSelectionDuration() + ".");

// ...

};

public static StorkObservation ACME_STORK_EVENT;

@Override
public StorkObservation create(String serviceName, String serviceDiscoveryType,
String serviceSelectionType) {
ACME_STORK_EVENT = new StorkObservation(
serviceName, serviceDiscoveryType, serviceSelectionType,
ACME_HANDLER);
return ACME_STORK_EVENT;
}
}
13 changes: 13 additions & 0 deletions docs/snippets/examples/ObservableInitializationExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package examples;

import io.smallrye.stork.Stork;
import io.smallrye.stork.integration.ObservableStorkInfrastructure;

public class ObservableInitializationExample {

public static void main(String[] args) {
Stork.initialize(new ObservableStorkInfrastructure(new AcmeObservationCollector()));
Stork stork = Stork.getInstance();
// ...
}
}
35 changes: 35 additions & 0 deletions docs/snippets/examples/ObservationExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package examples;

import io.smallrye.mutiny.Uni;
import io.smallrye.stork.Stork;
import io.smallrye.stork.api.Service;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.api.observability.ObservationCollector;
import io.smallrye.stork.api.observability.StorkObservation;

import java.time.Duration;
import java.util.List;
import java.util.Map;

import static examples.AcmeObservationCollector.*;

public class ObservationExample {

public static void example(Stork stork) {
Service service = stork.getService("my-service");

ObservationCollector observations = service.getObservations();

// Gets the time spent in service discovery and service selection even if any error happens
Duration overallDuration = ACME_STORK_EVENT.getOverallDuration();

// Gets the total number of instances discovered
int discoveredInstancesCount = ACME_STORK_EVENT.getDiscoveredInstancesCount();

// Gets the error raised during the process
Throwable failure = ACME_STORK_EVENT.failure();

// ...

}
}

0 comments on commit 5a76dfc

Please sign in to comment.