diff --git a/src/main/docker/images.bzl b/src/main/docker/images.bzl index 9192ef9e910..1157e0f2373 100644 --- a/src/main/docker/images.bzl +++ b/src/main/docker/images.bzl @@ -186,10 +186,44 @@ REPORTING_GKE_IMAGES = [ ), ] -ALL_GKE_IMAGES = COMMON_IMAGES + GKE_IMAGES + REPORTING_COMMON_IMAGES + REPORTING_GKE_IMAGES +REPORTING_V2_COMMON_IMAGES = [ + struct( + name = "reporting_v2alpha_public_api_server_image", + image = "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server:v2alpha_public_api_server_image", + repository = _PREFIX + "/reporting/v2/v2alpha-public-api", + ), +] + +REPORTING_V2_LOCAL_IMAGES = [ + struct( + name = "internal_reporting_v2_server_image", + image = "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/server:postgres_internal_reporting_server_image", + repository = _PREFIX + "/reporting/v2/local-postgres-internal", + ), + struct( + name = "reporting_v2_postgres_update_schema_image", + image = "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/tools:update_schema_image", + repository = _PREFIX + "/reporting/v2/local-postgres-update-schema", + ), +] + +REPORTING_V2_GKE_IMAGES = [ + struct( + name = "gcloud_reporting_v2_internal_server_image", + image = "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud/postgres/server:gcloud_postgres_internal_reporting_server_image", + repository = _PREFIX + "/reporting/v2/postgres-internal-server", + ), + struct( + name = "gcloud_reporting_v2_postgres_update_schema_image", + image = "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud/postgres/tools:update_schema_image", + repository = _PREFIX + "/reporting/v2/postgres-update-schema", + ), +] + +ALL_GKE_IMAGES = COMMON_IMAGES + GKE_IMAGES + REPORTING_COMMON_IMAGES + REPORTING_GKE_IMAGES + REPORTING_V2_COMMON_IMAGES + REPORTING_V2_GKE_IMAGES -ALL_LOCAL_IMAGES = COMMON_IMAGES + LOCAL_IMAGES + REPORTING_COMMON_IMAGES + REPORTING_LOCAL_IMAGES +ALL_LOCAL_IMAGES = COMMON_IMAGES + LOCAL_IMAGES + REPORTING_COMMON_IMAGES + REPORTING_LOCAL_IMAGES + REPORTING_V2_COMMON_IMAGES + REPORTING_V2_LOCAL_IMAGES -ALL_IMAGES = COMMON_IMAGES + LOCAL_IMAGES + GKE_IMAGES + REPORTING_COMMON_IMAGES + REPORTING_LOCAL_IMAGES + REPORTING_GKE_IMAGES +ALL_IMAGES = COMMON_IMAGES + LOCAL_IMAGES + GKE_IMAGES + REPORTING_COMMON_IMAGES + REPORTING_LOCAL_IMAGES + REPORTING_GKE_IMAGES + REPORTING_V2_COMMON_IMAGES + REPORTING_V2_LOCAL_IMAGES + REPORTING_V2_GKE_IMAGES -ALL_REPORTING_GKE_IMAGES = REPORTING_COMMON_IMAGES + REPORTING_GKE_IMAGES +ALL_REPORTING_GKE_IMAGES = REPORTING_COMMON_IMAGES + REPORTING_GKE_IMAGES + REPORTING_V2_COMMON_IMAGES + REPORTING_V2_GKE_IMAGES diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/BUILD.bazel new file mode 100644 index 00000000000..a6116e41dff --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/BUILD.bazel @@ -0,0 +1,34 @@ +load("@io_bazel_rules_kotlin//kotlin:jvm.bzl", "kt_jvm_library") + +package( + default_visibility = [ + "//src/main/kotlin/org/wfanet/measurement/reporting:__subpackages__", + "//src/test/kotlin/org/wfanet/measurement/reporting:__subpackages__", + ], +) + +kt_jvm_library( + name = "encryption_key_pair_map", + srcs = ["EncryptionKeyPairMap.kt"], + deps = [ + "//src/main/proto/wfa/measurement/config/reporting:encryption_key_pair_config_kt_jvm_proto", + "@wfa_common_jvm//imports/java/picocli", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto/tink", + ], +) + +kt_jvm_library( + name = "flags", + srcs = ["InternalApiFlags.kt"], + deps = [ + "@wfa_common_jvm//imports/java/picocli", + ], +) + +kt_jvm_library( + name = "kingdom_flags", + srcs = ["KingdomApiFlags.kt"], + deps = [ + "@wfa_common_jvm//imports/java/picocli", + ], +) diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/EncryptionKeyPairMap.kt b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/EncryptionKeyPairMap.kt new file mode 100644 index 00000000000..061c2d0ece4 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/EncryptionKeyPairMap.kt @@ -0,0 +1,57 @@ +/* + * Copyright 2023 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.reporting.deploy.v2.common + +import com.google.protobuf.ByteString +import java.io.File +import org.wfanet.measurement.common.crypto.PrivateKeyHandle +import org.wfanet.measurement.common.crypto.tink.loadPrivateKey +import org.wfanet.measurement.common.parseTextProto +import org.wfanet.measurement.common.readByteString +import org.wfanet.measurement.config.reporting.encryptionKeyPairConfig +import picocli.CommandLine.Option + +class EncryptionKeyPairMap { + @Option( + names = ["--key-pair-dir"], + description = ["Path to the directory of MeasurementConsumer's encryption keys"], + ) + private lateinit var keyFilesDirectory: File + + @Option( + names = ["--key-pair-config-file"], + description = ["Path to the textproto file of EncryptionKeyPairConfig that contains key pairs"], + required = true + ) + private lateinit var keyPairConfigFile: File + + private fun loadKeyPairs(): Map>> { + val keyPairConfig = + parseTextProto(keyPairConfigFile, encryptionKeyPairConfig {}).principalKeyPairsList + return keyPairConfig.associate { config -> + val keyPairs = + config.keyPairsList.map { keyPair -> + val publicKey = keyFilesDirectory.resolve(keyPair.publicKeyFile).readByteString() + val privateKey = loadPrivateKey(keyFilesDirectory.resolve(keyPair.privateKeyFile)) + publicKey to privateKey + } + checkNotNull(config.principal) to keyPairs + } + } + + val keyPairs: Map>> by lazy { loadKeyPairs() } +} diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/InternalApiFlags.kt b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/InternalApiFlags.kt new file mode 100644 index 00000000000..9556b2c7a8b --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/InternalApiFlags.kt @@ -0,0 +1,40 @@ +/* + * Copyright 2023 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.reporting.deploy.v2.common + +import picocli.CommandLine + +class InternalApiFlags { + @set:CommandLine.Option( + names = ["--internal-api-target"], + description = ["gRPC target (authority) of the Reporting internal API server"], + required = true, + ) + lateinit var target: String + + @CommandLine.Option( + names = ["--internal-api-cert-host"], + description = + [ + "Expected hostname (DNS-ID) in the Reporting internal API server's TLS certificate.", + "This overrides derivation of the TLS DNS-ID from --internal-api-target.", + ], + required = false, + ) + var certHost: String? = null + private set +} diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/KingdomApiFlags.kt b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/KingdomApiFlags.kt new file mode 100644 index 00000000000..559caa7c9b4 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/KingdomApiFlags.kt @@ -0,0 +1,40 @@ +/* + * Copyright 2023 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.reporting.deploy.v2.common + +import picocli.CommandLine + +class KingdomApiFlags { + @set:CommandLine.Option( + names = ["--kingdom-api-target"], + description = ["gRPC target (authority) of the Kingdom public API server"], + required = true, + ) + lateinit var target: String + + @CommandLine.Option( + names = ["--kingdom-api-cert-host"], + description = + [ + "Expected hostname (DNS-ID) in the Kingdom public API server's TLS certificate.", + "This overrides derivation of the TLS DNS-ID from --kingdom-api-target.", + ], + required = false, + ) + var certHost: String? = null + private set +} diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/BUILD.bazel index a4b276fcd09..f8784866a48 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/BUILD.bazel +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/BUILD.bazel @@ -1,4 +1,6 @@ load("@io_bazel_rules_kotlin//kotlin:jvm.bzl", "kt_jvm_library") +load("@rules_java//java:defs.bzl", "java_binary") +load("//src/main/docker:macros.bzl", "java_image") kt_jvm_library( name = "internal_reporting_server", @@ -17,3 +19,59 @@ kt_jvm_library( "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc", ], ) + +kt_jvm_library( + name = "reporting_api_server_flags", + srcs = ["ReportingApiServerFlags.kt"], + deps = [ + "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common:flags", + "@wfa_common_jvm//imports/java/picocli", + ], +) + +kt_jvm_library( + name = "v2alpha_public_api_server", + srcs = ["V2AlphaPublicApiServer.kt"], + runtime_deps = ["@wfa_common_jvm//imports/java/io/grpc/netty"], + deps = [ + ":reporting_api_server_flags", + "//src/main/kotlin/org/wfanet/measurement/common/api:memoizing_principal_lookup", + "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/common:encryption_key_pair_map", + "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/common:kingdom_flags", + "//src/main/kotlin/org/wfanet/measurement/reporting/service/api:cel_env_provider", + "//src/main/kotlin/org/wfanet/measurement/reporting/service/api:encryption_key_pair_store", + "//src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha:akid_principal_lookup", + "//src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha:event_groups_service", + "//src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha:metrics_service", + "//src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha:reporting_sets_service", + "//src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha:reports_service", + "//src/main/proto/wfa/measurement/api/v2alpha:certificates_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:data_providers_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:event_group_metadata_descriptors_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:event_groups_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:measurement_consumers_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/api/v2alpha:measurements_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/internal/reporting/v2:measurement_consumers_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/internal/reporting/v2:measurements_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/internal/reporting/v2:metrics_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/internal/reporting/v2:reporting_sets_service_kt_jvm_grpc_proto", + "//src/main/proto/wfa/measurement/internal/reporting/v2:reports_service_kt_jvm_grpc_proto", + "@wfa_common_jvm//imports/java/io/grpc:api", + "@wfa_common_jvm//imports/java/picocli", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc", + ], +) + +java_binary( + name = "V2AlphaPublicApiServer", + main_class = "org.wfanet.measurement.reporting.deploy.v2.common.server.V2AlphaPublicApiServerKt", + runtime_deps = [":v2alpha_public_api_server"], +) + +java_image( + name = "v2alpha_public_api_server_image", + binary = ":V2AlphaPublicApiServer", + main_class = "org.wfanet.measurement.reporting.deploy.v2.common.server.V2AlphaPublicApiServerKt", + visibility = ["//src:docker_image_deployment"], +) diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/ReportingApiServerFlags.kt b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/ReportingApiServerFlags.kt new file mode 100644 index 00000000000..694f8c54016 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/ReportingApiServerFlags.kt @@ -0,0 +1,48 @@ +/* + * Copyright 2023 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.reporting.deploy.v2.common.server + +import java.time.Duration +import kotlin.properties.Delegates +import org.wfanet.measurement.reporting.deploy.v2.common.InternalApiFlags +import picocli.CommandLine + +class ReportingApiServerFlags { + @CommandLine.Mixin + lateinit var internalApiFlags: InternalApiFlags + private set + + @set:CommandLine.Option( + names = ["--debug-verbose-grpc-client-logging"], + description = ["Enables full gRPC request and response logging for outgoing gRPCs"], + defaultValue = "false" + ) + var debugVerboseGrpcClientLogging by Delegates.notNull() + private set + + @CommandLine.Option( + names = ["--event-group-metadata-descriptor-cache-duration"], + description = + [ + "How long the event group metadata descriptors are cached for before refreshing in format 1d1h1m1s1ms1ns" + ], + defaultValue = "1h", + required = false, + ) + lateinit var eventGroupMetadataDescriptorCacheDuration: Duration + private set +} diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/V2AlphaPublicApiServer.kt b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/V2AlphaPublicApiServer.kt new file mode 100644 index 00000000000..d0f709838d3 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/V2AlphaPublicApiServer.kt @@ -0,0 +1,258 @@ +/* + * Copyright 2023 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.reporting.deploy.v2.common.server + +import com.google.protobuf.ByteString +import io.grpc.Channel +import io.grpc.ServerServiceDefinition +import io.grpc.Status +import io.grpc.StatusException +import io.grpc.inprocess.InProcessChannelBuilder +import io.grpc.inprocess.InProcessServerBuilder +import java.io.File +import java.security.SecureRandom +import java.util.concurrent.ExecutorService +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import org.wfanet.measurement.api.v2alpha.CertificatesGrpcKt.CertificatesCoroutineStub as KingdomCertificatesCoroutineStub +import org.wfanet.measurement.api.v2alpha.DataProvidersGrpcKt.DataProvidersCoroutineStub as KingdomDataProvidersCoroutineStub +import org.wfanet.measurement.api.v2alpha.EventGroupMetadataDescriptorsGrpcKt.EventGroupMetadataDescriptorsCoroutineStub as KingdomEventGroupMetadataDescriptorsCoroutineStub +import org.wfanet.measurement.api.v2alpha.EventGroupsGrpcKt.EventGroupsCoroutineStub as KingdomEventGroupsCoroutineStub +import org.wfanet.measurement.api.v2alpha.MeasurementConsumerKey +import org.wfanet.measurement.api.v2alpha.MeasurementConsumersGrpcKt.MeasurementConsumersCoroutineStub as KingdomMeasurementConsumersCoroutineStub +import org.wfanet.measurement.api.v2alpha.MeasurementsGrpcKt.MeasurementsCoroutineStub as KingdomMeasurementsCoroutineStub +import org.wfanet.measurement.api.withAuthenticationKey +import org.wfanet.measurement.common.api.PrincipalLookup +import org.wfanet.measurement.common.api.memoizing +import org.wfanet.measurement.common.commandLineMain +import org.wfanet.measurement.common.crypto.SigningCerts +import org.wfanet.measurement.common.grpc.CommonServer +import org.wfanet.measurement.common.grpc.ErrorLoggingServerInterceptor +import org.wfanet.measurement.common.grpc.LoggingServerInterceptor +import org.wfanet.measurement.common.grpc.buildMutualTlsChannel +import org.wfanet.measurement.common.grpc.withVerboseLogging +import org.wfanet.measurement.common.parseTextProto +import org.wfanet.measurement.config.reporting.MeasurementConsumerConfigs +import org.wfanet.measurement.config.reporting.MetricSpecConfig +import org.wfanet.measurement.internal.reporting.v2.MeasurementConsumersGrpcKt.MeasurementConsumersCoroutineStub as InternalMeasurementConsumersCoroutineStub +import org.wfanet.measurement.internal.reporting.v2.MeasurementsGrpcKt.MeasurementsCoroutineStub as InternalMeasurementsCoroutineStub +import org.wfanet.measurement.internal.reporting.v2.MetricsGrpcKt.MetricsCoroutineStub as InternalMetricsCoroutineStub +import org.wfanet.measurement.internal.reporting.v2.ReportingSetsGrpcKt.ReportingSetsCoroutineStub as InternalReportingSetsCoroutineStub +import org.wfanet.measurement.internal.reporting.v2.ReportsGrpcKt.ReportsCoroutineStub as InternalReportsCoroutineStub +import org.wfanet.measurement.internal.reporting.v2.measurementConsumer +import org.wfanet.measurement.reporting.deploy.common.EncryptionKeyPairMap +import org.wfanet.measurement.reporting.deploy.common.KingdomApiFlags +import org.wfanet.measurement.reporting.service.api.CelEnvCacheProvider +import org.wfanet.measurement.reporting.service.api.InMemoryEncryptionKeyPairStore +import org.wfanet.measurement.reporting.service.api.v2alpha.AkidPrincipalLookup +import org.wfanet.measurement.reporting.service.api.v2alpha.EventGroupsService +import org.wfanet.measurement.reporting.service.api.v2alpha.MetadataPrincipalServerInterceptor.Companion.withMetadataPrincipalIdentities +import org.wfanet.measurement.reporting.service.api.v2alpha.MetricsService +import org.wfanet.measurement.reporting.service.api.v2alpha.ReportingPrincipal +import org.wfanet.measurement.reporting.service.api.v2alpha.ReportingSetsService +import org.wfanet.measurement.reporting.service.api.v2alpha.ReportsService +import org.wfanet.measurement.reporting.service.api.v2alpha.withPrincipalsFromX509AuthorityKeyIdentifiers +import org.wfanet.measurement.reporting.v2alpha.MetricsGrpcKt.MetricsCoroutineStub +import picocli.CommandLine + +private const val SERVER_NAME = "V2AlphaPublicApiServer" + +@CommandLine.Command( + name = SERVER_NAME, + description = ["Server daemon for Reporting v2alpha public API services."], + mixinStandardHelpOptions = true, + showDefaultValues = true +) +private fun run( + @CommandLine.Mixin reportingApiServerFlags: ReportingApiServerFlags, + @CommandLine.Mixin kingdomApiFlags: KingdomApiFlags, + @CommandLine.Mixin commonServerFlags: CommonServer.Flags, + @CommandLine.Mixin v2AlphaFlags: V2AlphaFlags, + @CommandLine.Mixin encryptionKeyPairMap: EncryptionKeyPairMap, +) { + val clientCerts = + SigningCerts.fromPemFiles( + certificateFile = commonServerFlags.tlsFlags.certFile, + privateKeyFile = commonServerFlags.tlsFlags.privateKeyFile, + trustedCertCollectionFile = commonServerFlags.tlsFlags.certCollectionFile + ) + val channel: Channel = + buildMutualTlsChannel( + reportingApiServerFlags.internalApiFlags.target, + clientCerts, + reportingApiServerFlags.internalApiFlags.certHost + ) + .withVerboseLogging(reportingApiServerFlags.debugVerboseGrpcClientLogging) + + val kingdomChannel: Channel = + buildMutualTlsChannel( + target = kingdomApiFlags.target, + clientCerts = clientCerts, + hostName = kingdomApiFlags.certHost + ) + .withVerboseLogging(reportingApiServerFlags.debugVerboseGrpcClientLogging) + + val principalLookup: PrincipalLookup = + AkidPrincipalLookup( + v2AlphaFlags.authorityKeyIdentifierToPrincipalMapFile, + v2AlphaFlags.measurementConsumerConfigFile + ) + .memoizing() + + val measurementConsumerConfigs = + parseTextProto( + v2AlphaFlags.measurementConsumerConfigFile, + MeasurementConsumerConfigs.getDefaultInstance() + ) + + val internalMeasurementConsumersCoroutineStub = InternalMeasurementConsumersCoroutineStub(channel) + runBlocking { + measurementConsumerConfigs.configsMap.keys.forEach { + val measurementConsumerKey = + MeasurementConsumerKey.fromName(it) + ?: throw IllegalArgumentException("measurement_consumer_config is invalid") + try { + internalMeasurementConsumersCoroutineStub.createMeasurementConsumer( + measurementConsumer { + cmmsMeasurementConsumerId = measurementConsumerKey.measurementConsumerId + } + ) + } catch (e: StatusException) { + when (e.status.code) { + Status.Code.ALREADY_EXISTS -> {} + else -> throw e + } + } + } + } + + val metricSpecConfig = + parseTextProto(v2AlphaFlags.metricSpecConfigFile, MetricSpecConfig.getDefaultInstance()) + + val apiKey = measurementConsumerConfigs.configsMap.values.first().apiKey + val celEnvCacheProvider = + CelEnvCacheProvider( + KingdomEventGroupMetadataDescriptorsCoroutineStub(kingdomChannel) + .withAuthenticationKey(apiKey), + reportingApiServerFlags.eventGroupMetadataDescriptorCacheDuration, + Dispatchers.Default, + ) + + val metricsService = + MetricsService( + metricSpecConfig, + InternalReportingSetsCoroutineStub(channel), + InternalMetricsCoroutineStub(channel), + InternalMeasurementsCoroutineStub(channel), + KingdomDataProvidersCoroutineStub(kingdomChannel), + KingdomMeasurementsCoroutineStub(kingdomChannel), + KingdomCertificatesCoroutineStub(kingdomChannel), + KingdomMeasurementConsumersCoroutineStub(kingdomChannel), + InMemoryEncryptionKeyPairStore(encryptionKeyPairMap.keyPairs), + SecureRandom(), + v2AlphaFlags.signingPrivateKeyStoreDir, + commonServerFlags.tlsFlags.signingCerts.trustedCertificates, + Dispatchers.IO + ) + + val inProcessServerName = InProcessServerBuilder.generateName() + + val executor: ExecutorService = + ThreadPoolExecutor( + 1, + commonServerFlags.threadPoolSize, + 60L, + TimeUnit.SECONDS, + LinkedBlockingQueue() + ) + + InProcessServerBuilder.forName(inProcessServerName) + .apply { + executor(executor) + addService(metricsService.withMetadataPrincipalIdentities(measurementConsumerConfigs)) + if (commonServerFlags.debugVerboseGrpcLogging) { + intercept(LoggingServerInterceptor) + } else { + intercept(ErrorLoggingServerInterceptor) + } + } + .build() + .start() + + val inProcessChannel = + InProcessChannelBuilder.forName(inProcessServerName).directExecutor().build() + + val services: List = + listOf( + EventGroupsService( + KingdomEventGroupsCoroutineStub(kingdomChannel), + InMemoryEncryptionKeyPairStore(encryptionKeyPairMap.keyPairs), + celEnvCacheProvider, + ) + .withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup), + metricsService.withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup), + ReportingSetsService(InternalReportingSetsCoroutineStub(channel)) + .withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup), + ReportsService( + InternalReportsCoroutineStub(channel), + MetricsCoroutineStub(inProcessChannel), + metricSpecConfig + ) + .withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup), + ) + CommonServer.fromFlags(commonServerFlags, SERVER_NAME, services).start().blockUntilShutdown() +} + +fun main(args: Array) = commandLineMain(::run, args) + +/** Flags specific to the V2Alpha API version. */ +private class V2AlphaFlags { + @CommandLine.Option( + names = ["--authority-key-identifier-to-principal-map-file"], + description = ["File path to a AuthorityKeyToPrincipalMap textproto"], + required = true, + ) + lateinit var authorityKeyIdentifierToPrincipalMapFile: File + private set + + @CommandLine.Option( + names = ["--measurement-consumer-config-file"], + description = ["File path to a MeasurementConsumerConfig textproto"], + required = true, + ) + lateinit var measurementConsumerConfigFile: File + private set + + @CommandLine.Option( + names = ["--signing-private-key-store-dir"], + description = ["File path to the signing private key store directory"], + required = true, + ) + lateinit var signingPrivateKeyStoreDir: File + private set + + @CommandLine.Option( + names = ["--metric-spec-config-file"], + description = ["File path to a MetricSpecConfig textproto"], + required = true, + ) + lateinit var metricSpecConfigFile: File + private set +} diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud/postgres/server/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud/postgres/server/BUILD.bazel new file mode 100644 index 00000000000..5b801afa211 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud/postgres/server/BUILD.bazel @@ -0,0 +1,32 @@ +load("@io_bazel_rules_kotlin//kotlin:jvm.bzl", "kt_jvm_library") +load("@rules_java//java:defs.bzl", "java_binary") +load("//src/main/docker:macros.bzl", "java_image") + +kt_jvm_library( + name = "gcloud_postgres_internal_reporting_server", + srcs = ["GCloudPostgresInternalReportingServer.kt"], + runtime_deps = ["@wfa_common_jvm//imports/java/com/google/cloud/sql/postgres:r2dbc"], + deps = [ + "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server:internal_reporting_server", + "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/postgres:services", + "@wfa_common_jvm//imports/java/io/r2dbc", + "@wfa_common_jvm//imports/java/picocli", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/db/r2dbc/postgres", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/gcloud/postgres:factories", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/gcloud/postgres:flags", + ], +) + +java_binary( + name = "GCloudPostgresInternalReportingServer", + main_class = "org.wfanet.measurement.reporting.deploy.v2.gcloud.postgres.server.GCloudPostgresInternalReportingServerKt", + runtime_deps = [":gcloud_postgres_internal_reporting_server"], +) + +java_image( + name = "gcloud_postgres_internal_reporting_server_image", + binary = ":GCloudPostgresInternalReportingServer", + main_class = "org.wfanet.measurement.reporting.deploy.v2.gcloud.postgres.server.GCloudPostgresInternalReportingServerKt", + visibility = ["//src:docker_image_deployment"], +) diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud/postgres/server/GCloudPostgresInternalReportingServer.kt b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud/postgres/server/GCloudPostgresInternalReportingServer.kt new file mode 100644 index 00000000000..1b5a4a3cd19 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud/postgres/server/GCloudPostgresInternalReportingServer.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2023 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.reporting.deploy.v2.gcloud.postgres.server + +import java.time.Clock +import kotlinx.coroutines.runBlocking +import org.wfanet.measurement.common.commandLineMain +import org.wfanet.measurement.common.db.r2dbc.postgres.PostgresDatabaseClient +import org.wfanet.measurement.common.identity.RandomIdGenerator +import org.wfanet.measurement.gcloud.postgres.PostgresConnectionFactories +import org.wfanet.measurement.gcloud.postgres.PostgresFlags as GCloudPostgresFlags +import org.wfanet.measurement.reporting.deploy.v2.common.server.InternalReportingServer +import org.wfanet.measurement.reporting.deploy.v2.common.server.postgres.PostgresServices +import picocli.CommandLine + +/** Implementation of [InternalReportingServer] using Google Cloud Postgres. */ +@CommandLine.Command( + name = "GCloudPostgresInternalReportingServer", + description = ["Start the internal Reporting data-layer services in a single blocking server."], + mixinStandardHelpOptions = true, + showDefaultValues = true +) +class GCloudPostgresInternalReportingServer : InternalReportingServer() { + @CommandLine.Mixin private lateinit var gCloudPostgresFlags: GCloudPostgresFlags + + override fun run() = runBlocking { + val clock = Clock.systemUTC() + val idGenerator = RandomIdGenerator(clock) + + val factory = PostgresConnectionFactories.buildConnectionFactory(gCloudPostgresFlags) + val client = PostgresDatabaseClient.fromConnectionFactory(factory) + + run(PostgresServices.create(idGenerator, client)) + } +} + +fun main(args: Array) = commandLineMain(GCloudPostgresInternalReportingServer(), args) diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud/postgres/tools/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud/postgres/tools/BUILD.bazel new file mode 100644 index 00000000000..ac27f0c7f0e --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud/postgres/tools/BUILD.bazel @@ -0,0 +1,20 @@ +load("@rules_java//java:defs.bzl", "java_binary") +load("//src/main/docker:macros.bzl", "java_image") + +java_binary( + name = "UpdateSchema", + args = ["--changelog=reporting/postgres/changelog-v2.yaml"], + main_class = "org.wfanet.measurement.gcloud.postgres.tools.UpdateSchema", + resources = ["//src/main/resources/reporting/postgres"], + runtime_deps = [ + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/gcloud/postgres/tools:update_schema", + ], +) + +java_image( + name = "update_schema_image", + args = ["--changelog=reporting/postgres/changelog-v2.yaml"], + binary = ":UpdateSchema", + main_class = "org.wfanet.measurement.gcloud.postgres.tools.UpdateSchema", + visibility = ["//src:docker_image_deployment"], +) diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/server/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/server/BUILD.bazel new file mode 100644 index 00000000000..a415ccbba4e --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/server/BUILD.bazel @@ -0,0 +1,29 @@ +load("@io_bazel_rules_kotlin//kotlin:jvm.bzl", "kt_jvm_library") +load("@rules_java//java:defs.bzl", "java_binary") +load("//src/main/docker:macros.bzl", "java_image") + +kt_jvm_library( + name = "postgres_internal_reporting_server", + srcs = ["PostgresInternalReportingServer.kt"], + deps = [ + "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server:internal_reporting_server", + "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/postgres:services", + "@wfa_common_jvm//imports/java/picocli", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/db/postgres:flags", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/db/r2dbc/postgres", + ], +) + +java_binary( + name = "PostgresInternalReportingServer", + main_class = "org.wfanet.measurement.reporting.deploy.v2.postgres.server.PostgresInternalReportingServerKt", + runtime_deps = [":postgres_internal_reporting_server"], +) + +java_image( + name = "postgres_internal_reporting_server_image", + binary = ":PostgresInternalReportingServer", + main_class = "org.wfanet.measurement.reporting.deploy.v2.postgres.server.PostgresInternalReportingServerKt", + visibility = ["//src:docker_image_deployment"], +) diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/server/PostgresInternalReportingServer.kt b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/server/PostgresInternalReportingServer.kt new file mode 100644 index 00000000000..a440dfd9e89 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/server/PostgresInternalReportingServer.kt @@ -0,0 +1,49 @@ +/* + * Copyright 2023 The Cross-Media Measurement Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.wfanet.measurement.reporting.deploy.v2.postgres.server + +import java.time.Clock +import kotlinx.coroutines.runBlocking +import org.wfanet.measurement.common.commandLineMain +import org.wfanet.measurement.common.db.postgres.PostgresFlags +import org.wfanet.measurement.common.db.r2dbc.postgres.PostgresDatabaseClient +import org.wfanet.measurement.common.identity.RandomIdGenerator +import org.wfanet.measurement.reporting.deploy.v2.common.server.InternalReportingServer +import org.wfanet.measurement.reporting.deploy.v2.common.server.postgres.PostgresServices +import picocli.CommandLine + +/** Implementation of [InternalReportingServer] using Postgres. */ +@CommandLine.Command( + name = "PostgresInternalReportingServer", + description = ["Start the internal Reporting data-layer services in a single blocking server."], + mixinStandardHelpOptions = true, + showDefaultValues = true +) +class PostgresInternalReportingServer : InternalReportingServer() { + @CommandLine.Mixin private lateinit var postgresFlags: PostgresFlags + + override fun run() = runBlocking { + val clock = Clock.systemUTC() + val idGenerator = RandomIdGenerator(clock) + + val client = PostgresDatabaseClient.fromFlags(postgresFlags) + + run(PostgresServices.create(idGenerator, client)) + } +} + +fun main(args: Array) = commandLineMain(PostgresInternalReportingServer(), args) diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/tools/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/tools/BUILD.bazel new file mode 100644 index 00000000000..6a36ce4fbd7 --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres/tools/BUILD.bazel @@ -0,0 +1,20 @@ +load("@rules_java//java:defs.bzl", "java_binary") +load("//src/main/docker:macros.bzl", "java_image") + +java_binary( + name = "UpdateSchema", + args = ["--changelog=reporting/postgres/changelog-v2.yaml"], + main_class = "org.wfanet.measurement.common.db.postgres.tools.UpdateSchema", + resources = ["//src/main/resources/reporting/postgres"], + runtime_deps = [ + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/db/postgres/tools:update_schema", + ], +) + +java_image( + name = "update_schema_image", + args = ["--changelog=reporting/postgres/changelog-v2.yaml"], + binary = ":UpdateSchema", + main_class = "org.wfanet.measurement.common.db.postgres.tools.UpdateSchema", + visibility = ["//src:docker_image_deployment"], +) diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportingSetsService.kt b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportingSetsService.kt index ab11e00b653..11bff84f7ed 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportingSetsService.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportingSetsService.kt @@ -90,7 +90,7 @@ class ReportingSetsService(private val internalReportingSetsStub: ReportingSetsC throw when (e.status.code) { Status.Code.ALREADY_EXISTS -> Status.ALREADY_EXISTS.withDescription( - "Metric with ID ${request.reportingSetId} already exists under ${request.parent}" + "ReportingSet with ID ${request.reportingSetId} already exists under ${request.parent}" ) Status.Code.NOT_FOUND -> Status.NOT_FOUND.withDescription("Child ReportingSet not found.") Status.Code.FAILED_PRECONDITION -> diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsService.kt b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsService.kt index aad6c103ef8..2d7fc1b1f64 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsService.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsService.kt @@ -229,7 +229,7 @@ class ReportsService( throw when (e.status.code) { Status.Code.INVALID_ARGUMENT -> Status.INVALID_ARGUMENT.withDescription(e.message) Status.Code.PERMISSION_DENIED -> Status.PERMISSION_DENIED.withDescription(e.message) - else -> Status.UNKNOWN.withDescription("Unable to create Metrics.") + else -> Status.UNKNOWN.withDescription("Unable to get Metrics.") } .withCause(e) .asRuntimeException() diff --git a/src/main/resources/reporting/postgres/BUILD.bazel b/src/main/resources/reporting/postgres/BUILD.bazel index 481ceea5b48..2ccee916065 100644 --- a/src/main/resources/reporting/postgres/BUILD.bazel +++ b/src/main/resources/reporting/postgres/BUILD.bazel @@ -1,6 +1,7 @@ package(default_visibility = [ "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/gcloud/postgres:__subpackages__", "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/postgres:__subpackages__", + "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/gcloud:__subpackages__", "//src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres:__subpackages__", "//src/test/kotlin/org/wfanet/measurement/reporting/deploy/postgres:__subpackages__", "//src/test/kotlin/org/wfanet/measurement/reporting/deploy/v2/postgres:__subpackages__",