Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add postgres computation readers #1104

Merged
merged 10 commits into from
Jul 19, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ kt_jvm_library(
name = "services",
srcs = glob(["*Service.kt"]),
visibility = [
"//src/main/kotlin/org/wfanet/measurement/duchy/deploy/postgres:__pkg__",
"//src/main/kotlin/org/wfanet/measurement/duchy/common/deploy/postgres:__pkg__",
"//src/test/kotlin/org/wfanet/measurement/duchy/deploy/postgres:__pkg__",
],
deps = [
"//src/main/kotlin/org/wfanet/measurement/duchy/deploy/postgres/readers",
"//src/main/kotlin/org/wfanet/measurement/duchy/deploy/postgres/writers",
"//src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/readers",
"//src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/writers",
"//src/main/kotlin/org/wfanet/measurement/system/v1alpha:resource_key",
"//src/main/proto/wfa/measurement/internal/duchy:continuation_tokens_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/system/v1alpha:computation_log_entries_service_kt_jvm_grpc_proto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.wfanet.measurement.duchy.deploy.postgres
package org.wfanet.measurement.duchy.deploy.common.postgres

import org.wfanet.measurement.common.db.r2dbc.DatabaseClient
import org.wfanet.measurement.common.grpc.grpcRequire
import org.wfanet.measurement.common.identity.IdGenerator
import org.wfanet.measurement.duchy.deploy.postgres.writers.InsertComputationStat
import org.wfanet.measurement.duchy.deploy.common.postgres.writers.InsertComputationStat
import org.wfanet.measurement.internal.duchy.ComputationStatsGrpcKt.ComputationStatsCoroutineImplBase
import org.wfanet.measurement.internal.duchy.CreateComputationStatRequest
import org.wfanet.measurement.internal.duchy.CreateComputationStatResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.wfanet.measurement.duchy.deploy.postgres
package org.wfanet.measurement.duchy.deploy.common.postgres

import io.grpc.Status
import java.time.Clock
Expand All @@ -28,9 +28,9 @@ import org.wfanet.measurement.common.toDuration
import org.wfanet.measurement.duchy.db.computation.ComputationProtocolStageDetailsHelper
import org.wfanet.measurement.duchy.db.computation.ComputationProtocolStagesEnumHelper
import org.wfanet.measurement.duchy.db.computation.ComputationTypeEnumHelper
import org.wfanet.measurement.duchy.deploy.postgres.readers.ComputationReader
import org.wfanet.measurement.duchy.deploy.postgres.writers.ClaimWork
import org.wfanet.measurement.duchy.deploy.postgres.writers.CreateComputation
import org.wfanet.measurement.duchy.deploy.common.postgres.readers.ComputationReader
import org.wfanet.measurement.duchy.deploy.common.postgres.writers.ClaimWork
import org.wfanet.measurement.duchy.deploy.common.postgres.writers.CreateComputation
import org.wfanet.measurement.duchy.name
import org.wfanet.measurement.duchy.number
import org.wfanet.measurement.duchy.service.internal.ComputationDetailsNotFoundException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.wfanet.measurement.duchy.deploy.postgres
package org.wfanet.measurement.duchy.deploy.common.postgres

import io.grpc.Status
import org.wfanet.measurement.common.db.r2dbc.DatabaseClient
import org.wfanet.measurement.common.identity.IdGenerator
import org.wfanet.measurement.duchy.deploy.postgres.readers.ContinuationTokenReader
import org.wfanet.measurement.duchy.deploy.postgres.writers.SetContinuationToken
import org.wfanet.measurement.duchy.deploy.common.postgres.readers.ContinuationTokenReader
import org.wfanet.measurement.duchy.deploy.common.postgres.writers.SetContinuationToken
import org.wfanet.measurement.duchy.service.internal.ContinuationTokenInvalidException
import org.wfanet.measurement.duchy.service.internal.ContinuationTokenMalformedException
import org.wfanet.measurement.internal.duchy.ContinuationTokensGrpcKt.ContinuationTokensCoroutineImplBase
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
load("@io_bazel_rules_kotlin//kotlin:jvm.bzl", "kt_jvm_library")

package(default_visibility = [
"//src/main/kotlin/org/wfanet/measurement/duchy/deploy/postgres:__subpackages__",
"//src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres:__subpackages__",
])

kt_jvm_library(
Expand All @@ -18,6 +18,7 @@ kt_jvm_library(
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/db/r2dbc",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/identity",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2023 The Cross-Media Measurement Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.wfanet.measurement.duchy.deploy.common.postgres.readers

import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.toList
import org.wfanet.measurement.common.db.r2dbc.ReadContext
import org.wfanet.measurement.common.db.r2dbc.ResultRow
import org.wfanet.measurement.common.db.r2dbc.boundStatement
import org.wfanet.measurement.internal.duchy.ComputationBlobDependency
import org.wfanet.measurement.internal.duchy.ComputationStageBlobMetadata
import org.wfanet.measurement.internal.duchy.computationStageBlobMetadata

/** Performs read operations on ComputationBlobReferences tables */
class ComputationBlobReferenceReader {

/**
* Gets the [ComputationBlobDependency] of a computation.
*
* @param localId local identifier of the computation
* @param stage stage enum of the computation
* @param blobId local identifier of the blob
* @return [ComputationBlobDependency] if the blob exists, or null.
*/
suspend fun readBlobDependency(
readContext: ReadContext,
localId: Long,
stage: Long,
blobId: Long,
): ComputationBlobDependency? {
val sql =
boundStatement(
"""
SELECT DependencyType
FROM ComputationBlobReferences
WHERE
ComputationId = $1
AND
ComputationStage = $2
AND
BlobId = $3
"""
.trimIndent()
) {
bind("$1", localId)
bind("$2", stage)
bind("$3", blobId)
}

return readContext
.executeQuery(sql)
.consume { row -> row.getProtoEnum("DependencyType", ComputationBlobDependency::forNumber) }
.firstOrNull()
}

/**
* Gets a map of blobId to pathToBlob of a computation based on localComputationId.
*
* @param localId local identifier of the computation
* @param stage stage enum of the computation
* @param dependencyType enum value of the dependency type
* @return [Map<[Long], [String]?>] for all blobIds of a computation
*/
suspend fun blobIdToPathMapByDepType(
readContext: ReadContext,
localId: Long,
stage: Long,
dependencyType: Long
): Map<Long, String?> {
val sql =
boundStatement(
"""
SELECT BlobId, PathToBlob
FROM ComputationBlobReferences
WHERE
ComputationId = $1
AND
ComputationStage = $2
AND
DependencyType = $3
"""
.trimIndent()
) {
bind("$1", localId)
bind("$2", stage)
bind("$3", dependencyType)
}

return readContext
.executeQuery(sql)
.consume { it.get<Long>("BlobId") to it.get<String?>("PathToBlob") }
.toList()
.toMap()
}

/**
* Gets a list of computationBlobKeys by localComputationId
*
* @param readContext The transaction context for reading from the Postgres database.
* @param localComputationId A local identifier for a computation
* @return A list of computation blob keys
*/
suspend fun readComputationBlobKeys(
readContext: ReadContext,
localComputationId: Long
): List<String> {
val statement =
boundStatement(
"""
SELECT PathToBlob
FROM ComputationBlobReferences
WHERE ComputationId = $1 AND PathToBlob IS NOT NULL
"""
.trimIndent()
) {
bind("$1", localComputationId)
}

return readContext
.executeQuery(statement)
.consume { row -> row.get<String>("PathToBlob") }
.toList()
}

/**
* Gets a list of [ComputationStageBlobMetadata] by localComputationId
*
* @param readContext The transaction context for reading from the Postgres database.
* @param localComputationId A local identifier for a computation
* @param computationStage stage enum of the computation
* @return A list of [ComputationStageBlobMetadata]
*/
suspend fun readBlobMetadata(
readContext: ReadContext,
localComputationId: Long,
computationStage: Long
): List<ComputationStageBlobMetadata> {
val statement =
boundStatement(
"""
SELECT BlobId, PathToBlob, DependencyType
FROM ComputationBlobReferences
WHERE
ComputationId = $1
AND
ComputationStage = $2
"""
.trimIndent()
) {
bind("$1", localComputationId)
bind("$2", computationStage)
}

return readContext.executeQuery(statement).consume(::buildBlobMetadata).toList()
}

private fun buildBlobMetadata(row: ResultRow): ComputationStageBlobMetadata {
return computationStageBlobMetadata {
blobId = row["BlobId"]
row.get<String?>("PathToBlob")?.let { path = it }
dependencyType = row.getProtoEnum("DependencyType", ComputationBlobDependency::forNumber)
}
}
}
Loading