Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type registry and cel env caching. #983

Merged
merged 63 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
4dc9ab6
Add type registry and cel env caching.
tristanvuong2021 May 4, 2023
13c3322
lint fix
tristanvuong2021 May 8, 2023
bdda8fb
Major refactoring
tristanvuong2021 May 8, 2023
d9e58d3
lint fix
tristanvuong2021 May 8, 2023
49a6f61
Add unit tests
tristanvuong2021 May 9, 2023
d4ce4ea
lint fix
tristanvuong2021 May 9, 2023
e2e2550
lint fix
tristanvuong2021 May 9, 2023
73e2627
Change exception handling
tristanvuong2021 May 9, 2023
bbadc0e
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 10, 2023
ace8f63
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 11, 2023
b889948
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 15, 2023
ec35bde
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 15, 2023
f1f72ff
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 17, 2023
3390b97
Tests now work individually.
tristanvuong2021 May 18, 2023
6765ed3
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 18, 2023
36034e5
lint fix
tristanvuong2021 May 18, 2023
9208c18
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 18, 2023
8e74c2f
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 19, 2023
89527c7
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 22, 2023
e084b7e
Use coroutineScope instead of coroutineContext
tristanvuong2021 May 22, 2023
f4c6887
Lint fix
tristanvuong2021 May 22, 2023
f446322
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 22, 2023
f21886c
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 22, 2023
9fab9eb
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 23, 2023
5cae2b2
Back to coroutineContext
tristanvuong2021 May 24, 2023
f289d1c
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 24, 2023
bb1a164
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 24, 2023
8f2b2ea
lint fix
tristanvuong2021 May 24, 2023
5019dec
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 24, 2023
3e19e2f
Change experimental annotation
tristanvuong2021 May 24, 2023
1713491
Add close
tristanvuong2021 May 24, 2023
ae885cf
Fix tests
tristanvuong2021 May 25, 2023
0cd208e
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 25, 2023
5b01cac
lint fix
tristanvuong2021 May 25, 2023
cb21427
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 25, 2023
f526bcb
Refactor
tristanvuong2021 May 25, 2023
f427d48
lint fix
tristanvuong2021 May 25, 2023
d421eb1
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 25, 2023
c269a33
More verification
tristanvuong2021 May 26, 2023
b6d21f1
lint fix
tristanvuong2021 May 26, 2023
bee0857
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 May 31, 2023
384ae47
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 Jun 1, 2023
55ff0f9
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 Jun 5, 2023
5075034
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 Jun 6, 2023
52a4846
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 Jun 6, 2023
68c8531
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 Jun 7, 2023
eab3d86
Add additional unit test
tristanvuong2021 Jun 7, 2023
a43ef51
Lint fix
tristanvuong2021 Jun 7, 2023
be878f8
Update test to not use real delays.
SanjayVas Jun 8, 2023
64ef53c
Fix removal of clock
tristanvuong2021 Jun 8, 2023
f667b35
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 Jun 8, 2023
9ccb870
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 Jun 8, 2023
9a3cc8d
Fix isActive check and mark test as flaky.
tristanvuong2021 Jun 8, 2023
f4afa77
lint fix
tristanvuong2021 Jun 8, 2023
3f2c803
Remove unconfined test dispatcher
tristanvuong2021 Jun 8, 2023
54447bb
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 Jun 8, 2023
08b4314
Fix local deployment
tristanvuong2021 Jun 8, 2023
58a1fe3
Lint fix
tristanvuong2021 Jun 8, 2023
c1e4f3a
Add more test cases.
SanjayVas Jun 8, 2023
01c5ece
Merge branch 'main' into tristanvuong-periodically-update-cel-env-cache
tristanvuong2021 Jun 9, 2023
a66f707
Managed prometheus no longer gcloud beta.
tristanvuong2021 Jun 9, 2023
0c41fc4
Fix list event groups
tristanvuong2021 Jun 10, 2023
4aa5282
Lint fix
tristanvuong2021 Jun 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/gke/metrics-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ This can be done via the Google Cloud Console under "Features", or using the
gcloud CLI. For example, assuming a cluster named "kingdom":

```shell
gcloud beta container clusters update kingdom --enable-managed-prometheus
gcloud container clusters update kingdom --enable-managed-prometheus
```

## Service Accounts
Expand Down
1 change: 1 addition & 0 deletions src/main/k8s/reporting.cue
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ package k8s
_encryptionKeyPairConfigFileFlag,
"--port=8443",
"--health-port=8080",
"--event-group-metadata-descriptor-cache-duration=1h",
] + _tlsArgs + _internalApiTarget.args + _kingdomApiTarget.args

spec: template: spec: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ kt_jvm_library(
deps = [
"//src/main/kotlin/org/wfanet/measurement/integration/common/reporting/identity:metadata_principal_interceptor",
"//src/main/kotlin/org/wfanet/measurement/reporting/deploy/common/server:reporting_data_server",
"//src/main/kotlin/org/wfanet/measurement/reporting/service/api:cel_env_provider",
"//src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha:event_groups_service",
"//src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha:reporting_sets_service",
"//src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha:reports_service",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ import org.wfanet.measurement.api.v2alpha.EventGroupsGrpcKt
import org.wfanet.measurement.api.v2alpha.MeasurementConsumer
import org.wfanet.measurement.api.v2alpha.MeasurementConsumersGrpcKt
import org.wfanet.measurement.api.v2alpha.MeasurementsGrpcKt
import org.wfanet.measurement.api.v2alpha.batchGetEventGroupMetadataDescriptorsResponse
import org.wfanet.measurement.api.v2alpha.certificate
import org.wfanet.measurement.api.v2alpha.dataProvider
import org.wfanet.measurement.api.v2alpha.eventGroup
import org.wfanet.measurement.api.v2alpha.eventGroupMetadataDescriptor
import org.wfanet.measurement.api.v2alpha.listEventGroupMetadataDescriptorsResponse
import org.wfanet.measurement.api.v2alpha.listEventGroupsResponse
import org.wfanet.measurement.api.v2alpha.measurementConsumer
import org.wfanet.measurement.common.crypto.SigningKeyHandle
Expand Down Expand Up @@ -122,9 +122,9 @@ abstract class InProcessLifeOfAReportIntegrationTest {
private val publicKingdomEventGroupMetadataDescriptorsMock:
EventGroupMetadataDescriptorsGrpcKt.EventGroupMetadataDescriptorsCoroutineImplBase =
mockService {
onBlocking { batchGetEventGroupMetadataDescriptors(any()) }
onBlocking { listEventGroupMetadataDescriptors(any()) }
.thenReturn(
batchGetEventGroupMetadataDescriptorsResponse {
listEventGroupMetadataDescriptorsResponse {
eventGroupMetadataDescriptors += EVENT_GROUP_METADATA_DESCRIPTOR
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import io.grpc.Channel
import java.io.File
import java.security.SecureRandom
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.logging.Logger
import kotlinx.coroutines.Dispatchers
import org.junit.rules.TestRule
import org.junit.runner.Description
import org.junit.runners.model.Statement
Expand All @@ -29,6 +31,7 @@ import org.wfanet.measurement.api.v2alpha.EventGroupMetadataDescriptorsGrpcKt.Ev
import org.wfanet.measurement.api.v2alpha.EventGroupsGrpcKt.EventGroupsCoroutineStub as PublicKingdomEventGroupsCoroutineStub
import org.wfanet.measurement.api.v2alpha.MeasurementConsumersGrpcKt.MeasurementConsumersCoroutineStub as PublicKingdomMeasurementConsumersCoroutineStub
import org.wfanet.measurement.api.v2alpha.MeasurementsGrpcKt.MeasurementsCoroutineStub as PublicKingdomMeasurementsCoroutineStub
import org.wfanet.measurement.api.withAuthenticationKey
import org.wfanet.measurement.common.crypto.tink.loadPrivateKey
import org.wfanet.measurement.common.grpc.testing.GrpcTestServerRule
import org.wfanet.measurement.common.grpc.withVerboseLogging
Expand All @@ -42,6 +45,7 @@ import org.wfanet.measurement.internal.reporting.ReportingSetsGrpcKt.ReportingSe
import org.wfanet.measurement.internal.reporting.ReportsGrpcKt.ReportsCoroutineStub as InternalReportsCoroutineStub
import org.wfanet.measurement.reporting.deploy.common.server.ReportingDataServer
import org.wfanet.measurement.reporting.deploy.common.server.ReportingDataServer.Companion.toList
import org.wfanet.measurement.reporting.service.api.CelEnvCacheProvider
import org.wfanet.measurement.reporting.service.api.InMemoryEncryptionKeyPairStore
import org.wfanet.measurement.reporting.service.api.v1alpha.EventGroupsService
import org.wfanet.measurement.reporting.service.api.v1alpha.ReportingSetsService
Expand Down Expand Up @@ -112,11 +116,20 @@ class InProcessReportingServer(
)
)

val celEnvCacheProvider =
CelEnvCacheProvider(
publicKingdomEventGroupMetadataDescriptorsClient.withAuthenticationKey(
measurementConsumerConfig.apiKey
),
Duration.ofSeconds(5),
Dispatchers.Default,
)

listOf(
EventGroupsService(
publicKingdomEventGroupsClient,
publicKingdomEventGroupMetadataDescriptorsClient,
encryptionKeyPairStore
encryptionKeyPairStore,
celEnvCacheProvider,
)
.withMetadataPrincipalIdentities(measurementConsumerConfig),
ReportingSetsService(internalReportingSetsClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ kt_jvm_library(
"//src/main/kotlin/org/wfanet/measurement/common/api:memoizing_principal_lookup",
"//src/main/kotlin/org/wfanet/measurement/reporting/deploy/common:encryption_key_pair_map",
"//src/main/kotlin/org/wfanet/measurement/reporting/deploy/common:kingdom_flags",
"//src/main/kotlin/org/wfanet/measurement/reporting/service/api:cel_env_provider",
"//src/main/kotlin/org/wfanet/measurement/reporting/service/api:encryption_key_pair_store",
"//src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha:akid_principal_lookup",
"//src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha:event_groups_service",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.wfanet.measurement.reporting.deploy.common.server

import java.time.Duration
import kotlin.properties.Delegates
import org.wfanet.measurement.reporting.deploy.common.InternalApiFlags
import picocli.CommandLine
Expand All @@ -30,4 +31,16 @@ class ReportingApiServerFlags {
)
var debugVerboseGrpcClientLogging by Delegates.notNull<Boolean>()
private set

@CommandLine.Option(
names = ["--event-group-metadata-descriptor-cache-duration"],
description =
[
"How long the event group metadata descriptors are cached for before refreshing in format 1d1h1m1s1ms1ns"
],
defaultValue = "1h",
required = false,
)
lateinit var eventGroupMetadataDescriptorCacheDuration: Duration
private set
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,29 @@ import io.grpc.Channel
import io.grpc.ServerServiceDefinition
import java.io.File
import java.security.SecureRandom
import kotlinx.coroutines.Dispatchers
import org.wfanet.measurement.api.v2alpha.CertificatesGrpcKt.CertificatesCoroutineStub as KingdomCertificatesCoroutineStub
import org.wfanet.measurement.api.v2alpha.DataProvidersGrpcKt.DataProvidersCoroutineStub as KingdomDataProvidersCoroutineStub
import org.wfanet.measurement.api.v2alpha.EventGroupMetadataDescriptorsGrpcKt.EventGroupMetadataDescriptorsCoroutineStub as KingdomEventGroupMetadataDescriptorsCoroutineStub
import org.wfanet.measurement.api.v2alpha.EventGroupsGrpcKt.EventGroupsCoroutineStub as KingdomEventGroupsCoroutineStub
import org.wfanet.measurement.api.v2alpha.MeasurementConsumersGrpcKt.MeasurementConsumersCoroutineStub as KingdomMeasurementConsumersCoroutineStub
import org.wfanet.measurement.api.v2alpha.MeasurementsGrpcKt.MeasurementsCoroutineStub as KingdomMeasurementsCoroutineStub
import org.wfanet.measurement.api.withAuthenticationKey
import org.wfanet.measurement.common.api.PrincipalLookup
import org.wfanet.measurement.common.api.memoizing
import org.wfanet.measurement.common.commandLineMain
import org.wfanet.measurement.common.crypto.SigningCerts
import org.wfanet.measurement.common.grpc.CommonServer
import org.wfanet.measurement.common.grpc.buildMutualTlsChannel
import org.wfanet.measurement.common.grpc.withVerboseLogging
import org.wfanet.measurement.common.parseTextProto
import org.wfanet.measurement.config.reporting.MeasurementConsumerConfigs
import org.wfanet.measurement.internal.reporting.MeasurementsGrpcKt.MeasurementsCoroutineStub as InternalMeasurementsCoroutineStub
import org.wfanet.measurement.internal.reporting.ReportingSetsGrpcKt.ReportingSetsCoroutineStub as InternalReportingSetsCoroutineStub
import org.wfanet.measurement.internal.reporting.ReportsGrpcKt.ReportsCoroutineStub as InternalReportsCoroutineStub
import org.wfanet.measurement.reporting.deploy.common.EncryptionKeyPairMap
import org.wfanet.measurement.reporting.deploy.common.KingdomApiFlags
import org.wfanet.measurement.reporting.service.api.CelEnvCacheProvider
import org.wfanet.measurement.reporting.service.api.InMemoryEncryptionKeyPairStore
import org.wfanet.measurement.reporting.service.api.v1alpha.AkidPrincipalLookup
import org.wfanet.measurement.reporting.service.api.v1alpha.EventGroupsService
Expand Down Expand Up @@ -76,7 +81,11 @@ private fun run(
.withVerboseLogging(reportingApiServerFlags.debugVerboseGrpcClientLogging)

val kingdomChannel: Channel =
buildMutualTlsChannel(kingdomApiFlags.target, clientCerts, kingdomApiFlags.target)
buildMutualTlsChannel(
target = kingdomApiFlags.target,
clientCerts = clientCerts,
hostName = kingdomApiFlags.certHost
)
.withVerboseLogging(reportingApiServerFlags.debugVerboseGrpcClientLogging)

val principalLookup: PrincipalLookup<ReportingPrincipal, ByteString> =
Expand All @@ -86,12 +95,27 @@ private fun run(
)
.memoizing()

val measurementConsumerConfigs =
parseTextProto(
v1AlphaFlags.measurementConsumerConfigFile,
MeasurementConsumerConfigs.getDefaultInstance()
)

val apiKey = measurementConsumerConfigs.configsMap.values.first().apiKey
val celEnvCacheProvider =
CelEnvCacheProvider(
KingdomEventGroupMetadataDescriptorsCoroutineStub(kingdomChannel)
.withAuthenticationKey(apiKey),
reportingApiServerFlags.eventGroupMetadataDescriptorCacheDuration,
Dispatchers.Default,
)

val services: List<ServerServiceDefinition> =
listOf(
EventGroupsService(
KingdomEventGroupsCoroutineStub(kingdomChannel),
KingdomEventGroupMetadataDescriptorsCoroutineStub(kingdomChannel),
InMemoryEncryptionKeyPairStore(encryptionKeyPairMap.keyPairs)
InMemoryEncryptionKeyPairStore(encryptionKeyPairMap.keyPairs),
celEnvCacheProvider,
)
.withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup),
ReportingSetsService(InternalReportingSetsCoroutineStub(channel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,18 @@ kt_jvm_library(
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/crypto/tink",
],
)

kt_jvm_library(
name = "cel_env_provider",
srcs = ["CelEnvProvider.kt"],
deps = [
"//imports/java/org/projectnessie/cel",
"//src/main/proto/wfa/measurement/api/v2alpha:event_group_metadata_descriptor_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:event_group_metadata_descriptors_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/reporting/v1alpha:event_group_kt_jvm_proto",
"@wfa_common_jvm//imports/java/com/google/protobuf",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:jdk8",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
],
)
Loading