Skip to content

Commit

Permalink
Wait for Kingdom restart in EmptyClusterPanelMatchCorrectnessTest.
Browse files Browse the repository at this point in the history
This fixes a race condition where the port forwarding may be pointing to the Kingdom pod used for resource setup.
  • Loading branch information
SanjayVas committed Mar 7, 2024
1 parent e62feab commit 33ff31e
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 77 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ jobs:
- name: Delete Kubernetes Resources
continue-on-error: true
run: |
kubectl delete all --namespace=default --all
kubectl delete networkpolicies --all-namespaces --all
kubectl delete all --namespace=default --all --cascade=foreground --wait
kubectl delete networkpolicies --namespace=default --all
- name: Run panelmatch correctness test
id: run-panelmatch-correctness-test
Expand Down
17 changes: 16 additions & 1 deletion src/main/k8s/local/testing/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package(
default_testonly = True,
default_visibility = [
"//src/test/kotlin/org/wfanet/measurement/integration/k8s:__pkg__",
"//src/test/kotlin/org/wfanet/panelmatch/integration/k8s:__pkg__",
],
)

Expand Down Expand Up @@ -75,7 +76,6 @@ kustomization_dir(

kustomization_dir(
name = "cmms",
testonly = True,
srcs = [
":duchies",
":edp_simulators",
Expand All @@ -91,3 +91,18 @@ kustomization_dir(
"//src/main/k8s/testing/secretfiles:kustomization",
],
)

# Minimal CMMS for panel match.
kustomization_dir(
name = "cmms_for_panelmatch",
srcs = [
"//src/main/k8s/local:kingdom",
"//src/main/k8s/local:panelmatch_emulators",
],
generate_kustomization = True,
tags = ["manual"],
deps = [
":config_files",
"//src/main/k8s/testing/secretfiles:kustomization",
],
)
1 change: 0 additions & 1 deletion src/main/k8s/panelmatch/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ kustomization_dir(
name = "edp_daemon",
srcs = [
":forwarded_storage_edp_daemon",
"//src/main/k8s/local:kingdom",
],
generate_kustomization = True,
tags = ["manual"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.wfanet.panelmatch.integration.k8s
import com.google.privatemembership.batch.Shared
import com.google.protobuf.Message
import com.google.protobuf.TypeRegistry
import io.kubernetes.client.openapi.models.V1Deployment
import io.kubernetes.client.openapi.models.V1Pod
import io.kubernetes.client.util.ClientBuilder
import java.net.InetSocketAddress
import java.nio.file.Path
Expand Down Expand Up @@ -147,27 +145,11 @@ abstract class AbstractPanelMatchCorrectnessTest(private val localSystem: PanelM
)
}

@JvmStatic
protected suspend fun KubernetesClient.waitUntilDeploymentReady(name: String): V1Deployment {
logger.info { "Waiting for Deployment $name to be ready..." }
return waitUntilDeploymentReady(name, timeout = READY_TIMEOUT).also {
logger.info { "Deployment $name ready" }
}
}

fun getRuntimePath(workspaceRelativePath: Path): Path {
return checkNotNull(
org.wfanet.measurement.common.getRuntimePath(WORKSPACE_PATH.resolve(workspaceRelativePath))
)
}

@JvmStatic
protected suspend fun getPod(deploymentName: String): V1Pod {
return k8sClient
.listPodsByMatchLabels(k8sClient.waitUntilDeploymentReady(deploymentName))
.items
.first()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ java_test(
"//src/main/docker:push_all_local_images",
"//src/main/docker/panel_exchange_client:push_all_images",
"//src/main/k8s/local:kingdom_for_panelmatch_setup.tar",
"//src/main/k8s/local/testing:cmms_for_panelmatch.tar",
"//src/main/k8s/panelmatch/local:edp_daemon.tar",
"//src/main/k8s/panelmatch/local:mp_daemon.tar",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.grpc.ManagedChannel
import io.kubernetes.client.common.KubernetesObject
import io.kubernetes.client.openapi.Configuration
import io.kubernetes.client.openapi.models.V1Deployment
import io.kubernetes.client.openapi.models.V1Pod
import io.kubernetes.client.util.ClientBuilder
import java.io.File
import java.net.InetSocketAddress
Expand Down Expand Up @@ -158,28 +159,31 @@ class EmptyClusterPanelMatchCorrectnessTest : AbstractPanelMatchCorrectnessTest(
// https://github.com/kubernetes/kubernetes/issues/66689.
k8sClient.waitForServiceAccount("default", timeout = READY_TIMEOUT)

loadKingdomForPanelMatch()
loadKingdomForSetup()
val dataProviderEntityContent = withContext(Dispatchers.IO) { createEntityContent("edp1") }
val modelProviderEntityContent = withContext(Dispatchers.IO) { createEntityContent("mp1") }
return runResourceSetup(dataProviderEntityContent, modelProviderEntityContent)
val resourceSetupOutput =
runResourceSetup(dataProviderEntityContent, modelProviderEntityContent)
loadCmms(resourceSetupOutput.akidPrincipalMap)

return resourceSetupOutput.entitiesData
}

/** Runs resource setup, leaving port forwarding active for forwarded storage. */
private suspend fun runResourceSetup(
dataProviderContent: EntityContent,
modelProviderContent: EntityContent,
): EntitiesData {
): ResourceSetupOutput {
val outputDir = withContext(Dispatchers.IO) { tempDir.newFolder("resource-setup") }

val kingdomInternalPod = getPod(KINGDOM_INTERNAL_DEPLOYMENT_NAME)
val mpPrivateStoragePod = getPod(MP_PRIVATE_STORAGE_DEPLOYMENT_NAME)
val dpPrivateStoragePod = getPod(DP_PRIVATE_STORAGE_DEPLOYMENT_NAME)
val sharedStoragePod = getPod(SHARED_STORAGE_DEPLOYMENT_NAME)
var entitiesData: EntitiesData

PortForwarder(kingdomInternalPod, SERVER_PORT).use { internalForward ->
return PortForwarder(kingdomInternalPod, SERVER_PORT).use { internalForward ->
val internalAddress: InetSocketAddress =
withContext(Dispatchers.IO) { internalForward.start() }
.also { portForwarders.add(internalForward) }
val internalChannel =
buildMutualTlsChannel(internalAddress.toTarget(), KINGDOM_SIGNING_CERTS).also {
channels.add(it)
Expand All @@ -193,14 +197,14 @@ class EmptyClusterPanelMatchCorrectnessTest : AbstractPanelMatchCorrectnessTest(
outputDir,
)
val panelMatchResourceKey =
withContext(Dispatchers.IO) {
panelMatchResourceSetup.process(
panelMatchResourceSetup
.process(
dataProviderContent = dataProviderContent,
modelProviderContent = modelProviderContent,
exchangeDate = EXCHANGE_DATE.toProtoDate(),
exchangeSchedule = SCHEDULE,
)
}
.also { internalChannel.shutdown() }

dataProviderKey = panelMatchResourceKey.dataProviderKey
modelProviderKey = panelMatchResourceKey.modelProviderKey
Expand Down Expand Up @@ -287,6 +291,7 @@ class EmptyClusterPanelMatchCorrectnessTest : AbstractPanelMatchCorrectnessTest(
}

val akidPrincipalMap = outputDir.resolve(PanelMatchResourceSetup.AKID_PRINCIPAL_MAP_FILE)

loadDpDaemonForPanelMatch(
k8sClient,
panelMatchResourceKey.dataProviderKey,
Expand All @@ -298,16 +303,17 @@ class EmptyClusterPanelMatchCorrectnessTest : AbstractPanelMatchCorrectnessTest(
akidPrincipalMap,
)

entitiesData =
ResourceSetupOutput(
EntitiesData(
apiIdToExternalId(panelMatchResourceKey.dataProviderKey.dataProviderId),
apiIdToExternalId(panelMatchResourceKey.modelProviderKey.modelProviderId),
)
),
akidPrincipalMap,
)
}
return entitiesData
}

private suspend fun loadKingdomForPanelMatch() {
private suspend fun loadKingdomForSetup() {
val appliedObjects: List<KubernetesObject> =
withContext(Dispatchers.IO) {
val outputDir = tempDir.newFolder("kingdom-for-panelmatch-setup")
Expand All @@ -327,67 +333,100 @@ class EmptyClusterPanelMatchCorrectnessTest : AbstractPanelMatchCorrectnessTest(
kubectlApply(config, k8sClient)
}

appliedObjects.filterIsInstance<V1Deployment>().forEach {
k8sClient.waitUntilDeploymentReady(checkNotNull(it.metadata?.name))
}
waitUntilDeploymentsReady(appliedObjects)
}

private suspend fun loadCmms(akidPrincipalMap: File) {
val appliedObjects: List<KubernetesObject> =
withContext(Dispatchers.IO) {
val outputDir = tempDir.newFolder("cmms")
extractTar(
getRuntimePath(LOCAL_K8S_TESTING_PATH.resolve("cmms_for_panelmatch.tar")).toFile(),
outputDir,
)

val configFilesDir = outputDir.toPath().resolve(CONFIG_FILES_PATH).toFile()
logger.info("Copying $akidPrincipalMap to $CONFIG_FILES_PATH")
akidPrincipalMap.copyTo(configFilesDir.resolve(akidPrincipalMap.name))

val configFile: File = outputDir.resolve("config.yaml")
kustomize(
outputDir
.toPath()
.resolve(LOCAL_K8S_TESTING_PATH)
.resolve("cmms_for_panelmatch")
.toFile(),
configFile,
)

kubectlApply(configFile, k8sClient)
}

waitUntilDeploymentsReady(appliedObjects)
}

private suspend fun loadDpDaemonForPanelMatch(
k8sClient: KubernetesClient,
dataProviderKey: DataProviderKey,
akidPrincipalMap: File,
) {
withContext(Dispatchers.IO) {
val outputDir = tempDir.newFolder("edp_daemon")
extractTar(
getRuntimePath(LOCAL_K8S_PANELMATCH_PATH.resolve("edp_daemon.tar")).toFile(),
outputDir,
)
val appliedObjects: List<KubernetesObject> =
withContext(Dispatchers.IO) {
val outputDir = tempDir.newFolder("edp_daemon")
extractTar(
getRuntimePath(LOCAL_K8S_PANELMATCH_PATH.resolve("edp_daemon.tar")).toFile(),
outputDir,
)

val configFilesDir = outputDir.toPath().resolve(CONFIG_FILES_PATH).toFile()
akidPrincipalMap.copyTo(configFilesDir.resolve(akidPrincipalMap.name))
val configTemplate: File = outputDir.resolve("config.yaml")
kustomize(
outputDir.toPath().resolve(LOCAL_K8S_PANELMATCH_PATH).resolve("edp_daemon").toFile(),
configTemplate,
)
val configFilesDir = outputDir.toPath().resolve(PANELMATCH_CONFIG_FILES_PATH).toFile()
akidPrincipalMap.copyTo(configFilesDir.resolve(akidPrincipalMap.name))
val configTemplate: File = outputDir.resolve("config.yaml")
kustomize(
outputDir.toPath().resolve(LOCAL_K8S_PANELMATCH_PATH).resolve("edp_daemon").toFile(),
configTemplate,
)

val configContent =
configTemplate
.readText(StandardCharsets.UTF_8)
.replace("{party_name}", dataProviderKey.dataProviderId)
val configContent =
configTemplate
.readText(StandardCharsets.UTF_8)
.replace("{party_name}", dataProviderKey.dataProviderId)

kubectlApply(configContent, k8sClient)
}
kubectlApply(configContent, k8sClient)
}

waitUntilDeploymentsReady(appliedObjects)
}

private suspend fun loadMpDaemonForPanelMatch(
k8sClient: KubernetesClient,
modelProviderKey: ModelProviderKey,
akidPrincipalMap: File,
) {
withContext(Dispatchers.IO) {
val outputDir = tempDir.newFolder("mp_daemon")
extractTar(
getRuntimePath(LOCAL_K8S_PANELMATCH_PATH.resolve("mp_daemon.tar")).toFile(),
outputDir,
)
val appliedObjects: List<KubernetesObject> =
withContext(Dispatchers.IO) {
val outputDir = tempDir.newFolder("mp_daemon")
extractTar(
getRuntimePath(LOCAL_K8S_PANELMATCH_PATH.resolve("mp_daemon.tar")).toFile(),
outputDir,
)

val configFilesDir = outputDir.toPath().resolve(CONFIG_FILES_PATH).toFile()
akidPrincipalMap.copyTo(configFilesDir.resolve(akidPrincipalMap.name))
val configFilesDir = outputDir.toPath().resolve(PANELMATCH_CONFIG_FILES_PATH).toFile()
akidPrincipalMap.copyTo(configFilesDir.resolve(akidPrincipalMap.name))

val configTemplate: File = outputDir.resolve("config.yaml")
kustomize(
outputDir.toPath().resolve(LOCAL_K8S_PANELMATCH_PATH).resolve("mp_daemon").toFile(),
configTemplate,
)
val configContent =
configTemplate
.readText(StandardCharsets.UTF_8)
.replace("{party_name}", modelProviderKey.modelProviderId)
val configTemplate: File = outputDir.resolve("config.yaml")
kustomize(
outputDir.toPath().resolve(LOCAL_K8S_PANELMATCH_PATH).resolve("mp_daemon").toFile(),
configTemplate,
)
val configContent =
configTemplate
.readText(StandardCharsets.UTF_8)
.replace("{party_name}", modelProviderKey.modelProviderId)

kubectlApply(configContent, k8sClient)
}
kubectlApply(configContent, k8sClient)
}

waitUntilDeploymentsReady(appliedObjects)
}

private suspend fun createTestHarness(entitiesData: EntitiesData): PanelMatchSimulator {
Expand Down Expand Up @@ -436,6 +475,11 @@ class EmptyClusterPanelMatchCorrectnessTest : AbstractPanelMatchCorrectnessTest(
portForwarder.stop()
}
}

private data class ResourceSetupOutput(
val entitiesData: EntitiesData,
val akidPrincipalMap: File,
)
}

companion object {
Expand All @@ -449,8 +493,10 @@ class EmptyClusterPanelMatchCorrectnessTest : AbstractPanelMatchCorrectnessTest(
var modelProviderKey: ModelProviderKey? = null

private val LOCAL_K8S_PATH = Paths.get("src", "main", "k8s", "local")
private val LOCAL_K8S_TESTING_PATH = LOCAL_K8S_PATH.resolve("testing")
private val CONFIG_FILES_PATH = LOCAL_K8S_TESTING_PATH.resolve("config_files")
private val LOCAL_K8S_PANELMATCH_PATH = Paths.get("src", "main", "k8s", "panelmatch", "local")
private val CONFIG_FILES_PATH = LOCAL_K8S_PANELMATCH_PATH.resolve("config_files")
private val PANELMATCH_CONFIG_FILES_PATH = LOCAL_K8S_PANELMATCH_PATH.resolve("config_files")

private const val KINGDOM_INTERNAL_DEPLOYMENT_NAME = "gcp-kingdom-data-server-deployment"
private const val KINGDOM_PUBLIC_DEPLOYMENT_NAME = "v2alpha-public-api-server-deployment"
Expand Down Expand Up @@ -513,5 +559,27 @@ class EmptyClusterPanelMatchCorrectnessTest : AbstractPanelMatchCorrectnessTest(
private fun extractTar(archive: File, outputDirectory: File) {
Processes.runCommand("tar", "-xf", archive.toString(), "-C", outputDirectory.toString())
}

private suspend fun KubernetesClient.waitUntilDeploymentReady(name: String): V1Deployment {
logger.info { "Waiting for Deployment $name to be ready..." }
return waitUntilDeploymentReady(name, timeout = READY_TIMEOUT).also {
logger.info { "Deployment $name ready" }
}
}

private suspend fun getPod(deploymentName: String): V1Pod {
return k8sClient
.listPodsByMatchLabels(
k8sClient.waitUntilDeploymentReady(deploymentName, timeout = READY_TIMEOUT)
)
.items
.first()
}

private suspend fun waitUntilDeploymentsReady(appliedObjects: Iterable<KubernetesObject>) {
appliedObjects.filterIsInstance<V1Deployment>().forEach {
k8sClient.waitUntilDeploymentReady(checkNotNull(it.metadata?.name))
}
}
}
}

0 comments on commit 33ff31e

Please sign in to comment.