Skip to content

Commit

Permalink
Merge pull request smallrye#542 from aureamunoz/upgrade-k8s-client
Browse files Browse the repository at this point in the history
Update Kubernetes Client version to 6.5.1
  • Loading branch information
cescoffier authored Apr 25, 2023
2 parents 2196394 + e339781 commit 7831ab0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 68 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<!-- override the testcontainers version with the newest one -->
<version.testcontainers>1.18.0</version.testcontainers>

<kubernetes-client.version>6.4.1</kubernetes-client.version>
<kubernetes-client.version>6.5.1</kubernetes-client.version>
<!-- we cannot update to 2.x as jboss-logging does not support SLF4J 2 for now -->
<version.slf4j>1.7.36</version.slf4j>
<version.mockito>5.2.0</version.mockito>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,22 @@
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;

import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import jakarta.inject.Inject;

import org.awaitility.core.ConditionTimeoutException;
import org.hamcrest.Matchers;
import org.jboss.weld.junit5.WeldInitiator;
import org.jboss.weld.junit5.WeldJunit5Extension;
import org.jboss.weld.junit5.WeldSetup;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -428,67 +426,29 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws Interrupt
@Test
void shouldFetchInstancesFromTheCache() throws InterruptedException {

// Given an endpoint registered in the cluster
// Stork gather the cache from the cluster
// When an expectation is configured to throw an Error the next time we contact the cluster to get the endpoints and
// Stork is called to get service instances
// Stork get the instances from the cache: the error is not thrown because the cluster is not contacted.

config.addServiceConfig("svc", null, "kubernetes",
null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3"));
Stork stork = StorkTestUtils.getNewStorkInstance();
// Stork gathers the cache from the cluster
// Configure the mock cluster for recordings calls to a specific path
// Stork is called twice to get service instances
// Stork get the instances from the cache: assert that only 1 call to the cluster has been done

String serviceName = "svc";
String[] ips = { "10.96.96.231" };

registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231");

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Service service = stork.getService(serviceName);
service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th))
.subscribe().with(instances::set);

await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get() != null);

assertThat(instances.get()).hasSize(1);
assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080);
assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231");

//Recording k8s cluster calls and build the endpoints as response
AtomicInteger serverHit = new AtomicInteger(0);
server.expect().get().withPath("/api/v1/namespaces/test/endpoints?fieldSelector=metadata.name%3Dsvc")
.andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, "{}").once();

service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th))
.subscribe().with(instances::set);

await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get() != null);

assertThat(instances.get()).hasSize(1);
assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080);
assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231");
}

@Test
void shouldGetInstancesFromTheCluster() throws InterruptedException {

// Given an endpoint registered in the cluster
// Stork gather the cache from the cluster
// When an expectation in the cluster is configured to throw an Error the next time we try to get the endpoints and
// When the endpoint is removed (this invalidates the cache)
// Stork is called to get service instances again
// Stork gets the instances from the cache: the error is not thrown because the cluster is not contacted.
.andReply(200, r -> {
serverHit.incrementAndGet();
Endpoints endpoints = buildAndRegisterKubernetesService(serviceName, defaultNamespace, true, ips);
Arrays.stream(ips).map(ip -> buildAndRegisterBackendPod(serviceName, defaultNamespace, true, ips[0]))
.collect(Collectors.toList());
return endpoints;
}).always();

config.addServiceConfig("svc", null, "kubernetes",
null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3"));
Stork stork = StorkTestUtils.getNewStorkInstance();

String serviceName = "svc";

registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231");

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Service service = stork.getService(serviceName);
Expand All @@ -499,24 +459,17 @@ void shouldGetInstancesFromTheCluster() throws InterruptedException {
await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get() != null);

assertThat(instances.get()).hasSize(1);
assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080);
assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231");

server.expect().get().withPath("/api/v1/namespaces/test/endpoints?fieldSelector=metadata.name%3Dsvc")
.andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, "{}").once();

client.endpoints().withName(serviceName).delete();
assertThat(serverHit.get()).isEqualTo(1);

//second try to get instances, instances should be fetched from cache
service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th))
.subscribe().with(instances::set);

Assertions.assertThrows(ConditionTimeoutException.class,
() -> await()
.atMost(Duration.ofSeconds(5))
.until(() -> instances.get().isEmpty()));
await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get() != null);

assertThat(serverHit.get()).isEqualTo(1);
}

private void registerKubernetesResources(String serviceName, String namespace, String... ips) {
Expand Down

0 comments on commit 7831ab0

Please sign in to comment.