Skip to content

Commit

Permalink
Avoid unnecessary rolling updates when replacing custom CA (#10377)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj authored Jul 25, 2024
1 parent 8ef366c commit 5d56e50
Show file tree
Hide file tree
Showing 12 changed files with 33 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public static String entityUserOperatorRoleBinding(String clusterName) {
*
* @return Name of the Cluster Operator certificate secret
*/
public static String secretName(String cluster) {
public static String clusterOperatorCertsSecretName(String cluster) {
return cluster + "-cluster-operator-certs";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ Future<Void> reconcileCas(Clock clock) {
} else if (secretName.equals(KafkaResources.kafkaSecretName(reconciliation.name()))) {
clusterCaSecrets.add(secret);
clientsCaSecrets.add(secret);
} else if (secretName.equals(KafkaResources.clusterOperatorCertsSecretName(reconciliation.name()))) {
// The CO certificate is excluded as it is renewed in a separate cycle
} else {
clusterCaSecrets.add(secret);
}
Expand Down Expand Up @@ -345,7 +347,7 @@ private void checkCustomCaSecret(CertificateAuthority ca, Secret certSecret, Sec
That time is used for checking maintenance windows
*/
Future<Void> reconcileClusterOperatorSecret(Clock clock) {
return secretOperator.getAsync(reconciliation.namespace(), KafkaResources.secretName(reconciliation.name()))
return secretOperator.getAsync(reconciliation.namespace(), KafkaResources.clusterOperatorCertsSecretName(reconciliation.name()))
.compose(oldSecret -> {
coSecret = oldSecret;
if (oldSecret != null && this.isClusterCaNeedFullTrust) {
Expand All @@ -358,15 +360,15 @@ Future<Void> reconcileClusterOperatorSecret(Clock clock) {
clusterCa,
coSecret,
reconciliation.namespace(),
KafkaResources.secretName(reconciliation.name()),
KafkaResources.clusterOperatorCertsSecretName(reconciliation.name()),
"cluster-operator",
"cluster-operator",
clusterOperatorSecretLabels,
ownerRef,
Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant())
);

return secretOperator.reconcile(reconciliation, reconciliation.namespace(), KafkaResources.secretName(reconciliation.name()), coSecret)
return secretOperator.reconcile(reconciliation, reconciliation.namespace(), KafkaResources.clusterOperatorCertsSecretName(reconciliation.name()), coSecret)
.map((Void) null);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private static Future<PemTrustSet> clusterCaPemTrustSet(Reconciliation reconcili
* @return Future containing the auth identity to use for client authentication.
*/
private static Future<PemAuthIdentity> coPemAuthIdentity(Reconciliation reconciliation, SecretOperator secretOperator) {
return getSecret(secretOperator, reconciliation.namespace(), KafkaResources.secretName(reconciliation.name()))
return getSecret(secretOperator, reconciliation.namespace(), KafkaResources.clusterOperatorCertsSecretName(reconciliation.name()))
.map(PemAuthIdentity::clusterOperator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ private Future<ArgumentCaptor<Secret>> reconcileCa(Vertx vertx, Kafka kafka, Clo
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(AbstractModel.clusterCaKeySecretName(NAME)), c.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.noop(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaCertificateSecretName(NAME)), c.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.noop(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaKeySecretName(NAME)), c.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.noop(i.getArgument(0))));
when(secretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.secretName(NAME)))).thenAnswer(i -> Future.succeededFuture());
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.secretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(NAME)))).thenAnswer(i -> Future.succeededFuture());
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));

when(deploymentOps.getAsync(eq(NAMESPACE), any())).thenReturn(Future.succeededFuture());

Expand Down Expand Up @@ -1273,7 +1273,7 @@ public void testCustomLabelsAndAnnotations(Vertx vertx, VertxTestContext context
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(AbstractModel.clusterCaKeySecretName(NAME)), clusterCaKey.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaCertificateSecretName(NAME)), clientsCaCert.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaKeySecretName(NAME)), clientsCaKey.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.secretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of()));

when(podOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of()));
Expand Down Expand Up @@ -1359,7 +1359,7 @@ public void testClusterCASecretsWithoutOwnerReference(Vertx vertx, VertxTestCont
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(AbstractModel.clusterCaKeySecretName(NAME)), clusterCaKey.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaCertificateSecretName(NAME)), clientsCaCert.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaKeySecretName(NAME)), clientsCaKey.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.secretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of()));

when(podOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of()));
Expand Down Expand Up @@ -1439,7 +1439,7 @@ public void testClientsCASecretsWithoutOwnerReference(Vertx vertx, VertxTestCont
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(AbstractModel.clusterCaKeySecretName(NAME)), clusterCaKey.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaCertificateSecretName(NAME)), clientsCaCert.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaKeySecretName(NAME)), clientsCaKey.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.secretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of()));

when(podOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of()));
Expand Down Expand Up @@ -1515,7 +1515,7 @@ public void testClusterCAKeyNotTrusted(Vertx vertx, VertxTestContext context) {
});
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaCertificateSecretName(NAME)), clientsCaCert.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaKeySecretName(NAME)), clientsCaKey.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.secretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of()));

Map<String, String> generationAnnotations =
Expand Down Expand Up @@ -1599,7 +1599,7 @@ public void testRollingReasonsWithClusterCAKeyNotTrusted(Vertx vertx, VertxTestC
});
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaCertificateSecretName(NAME)), clientsCaCert.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaKeySecretName(NAME)), clientsCaKey.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.secretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0))));
when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of()));

Map<String, String> generationAnnotations =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void testReconcileReplacesAllDeletedSecrets(VertxTestContext context) {
KafkaResources.clusterCaCertificateSecretName(CLUSTER_NAME),
KafkaResources.kafkaSecretName(CLUSTER_NAME),
KafkaResources.zookeeperSecretName(CLUSTER_NAME),
KafkaResources.secretName(CLUSTER_NAME));
KafkaResources.clusterOperatorCertsSecretName(CLUSTER_NAME));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ private void createCluster(VertxTestContext context, Kafka kafka, List<Secret> s
KafkaResources.clusterCaKeySecretName(kafkaName),
KafkaResources.kafkaSecretName(kafkaName),
KafkaResources.zookeeperSecretName(kafkaName),
KafkaResources.secretName(kafkaName));
KafkaResources.clusterOperatorCertsSecretName(kafkaName));

if (metrics) {
expectedSecrets.add(KafkaExporterResources.secretName(kafkaName));
Expand Down Expand Up @@ -628,7 +628,7 @@ private void createCluster(VertxTestContext context, Kafka kafka, List<Secret> s
when(mockSecretOps.getAsync(kafkaNamespace, KafkaResources.clusterCaCertificateSecretName(kafkaName))).thenAnswer(i ->
Future.succeededFuture(secretsMap.get(i.<String>getArgument(1)))
);
when(mockSecretOps.getAsync(kafkaNamespace, KafkaResources.secretName(kafkaName))).thenAnswer(i ->
when(mockSecretOps.getAsync(kafkaNamespace, KafkaResources.clusterOperatorCertsSecretName(kafkaName))).thenAnswer(i ->
Future.succeededFuture(secretsMap.get(i.<String>getArgument(1)))
);

Expand Down Expand Up @@ -1125,9 +1125,9 @@ private void updateCluster(VertxTestContext context, Kafka originalAssembly, Kaf
.addToData("ca-cert.crt", "cert")
.build())
);
when(mockSecretOps.getAsync(clusterNamespace, KafkaResources.secretName(clusterName))).thenReturn(
when(mockSecretOps.getAsync(clusterNamespace, KafkaResources.clusterOperatorCertsSecretName(clusterName))).thenReturn(
Future.succeededFuture(new SecretBuilder()
.withNewMetadata().withName(KafkaResources.secretName(clusterName)).endMetadata()
.withNewMetadata().withName(KafkaResources.clusterOperatorCertsSecretName(clusterName)).endMetadata()
.addToData("cluster-operator.key", "key")
.addToData("cluster-operator.crt", "cert")
.addToData("cluster-operator.p12", "p12")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void testReconcileReplacesAllDeletedSecrets(VertxTestContext context) {
KafkaResources.clusterCaCertificateSecretName(CLUSTER_NAME),
KafkaResources.kafkaSecretName(CLUSTER_NAME),
KafkaResources.zookeeperSecretName(CLUSTER_NAME),
KafkaResources.secretName(CLUSTER_NAME));
KafkaResources.clusterOperatorCertsSecretName(CLUSTER_NAME));

Checkpoint async = context.checkpoint();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testMigrationFromZooKeeperToKRaftPostMigrationState(VertxTestContext
SecretOperator mockSecretOps = supplier.secretOperations;
Secret secret = new Secret();
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterCaCertificateSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.secretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));

KafkaMetadataStateManager kafkaMetadataStateManager = new KafkaMetadataStateManager(RECONCILIATION, kafka);

Expand Down Expand Up @@ -219,7 +219,7 @@ public void testMigrationFromKRaftPostMigrationToKRaft(VertxTestContext context)
SecretOperator mockSecretOps = supplier.secretOperations;
Secret secret = new Secret();
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterCaCertificateSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.secretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));

KafkaMetadataStateManager kafkaMetadataStateManager = new KafkaMetadataStateManager(RECONCILIATION, kafka);

Expand Down Expand Up @@ -261,7 +261,7 @@ public void testRollbackFromKRaftPostMigrationToKRaftDualWriting(VertxTestContex
SecretOperator mockSecretOps = supplier.secretOperations;
Secret secret = new Secret();
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterCaCertificateSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.secretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));

KafkaMetadataStateManager kafkaMetadataStateManager = new KafkaMetadataStateManager(RECONCILIATION, kafka);

Expand Down Expand Up @@ -300,7 +300,7 @@ public void testRollbackFromKRaftDualWritingToZooKeeper(VertxTestContext context
SecretOperator mockSecretOps = supplier.secretOperations;
Secret secret = new Secret();
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterCaCertificateSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.secretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));

KafkaMetadataStateManager kafkaMetadataStateManager = new KafkaMetadataStateManager(RECONCILIATION, kafka);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testZookeeperReconcilerWithKRaftMigrationRollback(VertxTestContext c
SecretOperator mockSecretOps = supplier.secretOperations;
Secret secret = new Secret();
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterCaCertificateSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.secretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
KafkaVersionChange versionChange = new KafkaVersionChange(VERSIONS.defaultVersion(), VERSIONS.defaultVersion(), VERSIONS.defaultVersion().protocolVersion(), VERSIONS.defaultVersion().messageVersion(), VERSIONS.defaultVersion().metadataVersion());

KafkaMetadataStateManager stateManager = new KafkaMetadataStateManager(RECONCILIATION, patchedKafka);
Expand Down Expand Up @@ -167,7 +167,7 @@ public void testZookeeperReconcilerWithNoKRaftMigrationRollback(VertxTestContext
SecretOperator mockSecretOps = supplier.secretOperations;
Secret secret = new Secret();
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterCaCertificateSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.secretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
when(mockSecretOps.getAsync(eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(CLUSTER_NAME)))).thenReturn(Future.succeededFuture(secret));
KafkaVersionChange versionChange = new KafkaVersionChange(VERSIONS.defaultVersion(), VERSIONS.defaultVersion(), VERSIONS.defaultVersion().protocolVersion(), VERSIONS.defaultVersion().messageVersion(), VERSIONS.defaultVersion().metadataVersion());

KafkaMetadataStateManager stateManager = new KafkaMetadataStateManager(RECONCILIATION, patchedKafka);
Expand Down
Loading

0 comments on commit 5d56e50

Please sign in to comment.