Skip to content

Commit

Permalink
Update cross-media-measurement-api for EventGroup pattern.
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjayVas committed Jun 15, 2023
1 parent a9e893a commit 0e514de
Show file tree
Hide file tree
Showing 21 changed files with 733 additions and 544 deletions.
4 changes: 2 additions & 2 deletions build/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def wfa_measurement_system_repositories():
wfa_repo_archive(
name = "wfa_measurement_proto",
repo = "cross-media-measurement-api",
sha256 = "e1738d74028be874e2ea4a3a7c9c2696f5aea60eb82c473771e8962cad838826",
version = "0.34.0",
sha256 = "dd7467771a667c6586f7605b1b09a5e1ffaaa78beee40be55e77c3b7c52fd126",
version = "0.35.0",
)

wfa_repo_archive(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@
package org.wfanet.measurement.api.v2alpha

import org.wfanet.measurement.common.ResourceNameParser
import org.wfanet.measurement.common.api.ChildResourceKey
import org.wfanet.measurement.common.api.ResourceKey

private val parser = ResourceNameParser("dataProviders/{data_provider}/eventGroups/{event_group}")

/** [ResourceKey] of an EventGroup. */
data class EventGroupKey(val dataProviderId: String, val eventGroupId: String) : ResourceKey {
data class EventGroupKey(val dataProviderId: String, val eventGroupId: String) : ChildResourceKey {
override fun toName(): String {
return parser.assembleName(
mapOf(IdVariable.DATA_PROVIDER to dataProviderId, IdVariable.EVENT_GROUP to eventGroupId)
)
}

override val parentKey = DataProviderKey(dataProviderId)

companion object FACTORY : ResourceKey.Factory<EventGroupKey> {
val defaultValue = EventGroupKey("", "")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 The Cross-Media Measurement Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wfanet.measurement.api.v2alpha

import org.wfanet.measurement.common.ResourceNameParser
import org.wfanet.measurement.common.api.ChildResourceKey
import org.wfanet.measurement.common.api.ResourceKey

/** [ResourceKey] of an EventGroup with a MeasurementConsumer as the parent. */
data class MeasurementConsumerEventGroupKey(
val measurementConsumerId: String,
val eventGroupId: String
) : ChildResourceKey {
override fun toName(): String {
return parser.assembleName(
mapOf(
IdVariable.MEASUREMENT_CONSUMER to measurementConsumerId,
IdVariable.EVENT_GROUP to eventGroupId
)
)
}

override val parentKey = MeasurementConsumerKey(measurementConsumerId)

companion object FACTORY : ResourceKey.Factory<MeasurementConsumerEventGroupKey> {
private val parser =
ResourceNameParser("measurementConsumers/{measurement_consumer}/eventGroups/{event_group}")

val defaultValue = MeasurementConsumerEventGroupKey("", "")

override fun fromName(resourceName: String): MeasurementConsumerEventGroupKey? {
return parser.parseIdVars(resourceName)?.let {
MeasurementConsumerEventGroupKey(
it.getValue(IdVariable.MEASUREMENT_CONSUMER),
it.getValue(IdVariable.EVENT_GROUP)
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ interface ResourceKey {
const val WILDCARD_ID = "-"
}
}

interface ChildResourceKey : ResourceKey {
val parentKey: ResourceKey
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package org.wfanet.measurement.kingdom.deploy.gcloud.spanner
import io.grpc.Status
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.wfanet.measurement.common.grpc.failGrpc
import org.wfanet.measurement.common.grpc.grpcRequire
import org.wfanet.measurement.common.identity.ExternalId
import org.wfanet.measurement.common.identity.IdGenerator
import org.wfanet.measurement.gcloud.spanner.AsyncDatabaseClient
import org.wfanet.measurement.internal.kingdom.CreateEventGroupRequest
Expand All @@ -32,6 +32,7 @@ import org.wfanet.measurement.internal.kingdom.eventGroup
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.CertificateIsInvalidException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.DataProviderNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.EventGroupInvalidArgsException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.EventGroupNotFoundByMeasurementConsumerException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.EventGroupNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.EventGroupStateIllegalException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.KingdomInternalException
Expand Down Expand Up @@ -114,42 +115,60 @@ class SpannerEventGroupsService(
}

override suspend fun getEventGroup(request: GetEventGroupRequest): EventGroup {
return EventGroupReader()
.readByExternalIds(
client.singleUse(),
request.externalDataProviderId,
request.externalEventGroupId,
)
?.eventGroup
?: failGrpc(Status.NOT_FOUND) { "EventGroup not found" }
grpcRequire(request.externalEventGroupId != 0L) { "external_event_group_id not specified" }
val externalEventGroupId = ExternalId(request.externalEventGroupId)
val reader = EventGroupReader()

@Suppress("WHEN_ENUM_CAN_BE_NULL_IN_JAVA") // Protobuf enum fields cannot be null.
return when (request.externalParentIdCase) {
GetEventGroupRequest.ExternalParentIdCase.EXTERNAL_DATA_PROVIDER_ID -> {
val externalDataProviderId = ExternalId(request.externalDataProviderId)
reader.readByDataProvider(client.singleUse(), externalDataProviderId, externalEventGroupId)
?: throw EventGroupNotFoundException(externalDataProviderId, externalEventGroupId)
.asStatusRuntimeException(Status.Code.NOT_FOUND)
}
GetEventGroupRequest.ExternalParentIdCase.EXTERNAL_MEASUREMENT_CONSUMER_ID -> {
val externalMeasurementConsumerId = ExternalId(request.externalMeasurementConsumerId)
reader.readByMeasurementConsumer(
client.singleUse(),
externalMeasurementConsumerId,
externalEventGroupId
)
?: throw EventGroupNotFoundByMeasurementConsumerException(
externalMeasurementConsumerId,
externalEventGroupId
)
.asStatusRuntimeException(Status.Code.NOT_FOUND)
}
GetEventGroupRequest.ExternalParentIdCase.EXTERNALPARENTID_NOT_SET ->
throw Status.INVALID_ARGUMENT.withDescription("external_parent_id not specified")
.asRuntimeException()
}.eventGroup
}

override suspend fun deleteEventGroup(request: DeleteEventGroupRequest): EventGroup {
grpcRequire(request.externalDataProviderId > 0L) { "ExternalDataProviderId unspecified" }
grpcRequire(request.externalEventGroupId > 0L) { "ExternalEventGroupId unspecified" }

val eventGroup = eventGroup {
externalDataProviderId = request.externalDataProviderId
externalEventGroupId = request.externalEventGroupId
}
grpcRequire(request.externalDataProviderId != 0L) { "external_data_provider_id unspecified" }
grpcRequire(request.externalEventGroupId > 0L) { "external_event_group_id unspecified" }

try {
return DeleteEventGroup(eventGroup).execute(client, idGenerator)
return DeleteEventGroup(request).execute(client, idGenerator)
} catch (e: EventGroupNotFoundException) {
throw e.asStatusRuntimeException(Status.Code.NOT_FOUND, "EventGroup not found.")
throw e.asStatusRuntimeException(Status.Code.NOT_FOUND)
} catch (e: EventGroupNotFoundByMeasurementConsumerException) {
throw e.asStatusRuntimeException(Status.Code.NOT_FOUND)
} catch (e: EventGroupStateIllegalException) {
when (e.state) {
throw when (e.state) {
EventGroup.State.DELETED -> {
throw e.asStatusRuntimeException(Status.Code.NOT_FOUND, "EventGroup state is DELETED.")
e.asStatusRuntimeException(Status.Code.NOT_FOUND)
}
EventGroup.State.ACTIVE,
EventGroup.State.STATE_UNSPECIFIED,
EventGroup.State.UNRECOGNIZED -> {
throw e.asStatusRuntimeException(Status.Code.INTERNAL, "Unexpected internal error.")
e.asStatusRuntimeException(Status.Code.INTERNAL)
}
}
} catch (e: KingdomInternalException) {
throw e.asStatusRuntimeException(Status.Code.INTERNAL, "Unexpected internal error.")
throw e.asStatusRuntimeException(Status.Code.INTERNAL)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,21 @@ class EventGroupNotFoundException(
override val context
get() =
mapOf(
"external_data_provider_id" to externalDataProviderId.toString(),
"external_event_group_id" to externalEventGroupId.toString()
"external_data_provider_id" to externalDataProviderId.value.toString(),
"external_event_group_id" to externalEventGroupId.value.toString()
)
}

class EventGroupNotFoundByMeasurementConsumerException(
val externalMeasurementConsumerId: ExternalId,
val externalEventGroupId: ExternalId,
provideDescription: () -> String = { "EventGroup not found" }
) : KingdomInternalException(ErrorCode.EVENT_GROUP_NOT_FOUND, provideDescription) {
override val context
get() =
mapOf(
"external_measurement_consumer_id" to externalMeasurementConsumerId.value.toString(),
"external_event_group_id" to externalEventGroupId.value.toString()
)
}

Expand All @@ -539,7 +552,7 @@ class EventGroupStateIllegalException(
val externalDataProviderId: ExternalId,
val externalEventGroupId: ExternalId,
val state: EventGroup.State,
provideDescription: () -> String = { "EventGroup state illegal" }
provideDescription: () -> String = { "EventGroup state is $state" }
) : KingdomInternalException(ErrorCode.EVENT_GROUP_STATE_ILLEGAL, provideDescription) {
override val context
get() =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,31 @@ class StreamEventGroups(requestFilter: StreamEventGroupsRequest.Filter, limit: I

private fun Statement.Builder.appendWhereClause(filter: StreamEventGroupsRequest.Filter) {
val conjuncts = mutableListOf<String>()
if (filter.externalDataProviderId != 0L) {
conjuncts.add("ExternalDataProviderId = @$EXTERNAL_DATA_PROVIDER_ID")
bind(EXTERNAL_DATA_PROVIDER_ID).to(filter.externalDataProviderId)
}
if (filter.externalMeasurementConsumerId != 0L) {
conjuncts.add("ExternalMeasurementConsumerId = @$EXTERNAL_MEASUREMENT_CONSUMER_ID")
bind(EXTERNAL_MEASUREMENT_CONSUMER_ID).to(filter.externalMeasurementConsumerId)
}
if (filter.externalMeasurementConsumerIdsList.isNotEmpty()) {
conjuncts.add("ExternalMeasurementConsumerId IN UNNEST(@$EXTERNAL_MEASUREMENT_CONSUMER_IDS)")
bind(EXTERNAL_MEASUREMENT_CONSUMER_IDS)
.toInt64Array(filter.externalMeasurementConsumerIdsList.map { it.toLong() })
}
if (filter.externalDataProviderId != 0L) {
conjuncts.add("ExternalDataProviderId = @$EXTERNAL_DATA_PROVIDER_ID")
bind(EXTERNAL_DATA_PROVIDER_ID to filter.externalDataProviderId)
if (filter.externalDataProviderIdsList.isNotEmpty()) {
conjuncts.add("ExternalDataProviderId IN UNNEST(@$EXTERNAL_DATA_PROVIDER_IDS)")
bind(EXTERNAL_DATA_PROVIDER_IDS)
.toInt64Array(filter.externalDataProviderIdsList.map { it.toLong() })
}

if (!filter.showDeleted) {
conjuncts.add("State != @$DELETED_STATE")
bind(DELETED_STATE).toProtoEnum(EventGroup.State.DELETED)
}

if (filter.externalEventGroupIdAfter != 0L && filter.externalDataProviderIdAfter != 0L) {
if (filter.hasAfter()) {
conjuncts.add(
"""
((ExternalDataProviderId > @$EXTERNAL_DATA_PROVIDER_ID_AFTER)
Expand All @@ -55,13 +69,8 @@ class StreamEventGroups(requestFilter: StreamEventGroupsRequest.Filter, limit: I
"""
.trimIndent()
)
bind(EXTERNAL_DATA_PROVIDER_ID_AFTER).to(filter.externalDataProviderIdAfter)
bind(EXTERNAL_EVENT_GROUP_ID_AFTER).to(filter.externalEventGroupIdAfter)
}

if (!filter.showDeleted) {
conjuncts.add("State != @$DELETED_STATE")
bind(DELETED_STATE).toProtoEnum(EventGroup.State.DELETED)
bind(EXTERNAL_DATA_PROVIDER_ID_AFTER).to(filter.after.externalDataProviderId)
bind(EXTERNAL_EVENT_GROUP_ID_AFTER).to(filter.after.externalEventGroupId)
}

if (conjuncts.isEmpty()) {
Expand All @@ -74,8 +83,10 @@ class StreamEventGroups(requestFilter: StreamEventGroupsRequest.Filter, limit: I

companion object {
const val LIMIT = "limit"
const val EXTERNAL_MEASUREMENT_CONSUMER_IDS = "externalMeasurementConsumerIds"
const val EXTERNAL_DATA_PROVIDER_ID = "externalDataProviderId"
const val EXTERNAL_MEASUREMENT_CONSUMER_ID = "externalMeasurementConsumerId"
const val EXTERNAL_MEASUREMENT_CONSUMER_IDS = "externalMeasurementConsumerIds"
const val EXTERNAL_DATA_PROVIDER_IDS = "externalDataProviderIds"
const val EXTERNAL_EVENT_GROUP_ID_AFTER = "externalEventGroupIdAfter"
const val EXTERNAL_DATA_PROVIDER_ID_AFTER = "externalDataProviderIdAfter"
const val DELETED_STATE = "deletedState"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers
import com.google.cloud.spanner.Statement
import com.google.cloud.spanner.Struct
import kotlinx.coroutines.flow.singleOrNull
import org.wfanet.measurement.common.identity.ExternalId
import org.wfanet.measurement.common.identity.InternalId
import org.wfanet.measurement.gcloud.spanner.AsyncDatabaseClient
import org.wfanet.measurement.gcloud.spanner.appendClause
Expand Down Expand Up @@ -62,10 +63,10 @@ class EventGroupReader : BaseSpannerReader<EventGroupReader.Result>() {
.singleOrNull()
}

suspend fun readByExternalIds(
suspend fun readByDataProvider(
readContext: AsyncDatabaseClient.ReadContext,
externalDataProviderId: Long,
externalEventGroupId: Long,
externalDataProviderId: ExternalId,
externalEventGroupId: ExternalId,
): Result? {
return fillStatementBuilder {
appendClause(
Expand All @@ -84,6 +85,28 @@ class EventGroupReader : BaseSpannerReader<EventGroupReader.Result>() {
.singleOrNull()
}

suspend fun readByMeasurementConsumer(
readContext: AsyncDatabaseClient.ReadContext,
externalMeasurementConsumerId: ExternalId,
externalEventGroupId: ExternalId,
): Result? {
return fillStatementBuilder {
appendClause(
"""
WHERE
ExternalMeasurementConsumerId = @${Params.EXTERNAL_MEASUREMENT_CONSUMER_ID}
AND ExternalEventGroupId = @${Params.EXTERNAL_EVENT_GROUP_ID}
"""
.trimIndent()
)
bind(Params.EXTERNAL_MEASUREMENT_CONSUMER_ID to externalMeasurementConsumerId)
bind(Params.EXTERNAL_EVENT_GROUP_ID to externalEventGroupId)
appendClause("LIMIT 1")
}
.execute(readContext)
.singleOrNull()
}

override suspend fun translate(struct: Struct): Result =
Result(
buildEventGroup(struct),
Expand Down Expand Up @@ -136,6 +159,7 @@ class EventGroupReader : BaseSpannerReader<EventGroupReader.Result>() {

private object Params {
const val EXTERNAL_DATA_PROVIDER_ID = "externalDataProviderId"
const val EXTERNAL_MEASUREMENT_CONSUMER_ID = "externalMeasurementConsumerId"
const val EXTERNAL_EVENT_GROUP_ID = "externalEventGroupId"
const val DATA_PROVIDER_ID = "dataProviderId"
const val CREATE_REQUEST_ID = "createRequestId"
Expand Down
Loading

0 comments on commit 0e514de

Please sign in to comment.