Skip to content

Commit

Permalink
Add classes and configs to be run for measurement retention policy (#955
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jcorilla authored Apr 28, 2023
1 parent 10c44b1 commit a99bf24
Show file tree
Hide file tree
Showing 11 changed files with 715 additions and 5 deletions.
10 changes: 10 additions & 0 deletions src/main/docker/images.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ COMMON_IMAGES = [
image = "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/server:gcp_kingdom_data_server_image",
repository = _PREFIX + "/kingdom/data-server",
),
struct(
name = "kingdom_completed_measurements_deletion_image",
image = "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job:completed_measurements_deletion_image",
repository = _PREFIX + "/kingdom/completed-measurements-deletion",
),
struct(
name = "kingdom_pending_measurements_cancellation_image",
image = "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job:pending_measurements_cancellation_image",
repository = _PREFIX + "/kingdom/pending-measurements-cancellation",
),
struct(
name = "kingdom_system_api_server_image",
image = "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server:system_api_server_image",
Expand Down
6 changes: 5 additions & 1 deletion src/main/k8s/base.cue
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,11 @@ objects: [ for objectSet in objectSets for object in objectSet {object}]
spec: {
backoffLimit: uint | *0
template: {
metadata: labels: app: _name + "-app"
metadata: {
labels: {
app: _name + "-app"
}
}
spec: #PodSpec & {
if _secretName != _|_ {
_mounts: "\(_name)-files": {
Expand Down
1 change: 1 addition & 0 deletions src/main/k8s/dev/kingdom_gke.cue
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ objectSets: [
kingdom.deployments,
kingdom.services,
kingdom.networkPolicies,
kingdom.cronjobs,
]

kingdom: #Kingdom & {
Expand Down
79 changes: 75 additions & 4 deletions src/main/k8s/kingdom.cue
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package k8s

import ("strings")

#Kingdom: {
_verboseGrpcServerLogging: bool | *false
_verboseGrpcClientLogging: bool | *false
Expand All @@ -22,12 +24,19 @@ package k8s

_kingdom_secret_name: string

_completedMeasurementsTimeToLive: string | *"180d"
_completedMeasurementsDryRun: bool | *false
_pendingMeasurementsTimeToLive: string | *"15d"
_pendingMeasurementsDryRun: bool | *false

_imageSuffixes: [string]: string
_imageSuffixes: {
"gcp-kingdom-data-server": string | *"kingdom/data-server"
"system-api-server": string | *"kingdom/system-api"
"v2alpha-public-api-server": string | *"kingdom/v2alpha-public-api"
"update-kingdom-schema": string | *"kingdom/spanner-update-schema"
"gcp-kingdom-data-server": string | *"kingdom/data-server"
"system-api-server": string | *"kingdom/system-api"
"v2alpha-public-api-server": string | *"kingdom/v2alpha-public-api"
"update-kingdom-schema": string | *"kingdom/spanner-update-schema"
"completed-measurements-deletion": string | *"kingdom/completed-measurements-deletion"
"pending-measurements-cancellation": string | *"kingdom/pending-measurements-cancellation"
}
_imageConfigs: [string]: #ImageConfig
_imageConfigs: {
Expand Down Expand Up @@ -58,6 +67,12 @@ package k8s

_open_id_redirect_uri_flag: "--open-id-redirect-uri=https://localhost:2048"

_kingdomCompletedMeasurementsTimeToLiveFlag: "--time-to-live=\(_completedMeasurementsTimeToLive)"
_kingdomCompletedMeasurementsDryRunRetentionPolicyFlag: "--dry-run=\(_completedMeasurementsDryRun)"
_kingdomPendingMeasurementsTimeToLiveFlag: "--time-to-live=\(_pendingMeasurementsTimeToLive)"
_kingdomPendingMeasurementsDryRunRetentionPolicyFlag: "--dry-run=\(_pendingMeasurementsDryRun)"
_otlpEndpoint: "--otel-exporter-otlp-endpoint=\(#OpenTelemetryCollectorEndpoint)"

services: [Name=_]: #GrpcService & {
_name: Name
_system: "kingdom"
Expand Down Expand Up @@ -154,6 +169,48 @@ package k8s
}
}

cronjobs: [Name=_]: #CronJob & {
_name: strings.TrimSuffix(Name, "-cronjob")
_secretName: _kingdom_secret_name
_system: "kingdom"
_container: {
image: _images[_name]
}
}

cronjobs: {
"completed-measurements-deletion": Cronjob={
_container: args: [
_internal_api_target_flag,
_internal_api_cert_host_flag,
_kingdom_tls_cert_file_flag,
_kingdom_tls_key_file_flag,
_kingdom_cert_collection_file_flag,
_kingdomCompletedMeasurementsTimeToLiveFlag,
_kingdomCompletedMeasurementsDryRunRetentionPolicyFlag,
_debug_verbose_grpc_client_logging_flag,
_otlpEndpoint,
"--otel-service-name=\(Cronjob.metadata.name)",
]
spec: schedule: "15 * * * *" // Hourly, 15 minutes past the hour
}
"pending-measurements-cancellation": Cronjob={
_container: args: [
_internal_api_target_flag,
_internal_api_cert_host_flag,
_kingdom_tls_cert_file_flag,
_kingdom_tls_key_file_flag,
_kingdom_cert_collection_file_flag,
_kingdomPendingMeasurementsTimeToLiveFlag,
_kingdomPendingMeasurementsDryRunRetentionPolicyFlag,
_debug_verbose_grpc_client_logging_flag,
_otlpEndpoint,
"--otel-service-name=\(Cronjob.metadata.name)",
]
spec: schedule: "45 * * * *" // Hourly, 45 minutes past the hour
}
}

networkPolicies: [Name=_]: #NetworkPolicy & {
_name: Name
}
Expand All @@ -165,6 +222,8 @@ package k8s
"v2alpha-public-api-server-app",
"system-api-server-app",
"resource-setup-app",
"completed-measurements-deletion-app",
"pending-measurements-cancellation-app",
]
_egresses: {
// Need to send external traffic to Spanner.
Expand Down Expand Up @@ -203,5 +262,17 @@ package k8s
"opentelemetry-collector-app",
]
}
"completed-measurements-deletion": {
_app_label: "completed-measurements-deletion-app"
_destinationMatchLabels: [
"gcp-kingdom-data-server-app",
]
}
"pending-measurements-cancellation": {
_app_label: "pending-measurements-cancellation-app"
_destinationMatchLabels: [
"gcp-kingdom-data-server-app",
]
}
}
}
28 changes: 28 additions & 0 deletions src/main/kotlin/org/wfanet/measurement/kingdom/batch/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
load("@io_bazel_rules_kotlin//kotlin:jvm.bzl", "kt_jvm_library")

package(default_visibility = [
"//src/main/kotlin/org/wfanet/measurement/kingdom/batch/spanner:__subpackages__",
"//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common:__subpackages__",
])

kt_jvm_library(
name = "pending_measurements_cancellation",
srcs = ["PendingMeasurementsCancellation.kt"],
deps = [
"//imports/java/io/opentelemetry/api",
"//src/main/proto/wfa/measurement/internal/kingdom:measurements_service_kt_jvm_grpc_proto",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
],
)

kt_jvm_library(
name = "completed_measurements_deletion",
srcs = ["CompletedMeasurementsDeletion.kt"],
deps = [
"//imports/java/io/opentelemetry/api",
"//src/main/proto/wfa/measurement/internal/kingdom:measurements_service_kt_jvm_grpc_proto",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2023 The Cross-Media Measurement Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wfanet.measurement.kingdom.batch

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.metrics.LongCounter
import io.opentelemetry.api.metrics.Meter
import java.time.Clock
import java.time.Duration
import java.util.logging.Logger
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.wfanet.measurement.common.toProtoTime
import org.wfanet.measurement.internal.kingdom.DeleteMeasurementRequest
import org.wfanet.measurement.internal.kingdom.Measurement
import org.wfanet.measurement.internal.kingdom.MeasurementsGrpcKt.MeasurementsCoroutineStub
import org.wfanet.measurement.internal.kingdom.StreamMeasurementsRequestKt
import org.wfanet.measurement.internal.kingdom.batchDeleteMeasurementsRequest
import org.wfanet.measurement.internal.kingdom.deleteMeasurementRequest
import org.wfanet.measurement.internal.kingdom.streamMeasurementsRequest

private val COMPLETED_MEASUREMENT_STATES =
listOf(Measurement.State.SUCCEEDED, Measurement.State.FAILED, Measurement.State.CANCELLED)
private const val MAX_BATCH_DELETE = 1000

class CompletedMeasurementsDeletion(
private val measurementsService: MeasurementsCoroutineStub,
private val timeToLive: Duration,
private val dryRun: Boolean = false,
private val clock: Clock = Clock.systemUTC(),
private val openTelemetry: OpenTelemetry = GlobalOpenTelemetry.get()
) {
private val meter: Meter = openTelemetry.getMeter(CompletedMeasurementsDeletion::class.java.name)
private val completedMeasurementDeletionCounter: LongCounter =
meter
.counterBuilder("completed_measurements_deletion_total")
.setDescription("Total number of completed measurements deleted under retention policy")
.build()
fun run() {
if (timeToLive.toMillis() == 0L) {
logger.warning("Time to live cannot be 0. TTL=$timeToLive")
}
val currentTime = clock.instant()
runBlocking {
var measurementsToDelete: List<Measurement> =
measurementsService
.streamMeasurements(
streamMeasurementsRequest {
filter =
StreamMeasurementsRequestKt.filter {
states += COMPLETED_MEASUREMENT_STATES
updatedBefore = currentTime.minus(timeToLive).toProtoTime()
}
}
)
.toList()

if (dryRun) {
logger.info { "Measurements that would have been deleted: $measurementsToDelete" }
} else {
while (measurementsToDelete.isNotEmpty()) {
val batchMeasurementsToDelete = measurementsToDelete.take(MAX_BATCH_DELETE)
val deleteRequests: List<DeleteMeasurementRequest> =
batchMeasurementsToDelete.map {
deleteMeasurementRequest {
externalMeasurementId = it.externalMeasurementId
externalMeasurementConsumerId = it.externalMeasurementConsumerId
etag = it.etag
}
}
measurementsService.batchDeleteMeasurements(
batchDeleteMeasurementsRequest { requests += deleteRequests }
)
completedMeasurementDeletionCounter.add(deleteRequests.size.toLong())

measurementsToDelete =
measurementsService
.streamMeasurements(
streamMeasurementsRequest {
filter =
StreamMeasurementsRequestKt.filter {
states += COMPLETED_MEASUREMENT_STATES
updatedBefore = currentTime.minus(timeToLive).toProtoTime()
}
}
)
.toList()
}
}
}
}

companion object {
private val logger: Logger = Logger.getLogger(this::class.java.name)
}
}
Loading

0 comments on commit a99bf24

Please sign in to comment.