-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add type registry and cel env caching. #983
Add type registry and cel env caching. #983
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 7 files at r1, 2 of 4 files at r2, all commit messages.
Reviewable status: 3 of 7 files reviewed, 5 unresolved discussions (waiting on @tristanvuong2021)
src/main/kotlin/org/wfanet/measurement/reporting/deploy/common/server/ReportingApiServerFlags.kt
line 36 at r2 (raw file):
@CommandLine.Option( names = ["--list-event-groups-cache-refresh-interval"],
nit: maybe --event-group-metadata-descriptor-cache-duration
, since this is really answering "How long do I cache EventGroupMetadataDescriptors for (before refreshing)?"
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsService.kt
line 229 at r2 (raw file):
} private class ListEventGroupsCache(
This is technically a cache of the CEL environment (and by extension, the TypeRegistry used to build the environment which in turn comes from EventGroupMetadataDescriptors). The caching functionality isn't the salient point to the service, but rather the fact that it provides the environment.
I'd refactor this as follows: Extract an interface for the CEL env provider that this class implements. The service should take in an instance of the interface, since that's all it cares about. This means moving the implementation so it's not a nested class and the caller of the service constructor can pick the implementation.
Finally, write a test for the caching provider implementation.
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsService.kt
line 238 at r2 (raw file):
) private lateinit var typeRegistryAndEnv: TypeRegistryAndEnv
Access to this needs to be guarded for thread safety so e.g. it doesn't get a ready in the middle of being set. A kotlinx.coroutines.sync.Mutex should do the job. See https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html#mutual-exclusion
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsService.kt
line 241 at r2 (raw file):
init { CoroutineScope(Dispatchers.Default + SupervisorJob()).launch {
Take in the CoroutineContext (dispatcher) as a constructor parameter.
Code quote:
Dispatchers.Default
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsService.kt
line 242 at r2 (raw file):
init { CoroutineScope(Dispatchers.Default + SupervisorJob()).launch { MinimumIntervalThrottler(Clock.systemUTC(), cacheRefreshInterval).loopOnReady {
Take in the Clock instance as a constructor parameter.
Code quote:
Clock.systemUTC()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 15 files reviewed, 4 unresolved discussions (waiting on @SanjayVas)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsService.kt
line 229 at r2 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
This is technically a cache of the CEL environment (and by extension, the TypeRegistry used to build the environment which in turn comes from EventGroupMetadataDescriptors). The caching functionality isn't the salient point to the service, but rather the fact that it provides the environment.
I'd refactor this as follows: Extract an interface for the CEL env provider that this class implements. The service should take in an instance of the interface, since that's all it cares about. This means moving the implementation so it's not a nested class and the caller of the service constructor can pick the implementation.
Finally, write a test for the caching provider implementation.
Done.
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsService.kt
line 238 at r2 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Access to this needs to be guarded for thread safety so e.g. it doesn't get a ready in the middle of being set. A kotlinx.coroutines.sync.Mutex should do the job. See https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html#mutual-exclusion
Done.
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsService.kt
line 241 at r2 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Take in the CoroutineContext (dispatcher) as a constructor parameter.
Done.
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v1alpha/EventGroupsService.kt
line 242 at r2 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Take in the Clock instance as a constructor parameter.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 7 files at r1, 14 of 14 files at r3, all commit messages.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @tristanvuong2021)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt
line 65 at r3 (raw file):
EventGroupMetadataDescriptorsGrpcKt.EventGroupMetadataDescriptorsCoroutineStub, private val cacheRefreshInterval: Duration, coroutineDispatcher: CoroutineDispatcher,
The caller should be able to pass in an entire context. It just happens that the main useful thing to pass is the dispatcher, which is a context element and therefore also implements CoroutineContext.
Suggestion:
coroutineContext: CoroutineContext
src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt
line 80 at r3 (raw file):
val updateFlow = flow<Unit> { setTypeRegistryAndEnv() } launch { updateFlow.retry(numRefreshAttempts) { e -> e is java.lang.RuntimeException }.collect()
It's unsafe to just retry on any runtime exception. Actually check whether this is an exception that we'd consider retriable (this may involve wrapping the StatusException with our own Exception type which indicates this).
See https://grpc.github.io/grpc/core/md_doc_statuscodes.html for information on which codes are safe to retry on. At first glance since we're calling an idempotent method I'd say UNAVAILABLE and DEADLINE_EXCEEDED.
src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt
line 182 at r3 (raw file):
.withDescription("Error retrieving EventGroupMetadataDescriptors") .withCause(e) .asRuntimeException()
Only service methods throw StatusRuntimeException. This is being called in a background coroutine and not directly by a service method.
src/test/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProviderTest.kt
line 171 at r3 (raw file):
) delay(800)
Avoid introducing real delays in test code. See https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-test/kotlinx.coroutines.test/run-test.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 9 of 15 files reviewed, 4 unresolved discussions (waiting on @SanjayVas)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt
line 65 at r3 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
The caller should be able to pass in an entire context. It just happens that the main useful thing to pass is the dispatcher, which is a context element and therefore also implements CoroutineContext.
Done.
src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt
line 80 at r3 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
It's unsafe to just retry on any runtime exception. Actually check whether this is an exception that we'd consider retriable (this may involve wrapping the StatusException with our own Exception type which indicates this).
See https://grpc.github.io/grpc/core/md_doc_statuscodes.html for information on which codes are safe to retry on. At first glance since we're calling an idempotent method I'd say UNAVAILABLE and DEADLINE_EXCEEDED.
Done.
src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt
line 182 at r3 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Only service methods throw StatusRuntimeException. This is being called in a background coroutine and not directly by a service method.
Done.
src/test/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProviderTest.kt
line 171 at r3 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Avoid introducing real delays in test code. See https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-test/kotlinx.coroutines.test/run-test.html
Got rid of delays and can get the tests to work individually, but they time out when the entire test class is run at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 3 files at r5, 3 of 5 files at r7, all commit messages.
Reviewable status: 13 of 15 files reviewed, 2 unresolved discussions (waiting on @tristanvuong2021)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt
line 65 at r3 (raw file):
Previously, tristanvuong2021 (Tristan Vuong) wrote…
Done.
Not the CoroutineScope, but the CoroutineContext. The scope should be managed by CelEnvCacheProvider
, which should implement AutoCloseable
and cancel the coroutine scope on close
.
src/main/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProvider.kt
line 80 at r3 (raw file):
Previously, tristanvuong2021 (Tristan Vuong) wrote…
Done.
StatusException should always be caught at the RPC site. Check the conditions there and wrap in a custom exception to indicate that it's retriable.
Another way to think of this is that the caller of setTypeRegistryAndEnv
has no idea what RPCs may be called and therefore whether a given status code is retriable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 14 of 15 files reviewed, 1 unresolved discussion (waiting on @SanjayVas)
src/test/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProviderTest.kt
line 153 at r12 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Okay, so then we have a behavioral question: Should
getTypeRegistryAndEnv()
always wait for any pending sync operations to complete? If so, then it's simple: just wrap a mutex around both the set and get.If we don't want this behavior (i.e.
getTypeRegistryAndEnv()
should just return whatever it can without waiting aside from the initial sync), then we'll need a separate method that specifically waits for any pending sync to complete.
Ending up having to use real delays for the third test.
This adds a waitForSync method to CelEnvProvider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 3 files at r15, all commit messages.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @tristanvuong2021)
src/test/kotlin/org/wfanet/measurement/reporting/service/api/CelEnvProviderTest.kt
line 153 at r12 (raw file):
Previously, tristanvuong2021 (Tristan Vuong) wrote…
Ending up having to use real delays for the third test.
I pushed a commit that gets it working without real delays. I ended up dropping MinimumIntervalThrottler so I could access the Mutex. I figured we don't need the minimum interval guarantee and can just wait the full interval each time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 7 files at r1, 7 of 14 files at r3, 1 of 3 files at r15, 3 of 3 files at r16, 2 of 3 files at r17, all commit messages.
Reviewable status: 14 of 15 files reviewed, all discussions resolved (waiting on @Marco-Premier and @SanjayVas)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 3 files at r17, 1 of 1 files at r18, all commit messages.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @Marco-Premier)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 7 files at r1, 7 of 14 files at r3, 1 of 3 files at r15, 2 of 3 files at r16, 1 of 1 files at r18, 3 of 3 files at r19, all commit messages.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @tristanvuong2021)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 7 files at r1, 6 of 14 files at r3, 1 of 3 files at r15, 2 of 3 files at r16, 1 of 1 files at r18, 3 of 3 files at r19, 1 of 1 files at r20, 2 of 2 files at r21, all commit messages.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @tristanvuong2021)
Co-authored-by: Sanjay Vasandani <[email protected]>
Co-authored-by: Sanjay Vasandani <[email protected]>
No description provided.