-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor SubmitBatchRequests to use Coroutines #1467
Refactor SubmitBatchRequests to use Coroutines #1467
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 6 files at r2, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @tristanvuong2021)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/SubmitBatchRequests.kt
line 59 at r2 (raw file):
callRpc: suspend (List<ITEM>) -> RESP, parseResponse: (RESP) -> List<RESULT>, ): List<RESULT> {
Thanks for bringing up Semaphore
. I wonder if there is a particular reason to use List
. When I wrote this implementation, I chose Flow
to avoid blocking based on the review feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 1 unresolved discussion (waiting on @riemanli)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/SubmitBatchRequests.kt
line 59 at r2 (raw file):
Previously, riemanli (Rieman) wrote…
Thanks for bringing up
Semaphore
. I wonder if there is a particular reason to useList
. When I wrote this implementation, I choseFlow
to avoid blocking based on the review feedback.
I changed it to a flow. Realized a way to make use of flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 6 files at r3, all commit messages.
Reviewable status: 1 of 6 files reviewed, 2 unresolved discussions (waiting on @tristanvuong2021)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/SubmitBatchRequests.kt
line 59 at r2 (raw file):
Previously, tristanvuong2021 (Tristan Vuong) wrote…
I changed it to a flow. Realized a way to make use of flow.
Would it be possible to just output Flow<RESULT>
if you use items.chunked(limit).flatMapConcat
or something works similarly instead of items.chunked(limit).map
?
src/main/kotlin/org/wfanet/measurement/reporting/service/api/SubmitBatchRequests.kt
line 54 at r3 (raw file):
/** Submits multiple RPCs by dividing the input items to batches. */ suspend fun <ITEM, RESP, RESULT> submitBatchRequests( items: Collection<ITEM>,
Same question about List vs Flow here for the input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 2 unresolved discussions (waiting on @riemanli)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/SubmitBatchRequests.kt
line 59 at r2 (raw file):
Previously, riemanli (Rieman) wrote…
Would it be possible to just output
Flow<RESULT>
if you useitems.chunked(limit).flatMapConcat
or something works similarly instead ofitems.chunked(limit).map
?
I made the Flow emit Lists corresponding to the API requests on purpose. Emitting individual elements requires iterating through the Lists, then using this method will require iterating through the elements as well. That is an additional iteration that adds up when considering performance.
src/main/kotlin/org/wfanet/measurement/reporting/service/api/SubmitBatchRequests.kt
line 54 at r3 (raw file):
Previously, riemanli (Rieman) wrote…
Same question about List vs Flow here for the input.
I don't see how making the input a Flow helps. A Collection or a Flow both require iterating through the elements sequentially to build the batch requests, except the way this method is used, will generally require converting a Collection to a Flow instead of just using the Collection directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 2 unresolved discussions (waiting on @riemanli)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/SubmitBatchRequests.kt
line 54 at r3 (raw file):
Previously, tristanvuong2021 (Tristan Vuong) wrote…
I don't see how making the input a Flow helps. A Collection or a Flow both require iterating through the elements sequentially to build the batch requests, except the way this method is used, will generally require converting a Collection to a Flow instead of just using the Collection directly.
I could see how a Flow input would improve things, but I would have to change the way SubmitBatchRequests is used. I will give it a shot and see if it works out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 5 of 6 files at r3, 1 of 1 files at r4, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @tristanvuong2021)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/SubmitBatchRequests.kt
line 54 at r3 (raw file):
Previously, tristanvuong2021 (Tristan Vuong) wrote…
I could see how a Flow input would improve things, but I would have to change the way SubmitBatchRequests is used. I will give it a shot and see if it works out.
My only concern is when items
contains numerous elements, which is the exact scenario this implementation designed for. Using Collection
means we need to wait all elements are ready and also means higher memory consumption, i.e. higher chance of OOM.
If you think using Collection
has some advantages over using Flow
, I am ok with that you want to test it out to see how to make the trade-off as you suggested above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 1 unresolved discussion (waiting on @riemanli)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/SubmitBatchRequests.kt
line 54 at r3 (raw file):
Previously, riemanli (Rieman) wrote…
My only concern is when
items
contains numerous elements, which is the exact scenario this implementation designed for. UsingCollection
means we need to wait all elements are ready and also means higher memory consumption, i.e. higher chance of OOM.If you think using
Collection
has some advantages over usingFlow
, I am ok with that you want to test it out to see how to make the trade-off as you suggested above.
I refactored it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 6 files at r5, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @tristanvuong2021)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt
line 807 at r5 (raw file):
// Most Measurements are expected to be SUCCEEDED so SUCCEEDED Measurements will be collected // via a Flow.
I guess this is the trade-off if we are not able to use Flow
for failed measurements.
Code quote:
// Most Measurements are expected to be SUCCEEDED so SUCCEEDED Measurements will be collected
// via a Flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 6 files at r5, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @riemanli and @tristanvuong2021)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt
line 809 at r5 (raw file):
// via a Flow. val succeededMeasurements: Flow<Measurement> = flow { getCmmsMeasurements(internalMeasurements, principal).collect { measurements ->
why do you need to collect then? Why not just map the flow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 5 of 6 files reviewed, 2 unresolved discussions (waiting on @riemanli and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt
line 809 at r5 (raw file):
Previously, stevenwarejones (Steven Ware Jones) wrote…
why do you need to collect then? Why not just map the flow?
The alternative is flatMapConcat or map then flattenConcat instead of just collect, but that just moves the flow builder to inside the transform function. map alone will just create a flow that could return empty lists and that makes things complicated to deal with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed all commit messages.
Reviewable status: 5 of 6 files reviewed, 1 unresolved discussion (waiting on @riemanli and @tristanvuong2021)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt
line 809 at r5 (raw file):
Previously, tristanvuong2021 (Tristan Vuong) wrote…
The alternative is flatMapConcat or map then flattenConcat instead of just collect, but that just moves the flow builder to inside the transform function. map alone will just create a flow that could return empty lists and that makes things complicated to deal with.
Let me clarify. Why doesn't something like the following work?
val succeededMeasurements: Flow<Measurement> =
getCmmsMeasurements(internalMeasurements, principal).transform { measurement ->
@Suppress("WHEN_ENUM_CAN_BE_NULL_IN_JAVA") // Protobuf enum fields cannot be null.
when (measurement.state) {
Measurement.State.SUCCEEDED -> emit(measurement)
Measurement.State.CANCELLED,
Measurement.State.FAILED -> failedMeasurements.add(measurement)
Measurement.State.COMPUTING,
Measurement.State.AWAITING_REQUISITION_FULFILLMENT -> {}
Measurement.State.STATE_UNSPECIFIED ->
failGrpc(status = Status.FAILED_PRECONDITION, cause = IllegalStateException()) {
"The CMMS measurement state should've been set."
}
Measurement.State.UNRECOGNIZED -> {
failGrpc(status = Status.FAILED_PRECONDITION, cause = IllegalStateException()) {
"Unrecognized CMMS measurement state."
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 4 of 6 files reviewed, 1 unresolved discussion (waiting on @riemanli and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt
line 809 at r5 (raw file):
Previously, stevenwarejones (Steven Ware Jones) wrote…
Let me clarify. Why doesn't something like the following work?
val succeededMeasurements: Flow<Measurement> = getCmmsMeasurements(internalMeasurements, principal).transform { measurement -> @Suppress("WHEN_ENUM_CAN_BE_NULL_IN_JAVA") // Protobuf enum fields cannot be null. when (measurement.state) { Measurement.State.SUCCEEDED -> emit(measurement) Measurement.State.CANCELLED, Measurement.State.FAILED -> failedMeasurements.add(measurement) Measurement.State.COMPUTING, Measurement.State.AWAITING_REQUISITION_FULFILLMENT -> {} Measurement.State.STATE_UNSPECIFIED -> failGrpc(status = Status.FAILED_PRECONDITION, cause = IllegalStateException()) { "The CMMS measurement state should've been set." } Measurement.State.UNRECOGNIZED -> { failGrpc(status = Status.FAILED_PRECONDITION, cause = IllegalStateException()) { "Unrecognized CMMS measurement state." } } }
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 6 files at r5, 1 of 1 files at r6, 1 of 1 files at r7, all commit messages.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @tristanvuong2021)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt
line 324 at r7 (raw file):
if (!contains(primitiveReportingSetBasis.externalReportingSetId)) { emit(primitiveReportingSetBasis.externalReportingSetId) add(primitiveReportingSetBasis.externalReportingSetId)
i'm confused why you are both emitting and adding here
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt
line 398 at r7 (raw file):
callBatchCreateMeasurementsRpc, ) { response: BatchCreateMeasurementsResponse -> response.measurementsList
i think its clearer to transform this to a flow here and then merge the flows instead of flatMapMerge
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsService.kt
line 420 at r7 (raw file):
reportingMetricCalculationSpec.metricCalculationSpecReportingMetricsList.flatMap { metricCalculationSpecReportingMetrics -> metricCalculationSpecReportingMetrics.reportingMetricsList.map {
I prefer .asFlow.map{
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 4 of 6 files reviewed, 3 unresolved discussions (waiting on @riemanli and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt
line 324 at r7 (raw file):
Previously, stevenwarejones (Steven Ware Jones) wrote…
i'm confused why you are both emitting and adding here
Added comments to clarify.
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/MetricsService.kt
line 398 at r7 (raw file):
Previously, stevenwarejones (Steven Ware Jones) wrote…
i think its clearer to transform this to a flow here and then merge the flows instead of flatMapMerge
Done.
src/main/kotlin/org/wfanet/measurement/reporting/service/api/v2alpha/ReportsService.kt
line 420 at r7 (raw file):
Previously, stevenwarejones (Steven Ware Jones) wrote…
I prefer
.asFlow.map{
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 2 files at r8, all commit messages.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @tristanvuong2021)
No description provided.