diff --git a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java index 5561e30f..b6bc73f3 100644 --- a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java +++ b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java @@ -4,6 +4,7 @@ import static java.util.Map.entry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assertions.tuple; import static org.awaitility.Awaitility.await; import java.time.Duration; @@ -12,11 +13,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import io.vertx.core.impl.ConcurrentHashSet; import jakarta.inject.Inject; import org.hamcrest.Matchers; @@ -24,7 +28,6 @@ import org.jboss.weld.junit5.WeldJunit5Extension; import org.jboss.weld.junit5.WeldSetup; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; @@ -607,7 +610,7 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws Interrupt } @Test - void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() throws InterruptedException { + void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() { // Given a service with 3 instances registered in the cluster in any namespace // Stork gather the cache from the cluster @@ -622,31 +625,39 @@ void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() throws InterruptedExc registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233"); - AtomicReference> instances = new AtomicReference<>(); + Set instances = new ConcurrentHashSet<>(); Service service = stork.getService(serviceName); service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) - .subscribe().with(instances::set); + .subscribe().with(instances::addAll); await().atMost(Duration.ofSeconds(5)) - .until(() -> instances.get() != null); + .until(() -> !instances.isEmpty()); - assertThat(instances.get()).hasSize(3); - assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); - assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", - "10.96.96.232", "10.96.96.233"); + assertThat(instances) + .hasSize(3) + .extracting(ServiceInstance::getPort, ServiceInstance::getHost) + .containsExactlyInAnyOrder( + tuple(8080, "10.96.96.231"), + tuple(8080, "10.96.96.232"), + tuple(8080, "10.96.96.233")); - client.endpoints().inNamespace(defaultNamespace).delete(); + client.endpoints().inNamespace(defaultNamespace).withName(serviceName) + .withTimeout(100, TimeUnit.MILLISECONDS) + .delete(); service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) - .subscribe().with(instances::set); + .subscribe().with(received -> { + instances.clear(); + instances.addAll(received); + }); await().atMost(Duration.ofSeconds(5)) - .until(() -> instances.get().isEmpty()); + .until(instances::isEmpty); - assertThat(instances.get()).hasSize(0); + assertThat(instances).isEmpty(); } private Endpoints registerKubernetesResources(String serviceName, String namespace, String... ips) { diff --git a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java index 1960df6c..d67b6181 100644 --- a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java +++ b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java @@ -4,22 +4,27 @@ import static java.util.Map.entry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assertions.tuple; import static org.awaitility.Awaitility.await; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import io.vertx.core.impl.ConcurrentHashSet; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; @@ -593,7 +598,7 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws Interrupt } @Test - void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() throws InterruptedException { + void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() { // Given a service with 3 instances registered in the cluster in any namespace // Stork gather the cache from the cluster @@ -608,31 +613,39 @@ void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() throws InterruptedExc registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233"); - AtomicReference> instances = new AtomicReference<>(); + Set instances = new ConcurrentHashSet<>(); Service service = stork.getService(serviceName); service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) - .subscribe().with(instances::set); + .subscribe().with(instances::addAll); await().atMost(Duration.ofSeconds(5)) - .until(() -> instances.get() != null); + .until(() -> !instances.isEmpty()); - assertThat(instances.get()).hasSize(3); - assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); - assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", - "10.96.96.232", "10.96.96.233"); + assertThat(instances) + .hasSize(3) + .extracting(ServiceInstance::getPort, ServiceInstance::getHost) + .containsExactlyInAnyOrder( + tuple(8080, "10.96.96.231"), + tuple(8080, "10.96.96.232"), + tuple(8080, "10.96.96.233")); - client.endpoints().inNamespace(defaultNamespace).withName(serviceName).delete(); + client.endpoints().inNamespace(defaultNamespace).withName(serviceName) + .withTimeout(100, TimeUnit.MILLISECONDS) + .delete(); service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) - .subscribe().with(instances::set); + .subscribe().with(received -> { + instances.clear(); + instances.addAll(received); + }); await().atMost(Duration.ofSeconds(5)) - .until(() -> instances.get().isEmpty()); + .until(instances::isEmpty); - assertThat(instances.get()).hasSize(0); + assertThat(instances).isEmpty(); } private Endpoints registerKubernetesResources(String serviceName, String namespace, String... ips) { @@ -690,15 +703,11 @@ private Endpoints buildAndRegisterKubernetesService(String applicationName, Stri } private Pod buildAndRegisterBackendPod(String name, String namespace, boolean register, String ip) { - - Map serviceLabels = new HashMap<>(); - serviceLabels.put("app.kubernetes.io/name", name); - serviceLabels.put("app.kubernetes.io/version", "1.0"); - - Map podLabels = new HashMap<>(serviceLabels); - podLabels.put("ui", "ui-" + ipAsSuffix(ip)); - Pod backendPod = new PodBuilder().withNewMetadata().withName(name + "-" + ipAsSuffix(ip)) - .withLabels(podLabels) + Pod backendPod = new PodBuilder().withNewMetadata() + .withName(name + "-" + ipAsSuffix(ip)) + .addToLabels("app.kubernetes.io/name", name) + .addToLabels("app.kubernetes.io/version", "1.0") + .addToLabels("ui", "ui-" + ipAsSuffix(ip)) .withNamespace(namespace) .endMetadata() .build();