Skip to content

Commit

Permalink
Added canary service account configuration to be authenticated via OA…
Browse files Browse the repository at this point in the history
…UTH over PLAIN (#482)

* Configured internal TLS listener with OAUTH over PLAIN authentication
Added the serviceAccounts to the ManagedKafka spec and status
Added canary configuration with service account

Signed-off-by: Paolo Patierno <[email protected]>

* Fixed unit tests

Signed-off-by: Paolo Patierno <[email protected]>

* Removed useless annotation on SA manager

Signed-off-by: Paolo Patierno <[email protected]>

* Removed service accounts manager

Signed-off-by: Paolo Patierno <[email protected]>
  • Loading branch information
ppatierno authored Aug 20, 2021
1 parent c947e2b commit b5e2708
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import io.sundr.builder.annotations.BuildableReference;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

/**
Expand Down Expand Up @@ -74,6 +76,24 @@ public void setPlacementId(String placementId) {
getOrCreateAnnotations().put(PLACEMENT_ID, placementId);
}

/**
* Get a specific service account information from the ManagedKafka instance
*
* @param name name/type of service account to look for
* @return service account related information
*/
public Optional<ServiceAccount> getServiceAccount(ServiceAccount.ServiceAccountName name) {
List<ServiceAccount> serviceAccounts = this.spec.getServiceAccounts();
if (serviceAccounts != null && !serviceAccounts.isEmpty()) {
Optional<ServiceAccount> serviceAccount =
serviceAccounts.stream()
.filter(sa -> name.toValue().equals(sa.getName()))
.findFirst();
return serviceAccount;
}
return Optional.empty();
}

/**
* Effectively a template for creating default {@link ManagedKafka} instances.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class ManagedKafkaSpec {
private Versions versions;
private boolean deleted;
private List<String> owners = new ArrayList<>();
private List<ServiceAccount> serviceAccounts = new ArrayList<>();

/**
* Never null
Expand Down Expand Up @@ -85,4 +86,12 @@ public List<String> getOwners() {
public void setOwners(List<String> owners) {
this.owners = owners;
}

public List<ServiceAccount> getServiceAccounts() {
return serviceAccounts;
}

public void setServiceAccounts(List<ServiceAccount> serviceAccounts) {
this.serviceAccounts = serviceAccounts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class ManagedKafkaStatus {
private Versions versions;
private String adminServerURI;
private String updatedTimestamp;
private List<ServiceAccount> serviceAccounts;

public List<ManagedKafkaCondition> getConditions() {
return conditions;
Expand Down Expand Up @@ -62,4 +63,12 @@ public String getUpdatedTimestamp() {
public void setUpdatedTimestamp(String updatedTimestamp) {
this.updatedTimestamp = updatedTimestamp;
}

public List<ServiceAccount> getServiceAccounts() {
return serviceAccounts;
}

public void setServiceAccounts(List<ServiceAccount> serviceAccounts) {
this.serviceAccounts = serviceAccounts;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.bf2.operator.resources.v1alpha1;

import com.fasterxml.jackson.annotation.JsonInclude;
import io.sundr.builder.annotations.Buildable;
import lombok.ToString;

import javax.validation.constraints.NotNull;

/**
* Define a service account to be used by a specific Kafka instance component (i.e. canary)
* to authenticate to Kafka brokers through the authentication service (i.e. Keycloak)
*/
@Buildable(
builderPackage = "io.fabric8.kubernetes.api.builder",
editableEnabled = false
)
@ToString
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ServiceAccount {

public enum ServiceAccountName {
Canary;

public static ServiceAccountName forValue(String value) {
switch (value) {
case "canary":
return Canary;
default:
return null;
}
}

public String toValue() {
switch (this) {
case Canary:
return "canary";
default:
return null;
}
}
}

// using String and not the enum because fabric8 CRD generator doesn't allow to serialize as "canary" using a @JsonProperty for example
// opened GitHub issue for this: https://github.com/fabric8io/kubernetes-client/issues/3411
@NotNull
private String name;
@NotNull
private String principal;
@NotNull
private String password;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getPrincipal() {
return principal;
}

public void setPrincipal(String principal) {
this.principal = principal;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ private void updateManagedKafkaStatus(ManagedKafka managedKafka) {
// just keep the current version
}
status.setAdminServerURI(kafkaInstance.getAdminServer().uri(managedKafka));
status.setServiceAccounts(managedKafka.getSpec().getServiceAccounts());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ protected ArrayOrObjectKafkaListeners buildListeners(ManagedKafka managedKafka)
.withPort(9093)
.withType(KafkaListenerType.INTERNAL)
.withTls(true)
.withAuth(plainOverOauthAuthenticationListener)
.build(),
new GenericKafkaListenerBuilder()
.withName("external")
Expand Down
11 changes: 10 additions & 1 deletion operator/src/main/java/org/bf2/operator/operands/Canary.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.quarkus.arc.DefaultBean;
import org.bf2.common.OperandUtils;
import org.bf2.operator.resources.v1alpha1.ManagedKafka;
import org.bf2.operator.resources.v1alpha1.ServiceAccount;
import org.bf2.operator.secrets.ImagePullSecretManager;
import org.bf2.operator.secrets.SecuritySecretManager;
import org.eclipse.microprofile.config.inject.ConfigProperty;
Expand All @@ -35,6 +36,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Provides same functionalities to get a Canary deployment from a ManagedKafka one
Expand Down Expand Up @@ -160,7 +162,7 @@ private Map<String, String> buildLabels(String canaryName) {
}

private List<EnvVar> buildEnvVar(ManagedKafka managedKafka) {
List<EnvVar> envVars = new ArrayList<>(3);
List<EnvVar> envVars = new ArrayList<>(9);
envVars.add(new EnvVarBuilder().withName("KAFKA_BOOTSTRAP_SERVERS").withValue(managedKafka.getMetadata().getName() + "-kafka-bootstrap:9093").build());
envVars.add(new EnvVarBuilder().withName("RECONCILE_INTERVAL_MS").withValue("5000").build());
envVars.add(new EnvVarBuilder().withName("EXPECTED_CLUSTER_SIZE").withValue(String.valueOf(this.config.getKafka().getReplicas())).build());
Expand Down Expand Up @@ -189,6 +191,13 @@ private List<EnvVar> buildEnvVar(ManagedKafka managedKafka) {

envVars.add(new EnvVarBuilder().withName("SARAMA_LOG_ENABLED").withValueFrom(saramaLogEnabled).build());
envVars.add(new EnvVarBuilder().withName("VERBOSITY_LOG_LEVEL").withValueFrom(verbosityLogLevel).build());

Optional<ServiceAccount> canaryServiceAccount = managedKafka.getServiceAccount(ServiceAccount.ServiceAccountName.Canary);
if (canaryServiceAccount.isPresent()) {
envVars.add(new EnvVarBuilder().withName("SASL_MECHANISM").withValue("PLAIN").build());
envVars.add(new EnvVarBuilder().withName("SASL_USER").withValue(canaryServiceAccount.get().getPrincipal()).build());
envVars.add(new EnvVarBuilder().withName("SASL_PASSWORD").withValue(canaryServiceAccount.get().getPassword()).build());
}
return envVars;
}

Expand Down
18 changes: 18 additions & 0 deletions operator/src/test/resources/expected/custom-config-strimzi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ spec:
port: 9093
type: "internal"
tls: true
authentication: !<oauth>
clientId: "clientId"
clientSecret:
secretName: "test-mk-sso-secret"
key: "ssoClientSecret"
validIssuerUri: "https://validIssuerEndpointURI"
checkIssuer: true
jwksEndpointUri: "https://jwksEndpointURI"
userNameClaim: "userNameClaim"
checkAccessTokenType: true
accessTokenIsJwt: true
tlsTrustedCertificates:
- secretName: "test-mk-sso-cert"
certificate: "keycloak.crt"
enablePlain: true
tokenEndpointUri: "https://tokenEndpointURI"
enableOauthBearer: true
type: "oauth"
- name: "external"
port: 9094
type: "ingress"
Expand Down
18 changes: 18 additions & 0 deletions operator/src/test/resources/expected/strimzi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ spec:
port: 9093
type: "internal"
tls: true
authentication: !<oauth>
clientId: "clientId"
clientSecret:
secretName: "test-mk-sso-secret"
key: "ssoClientSecret"
validIssuerUri: "https://validIssuerEndpointURI"
checkIssuer: true
jwksEndpointUri: "https://jwksEndpointURI"
userNameClaim: "userNameClaim"
checkAccessTokenType: true
accessTokenIsJwt: true
tlsTrustedCertificates:
- secretName: "test-mk-sso-cert"
certificate: "keycloak.crt"
enablePlain: true
tokenEndpointUri: "https://tokenEndpointURI"
enableOauthBearer: true
type: "oauth"
- name: "external"
port: 9094
type: "ingress"
Expand Down

0 comments on commit b5e2708

Please sign in to comment.