Skip to content

Commit

Permalink
Add listReportingSets (#992)
Browse files Browse the repository at this point in the history
Add listReportingSets for v2alpha reporting server.
  • Loading branch information
riemanli authored May 15, 2023
1 parent fb92efa commit 817bf61
Show file tree
Hide file tree
Showing 3 changed files with 474 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ import org.wfanet.measurement.internal.reporting.v2.MetricSpecKt as InternalMetr
import org.wfanet.measurement.internal.reporting.v2.ReportingSet as InternalReportingSet
import org.wfanet.measurement.internal.reporting.v2.StreamMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.StreamMetricsRequestKt
import org.wfanet.measurement.internal.reporting.v2.StreamReportingSetsRequest
import org.wfanet.measurement.internal.reporting.v2.StreamReportingSetsRequestKt
import org.wfanet.measurement.internal.reporting.v2.TimeInterval as InternalTimeInterval
import org.wfanet.measurement.internal.reporting.v2.streamMetricsRequest
import org.wfanet.measurement.internal.reporting.v2.streamReportingSetsRequest
import org.wfanet.measurement.internal.reporting.v2.timeInterval as internalTimeInterval
import org.wfanet.measurement.reporting.v2alpha.ListMetricsPageToken
import org.wfanet.measurement.reporting.v2alpha.ListReportingSetsPageToken
import org.wfanet.measurement.reporting.v2alpha.MetricSpec
import org.wfanet.measurement.reporting.v2alpha.MetricSpecKt
import org.wfanet.measurement.reporting.v2alpha.ReportingSet
Expand Down Expand Up @@ -389,3 +393,17 @@ fun InternalReportingSet.Primitive.toPrimitive(): ReportingSet.Primitive {
}
}
}

/** Converts a [ListReportingSetsPageToken] to an internal [StreamReportingSetsRequest]. */
fun ListReportingSetsPageToken.toStreamReportingSetsRequest(): StreamReportingSetsRequest {
val source = this
return streamReportingSetsRequest {
// get 1 more than the actual page size for deciding whether to set page token
limit = source.pageSize + 1
filter =
StreamReportingSetsRequestKt.filter {
cmmsMeasurementConsumerId = source.cmmsMeasurementConsumerId
externalReportingSetIdAfter = source.lastReportingSet.externalReportingSetId
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ package org.wfanet.measurement.reporting.service.api.v2alpha

import io.grpc.Status
import io.grpc.StatusException
import kotlin.math.min
import kotlinx.coroutines.flow.toList
import org.wfanet.measurement.api.v2alpha.MeasurementConsumerKey
import org.wfanet.measurement.common.base64UrlDecode
import org.wfanet.measurement.common.base64UrlEncode
import org.wfanet.measurement.common.grpc.failGrpc
import org.wfanet.measurement.common.grpc.grpcRequire
import org.wfanet.measurement.common.grpc.grpcRequireNotNull
Expand All @@ -29,8 +33,19 @@ import org.wfanet.measurement.internal.reporting.v2.ReportingSetsGrpcKt.Reportin
import org.wfanet.measurement.internal.reporting.v2.batchGetReportingSetsRequest
import org.wfanet.measurement.internal.reporting.v2.reportingSet as internalReportingSet
import org.wfanet.measurement.reporting.v2alpha.CreateReportingSetRequest
import org.wfanet.measurement.reporting.v2alpha.ListReportingSetsPageToken
import org.wfanet.measurement.reporting.v2alpha.ListReportingSetsPageTokenKt.previousPageEnd
import org.wfanet.measurement.reporting.v2alpha.ListReportingSetsRequest
import org.wfanet.measurement.reporting.v2alpha.ListReportingSetsResponse
import org.wfanet.measurement.reporting.v2alpha.ReportingSet
import org.wfanet.measurement.reporting.v2alpha.ReportingSetsGrpcKt.ReportingSetsCoroutineImplBase
import org.wfanet.measurement.reporting.v2alpha.copy
import org.wfanet.measurement.reporting.v2alpha.listReportingSetsPageToken
import org.wfanet.measurement.reporting.v2alpha.listReportingSetsResponse

private const val MIN_PAGE_SIZE = 1
private const val DEFAULT_PAGE_SIZE = 50
private const val MAX_PAGE_SIZE = 1000

class ReportingSetsService(private val internalReportingSetsStub: ReportingSetsCoroutineStub) :
ReportingSetsCoroutineImplBase() {
Expand Down Expand Up @@ -309,6 +324,84 @@ class ReportingSetsService(private val internalReportingSetsStub: ReportingSetsC
.asRuntimeException()
}
}

override suspend fun listReportingSets(
request: ListReportingSetsRequest
): ListReportingSetsResponse {
val listReportingSetsPageToken = request.toListReportingSetsPageToken()

when (val principal: ReportingPrincipal = principalFromCurrentContext) {
is MeasurementConsumerPrincipal -> {
if (request.parent != principal.resourceKey.toName()) {
failGrpc(Status.PERMISSION_DENIED) {
"Cannot list ReportingSets belonging to other MeasurementConsumers."
}
}
}
}

val results: List<InternalReportingSet> =
internalReportingSetsStub
.streamReportingSets(listReportingSetsPageToken.toStreamReportingSetsRequest())
.toList()

if (results.isEmpty()) {
return ListReportingSetsResponse.getDefaultInstance()
}

return listReportingSetsResponse {
reportingSets +=
results
.subList(0, min(results.size, listReportingSetsPageToken.pageSize))
.map(InternalReportingSet::toReportingSet)

if (results.size > listReportingSetsPageToken.pageSize) {
val pageToken =
listReportingSetsPageToken.copy {
lastReportingSet = previousPageEnd {
cmmsMeasurementConsumerId = results[results.lastIndex - 1].cmmsMeasurementConsumerId
externalReportingSetId = results[results.lastIndex - 1].externalReportingSetId
}
}
nextPageToken = pageToken.toByteString().base64UrlEncode()
}
}
}
}

/** Converts a public [ListReportingSetsRequest] to a [ListReportingSetsPageToken]. */
private fun ListReportingSetsRequest.toListReportingSetsPageToken(): ListReportingSetsPageToken {
grpcRequire(pageSize >= 0) { "Page size cannot be less than 0" }

val source = this
val parentKey: MeasurementConsumerKey =
grpcRequireNotNull(MeasurementConsumerKey.fromName(parent)) {
"Parent is either unspecified or invalid."
}

return if (source.pageToken.isNotBlank()) {
ListReportingSetsPageToken.parseFrom(source.pageToken.base64UrlDecode()).copy {
grpcRequire(this.cmmsMeasurementConsumerId == parentKey.measurementConsumerId) {
"Arguments must be kept the same when using a page token"
}

if (
source.pageSize != 0 && source.pageSize >= MIN_PAGE_SIZE && source.pageSize <= MAX_PAGE_SIZE
) {
pageSize = source.pageSize
}
}
} else {
listReportingSetsPageToken {
pageSize =
when {
source.pageSize < MIN_PAGE_SIZE -> DEFAULT_PAGE_SIZE
source.pageSize > MAX_PAGE_SIZE -> MAX_PAGE_SIZE
else -> source.pageSize
}
this.cmmsMeasurementConsumerId = parentKey.measurementConsumerId
}
}
}

/** Converts a [ReportingSet.SetExpression.Operation] to an [Operator]. */
Expand Down
Loading

0 comments on commit 817bf61

Please sign in to comment.