Skip to content

Commit

Permalink
Improve Performance of Measurement Creation in Create Report (#1310)
Browse files Browse the repository at this point in the history
(cherry picked from commit a29442e)
  • Loading branch information
tristanvuong2021 committed Oct 30, 2023
1 parent b48ecc0 commit 1f6b208
Show file tree
Hide file tree
Showing 8 changed files with 448 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,30 @@ import io.grpc.Status
import org.wfanet.measurement.common.db.r2dbc.DatabaseClient
import org.wfanet.measurement.common.db.r2dbc.postgres.SerializableErrors
import org.wfanet.measurement.common.identity.IdGenerator
import org.wfanet.measurement.internal.reporting.BatchCreateMeasurementsRequest
import org.wfanet.measurement.internal.reporting.BatchCreateMeasurementsResponse
import org.wfanet.measurement.internal.reporting.GetMeasurementRequest
import org.wfanet.measurement.internal.reporting.Measurement
import org.wfanet.measurement.internal.reporting.MeasurementsGrpcKt.MeasurementsCoroutineImplBase
import org.wfanet.measurement.internal.reporting.SetMeasurementFailureRequest
import org.wfanet.measurement.internal.reporting.SetMeasurementResultRequest
import org.wfanet.measurement.internal.reporting.batchCreateMeasurementsResponse
import org.wfanet.measurement.reporting.deploy.postgres.readers.MeasurementReader
import org.wfanet.measurement.reporting.deploy.postgres.writers.CreateMeasurement
import org.wfanet.measurement.reporting.deploy.postgres.writers.CreateMeasurements
import org.wfanet.measurement.reporting.deploy.postgres.writers.SetMeasurementFailure
import org.wfanet.measurement.reporting.deploy.postgres.writers.SetMeasurementResult
import org.wfanet.measurement.reporting.service.internal.MeasurementAlreadyExistsException
import org.wfanet.measurement.reporting.service.internal.MeasurementNotFoundException
import org.wfanet.measurement.reporting.service.internal.MeasurementStateInvalidException

class PostgresMeasurementsService(
private val idGenerator: IdGenerator,
private val client: DatabaseClient,
) : MeasurementsCoroutineImplBase() {
override suspend fun createMeasurement(request: Measurement): Measurement {
return try {
CreateMeasurement(request).execute(client, idGenerator)
} catch (e: MeasurementAlreadyExistsException) {
throw e.asStatusRuntimeException(Status.Code.ALREADY_EXISTS, "Measurement already exists.")
override suspend fun batchCreateMeasurements(
request: BatchCreateMeasurementsRequest
): BatchCreateMeasurementsResponse {
return batchCreateMeasurementsResponse {
measurements += CreateMeasurements(request.measurementsList).execute(client, idGenerator)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,40 @@

package org.wfanet.measurement.reporting.deploy.postgres.writers

import io.r2dbc.spi.R2dbcDataIntegrityViolationException
import org.wfanet.measurement.common.db.r2dbc.boundStatement
import org.wfanet.measurement.common.db.r2dbc.postgres.PostgresWriter
import org.wfanet.measurement.internal.reporting.Measurement
import org.wfanet.measurement.internal.reporting.copy
import org.wfanet.measurement.reporting.service.internal.MeasurementAlreadyExistsException

/**
* Inserts a Measurement into the database.
* Inserts Measurements into the database.
*
* Throws the following on [execute]:
* * [MeasurementAlreadyExistsException] Measurement already exists
*/
class CreateMeasurement(private val request: Measurement) : PostgresWriter<Measurement>() {
override suspend fun TransactionScope.runTransaction(): Measurement {
val builder =
boundStatement(
"""
INSERT INTO Measurements (MeasurementConsumerReferenceId, MeasurementReferenceId, State)
VALUES ($1, $2, $3)
"""
) {
bind("$1", request.measurementConsumerReferenceId)
bind("$2", request.measurementReferenceId)
bind("$3", Measurement.State.PENDING_VALUE)
}

class CreateMeasurements(private val measurements: Iterable<Measurement>) :
PostgresWriter<Iterable<Measurement>>() {
override suspend fun TransactionScope.runTransaction(): Iterable<Measurement> {
transactionContext.run {
try {
for (measurement in measurements) {
val builder =
boundStatement(
"""
INSERT INTO Measurements (MeasurementConsumerReferenceId, MeasurementReferenceId, State)
VALUES ($1, $2, $3)
ON CONFLICT DO NOTHING
"""
) {
bind("$1", measurement.measurementConsumerReferenceId)
bind("$2", measurement.measurementReferenceId)
bind("$3", Measurement.State.PENDING_VALUE)
}

executeStatement(builder)
} catch (e: R2dbcDataIntegrityViolationException) {
throw MeasurementAlreadyExistsException()
}
}

return request.copy { state = Measurement.State.PENDING }
return measurements.map { it.copy { state = Measurement.State.PENDING } }
}
}
Loading

0 comments on commit 1f6b208

Please sign in to comment.