Skip to content

Commit

Permalink
Allow selecting a port by name in service discovery kubernetes
Browse files Browse the repository at this point in the history
This is specially necessary when having endpoints that expose multiple ports. 
At the moment, when having multiple ports, Stork does not select any and uses the port 80 which is problemmatic when the application does not use this port. 

Moreover, it improves the logic to matching a port to also check the pod spec (if there is only one container with one port, it will use it). 

Relates quarkusio/quarkus#33131
  • Loading branch information
Sgitario committed May 5, 2023
1 parent 72b13e3 commit 83ac9c9
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 4 deletions.
12 changes: 11 additions & 1 deletion service-discovery/kubernetes/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,17 @@
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [

{
"ignore": true,
"code": "java.annotation.attributeValueChanged",
"old": "class io.smallrye.stork.servicediscovery.kubernetes.KubernetesServiceDiscoveryProvider",
"new": "class io.smallrye.stork.servicediscovery.kubernetes.KubernetesServiceDiscoveryProvider",
"annotationType": "io.smallrye.stork.api.config.ServiceDiscoveryAttributes",
"attribute": "value",
"oldValue": "{@io.smallrye.stork.api.config.ServiceDiscoveryAttribute(name = \"k8s-host\", description = \"The Kubernetes API host.\"), @io.smallrye.stork.api.config.ServiceDiscoveryAttribute(name = \"k8s-namespace\", description = \"The namespace of the service. Use all to discover all namespaces.\"), @io.smallrye.stork.api.config.ServiceDiscoveryAttribute(name = \"application\", description = \"The Kubernetes application Id; if not defined Stork service name will be used.\"), @io.smallrye.stork.api.config.ServiceDiscoveryAttribute(name = \"refresh-period\", description = \"Service discovery cache refresh period.\", defaultValue = \"5M\"), @io.smallrye.stork.api.config.ServiceDiscoveryAttribute(name = \"secure\", description = \"Whether the connection with the service should be encrypted with TLS.\")}",
"newValue": "{@io.smallrye.stork.api.config.ServiceDiscoveryAttribute(name = \"k8s-host\", description = \"The Kubernetes API host.\"), @io.smallrye.stork.api.config.ServiceDiscoveryAttribute(name = \"k8s-namespace\", description = \"The namespace of the service. Use all to discover all namespaces.\"), @io.smallrye.stork.api.config.ServiceDiscoveryAttribute(name = \"application\", description = \"The Kubernetes application Id; if not defined Stork service name will be used.\"), @io.smallrye.stork.api.config.ServiceDiscoveryAttribute(name = \"port-name\", description = \"The Kubernetes application port name. When exposing multiple ports, users do need to specify one; otherwise it will use the port 80.\"), @io.smallrye.stork.api.config.ServiceDiscoveryAttribute(name = \"refresh-period\", description = \"Service discovery cache refresh period.\", defaultValue = \"5M\"), @io.smallrye.stork.api.config.ServiceDiscoveryAttribute(name = \"secure\", description = \"Whether the connection with the service should be encrypted with TLS.\")}",
"justification": "Added the 'port-name' attribute. Should not impact users."
}
]
}
}, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.EndpointAddress;
import io.fabric8.kubernetes.api.model.EndpointPort;
import io.fabric8.kubernetes.api.model.EndpointSubset;
Expand Down Expand Up @@ -46,6 +47,7 @@ public class KubernetesServiceDiscovery extends CachingServiceDiscovery {
static final String METADATA_NAME = "metadata.name";
private final KubernetesClient client;
private final String application;
private final String portName;
private final boolean allNamespaces;
private final String namespace;
private final boolean secure;
Expand All @@ -68,6 +70,7 @@ public KubernetesServiceDiscovery(String serviceName, KubernetesConfiguration co
String masterUrl = config.getK8sHost() == null ? base.getMasterUrl() : config.getK8sHost();
this.application = config.getApplication() == null ? serviceName : config.getApplication();
this.namespace = config.getK8sNamespace() == null ? base.getNamespace() : config.getK8sNamespace();
this.portName = config.getPortName();

allNamespaces = namespace != null && namespace.equalsIgnoreCase("all");

Expand Down Expand Up @@ -187,6 +190,14 @@ private List<ServiceInstance> toStorkServiceInstances(Map<Endpoints, List<Pod>>
if (endpointPorts.size() == 1) {
port = endpointPorts.get(0).getPort();
protocol = endpointPorts.get(0).getProtocol();
} else if (portName != null) {
for (EndpointPort endpointPort : endpointPorts) {
if (portName.equals(endpointPort.getName())) {
port = endpointPort.getPort();
protocol = endpointPort.getProtocol();
break;
}
}
}

ServiceInstance matching = ServiceInstanceUtils.findMatching(previousInstances, hostname, port);
Expand All @@ -200,12 +211,22 @@ private List<ServiceInstance> toStorkServiceInstances(Map<Endpoints, List<Pod>>
.findFirst();
String podNamespace = namespace;
if (maybePod.isPresent()) {
ObjectMeta metadata = maybePod.get().getMetadata();
Pod pod = maybePod.get();
ObjectMeta metadata = pod.getMetadata();
podNamespace = metadata.getNamespace();
Map<String, String> podLabels = metadata.getLabels();
for (Map.Entry<String, String> label : podLabels.entrySet()) {
labels.putIfAbsent(label.getKey(), label.getValue());
}

// If we couldn't find the matching port, we try to select it from the pod
if (port == 0
&& pod.getSpec().getContainers().size() == 1
&& pod.getSpec().getContainers().get(0).getPorts().size() == 1) {
ContainerPort containerPort = pod.getSpec().getContainers().get(0).getPorts().get(0);
port = containerPort.getContainerPort();
protocol = containerPort.getProtocol();
}
}
//TODO add some useful metadata?
Metadata<KubernetesMetadataKey> k8sMetadata = Metadata.of(KubernetesMetadataKey.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
@ServiceDiscoveryAttribute(name = "k8s-host", description = "The Kubernetes API host.")
@ServiceDiscoveryAttribute(name = "k8s-namespace", description = "The namespace of the service. Use all to discover all namespaces.")
@ServiceDiscoveryAttribute(name = "application", description = "The Kubernetes application Id; if not defined Stork service name will be used.")
@ServiceDiscoveryAttribute(name = "port-name", description = "The Kubernetes application port name. When exposing multiple ports, users do need to specify one; otherwise it will use the port 80.")
@ServiceDiscoveryAttribute(name = "refresh-period", description = "Service discovery cache refresh period.", defaultValue = CachingServiceDiscovery.DEFAULT_REFRESH_INTERVAL)
@ServiceDiscoveryAttribute(name = "secure", description = "Whether the connection with the service should be encrypted with TLS.")
@ApplicationScoped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import io.fabric8.kubernetes.api.model.EndpointAddress;
import io.fabric8.kubernetes.api.model.EndpointAddressBuilder;
import io.fabric8.kubernetes.api.model.EndpointPort;
import io.fabric8.kubernetes.api.model.EndpointPortBuilder;
import io.fabric8.kubernetes.api.model.EndpointSubsetBuilder;
import io.fabric8.kubernetes.api.model.Endpoints;
Expand Down Expand Up @@ -289,6 +290,51 @@ void shouldGetServiceFromSpecificNamespace() {
}
}

@Test
void shouldGetServiceWithMultiplePortsFromSpecificNamespace() {
String serviceName = "svc";
String specificNs = "ns1";

TestConfigProvider.addServiceConfig(serviceName, null, "kubernetes",
null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", "ns1", "port-name", "http1"));
Stork stork = StorkTestUtils.getNewStorkInstance();

String[] ips = new String[] { "10.96.96.231", "10.96.96.232", "10.96.96.233" };
EndpointPort[] ports = new EndpointPort[] {
new EndpointPortBuilder().withName("http1").withPort(8080).withProtocol("TCP").build(),
new EndpointPortBuilder().withName("http2").withPort(8081).withProtocol("TCP").build() };
buildAndRegisterKubernetesService(serviceName, specificNs, true, ports, ips);
Arrays.stream(ips).forEach(ip -> buildAndRegisterBackendPod(serviceName, specificNs, true, ip));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Service service = stork.getService(serviceName);
service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th))
.subscribe().with(instances::set);

await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get() != null);

assertThat(instances.get()).hasSize(3);
assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080);
assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231",
"10.96.96.232", "10.96.96.233");
instances.get().stream().map(ServiceInstance::getLabels)
.forEach(serviceInstanceLabels -> assertThat(serviceInstanceLabels)
.contains(entry("app.kubernetes.io/name", "svc"), entry("app.kubernetes.io/version", "1.0")));
instances.get().stream().map(ServiceInstance::getMetadata).forEach(metadata -> {
Metadata<KubernetesMetadataKey> k8sMetadata = (Metadata<KubernetesMetadataKey>) metadata;
assertThat(k8sMetadata.getMetadata()).containsKey(META_K8S_SERVICE_ID);
});
for (ServiceInstance serviceInstance : instances.get()) {
Map<String, String> labels = serviceInstance.getLabels();
assertThat(labels).contains(entry("app.kubernetes.io/name", "svc"),
entry("app.kubernetes.io/version", "1.0"),
entry("ui", "ui-" + ipAsSuffix(serviceInstance.getHost())));
}
}

@Test
void shouldGetServiceFromAllNamespace() {

Expand Down Expand Up @@ -499,7 +545,7 @@ void shouldFetchInstancesFromTheCache() throws InterruptedException {
private void registerKubernetesResources(String serviceName, String namespace, String... ips) {
Assert.checkNotNullParam("ips", ips);
buildAndRegisterKubernetesService(serviceName, namespace, true, ips);
Arrays.stream(ips).map(ip -> buildAndRegisterBackendPod(serviceName, namespace, true, ip)).collect(Collectors.toList());
Arrays.stream(ips).forEach(ip -> buildAndRegisterBackendPod(serviceName, namespace, true, ip));
}

private Map<String, Long> mapHostnameToIds(List<ServiceInstance> serviceInstances) {
Expand All @@ -512,6 +558,12 @@ private Map<String, Long> mapHostnameToIds(List<ServiceInstance> serviceInstance

private Endpoints buildAndRegisterKubernetesService(String applicationName, String namespace, boolean register,
String... ipAdresses) {
EndpointPort[] ports = new EndpointPort[] { new EndpointPortBuilder().withPort(8080).withProtocol("TCP").build() };
return buildAndRegisterKubernetesService(applicationName, namespace, register, ports, ipAdresses);
}

private Endpoints buildAndRegisterKubernetesService(String applicationName, String namespace, boolean register,
EndpointPort[] ports, String... ipAdresses) {

Map<String, String> serviceLabels = new HashMap<>();
serviceLabels.put("app.kubernetes.io/name", applicationName);
Expand All @@ -528,7 +580,7 @@ private Endpoints buildAndRegisterKubernetesService(String applicationName, Stri
Endpoints endpoint = new EndpointsBuilder()
.withNewMetadata().withName(applicationName).withLabels(serviceLabels).endMetadata()
.addToSubsets(new EndpointSubsetBuilder().withAddresses(endpointAddresses)
.addToPorts(new EndpointPortBuilder().withPort(8080).withProtocol("TCP").build())
.addToPorts(ports)
.build())
.build();

Expand Down

0 comments on commit 83ac9c9

Please sign in to comment.