Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add postgres computations service #1136

Merged
merged 14 commits into from
Aug 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ kt_jvm_library(
"//src/main/proto/wfa/measurement/internal/duchy:computations_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/internal/duchy/protocol:liquid_legions_v2_kt_jvm_proto",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2020 The Cross-Media Measurement Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.wfanet.measurement.duchy.db.computation

import org.wfanet.measurement.common.grpc.failGrpc
import org.wfanet.measurement.internal.duchy.ComputationStage
import org.wfanet.measurement.internal.duchy.ComputationToken
import org.wfanet.measurement.internal.duchy.ComputationTypeEnum.ComputationType

/** Information about a computation needed to edit a computation. */
data class ComputationEditToken<ProtocolT, StageT>(
/** The identifier for the computation used locally. */
val localId: Long,
/** The protocol used for the computation. */
val protocol: ProtocolT,
/** The stage of the computation when the token was created. */
val stage: StageT,
/** The number of the current attempt of this stage for this computation. */
val attempt: Int,
/**
* The version number of the last known edit to the computation. The version is a monotonically
* increasing number used as a guardrail to protect against concurrent edits to the same
* computation.
*/
val editVersion: Long,
/** The global identifier for the computation. */
val globalId: String,
)

fun ComputationToken.toDatabaseEditToken():
ComputationEditToken<ComputationType, ComputationStage> {
val protocol = computationStage.toComputationType()
if (protocol == ComputationType.UNRECOGNIZED) {
failGrpc { "Computation type for $this is unknown" }
}
return ComputationEditToken(
localId = localComputationId,
protocol = protocol,
stage = computationStage,
attempt = attempt,
editVersion = version,
globalId = globalComputationId
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.wfanet.measurement.duchy.db.computation

import org.wfanet.measurement.common.numberAsLong
import org.wfanet.measurement.internal.duchy.ComputationStage
import org.wfanet.measurement.internal.duchy.ComputationTypeEnum.ComputationType

/** Helper class for working with [ComputationType] protocols. */
Expand All @@ -28,3 +29,10 @@ object ComputationTypes : ComputationTypeEnumHelper<ComputationType> {
return ComputationType.forNumber(value.toInt()) ?: ComputationType.UNRECOGNIZED
}
}

fun ComputationStage.toComputationType() =
when (stageCase) {
ComputationStage.StageCase.LIQUID_LEGIONS_SKETCH_AGGREGATION_V2 ->
ComputationType.LIQUID_LEGIONS_SKETCH_AGGREGATION_V2
else -> ComputationType.UNRECOGNIZED
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,24 +176,6 @@ interface ComputationsDatabaseTransactor<ProtocolT, StageT, StageDetailsT, Compu
attempt: Long,
metric: ComputationStatMetric
)

/** Information about a computation needed to edit a computation. */
data class ComputationEditToken<ProtocolT, StageT>(
/** The identifier for the computation used locally. */
val localId: Long,
/** The protocol used for the computation. */
val protocol: ProtocolT,
/** The stage of the computation when the token was created. */
val stage: StageT,
/** The number of the current attempt of this stage for this computation. */
val attempt: Int,
/**
* The version number of the last known edit to the computation. The version is a monotonically
* increasing number used as a guardrail to protect against concurrent edits to the same
* computation.
*/
val editVersion: Long
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import kotlin.experimental.ExperimentalTypeInference
import org.wfanet.measurement.common.toJson
import org.wfanet.measurement.duchy.db.computation.AfterTransition
import org.wfanet.measurement.duchy.db.computation.BlobRef
import org.wfanet.measurement.duchy.db.computation.ComputationEditToken
import org.wfanet.measurement.duchy.db.computation.ComputationProtocolStages
import org.wfanet.measurement.duchy.db.computation.ComputationProtocolStagesEnumHelper
import org.wfanet.measurement.duchy.db.computation.ComputationStatMetric
import org.wfanet.measurement.duchy.db.computation.ComputationsDatabase
import org.wfanet.measurement.duchy.db.computation.ComputationsDatabaseTransactor.ComputationEditToken
import org.wfanet.measurement.duchy.db.computation.EndComputationReason
import org.wfanet.measurement.duchy.db.computation.toCompletedReason
import org.wfanet.measurement.duchy.service.internal.computations.newEmptyOutputBlobMetadata
Expand Down Expand Up @@ -355,7 +355,8 @@ private constructor(
},
stage = it.computationStage,
attempt = it.attempt,
editVersion = it.version
editVersion = it.version,
globalId = it.globalComputationId,
)
}
.firstOrNull()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ kt_jvm_library(
srcs = glob(["*Service.kt"]),
visibility = [
"//src/main/kotlin/org/wfanet/measurement/duchy/common/deploy/postgres:__pkg__",
"//src/test/kotlin/org/wfanet/measurement/duchy/deploy/postgres:__pkg__",
"//src/test/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres:__pkg__",
],
deps = [
"//src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/readers",
Expand Down
Loading