diff --git a/src/main/docker/images.bzl b/src/main/docker/images.bzl index 0ea1c147af8..6a042f29b5c 100644 --- a/src/main/docker/images.bzl +++ b/src/main/docker/images.bzl @@ -46,6 +46,16 @@ COMMON_IMAGES = [ image = "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/server:gcp_kingdom_data_server_image", repository = _PREFIX + "/kingdom/data-server", ), + struct( + name = "kingdom_completed_measurements_deletion_image", + image = "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job:completed_measurements_deletion_image", + repository = _PREFIX + "/kingdom/completed-measurements-deletion", + ), + struct( + name = "kingdom_pending_measurements_cancellation_image", + image = "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job:pending_measurements_cancellation_image", + repository = _PREFIX + "/kingdom/pending-measurements-cancellation", + ), struct( name = "kingdom_system_api_server_image", image = "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server:system_api_server_image", diff --git a/src/main/k8s/base.cue b/src/main/k8s/base.cue index b460f2b2062..f427d99b5b4 100644 --- a/src/main/k8s/base.cue +++ b/src/main/k8s/base.cue @@ -548,7 +548,11 @@ objects: [ for objectSet in objectSets for object in objectSet {object}] spec: { backoffLimit: uint | *0 template: { - metadata: labels: app: _name + "-app" + metadata: { + labels: { + app: _name + "-app" + } + } spec: #PodSpec & { if _secretName != _|_ { _mounts: "\(_name)-files": { diff --git a/src/main/k8s/dev/kingdom_gke.cue b/src/main/k8s/dev/kingdom_gke.cue index 1fccebc8480..2392077e747 100644 --- a/src/main/k8s/dev/kingdom_gke.cue +++ b/src/main/k8s/dev/kingdom_gke.cue @@ -50,6 +50,7 @@ objectSets: [ kingdom.deployments, kingdom.services, kingdom.networkPolicies, + kingdom.cronjobs, ] kingdom: #Kingdom & { diff --git a/src/main/k8s/kingdom.cue b/src/main/k8s/kingdom.cue index 98e202bfe8a..5cf1f414f18 100644 --- a/src/main/k8s/kingdom.cue +++ b/src/main/k8s/kingdom.cue @@ -14,6 +14,8 @@ package k8s +import ("strings") + #Kingdom: { _verboseGrpcServerLogging: bool | *false _verboseGrpcClientLogging: bool | *false @@ -22,12 +24,19 @@ package k8s _kingdom_secret_name: string + _completedMeasurementsTimeToLive: string | *"180d" + _completedMeasurementsDryRun: bool | *false + _pendingMeasurementsTimeToLive: string | *"15d" + _pendingMeasurementsDryRun: bool | *false + _imageSuffixes: [string]: string _imageSuffixes: { - "gcp-kingdom-data-server": string | *"kingdom/data-server" - "system-api-server": string | *"kingdom/system-api" - "v2alpha-public-api-server": string | *"kingdom/v2alpha-public-api" - "update-kingdom-schema": string | *"kingdom/spanner-update-schema" + "gcp-kingdom-data-server": string | *"kingdom/data-server" + "system-api-server": string | *"kingdom/system-api" + "v2alpha-public-api-server": string | *"kingdom/v2alpha-public-api" + "update-kingdom-schema": string | *"kingdom/spanner-update-schema" + "completed-measurements-deletion": string | *"kingdom/completed-measurements-deletion" + "pending-measurements-cancellation": string | *"kingdom/pending-measurements-cancellation" } _imageConfigs: [string]: #ImageConfig _imageConfigs: { @@ -58,6 +67,12 @@ package k8s _open_id_redirect_uri_flag: "--open-id-redirect-uri=https://localhost:2048" + _kingdomCompletedMeasurementsTimeToLiveFlag: "--time-to-live=\(_completedMeasurementsTimeToLive)" + _kingdomCompletedMeasurementsDryRunRetentionPolicyFlag: "--dry-run=\(_completedMeasurementsDryRun)" + _kingdomPendingMeasurementsTimeToLiveFlag: "--time-to-live=\(_pendingMeasurementsTimeToLive)" + _kingdomPendingMeasurementsDryRunRetentionPolicyFlag: "--dry-run=\(_pendingMeasurementsDryRun)" + _otlpEndpoint: "--otel-exporter-otlp-endpoint=\(#OpenTelemetryCollectorEndpoint)" + services: [Name=_]: #GrpcService & { _name: Name _system: "kingdom" @@ -154,6 +169,48 @@ package k8s } } + cronjobs: [Name=_]: #CronJob & { + _name: strings.TrimSuffix(Name, "-cronjob") + _secretName: _kingdom_secret_name + _system: "kingdom" + _container: { + image: _images[_name] + } + } + + cronjobs: { + "completed-measurements-deletion": Cronjob={ + _container: args: [ + _internal_api_target_flag, + _internal_api_cert_host_flag, + _kingdom_tls_cert_file_flag, + _kingdom_tls_key_file_flag, + _kingdom_cert_collection_file_flag, + _kingdomCompletedMeasurementsTimeToLiveFlag, + _kingdomCompletedMeasurementsDryRunRetentionPolicyFlag, + _debug_verbose_grpc_client_logging_flag, + _otlpEndpoint, + "--otel-service-name=\(Cronjob.metadata.name)", + ] + spec: schedule: "15 * * * *" // Hourly, 15 minutes past the hour + } + "pending-measurements-cancellation": Cronjob={ + _container: args: [ + _internal_api_target_flag, + _internal_api_cert_host_flag, + _kingdom_tls_cert_file_flag, + _kingdom_tls_key_file_flag, + _kingdom_cert_collection_file_flag, + _kingdomPendingMeasurementsTimeToLiveFlag, + _kingdomPendingMeasurementsDryRunRetentionPolicyFlag, + _debug_verbose_grpc_client_logging_flag, + _otlpEndpoint, + "--otel-service-name=\(Cronjob.metadata.name)", + ] + spec: schedule: "45 * * * *" // Hourly, 45 minutes past the hour + } + } + networkPolicies: [Name=_]: #NetworkPolicy & { _name: Name } @@ -165,6 +222,8 @@ package k8s "v2alpha-public-api-server-app", "system-api-server-app", "resource-setup-app", + "completed-measurements-deletion-app", + "pending-measurements-cancellation-app", ] _egresses: { // Need to send external traffic to Spanner. @@ -203,5 +262,17 @@ package k8s "opentelemetry-collector-app", ] } + "completed-measurements-deletion": { + _app_label: "completed-measurements-deletion-app" + _destinationMatchLabels: [ + "gcp-kingdom-data-server-app", + ] + } + "pending-measurements-cancellation": { + _app_label: "pending-measurements-cancellation-app" + _destinationMatchLabels: [ + "gcp-kingdom-data-server-app", + ] + } } } diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/batch/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/kingdom/batch/BUILD.bazel new file mode 100644 index 00000000000..a1ef233e567 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/batch/BUILD.bazel @@ -0,0 +1,28 @@ +load("@io_bazel_rules_kotlin//kotlin:jvm.bzl", "kt_jvm_library") + +package(default_visibility = [ + "//src/main/kotlin/org/wfanet/measurement/kingdom/batch/spanner:__subpackages__", + "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common:__subpackages__", +]) + +kt_jvm_library( + name = "pending_measurements_cancellation", + srcs = ["PendingMeasurementsCancellation.kt"], + deps = [ + "//imports/java/io/opentelemetry/api", + "//src/main/proto/wfa/measurement/internal/kingdom:measurements_service_kt_jvm_grpc_proto", + "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", + ], +) + +kt_jvm_library( + name = "completed_measurements_deletion", + srcs = ["CompletedMeasurementsDeletion.kt"], + deps = [ + "//imports/java/io/opentelemetry/api", + "//src/main/proto/wfa/measurement/internal/kingdom:measurements_service_kt_jvm_grpc_proto", + "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", + ], +) diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/batch/CompletedMeasurementsDeletion.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/batch/CompletedMeasurementsDeletion.kt new file mode 100644 index 00000000000..585aefc0058 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/batch/CompletedMeasurementsDeletion.kt @@ -0,0 +1,111 @@ +/* + * Copyright 2023 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.kingdom.batch + +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.OpenTelemetry +import io.opentelemetry.api.metrics.LongCounter +import io.opentelemetry.api.metrics.Meter +import java.time.Clock +import java.time.Duration +import java.util.logging.Logger +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.wfanet.measurement.common.toProtoTime +import org.wfanet.measurement.internal.kingdom.DeleteMeasurementRequest +import org.wfanet.measurement.internal.kingdom.Measurement +import org.wfanet.measurement.internal.kingdom.MeasurementsGrpcKt.MeasurementsCoroutineStub +import org.wfanet.measurement.internal.kingdom.StreamMeasurementsRequestKt +import org.wfanet.measurement.internal.kingdom.batchDeleteMeasurementsRequest +import org.wfanet.measurement.internal.kingdom.deleteMeasurementRequest +import org.wfanet.measurement.internal.kingdom.streamMeasurementsRequest + +private val COMPLETED_MEASUREMENT_STATES = + listOf(Measurement.State.SUCCEEDED, Measurement.State.FAILED, Measurement.State.CANCELLED) +private const val MAX_BATCH_DELETE = 1000 + +class CompletedMeasurementsDeletion( + private val measurementsService: MeasurementsCoroutineStub, + private val timeToLive: Duration, + private val dryRun: Boolean = false, + private val clock: Clock = Clock.systemUTC(), + private val openTelemetry: OpenTelemetry = GlobalOpenTelemetry.get() +) { + private val meter: Meter = openTelemetry.getMeter(CompletedMeasurementsDeletion::class.java.name) + private val completedMeasurementDeletionCounter: LongCounter = + meter + .counterBuilder("completed_measurements_deletion_total") + .setDescription("Total number of completed measurements deleted under retention policy") + .build() + fun run() { + if (timeToLive.toMillis() == 0L) { + logger.warning("Time to live cannot be 0. TTL=$timeToLive") + } + val currentTime = clock.instant() + runBlocking { + var measurementsToDelete: List = + measurementsService + .streamMeasurements( + streamMeasurementsRequest { + filter = + StreamMeasurementsRequestKt.filter { + states += COMPLETED_MEASUREMENT_STATES + updatedBefore = currentTime.minus(timeToLive).toProtoTime() + } + } + ) + .toList() + + if (dryRun) { + logger.info { "Measurements that would have been deleted: $measurementsToDelete" } + } else { + while (measurementsToDelete.isNotEmpty()) { + val batchMeasurementsToDelete = measurementsToDelete.take(MAX_BATCH_DELETE) + val deleteRequests: List = + batchMeasurementsToDelete.map { + deleteMeasurementRequest { + externalMeasurementId = it.externalMeasurementId + externalMeasurementConsumerId = it.externalMeasurementConsumerId + etag = it.etag + } + } + measurementsService.batchDeleteMeasurements( + batchDeleteMeasurementsRequest { requests += deleteRequests } + ) + completedMeasurementDeletionCounter.add(deleteRequests.size.toLong()) + + measurementsToDelete = + measurementsService + .streamMeasurements( + streamMeasurementsRequest { + filter = + StreamMeasurementsRequestKt.filter { + states += COMPLETED_MEASUREMENT_STATES + updatedBefore = currentTime.minus(timeToLive).toProtoTime() + } + } + ) + .toList() + } + } + } + } + + companion object { + private val logger: Logger = Logger.getLogger(this::class.java.name) + } +} diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/batch/PendingMeasurementsCancellation.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/batch/PendingMeasurementsCancellation.kt new file mode 100644 index 00000000000..9766165d397 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/batch/PendingMeasurementsCancellation.kt @@ -0,0 +1,117 @@ +/* + * Copyright 2023 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.kingdom.batch + +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.OpenTelemetry +import io.opentelemetry.api.metrics.LongCounter +import io.opentelemetry.api.metrics.Meter +import java.time.Clock +import java.time.Duration +import java.util.logging.Logger +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.wfanet.measurement.common.toProtoTime +import org.wfanet.measurement.internal.kingdom.CancelMeasurementRequest +import org.wfanet.measurement.internal.kingdom.Measurement +import org.wfanet.measurement.internal.kingdom.MeasurementsGrpcKt.MeasurementsCoroutineStub +import org.wfanet.measurement.internal.kingdom.StreamMeasurementsRequestKt +import org.wfanet.measurement.internal.kingdom.batchCancelMeasurementsRequest +import org.wfanet.measurement.internal.kingdom.cancelMeasurementRequest +import org.wfanet.measurement.internal.kingdom.streamMeasurementsRequest + +private val PENDING_MEASUREMENT_STATES = + listOf( + Measurement.State.PENDING_COMPUTATION, + Measurement.State.PENDING_PARTICIPANT_CONFIRMATION, + Measurement.State.PENDING_REQUISITION_FULFILLMENT, + Measurement.State.PENDING_REQUISITION_PARAMS + ) +private const val MAX_BATCH_CANCEL = 1000 + +class PendingMeasurementsCancellation( + private val measurementsService: MeasurementsCoroutineStub, + private val timeToLive: Duration, + private val dryRun: Boolean = false, + private val clock: Clock = Clock.systemUTC(), + private val openTelemetry: OpenTelemetry = GlobalOpenTelemetry.get() +) { + private val meter: Meter = + openTelemetry.getMeter(PendingMeasurementsCancellation::class.java.name) + private val pendingMeasurementCancellationCounter: LongCounter = + meter + .counterBuilder("pending_measurements_cancellation_total") + .setDescription("Total number of pending measurements cancelled under retention policy") + .build() + fun run() { + if (timeToLive.toMillis() == 0L) { + logger.warning("Time to live cannot be 0. TTL=$timeToLive") + } + val currentTime = clock.instant() + runBlocking { + var measurementsToCancel: List = + measurementsService + .streamMeasurements( + streamMeasurementsRequest { + filter = + StreamMeasurementsRequestKt.filter { + states += PENDING_MEASUREMENT_STATES + createdBefore = currentTime.minus(timeToLive).toProtoTime() + } + } + ) + .toList() + + if (dryRun) { + logger.info { "Measurements that would have been cancelled: $measurementsToCancel" } + } else { + while (measurementsToCancel.isNotEmpty()) { + val batchMeasurementsToCancel = measurementsToCancel.take(MAX_BATCH_CANCEL) + val cancelRequests: List = + batchMeasurementsToCancel.map { + cancelMeasurementRequest { + externalMeasurementId = it.externalMeasurementId + externalMeasurementConsumerId = it.externalMeasurementConsumerId + etag = it.etag + } + } + measurementsService.batchCancelMeasurements( + batchCancelMeasurementsRequest { requests += cancelRequests } + ) + pendingMeasurementCancellationCounter.add(cancelRequests.size.toLong()) + + measurementsToCancel = + measurementsService + .streamMeasurements( + streamMeasurementsRequest { + filter = + StreamMeasurementsRequestKt.filter { + states += PENDING_MEASUREMENT_STATES + createdBefore = currentTime.minus(timeToLive).toProtoTime() + } + } + ) + .toList() + } + } + } + } + + companion object { + private val logger: Logger = Logger.getLogger(this::class.java.name) + } +} diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job/BUILD.bazel new file mode 100644 index 00000000000..d77f3229e46 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job/BUILD.bazel @@ -0,0 +1,71 @@ +load("@rules_java//java:defs.bzl", "java_binary") +load("@io_bazel_rules_kotlin//kotlin:jvm.bzl", "kt_jvm_library") +load("@io_bazel_rules_docker//java:image.bzl", "java_image") + +kt_jvm_library( + name = "completed_measurements_deletion_job", + srcs = ["CompletedMeasurementsDeletionJob.kt"], + runtime_deps = ["@wfa_common_jvm//imports/java/io/grpc/netty"], + deps = [ + "//imports/java/io/opentelemetry/api", + "//imports/java/io/opentelemetry/exporter/otlp", + "//imports/java/io/opentelemetry/sdk", + "//imports/java/io/opentelemetry/sdk:common", + "//imports/java/io/opentelemetry/sdk:metrics", + "//imports/java/io/opentelemetry/semconv", + "//src/main/kotlin/org/wfanet/measurement/kingdom/batch:completed_measurements_deletion", + "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server:utils", + "//src/main/proto/wfa/measurement/internal/kingdom:measurements_service_kt_jvm_grpc_proto", + "@wfa_common_jvm//imports/java/picocli", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto:signing_certs", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc", + ], +) + +java_binary( + name = "CompletedMeasurementsDeletionJob", + main_class = "org.wfanet.measurement.kingdom.deploy.common.job.CompletedMeasurementsDeletionJobKt", + runtime_deps = [":completed_measurements_deletion_job"], +) + +java_image( + name = "completed_measurements_deletion_image", + main_class = "org.wfanet.measurement.kingdom.deploy.common.job.CompletedMeasurementsDeletionJobKt", + visibility = ["//src:docker_image_deployment"], + runtime_deps = [":completed_measurements_deletion_job"], +) + +kt_jvm_library( + name = "pending_measurements_cancellation_job", + srcs = ["PendingMeasurementsCancellationJob.kt"], + runtime_deps = ["@wfa_common_jvm//imports/java/io/grpc/netty"], + deps = [ + "//imports/java/io/opentelemetry/api", + "//imports/java/io/opentelemetry/exporter/otlp", + "//imports/java/io/opentelemetry/sdk", + "//imports/java/io/opentelemetry/sdk:common", + "//imports/java/io/opentelemetry/sdk:metrics", + "//imports/java/io/opentelemetry/semconv", + "//src/main/kotlin/org/wfanet/measurement/kingdom/batch:pending_measurements_cancellation", + "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server:utils", + "//src/main/proto/wfa/measurement/internal/kingdom:measurements_service_kt_jvm_grpc_proto", + "@wfa_common_jvm//imports/java/picocli", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto:signing_certs", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc", + ], +) + +java_binary( + name = "PendingMeasurementsCancellationJob", + main_class = "org.wfanet.measurement.kingdom.deploy.common.job.PendingMeasurementsCancellationJobKt", + runtime_deps = [":pending_measurements_cancellation_job"], +) + +java_image( + name = "pending_measurements_cancellation_image", + main_class = "org.wfanet.measurement.kingdom.deploy.common.job.PendingMeasurementsCancellationJobKt", + visibility = ["//src:docker_image_deployment"], + runtime_deps = [":pending_measurements_cancellation_job"], +) diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job/CompletedMeasurementsDeletionJob.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job/CompletedMeasurementsDeletionJob.kt new file mode 100644 index 00000000000..98ff4a4bef7 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job/CompletedMeasurementsDeletionJob.kt @@ -0,0 +1,148 @@ +/* + * Copyright 2023 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.kingdom.deploy.common.job + +import io.opentelemetry.api.OpenTelemetry +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter +import io.opentelemetry.sdk.OpenTelemetrySdk +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import java.time.Clock +import java.time.Duration +import kotlin.properties.Delegates +import org.wfanet.measurement.common.commandLineMain +import org.wfanet.measurement.common.crypto.SigningCerts +import org.wfanet.measurement.common.grpc.TlsFlags +import org.wfanet.measurement.common.grpc.buildMutualTlsChannel +import org.wfanet.measurement.common.grpc.withDefaultDeadline +import org.wfanet.measurement.common.grpc.withVerboseLogging +import org.wfanet.measurement.internal.kingdom.MeasurementsGrpcKt.MeasurementsCoroutineStub +import org.wfanet.measurement.kingdom.batch.CompletedMeasurementsDeletion +import org.wfanet.measurement.kingdom.deploy.common.server.KingdomApiServerFlags +import picocli.CommandLine + +private class Flags { + @CommandLine.Mixin + lateinit var tlsFlags: TlsFlags + private set + + @CommandLine.Mixin + lateinit var kingdomApiServerFlags: KingdomApiServerFlags + private set + + @CommandLine.Option( + names = ["--time-to-live"], + defaultValue = "100d", + description = + [ + "Time to live (TTL) for a completed Measurement since latest update time. After " + + "completion, a Measurement won't live longer than this duration." + ], + ) + lateinit var measurementsTimeToLive: Duration + private set + + @CommandLine.Option( + names = ["--otel-exporter-otlp-endpoint"], + description = ["Endpoint for OpenTelemetry Collector."], + required = true, + ) + lateinit var otelExporterOtlpEndpoint: String + private set + + @CommandLine.Option( + names = ["--otel-service-name"], + description = ["Service name to label cronjob metrics with."], + required = true, + ) + lateinit var otelServiceName: String + private set + + @set:CommandLine.Option( + names = ["--dry-run"], + description = + [ + "Option to print the Measurements that would be deleted instead of performing the deletion." + ], + required = false, + defaultValue = "false" + ) + var dryRun by Delegates.notNull() + private set +} + +@CommandLine.Command( + name = "CompletedMeasurementsDeletionJob", + mixinStandardHelpOptions = true, + showDefaultValues = true +) +private fun run(@CommandLine.Mixin flags: Flags) { + val clientCerts = + SigningCerts.fromPemFiles( + certificateFile = flags.tlsFlags.certFile, + privateKeyFile = flags.tlsFlags.privateKeyFile, + trustedCertCollectionFile = flags.tlsFlags.certCollectionFile + ) + + val channel = + buildMutualTlsChannel( + flags.kingdomApiServerFlags.internalApiFlags.target, + clientCerts, + flags.kingdomApiServerFlags.internalApiFlags.certHost + ) + .withVerboseLogging(flags.kingdomApiServerFlags.debugVerboseGrpcClientLogging) + .withDefaultDeadline(flags.kingdomApiServerFlags.internalApiFlags.defaultDeadlineDuration) + + val internalMeasurementsClient = MeasurementsCoroutineStub(channel) + + val otlpEndpoint = flags.otelExporterOtlpEndpoint + val otelServiceName = flags.otelServiceName + val resource: Resource = + Resource.getDefault() + .merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, otelServiceName))) + val meterProvider = + SdkMeterProvider.builder() + .setResource(resource) + .registerMetricReader( + PeriodicMetricReader.builder( + OtlpGrpcMetricExporter.builder() + .setTimeout(Duration.ofSeconds(30L)) + .setEndpoint(otlpEndpoint) + .build() + ) + .setInterval(Duration.ofSeconds(60L)) + .build() + ) + .build() + val openTelemetry: OpenTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build() + + val completedMeasurementsDeletion = + CompletedMeasurementsDeletion( + internalMeasurementsClient, + flags.measurementsTimeToLive, + flags.dryRun, + Clock.systemUTC(), + openTelemetry + ) + completedMeasurementsDeletion.run() +} + +fun main(args: Array) = commandLineMain(::run, args) diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job/PendingMeasurementsCancellationJob.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job/PendingMeasurementsCancellationJob.kt new file mode 100644 index 00000000000..6ce5bdc79c2 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job/PendingMeasurementsCancellationJob.kt @@ -0,0 +1,148 @@ +/* + * Copyright 2023 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.kingdom.deploy.common.job + +import io.opentelemetry.api.OpenTelemetry +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter +import io.opentelemetry.sdk.OpenTelemetrySdk +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import java.time.Clock +import java.time.Duration +import kotlin.properties.Delegates +import org.wfanet.measurement.common.commandLineMain +import org.wfanet.measurement.common.crypto.SigningCerts +import org.wfanet.measurement.common.grpc.TlsFlags +import org.wfanet.measurement.common.grpc.buildMutualTlsChannel +import org.wfanet.measurement.common.grpc.withDefaultDeadline +import org.wfanet.measurement.common.grpc.withVerboseLogging +import org.wfanet.measurement.internal.kingdom.MeasurementsGrpcKt.MeasurementsCoroutineStub +import org.wfanet.measurement.kingdom.batch.PendingMeasurementsCancellation +import org.wfanet.measurement.kingdom.deploy.common.server.KingdomApiServerFlags +import picocli.CommandLine + +private class Flags { + @CommandLine.Mixin + lateinit var tlsFlags: TlsFlags + private set + + @CommandLine.Mixin + lateinit var kingdomApiServerFlags: KingdomApiServerFlags + private set + + @CommandLine.Option( + names = ["--time-to-live"], + defaultValue = "30d", + description = + [ + "Time to live (TTL) for a pending Measurement since created time. A measurement will be." + + "cancelled if it remains in a pending state longer than this duration." + ], + ) + lateinit var measurementsTimeToLive: Duration + private set + + @CommandLine.Option( + names = ["--otel-exporter-otlp-endpoint"], + description = ["Endpoint for OpenTelemetry Collector."], + required = true + ) + lateinit var otelExporterOtlpEndpoint: String + private set + + @CommandLine.Option( + names = ["--otel-service-name"], + description = ["Service name to label cronjob metrics with."], + required = true + ) + lateinit var otelServiceName: String + private set + + @set:CommandLine.Option( + names = ["--dry-run"], + description = + [ + "Option to print Measurements that would be cancelled instead of performing the cancellation." + ], + required = false, + defaultValue = "false" + ) + var dryRun by Delegates.notNull() + private set +} + +@CommandLine.Command( + name = "PendingMeasurementsCancellationJob", + mixinStandardHelpOptions = true, + showDefaultValues = true +) +private fun run(@CommandLine.Mixin flags: Flags) { + val clientCerts = + SigningCerts.fromPemFiles( + certificateFile = flags.tlsFlags.certFile, + privateKeyFile = flags.tlsFlags.privateKeyFile, + trustedCertCollectionFile = flags.tlsFlags.certCollectionFile + ) + + val channel = + buildMutualTlsChannel( + flags.kingdomApiServerFlags.internalApiFlags.target, + clientCerts, + flags.kingdomApiServerFlags.internalApiFlags.certHost + ) + .withVerboseLogging(flags.kingdomApiServerFlags.debugVerboseGrpcClientLogging) + .withDefaultDeadline(flags.kingdomApiServerFlags.internalApiFlags.defaultDeadlineDuration) + + val internalMeasurementsClient = MeasurementsCoroutineStub(channel) + + val otlpEndpoint = flags.otelExporterOtlpEndpoint + val otelServiceName = flags.otelServiceName + val resource: Resource = + Resource.getDefault() + .merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, otelServiceName))) + val meterProvider = + SdkMeterProvider.builder() + .setResource(resource) + .registerMetricReader( + PeriodicMetricReader.builder( + OtlpGrpcMetricExporter.builder() + .setTimeout(Duration.ofSeconds(30L)) + .setEndpoint(otlpEndpoint) + .build() + ) + .setInterval(Duration.ofSeconds(60L)) + .build() + ) + .build() + val openTelemetry: OpenTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build() + + val pendingMeasurementsCancellation = + PendingMeasurementsCancellation( + internalMeasurementsClient, + flags.measurementsTimeToLive, + flags.dryRun, + Clock.systemUTC(), + openTelemetry + ) + pendingMeasurementsCancellation.run() +} + +fun main(args: Array) = commandLineMain(::run, args) diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server/BUILD.bazel index 11426a33f33..090b4d8201a 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server/BUILD.bazel +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/server/BUILD.bazel @@ -21,6 +21,7 @@ kt_jvm_library( kt_jvm_library( name = "utils", srcs = ["Utils.kt"], + visibility = ["//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/job:__subpackages__"], deps = [ "//src/main/kotlin/org/wfanet/measurement/common/identity", "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common:flags",