Skip to content

Commit

Permalink
Ignore party field in public API GetExchangeRequest.
Browse files Browse the repository at this point in the history
This also exposes the Exchanges and RecurringExchanges services in the public API server. Since these services were never exposed, this is not a breaking change.
  • Loading branch information
SanjayVas committed Jun 1, 2023
1 parent 72e3798 commit 38fb66f
Show file tree
Hide file tree
Showing 18 changed files with 319 additions and 221 deletions.
53 changes: 13 additions & 40 deletions src/main/kotlin/org/wfanet/measurement/api/v2alpha/ExchangeKey.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,26 @@ package org.wfanet.measurement.api.v2alpha
import org.wfanet.measurement.common.ResourceNameParser
import org.wfanet.measurement.common.api.ResourceKey

private val parsers =
listOf(
ResourceNameParser("recurringExchanges/{recurring_exchange}/exchanges/{exchange}"),
ResourceNameParser(
"dataProviders/{data_provider}/recurringExchanges/{recurring_exchange}/exchanges/{exchange}"
),
ResourceNameParser(
"modelProviders/{model_provider}/recurringExchanges/{recurring_exchange}/exchanges/{exchange}"
)
)

/** [ExchangeKey] of an Exchange. */
data class ExchangeKey(
val dataProviderId: String?,
val modelProviderId: String?,
val recurringExchangeId: String,
val exchangeId: String
) : ResourceKey {
init {
require((dataProviderId == null) || (modelProviderId == null))
}

data class ExchangeKey(val recurringExchangeId: String, val exchangeId: String) : ResourceKey {
override fun toName(): String {
return parsers
.first()
.assembleName(
mapOf(
IdVariable.RECURRING_EXCHANGE to recurringExchangeId,
IdVariable.EXCHANGE to exchangeId
)
)
return parser.assembleName(
mapOf(IdVariable.RECURRING_EXCHANGE to recurringExchangeId, IdVariable.EXCHANGE to exchangeId)
)
}

companion object FACTORY : ResourceKey.Factory<ExchangeKey> {
val defaultValue = ExchangeKey(null, null, "", "")
private val parser =
ResourceNameParser("recurringExchanges/{recurring_exchange}/exchanges/{exchange}")

val defaultValue = ExchangeKey("", "")

override fun fromName(resourceName: String): ExchangeKey? {
for (parser in parsers) {
val idVars = parser.parseIdVars(resourceName) ?: continue
return ExchangeKey(
idVars[IdVariable.DATA_PROVIDER],
idVars[IdVariable.MODEL_PROVIDER],
idVars.getValue(IdVariable.RECURRING_EXCHANGE),
idVars.getValue(IdVariable.EXCHANGE)
)
}
return null
val idVars = parser.parseIdVars(resourceName) ?: return null
return ExchangeKey(
idVars.getValue(IdVariable.RECURRING_EXCHANGE),
idVars.getValue(IdVariable.EXCHANGE)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.wfanet.measurement.common.identity.apiIdToExternalId
import org.wfanet.measurement.internal.common.Provider
import org.wfanet.measurement.internal.common.provider

fun ResourceKey.toProvider(): Provider? {
fun ResourceKey.toProvider(): Provider {
return when (this) {
is DataProviderKey ->
provider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ java_library(
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:measurement_consumers_service",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:measurements_service",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:public_keys_service",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:recurring_exchanges_service",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:requisitions_service",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.wfanet.measurement.internal.kingdom.MeasurementConsumersGrpcKt.Measur
import org.wfanet.measurement.internal.kingdom.MeasurementLogEntriesGrpcKt.MeasurementLogEntriesCoroutineStub as InternalMeasurementLogEntriesCoroutineStub
import org.wfanet.measurement.internal.kingdom.MeasurementsGrpcKt.MeasurementsCoroutineStub as InternalMeasurementsCoroutineStub
import org.wfanet.measurement.internal.kingdom.PublicKeysGrpcKt.PublicKeysCoroutineStub as InternalPublicKeysCoroutineStub
import org.wfanet.measurement.internal.kingdom.RecurringExchangesGrpcKt.RecurringExchangesCoroutineStub as InternalRecurringExchangesCoroutineStub
import org.wfanet.measurement.internal.kingdom.RequisitionsGrpcKt.RequisitionsCoroutineStub as InternalRequisitionsCoroutineStub
import org.wfanet.measurement.kingdom.deploy.common.service.DataServices
import org.wfanet.measurement.kingdom.deploy.common.service.toList
Expand All @@ -54,13 +55,14 @@ import org.wfanet.measurement.kingdom.service.api.v2alpha.ExchangesService
import org.wfanet.measurement.kingdom.service.api.v2alpha.MeasurementConsumersService
import org.wfanet.measurement.kingdom.service.api.v2alpha.MeasurementsService
import org.wfanet.measurement.kingdom.service.api.v2alpha.PublicKeysService
import org.wfanet.measurement.kingdom.service.api.v2alpha.RecurringExchangesService
import org.wfanet.measurement.kingdom.service.api.v2alpha.RequisitionsService
import org.wfanet.measurement.kingdom.service.api.v2alpha.withAccountAuthenticationServerInterceptor
import org.wfanet.measurement.kingdom.service.api.v2alpha.withApiKeyAuthenticationServerInterceptor
import org.wfanet.measurement.kingdom.service.system.v1alpha.ComputationLogEntriesService as systemComputationLogEntriesService
import org.wfanet.measurement.kingdom.service.system.v1alpha.ComputationParticipantsService as systemComputationParticipantsService
import org.wfanet.measurement.kingdom.service.system.v1alpha.ComputationsService as systemComputationsService
import org.wfanet.measurement.kingdom.service.system.v1alpha.RequisitionsService as systemRequisitionsService
import org.wfanet.measurement.kingdom.service.system.v1alpha.ComputationLogEntriesService as SystemComputationLogEntriesService
import org.wfanet.measurement.kingdom.service.system.v1alpha.ComputationParticipantsService as SystemComputationParticipantsService
import org.wfanet.measurement.kingdom.service.system.v1alpha.ComputationsService as SystemComputationsService
import org.wfanet.measurement.kingdom.service.system.v1alpha.RequisitionsService as SystemRequisitionsService
import org.wfanet.measurement.loadtest.panelmatchresourcesetup.PanelMatchResourceSetup

/** TestRule that starts and stops all Kingdom gRPC services. */
Expand Down Expand Up @@ -110,6 +112,9 @@ class InProcessKingdom(
InternalExchangeStepsCoroutineStub(internalApiChannel)
}
private val internalExchangesClient by lazy { InternalExchangesCoroutineStub(internalApiChannel) }
private val internalRecurringExchangesClient by lazy {
InternalRecurringExchangesCoroutineStub(internalApiChannel)
}

private val internalDataServer =
GrpcTestServerRule(logAllRequests = verboseGrpcLogging) {
Expand All @@ -120,10 +125,10 @@ class InProcessKingdom(
GrpcTestServerRule(logAllRequests = verboseGrpcLogging) {
logger.info("Building Kingdom's system API services")
listOf(
systemComputationsService(internalMeasurementsClient),
systemComputationLogEntriesService(internalMeasurementLogEntriesClient),
systemComputationParticipantsService(internalComputationParticipantsClient),
systemRequisitionsService(internalRequisitionsClient)
SystemComputationsService(internalMeasurementsClient),
SystemComputationLogEntriesService(internalMeasurementLogEntriesClient),
SystemComputationParticipantsService(internalComputationParticipantsClient),
SystemRequisitionsService(internalRequisitionsClient)
)
.forEach { addService(it.withMetadataDuchyIdentities()) }
}
Expand Down Expand Up @@ -160,19 +165,18 @@ class InProcessKingdom(
MeasurementConsumersService(internalMeasurementConsumersClient)
.withMetadataPrincipalIdentities()
.withAccountAuthenticationServerInterceptor(internalAccountsClient, redirectUri)
.withApiKeyAuthenticationServerInterceptor(internalApiKeysClient)
)
.forEach { addService(it) }

listOf(
.withApiKeyAuthenticationServerInterceptor(internalApiKeysClient),
RecurringExchangesService().withMetadataPrincipalIdentities(),
ExchangesService(internalRecurringExchangesClient, internalExchangesClient)
.withMetadataPrincipalIdentities(),
ExchangeStepsService(internalExchangeStepsClient).withMetadataPrincipalIdentities(),
ExchangeStepAttemptsService(
internalExchangeStepAttemptsClient,
internalExchangeStepsClient
),
ExchangeStepsService(internalExchangeStepsClient),
ExchangesService(internalExchangesClient)
internalExchangeStepAttemptsClient,
internalExchangeStepsClient
)
.withMetadataPrincipalIdentities(),
)
.forEach { addService(it.withMetadataPrincipalIdentities()) }
.forEach { addService(it) }
}

/** Provides a gRPC channel to the Kingdom's public API. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ import org.wfanet.measurement.internal.kingdom.EventGroupMetadataDescriptorsGrpc
import org.wfanet.measurement.internal.kingdom.EventGroupsGrpcKt.EventGroupsCoroutineStub as InternalEventGroupsCoroutineStub
import org.wfanet.measurement.internal.kingdom.ExchangeStepAttemptsGrpcKt.ExchangeStepAttemptsCoroutineStub as InternalExchangeStepAttemptsCoroutineStub
import org.wfanet.measurement.internal.kingdom.ExchangeStepsGrpcKt.ExchangeStepsCoroutineStub as InternalExchangeStepsCoroutineStub
import org.wfanet.measurement.internal.kingdom.ExchangesGrpcKt.ExchangesCoroutineStub as InternalExchangesCoroutineStub
import org.wfanet.measurement.internal.kingdom.MeasurementConsumersGrpcKt.MeasurementConsumersCoroutineStub as InternalMeasurementConsumersCoroutineStub
import org.wfanet.measurement.internal.kingdom.MeasurementsGrpcKt.MeasurementsCoroutineStub as InternalMeasurementsCoroutineStub
import org.wfanet.measurement.internal.kingdom.ModelLinesGrpcKt.ModelLinesCoroutineStub as InternalModelLinesCoroutineStub
import org.wfanet.measurement.internal.kingdom.ModelReleasesGrpcKt.ModelReleasesCoroutineStub as InternalModelReleasesCoroutineStub
import org.wfanet.measurement.internal.kingdom.ModelSuitesGrpcKt.ModelSuitesCoroutineStub as InternalModelSuitesCoroutineStub
import org.wfanet.measurement.internal.kingdom.PublicKeysGrpcKt.PublicKeysCoroutineStub as InternalPublicKeysCoroutineStub
import org.wfanet.measurement.internal.kingdom.RecurringExchangesGrpcKt.RecurringExchangesCoroutineStub as InternalRecurringExchangesCoroutineStub
import org.wfanet.measurement.internal.kingdom.RequisitionsGrpcKt.RequisitionsCoroutineStub as InternalRequisitionsCoroutineStub
import org.wfanet.measurement.kingdom.deploy.common.Llv2ProtocolConfig
import org.wfanet.measurement.kingdom.deploy.common.Llv2ProtocolConfigFlags
Expand All @@ -51,12 +53,14 @@ import org.wfanet.measurement.kingdom.service.api.v2alpha.EventGroupMetadataDesc
import org.wfanet.measurement.kingdom.service.api.v2alpha.EventGroupsService
import org.wfanet.measurement.kingdom.service.api.v2alpha.ExchangeStepAttemptsService
import org.wfanet.measurement.kingdom.service.api.v2alpha.ExchangeStepsService
import org.wfanet.measurement.kingdom.service.api.v2alpha.ExchangesService
import org.wfanet.measurement.kingdom.service.api.v2alpha.MeasurementConsumersService
import org.wfanet.measurement.kingdom.service.api.v2alpha.MeasurementsService
import org.wfanet.measurement.kingdom.service.api.v2alpha.ModelLinesService
import org.wfanet.measurement.kingdom.service.api.v2alpha.ModelReleasesService
import org.wfanet.measurement.kingdom.service.api.v2alpha.ModelSuitesService
import org.wfanet.measurement.kingdom.service.api.v2alpha.PublicKeysService
import org.wfanet.measurement.kingdom.service.api.v2alpha.RecurringExchangesService
import org.wfanet.measurement.kingdom.service.api.v2alpha.RequisitionsService
import org.wfanet.measurement.kingdom.service.api.v2alpha.withAccountAuthenticationServerInterceptor
import org.wfanet.measurement.kingdom.service.api.v2alpha.withApiKeyAuthenticationServerInterceptor
Expand Down Expand Up @@ -128,13 +132,6 @@ private fun run(
)
.withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup)
.withApiKeyAuthenticationServerInterceptor(internalApiKeysCoroutineStub),
ExchangeStepAttemptsService(
InternalExchangeStepAttemptsCoroutineStub(channel),
internalExchangeStepsCoroutineStub
)
.withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup),
ExchangeStepsService(internalExchangeStepsCoroutineStub)
.withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup),
MeasurementsService(
InternalMeasurementsCoroutineStub(channel),
)
Expand All @@ -153,6 +150,19 @@ private fun run(
RequisitionsService(InternalRequisitionsCoroutineStub(channel))
.withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup)
.withApiKeyAuthenticationServerInterceptor(internalApiKeysCoroutineStub),
RecurringExchangesService().withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup),
ExchangesService(
InternalRecurringExchangesCoroutineStub(channel),
InternalExchangesCoroutineStub(channel)
)
.withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup),
ExchangeStepsService(internalExchangeStepsCoroutineStub)
.withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup),
ExchangeStepAttemptsService(
InternalExchangeStepAttemptsCoroutineStub(channel),
internalExchangeStepsCoroutineStub
)
.withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup),
ModelLinesService(InternalModelLinesCoroutineStub(channel))
.withPrincipalsFromX509AuthorityKeyIdentifiers(principalLookup),
ModelSuitesService(InternalModelSuitesCoroutineStub(channel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import org.wfanet.measurement.internal.kingdom.GetExchangeRequest
import org.wfanet.measurement.internal.kingdom.StreamExchangesRequest
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.ExchangeNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.KingdomInternalException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.PROVIDER_PARAM
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.providerFilter
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.queries.StreamExchanges
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.ExchangeReader
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.writers.BatchDeleteExchanges
Expand All @@ -58,13 +56,11 @@ class SpannerExchangesService(
"""
WHERE RecurringExchanges.ExternalRecurringExchangeId = @external_recurring_exchange_id
AND Exchanges.Date = @date
AND ${providerFilter(request.provider)}
"""
.trimIndent()
)
bind("external_recurring_exchange_id" to request.externalRecurringExchangeId)
bind("date" to request.date.toCloudDate())
bind(PROVIDER_PARAM to request.provider.externalId)
appendClause("LIMIT 1")
}
.execute(client.singleUse())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ class ExchangeReader : SpannerReader<ExchangeReader.Result>() {
SELECT $SELECT_COLUMNS_SQL
FROM Exchanges
JOIN RecurringExchanges USING (RecurringExchangeId)
LEFT JOIN ModelProviders USING (ModelProviderId)
LEFT JOIN DataProviders USING (DataProviderId)
"""
.trimIndent()

Expand Down Expand Up @@ -111,6 +109,6 @@ class ExchangeReader : SpannerReader<ExchangeReader.Result>() {
"RecurringExchanges.RecurringExchangeDetails"
)

val SELECT_COLUMNS_SQL = SELECT_COLUMNS.joinToString(", ")
private val SELECT_COLUMNS_SQL = SELECT_COLUMNS.joinToString(", ")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ kt_jvm_library(
"//src/main/proto/wfa/measurement/internal/common:provider_kt_jvm_proto",
"//src/main/proto/wfa/measurement/internal/kingdom:exchange_kt_jvm_proto",
"//src/main/proto/wfa/measurement/internal/kingdom:exchanges_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/internal/kingdom:recurring_exchanges_service_kt_jvm_grpc_proto",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc",
Expand Down
Loading

0 comments on commit 38fb66f

Please sign in to comment.