From a3a5afa291a4e7bb346caf10e834e3d68c14cf74 Mon Sep 17 00:00:00 2001 From: Sanjay Vasandani Date: Fri, 12 Jan 2024 08:30:46 -0800 Subject: [PATCH] Pass Reporting EventGroup message descriptor to CelEnvProvider impls. (#1417) This allows CelEnvProvider to be used for multiple Reporting API versions. Fixes #1406 (cherry picked from commit 2306a1675585aa3139e3c38b79a28b19cd52a4a4) --- .../common/reporting/InProcessReportingServer.kt | 2 ++ .../common/reporting/v2/InProcessReportingServer.kt | 2 ++ .../deploy/common/server/V1AlphaPublicApiServer.kt | 2 ++ .../v2/common/server/V2AlphaPublicApiServer.kt | 2 ++ .../measurement/reporting/service/api/BUILD.bazel | 1 - .../reporting/service/api/CelEnvProvider.kt | 13 ++++++------- .../measurement/reporting/service/api/BUILD.bazel | 1 + .../reporting/service/api/CelEnvProviderTest.kt | 7 +++++++ .../service/api/v1alpha/EventGroupsServiceTest.kt | 2 ++ .../service/api/v2alpha/EventGroupsServiceTest.kt | 3 +++ 10 files changed, 27 insertions(+), 8 deletions(-) diff --git a/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/InProcessReportingServer.kt b/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/InProcessReportingServer.kt index 3db358aa04b..b3aeb7a4307 100644 --- a/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/InProcessReportingServer.kt +++ b/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/InProcessReportingServer.kt @@ -52,6 +52,7 @@ import org.wfanet.measurement.reporting.service.api.InMemoryEncryptionKeyPairSto import org.wfanet.measurement.reporting.service.api.v1alpha.EventGroupsService import org.wfanet.measurement.reporting.service.api.v1alpha.ReportingSetsService import org.wfanet.measurement.reporting.service.api.v1alpha.ReportsService +import org.wfanet.measurement.reporting.v1alpha.EventGroup /** TestRule that starts and stops all Reporting Server gRPC services. */ class InProcessReportingServer( @@ -123,6 +124,7 @@ class InProcessReportingServer( publicKingdomEventGroupMetadataDescriptorsClient.withAuthenticationKey( measurementConsumerConfig.apiKey ), + EventGroup.getDescriptor(), Duration.ofSeconds(5), Dispatchers.Default, ) diff --git a/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/v2/InProcessReportingServer.kt b/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/v2/InProcessReportingServer.kt index 82721be1ad4..accc171fcdf 100644 --- a/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/v2/InProcessReportingServer.kt +++ b/src/main/kotlin/org/wfanet/measurement/integration/common/reporting/v2/InProcessReportingServer.kt @@ -68,6 +68,7 @@ import org.wfanet.measurement.reporting.service.api.v2alpha.MetadataPrincipalSer import org.wfanet.measurement.reporting.service.api.v2alpha.MetricsService import org.wfanet.measurement.reporting.service.api.v2alpha.ReportingSetsService import org.wfanet.measurement.reporting.service.api.v2alpha.ReportsService +import org.wfanet.measurement.reporting.v2alpha.EventGroup import org.wfanet.measurement.reporting.v2alpha.MetricsGrpcKt.MetricsCoroutineStub as PublicMetricsCoroutineStub /** TestRule that starts and stops all Reporting Server gRPC services. */ @@ -178,6 +179,7 @@ class InProcessReportingServer( publicKingdomEventGroupMetadataDescriptorsClient.withAuthenticationKey( measurementConsumerConfig.apiKey ), + EventGroup.getDescriptor(), Duration.ofSeconds(5), Dispatchers.Default, ) diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/common/server/V1AlphaPublicApiServer.kt b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/common/server/V1AlphaPublicApiServer.kt index 435be4d2d27..c9c9cfc580d 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/common/server/V1AlphaPublicApiServer.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/common/server/V1AlphaPublicApiServer.kt @@ -51,6 +51,7 @@ import org.wfanet.measurement.reporting.service.api.v1alpha.ReportingPrincipal import org.wfanet.measurement.reporting.service.api.v1alpha.ReportingSetsService import org.wfanet.measurement.reporting.service.api.v1alpha.ReportsService import org.wfanet.measurement.reporting.service.api.v1alpha.withPrincipalsFromX509AuthorityKeyIdentifiers +import org.wfanet.measurement.reporting.v1alpha.EventGroup import picocli.CommandLine private const val SERVER_NAME = "V1AlphaPublicApiServer" @@ -108,6 +109,7 @@ private fun run( CelEnvCacheProvider( KingdomEventGroupMetadataDescriptorsCoroutineStub(kingdomChannel) .withAuthenticationKey(apiKey), + EventGroup.getDescriptor(), reportingApiServerFlags.eventGroupMetadataDescriptorCacheDuration, Dispatchers.Default, ) diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/V2AlphaPublicApiServer.kt b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/V2AlphaPublicApiServer.kt index 916a605f72e..cfa8c6ce515 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/V2AlphaPublicApiServer.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/deploy/v2/common/server/V2AlphaPublicApiServer.kt @@ -71,6 +71,7 @@ import org.wfanet.measurement.reporting.service.api.v2alpha.ReportingPrincipal import org.wfanet.measurement.reporting.service.api.v2alpha.ReportingSetsService import org.wfanet.measurement.reporting.service.api.v2alpha.ReportsService import org.wfanet.measurement.reporting.service.api.v2alpha.withPrincipalsFromX509AuthorityKeyIdentifiers +import org.wfanet.measurement.reporting.v2alpha.EventGroup import org.wfanet.measurement.reporting.v2alpha.MetricsGrpcKt.MetricsCoroutineStub import picocli.CommandLine @@ -153,6 +154,7 @@ private fun run( CelEnvCacheProvider( KingdomEventGroupMetadataDescriptorsCoroutineStub(kingdomChannel) .withAuthenticationKey(apiKey), + EventGroup.getDescriptor(), reportingApiServerFlags.eventGroupMetadataDescriptorCacheDuration, Dispatchers.Default, ) diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/BUILD.bazel index 0c770ee7c07..1f3034bff0f 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/BUILD.bazel +++ b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/BUILD.bazel @@ -18,7 +18,6 @@ kt_jvm_library( "//imports/java/org/projectnessie/cel", "//src/main/proto/wfa/measurement/api/v2alpha:event_group_metadata_descriptor_kt_jvm_proto", "//src/main/proto/wfa/measurement/api/v2alpha:event_group_metadata_descriptors_service_kt_jvm_grpc_proto", - "//src/main/proto/wfa/measurement/reporting/v1alpha:event_group_kt_jvm_proto", "@wfa_common_jvm//imports/java/com/google/protobuf", "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core", "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:jdk8", diff --git a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt index 079610b394a..0ef41db2dfc 100644 --- a/src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt +++ b/src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt @@ -49,7 +49,6 @@ import org.wfanet.measurement.api.v2alpha.EventGroupMetadataDescriptor import org.wfanet.measurement.api.v2alpha.EventGroupMetadataDescriptorsGrpcKt import org.wfanet.measurement.api.v2alpha.listEventGroupMetadataDescriptorsRequest import org.wfanet.measurement.common.ProtoReflection -import org.wfanet.measurement.reporting.v1alpha.EventGroup private const val METADATA_FIELD = "metadata.metadata" private const val MAX_PAGE_SIZE = 1000 @@ -66,6 +65,8 @@ interface CelEnvProvider { class CelEnvCacheProvider( private val eventGroupsMetadataDescriptorsStub: EventGroupMetadataDescriptorsGrpcKt.EventGroupMetadataDescriptorsCoroutineStub, + /** Protobuf descriptor of Reporting EventGroup message type. */ + private val reportingEventGroupDescriptor: Descriptors.Descriptor, private val cacheRefreshInterval: Duration, coroutineContext: CoroutineContext, private val numRetriesInitialSync: Int = 3, @@ -142,22 +143,20 @@ class CelEnvCacheProvider( // Build CEL ProtoTypeRegistry. val celTypeRegistry = ProtoTypeRegistry.newRegistry() descriptors.forEach { celTypeRegistry.registerDescriptor(it.file) } - - celTypeRegistry.registerMessage(EventGroup.getDefaultInstance()) + celTypeRegistry.registerDescriptor(reportingEventGroupDescriptor.file) // Build CEL Env. - val eventGroupDescriptor = EventGroup.getDescriptor() val env = Env.newEnv( - EnvOption.container(eventGroupDescriptor.fullName), + EnvOption.container(reportingEventGroupDescriptor.fullName), EnvOption.customTypeProvider(celTypeRegistry), EnvOption.customTypeAdapter(celTypeRegistry), EnvOption.declarations( - eventGroupDescriptor.fields + reportingEventGroupDescriptor.fields .map { Decls.newVar( it.name, - celTypeRegistry.findFieldType(eventGroupDescriptor.fullName, it.name).type + celTypeRegistry.findFieldType(reportingEventGroupDescriptor.fullName, it.name).type ) } // TODO(projectnessie/cel-java#295): Remove when fixed. diff --git a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/BUILD.bazel b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/BUILD.bazel index 8166234f80f..bde9f53abb7 100644 --- a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/BUILD.bazel +++ b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/BUILD.bazel @@ -9,6 +9,7 @@ kt_jvm_test( deps = [ "//src/main/proto/wfa/measurement/api/v2alpha:event_group_metadata_descriptors_service_kt_jvm_grpc_proto", "//src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing:test_metadata_messages_kt_jvm_proto", + "//src/main/proto/wfa/measurement/reporting/v1alpha:event_group_kt_jvm_proto", "@wfa_common_jvm//imports/java/com/google/common/truth", "@wfa_common_jvm//imports/kotlin/kotlin/test", "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core", diff --git a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProviderTest.kt b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProviderTest.kt index 83ec5f7d0cc..52cd3b57c79 100644 --- a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProviderTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProviderTest.kt @@ -96,6 +96,7 @@ class CelEnvProviderTest { EventGroupMetadataDescriptorsGrpcKt.EventGroupMetadataDescriptorsCoroutineStub( grpcTestServerRule.channel ), + REPORTING_EVENT_GROUP_DESCRIPTOR, Duration.ofMinutes(5), coroutineContext ) @@ -132,6 +133,7 @@ class CelEnvProviderTest { EventGroupMetadataDescriptorsGrpcKt.EventGroupMetadataDescriptorsCoroutineStub( grpcTestServerRule.channel ), + REPORTING_EVENT_GROUP_DESCRIPTOR, Duration.ofMinutes(5), coroutineContext, 1 @@ -171,6 +173,7 @@ class CelEnvProviderTest { EventGroupMetadataDescriptorsGrpcKt.EventGroupMetadataDescriptorsCoroutineStub( grpcTestServerRule.channel ), + REPORTING_EVENT_GROUP_DESCRIPTOR, Duration.ofMinutes(5), coroutineContext, numRetries @@ -201,6 +204,7 @@ class CelEnvProviderTest { EventGroupMetadataDescriptorsGrpcKt.EventGroupMetadataDescriptorsCoroutineStub( grpcTestServerRule.channel ), + REPORTING_EVENT_GROUP_DESCRIPTOR, Duration.ofMinutes(5), coroutineContext, numRetries @@ -245,6 +249,7 @@ class CelEnvProviderTest { EventGroupMetadataDescriptorsGrpcKt.EventGroupMetadataDescriptorsCoroutineStub( grpcTestServerRule.channel ), + REPORTING_EVENT_GROUP_DESCRIPTOR, cacheRefreshInterval, coroutineContext ) @@ -270,6 +275,8 @@ class CelEnvProviderTest { } companion object { + private val REPORTING_EVENT_GROUP_DESCRIPTOR = EventGroup.getDescriptor() + private fun verifyTypeRegistryAndEnv(typeRegistryAndEnv: CelEnvProvider.TypeRegistryAndEnv) { val eventGroup = eventGroup { metadata = diff --git a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsServiceTest.kt b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsServiceTest.kt index 704f4166a98..0214cf698c2 100644 --- a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsServiceTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsServiceTest.kt @@ -66,6 +66,7 @@ import org.wfanet.measurement.consent.client.common.toEncryptionPublicKey import org.wfanet.measurement.consent.client.dataprovider.encryptMetadata import org.wfanet.measurement.reporting.service.api.CelEnvCacheProvider import org.wfanet.measurement.reporting.service.api.InMemoryEncryptionKeyPairStore +import org.wfanet.measurement.reporting.v1alpha.EventGroup import org.wfanet.measurement.reporting.v1alpha.EventGroupKt.metadata import org.wfanet.measurement.reporting.v1alpha.eventGroup import org.wfanet.measurement.reporting.v1alpha.listEventGroupsRequest @@ -192,6 +193,7 @@ class EventGroupsServiceTest { val celEnvCacheProvider = CelEnvCacheProvider( EventGroupMetadataDescriptorsCoroutineStub(grpcTestServerRule.channel), + EventGroup.getDescriptor(), Duration.ofSeconds(5), Dispatchers.Default, ) diff --git a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/EventGroupsServiceTest.kt b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/EventGroupsServiceTest.kt index 3829a0a34d2..cdda02df839 100644 --- a/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/EventGroupsServiceTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/EventGroupsServiceTest.kt @@ -68,6 +68,7 @@ import org.wfanet.measurement.consent.client.common.toEncryptionPublicKey import org.wfanet.measurement.consent.client.dataprovider.encryptMetadata import org.wfanet.measurement.reporting.service.api.CelEnvCacheProvider import org.wfanet.measurement.reporting.service.api.InMemoryEncryptionKeyPairStore +import org.wfanet.measurement.reporting.v2alpha.EventGroup import org.wfanet.measurement.reporting.v2alpha.EventGroupKt import org.wfanet.measurement.reporting.v2alpha.eventGroup import org.wfanet.measurement.reporting.v2alpha.listEventGroupsRequest @@ -108,6 +109,7 @@ class EventGroupsServiceTest { val celEnvCacheProvider = CelEnvCacheProvider( EventGroupMetadataDescriptorsCoroutineStub(grpcTestServerRule.channel), + EventGroup.getDescriptor(), Duration.ofSeconds(5), Dispatchers.Default, ) @@ -508,6 +510,7 @@ class EventGroupsServiceTest { val celEnvCacheProvider = CelEnvCacheProvider( EventGroupMetadataDescriptorsCoroutineStub(grpcTestServerRule.channel), + EventGroup.getDescriptor(), Duration.ofSeconds(5), Dispatchers.Default, )