Skip to content

Commit

Permalink
test: fixed race condition with endpoint deletion
Browse files Browse the repository at this point in the history
client.endpoints().inNamespace(defaultNamespace).withName(serviceName).delete()

doesn't wait for the operation to complete in the cluster.

We either need to wait for a condition where these endpoint
is null.
Or use the convenience .withTimeout DSL method so that
Kubernetes client blocks the thread for us in a best effort
attempt to wait until the referenced Kubernetes object no longer
exists.
  • Loading branch information
manusa committed Oct 24, 2023
1 parent 963bd4c commit 483789f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static java.util.Map.entry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assertions.tuple;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
Expand All @@ -12,19 +13,21 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import io.vertx.core.impl.ConcurrentHashSet;
import jakarta.inject.Inject;

import org.hamcrest.Matchers;
import org.jboss.weld.junit5.WeldInitiator;
import org.jboss.weld.junit5.WeldJunit5Extension;
import org.jboss.weld.junit5.WeldSetup;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
Expand Down Expand Up @@ -607,7 +610,7 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws Interrupt
}

@Test
void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() throws InterruptedException {
void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() {

// Given a service with 3 instances registered in the cluster in any namespace
// Stork gather the cache from the cluster
Expand All @@ -622,31 +625,39 @@ void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() throws InterruptedExc

registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233");

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();
Set<ServiceInstance> instances = new ConcurrentHashSet<>();

Service service = stork.getService(serviceName);
service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th))
.subscribe().with(instances::set);
.subscribe().with(instances::addAll);

await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get() != null);
.until(() -> !instances.isEmpty());

assertThat(instances.get()).hasSize(3);
assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080);
assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231",
"10.96.96.232", "10.96.96.233");
assertThat(instances)
.hasSize(3)
.extracting(ServiceInstance::getPort, ServiceInstance::getHost)
.containsExactlyInAnyOrder(
tuple(8080, "10.96.96.231"),
tuple(8080, "10.96.96.232"),
tuple(8080, "10.96.96.233"));

client.endpoints().inNamespace(defaultNamespace).delete();
client.endpoints().inNamespace(defaultNamespace).withName(serviceName)
.withTimeout(100, TimeUnit.MILLISECONDS)
.delete();

service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th))
.subscribe().with(instances::set);
.subscribe().with(received -> {
instances.clear();
instances.addAll(received);
});

await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get().isEmpty());
.until(instances::isEmpty);

assertThat(instances.get()).hasSize(0);
assertThat(instances).isEmpty();
}

private Endpoints registerKubernetesResources(String serviceName, String namespace, String... ips) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,27 @@
import static java.util.Map.entry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assertions.tuple;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import io.vertx.core.impl.ConcurrentHashSet;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
Expand Down Expand Up @@ -593,7 +598,7 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws Interrupt
}

@Test
void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() throws InterruptedException {
void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() {

// Given a service with 3 instances registered in the cluster in any namespace
// Stork gather the cache from the cluster
Expand All @@ -608,31 +613,39 @@ void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() throws InterruptedExc

registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233");

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();
Set<ServiceInstance> instances = new ConcurrentHashSet<>();

Service service = stork.getService(serviceName);
service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th))
.subscribe().with(instances::set);
.subscribe().with(instances::addAll);

await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get() != null);
.until(() -> !instances.isEmpty());

assertThat(instances.get()).hasSize(3);
assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080);
assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231",
"10.96.96.232", "10.96.96.233");
assertThat(instances)
.hasSize(3)
.extracting(ServiceInstance::getPort, ServiceInstance::getHost)
.containsExactlyInAnyOrder(
tuple(8080, "10.96.96.231"),
tuple(8080, "10.96.96.232"),
tuple(8080, "10.96.96.233"));

client.endpoints().inNamespace(defaultNamespace).withName(serviceName).delete();
client.endpoints().inNamespace(defaultNamespace).withName(serviceName)
.withTimeout(100, TimeUnit.MILLISECONDS)
.delete();

service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th))
.subscribe().with(instances::set);
.subscribe().with(received -> {
instances.clear();
instances.addAll(received);
});

await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get().isEmpty());
.until(instances::isEmpty);

assertThat(instances.get()).hasSize(0);
assertThat(instances).isEmpty();
}

private Endpoints registerKubernetesResources(String serviceName, String namespace, String... ips) {
Expand Down Expand Up @@ -690,15 +703,11 @@ private Endpoints buildAndRegisterKubernetesService(String applicationName, Stri
}

private Pod buildAndRegisterBackendPod(String name, String namespace, boolean register, String ip) {

Map<String, String> serviceLabels = new HashMap<>();
serviceLabels.put("app.kubernetes.io/name", name);
serviceLabels.put("app.kubernetes.io/version", "1.0");

Map<String, String> podLabels = new HashMap<>(serviceLabels);
podLabels.put("ui", "ui-" + ipAsSuffix(ip));
Pod backendPod = new PodBuilder().withNewMetadata().withName(name + "-" + ipAsSuffix(ip))
.withLabels(podLabels)
Pod backendPod = new PodBuilder().withNewMetadata()
.withName(name + "-" + ipAsSuffix(ip))
.addToLabels("app.kubernetes.io/name", name)
.addToLabels("app.kubernetes.io/version", "1.0")
.addToLabels("ui", "ui-" + ipAsSuffix(ip))
.withNamespace(namespace)
.endMetadata()
.build();
Expand Down

0 comments on commit 483789f

Please sign in to comment.