Skip to content

Commit

Permalink
Implement internal spanner stream exchanges method
Browse files Browse the repository at this point in the history
  • Loading branch information
jcorilla committed May 1, 2023
1 parent a9e9490 commit c7cdd72
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package org.wfanet.measurement.kingdom.deploy.gcloud.spanner

import io.grpc.Status
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.singleOrNull
import org.wfanet.measurement.common.grpc.failGrpc
import org.wfanet.measurement.common.identity.IdGenerator
Expand All @@ -26,8 +28,10 @@ import org.wfanet.measurement.internal.kingdom.CreateExchangeRequest
import org.wfanet.measurement.internal.kingdom.Exchange
import org.wfanet.measurement.internal.kingdom.ExchangesGrpcKt.ExchangesCoroutineImplBase
import org.wfanet.measurement.internal.kingdom.GetExchangeRequest
import org.wfanet.measurement.internal.kingdom.StreamExchangesRequest
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.PROVIDER_PARAM
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.providerFilter
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.queries.StreamExchanges
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.ExchangeReader
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.writers.CreateExchange

Expand Down Expand Up @@ -60,4 +64,10 @@ class SpannerExchangesService(
?.exchange
?: failGrpc(Status.NOT_FOUND) { "Exchange not found" }
}

override fun streamExchanges(request: StreamExchangesRequest): Flow<Exchange> {
return StreamExchanges(request.filter, request.limit).execute(client.singleUse()).map {
it.exchange
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2023 The Cross-Media Measurement Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wfanet.measurement.kingdom.deploy.gcloud.spanner.queries

import com.google.cloud.spanner.Statement
import org.wfanet.measurement.gcloud.common.toCloudDate
import org.wfanet.measurement.gcloud.spanner.appendClause
import org.wfanet.measurement.gcloud.spanner.bind
import org.wfanet.measurement.internal.kingdom.StreamExchangesRequest
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.ExchangeReader

class StreamExchanges(requestFilter: StreamExchangesRequest.Filter, limit: Int = 0) :
SimpleSpannerQuery<ExchangeReader.Result>() {

override val reader =
ExchangeReader().fillStatementBuilder {
appendWhereClause(requestFilter)
appendClause("ORDER BY Exchanges.Date ASC")
if (limit > 0) {
appendClause("LIMIT @${Params.LIMIT}")
bind(Params.LIMIT to limit.toLong())
}
}

private fun Statement.Builder.appendWhereClause(filter: StreamExchangesRequest.Filter) {
val conjuncts = mutableListOf<String>()

if (filter.hasExchangedBefore()) {
conjuncts.add("Exchanges.Date < @${Params.EXCHANGED_BEFORE}")
bind(Params.EXCHANGED_BEFORE to filter.exchangedBefore.toCloudDate())
}

if (conjuncts.isEmpty()) {
return
}

appendClause("WHERE ")
append(conjuncts.joinToString(" AND "))
}

private object Params {
const val LIMIT = "limit"
const val EXCHANGED_BEFORE = "exchangedBefore"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package org.wfanet.measurement.kingdom.service.internal.testing

import com.google.common.truth.extensions.proto.ProtoTruth.assertThat
import com.google.protobuf.ByteString
import com.google.type.date
import kotlin.test.assertFails
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.junit.Before
import org.junit.Rule
Expand All @@ -37,6 +39,10 @@ import org.wfanet.measurement.internal.kingdom.GetExchangeRequest
import org.wfanet.measurement.internal.kingdom.ModelProvider
import org.wfanet.measurement.internal.kingdom.RecurringExchange
import org.wfanet.measurement.internal.kingdom.RecurringExchangesGrpcKt.RecurringExchangesCoroutineImplBase
import org.wfanet.measurement.internal.kingdom.StreamExchangesRequestKt.filter
import org.wfanet.measurement.internal.kingdom.copy
import org.wfanet.measurement.internal.kingdom.createExchangeRequest
import org.wfanet.measurement.internal.kingdom.streamExchangesRequest
import org.wfanet.measurement.kingdom.deploy.common.testing.DuchyIdSetter
import org.wfanet.measurement.kingdom.service.internal.testing.Population.Companion.DUCHIES

Expand Down Expand Up @@ -205,6 +211,95 @@ abstract class ExchangesServiceTest {
assertFails { getExchange() }
}

@Test
fun `streamExchange returns all exchanges`(): Unit = runBlocking {
val createRequest1 = createExchangeRequest { exchange = EXCHANGE }
val createRequest2 = createExchangeRequest {
exchange =
EXCHANGE.copy {
date = date {
year = 2021
month = 1
day = 1
}
}
}

val exchange1 = createExchange(createRequest1)
val exchange2 = createExchange(createRequest2)

val response = exchanges.streamExchanges(streamExchangesRequest {}).toList()

assertThat(response).hasSize(2)
assertThat(response).containsExactly(exchange1, exchange2)
}

@Test
fun `streamExchange respects exchanged before date`(): Unit = runBlocking {
val oldExchangeRequest = createExchangeRequest {
exchange =
EXCHANGE.copy {
date = date {
year = 2021
month = 1
day = 1
}
}
}
val newExchangeRequest = createExchangeRequest {
exchange =
EXCHANGE.copy {
date = date {
year = 2023
month = 1
day = 1
}
}
}

val oldExchange = createExchange(oldExchangeRequest)
createExchange(newExchangeRequest)

val response =
exchanges
.streamExchanges(
streamExchangesRequest {
filter = filter {
exchangedBefore = date {
year = 2022
month = 1
day = 1
}
}
}
)
.toList()

assertThat(response).containsExactly(oldExchange)
}

@Test
fun `streamExchange respects limit`(): Unit = runBlocking {
val createRequest1 = CreateExchangeRequest.newBuilder().apply { exchange = EXCHANGE }.build()
val createRequest2 = createExchangeRequest {
exchange =
EXCHANGE.copy {
date = date {
year = 2021
month = 1
day = 1
}
}
}

createExchange(createRequest1)
createExchange(createRequest2)

val response = exchanges.streamExchanges(streamExchangesRequest { limit = 1 }).toList()

assertThat(response).hasSize(1)
}

private fun createExchange(request: CreateExchangeRequest): Exchange {
return runBlocking { exchanges.createExchange(request) }
}
Expand Down

0 comments on commit c7cdd72

Please sign in to comment.