diff --git a/pom.xml b/pom.xml index 2f8f68e8..eaf347c8 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ 1.18.0 - 6.4.1 + 6.5.1 1.7.36 5.2.0 diff --git a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java index 3f2a2796..4fef5c93 100644 --- a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java +++ b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java @@ -6,24 +6,22 @@ import static org.assertj.core.api.Assertions.fail; import static org.awaitility.Awaitility.await; -import java.net.HttpURLConnection; import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import jakarta.inject.Inject; -import org.awaitility.core.ConditionTimeoutException; import org.hamcrest.Matchers; import org.jboss.weld.junit5.WeldInitiator; import org.jboss.weld.junit5.WeldJunit5Extension; import org.jboss.weld.junit5.WeldSetup; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -428,67 +426,29 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws Interrupt @Test void shouldFetchInstancesFromTheCache() throws InterruptedException { - // Given an endpoint registered in the cluster - // Stork gather the cache from the cluster - // When an expectation is configured to throw an Error the next time we contact the cluster to get the endpoints and - // Stork is called to get service instances - // Stork get the instances from the cache: the error is not thrown because the cluster is not contacted. - - config.addServiceConfig("svc", null, "kubernetes", - null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3")); - Stork stork = StorkTestUtils.getNewStorkInstance(); + // Stork gathers the cache from the cluster + // Configure the mock cluster for recordings calls to a specific path + // Stork is called twice to get service instances + // Stork get the instances from the cache: assert that only 1 call to the cluster has been done String serviceName = "svc"; + String[] ips = { "10.96.96.231" }; - registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231"); - - AtomicReference> instances = new AtomicReference<>(); - - Service service = stork.getService(serviceName); - service.getServiceDiscovery().getServiceInstances() - .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) - .subscribe().with(instances::set); - - await().atMost(Duration.ofSeconds(5)) - .until(() -> instances.get() != null); - - assertThat(instances.get()).hasSize(1); - assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); - assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231"); - + //Recording k8s cluster calls and build the endpoints as response + AtomicInteger serverHit = new AtomicInteger(0); server.expect().get().withPath("/api/v1/namespaces/test/endpoints?fieldSelector=metadata.name%3Dsvc") - .andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, "{}").once(); - - service.getServiceDiscovery().getServiceInstances() - .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) - .subscribe().with(instances::set); - - await().atMost(Duration.ofSeconds(5)) - .until(() -> instances.get() != null); - - assertThat(instances.get()).hasSize(1); - assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); - assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231"); - } - - @Test - void shouldGetInstancesFromTheCluster() throws InterruptedException { - - // Given an endpoint registered in the cluster - // Stork gather the cache from the cluster - // When an expectation in the cluster is configured to throw an Error the next time we try to get the endpoints and - // When the endpoint is removed (this invalidates the cache) - // Stork is called to get service instances again - // Stork gets the instances from the cache: the error is not thrown because the cluster is not contacted. + .andReply(200, r -> { + serverHit.incrementAndGet(); + Endpoints endpoints = buildAndRegisterKubernetesService(serviceName, defaultNamespace, true, ips); + Arrays.stream(ips).map(ip -> buildAndRegisterBackendPod(serviceName, defaultNamespace, true, ips[0])) + .collect(Collectors.toList()); + return endpoints; + }).always(); config.addServiceConfig("svc", null, "kubernetes", null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3")); Stork stork = StorkTestUtils.getNewStorkInstance(); - String serviceName = "svc"; - - registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231"); - AtomicReference> instances = new AtomicReference<>(); Service service = stork.getService(serviceName); @@ -499,24 +459,17 @@ void shouldGetInstancesFromTheCluster() throws InterruptedException { await().atMost(Duration.ofSeconds(5)) .until(() -> instances.get() != null); - assertThat(instances.get()).hasSize(1); - assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); - assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231"); - - server.expect().get().withPath("/api/v1/namespaces/test/endpoints?fieldSelector=metadata.name%3Dsvc") - .andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, "{}").once(); - - client.endpoints().withName(serviceName).delete(); + assertThat(serverHit.get()).isEqualTo(1); + //second try to get instances, instances should be fetched from cache service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) .subscribe().with(instances::set); - Assertions.assertThrows(ConditionTimeoutException.class, - () -> await() - .atMost(Duration.ofSeconds(5)) - .until(() -> instances.get().isEmpty())); + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + assertThat(serverHit.get()).isEqualTo(1); } private void registerKubernetesResources(String serviceName, String namespace, String... ips) {