Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement internal spanner stream exchanges method #975

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package org.wfanet.measurement.kingdom.deploy.gcloud.spanner

import io.grpc.Status
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.singleOrNull
import org.wfanet.measurement.common.grpc.failGrpc
import org.wfanet.measurement.common.identity.IdGenerator
Expand All @@ -26,8 +28,10 @@ import org.wfanet.measurement.internal.kingdom.CreateExchangeRequest
import org.wfanet.measurement.internal.kingdom.Exchange
import org.wfanet.measurement.internal.kingdom.ExchangesGrpcKt.ExchangesCoroutineImplBase
import org.wfanet.measurement.internal.kingdom.GetExchangeRequest
import org.wfanet.measurement.internal.kingdom.StreamExchangesRequest
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.PROVIDER_PARAM
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.providerFilter
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.queries.StreamExchanges
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.ExchangeReader
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.writers.CreateExchange

Expand Down Expand Up @@ -60,4 +64,10 @@ class SpannerExchangesService(
?.exchange
?: failGrpc(Status.NOT_FOUND) { "Exchange not found" }
}

override fun streamExchanges(request: StreamExchangesRequest): Flow<Exchange> {
return StreamExchanges(request.filter, request.limit).execute(client.singleUse()).map {
it.exchange
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2023 The Cross-Media Measurement Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wfanet.measurement.kingdom.deploy.gcloud.spanner.queries

import com.google.cloud.spanner.Statement
import org.wfanet.measurement.gcloud.common.toCloudDate
import org.wfanet.measurement.gcloud.spanner.appendClause
import org.wfanet.measurement.gcloud.spanner.bind
import org.wfanet.measurement.internal.kingdom.StreamExchangesRequest
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.ExchangeReader

class StreamExchanges(requestFilter: StreamExchangesRequest.Filter, limit: Int = 0) :
SimpleSpannerQuery<ExchangeReader.Result>() {

override val reader =
ExchangeReader().fillStatementBuilder {
appendWhereClause(requestFilter)
appendClause("ORDER BY Exchanges.Date ASC")
if (limit > 0) {
appendClause("LIMIT @${Params.LIMIT}")
bind(Params.LIMIT to limit.toLong())
}
}

private fun Statement.Builder.appendWhereClause(filter: StreamExchangesRequest.Filter) {
val conjuncts = mutableListOf<String>()

if (filter.hasDateBefore()) {
conjuncts.add("Exchanges.Date < @${Params.EXCHANGED_BEFORE}")
bind(Params.EXCHANGED_BEFORE to filter.dateBefore.toCloudDate())
}

if (conjuncts.isEmpty()) {
return
}

appendClause("WHERE ")
append(conjuncts.joinToString(" AND "))
}

private object Params {
const val LIMIT = "limit"
const val EXCHANGED_BEFORE = "exchangedBefore"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package org.wfanet.measurement.kingdom.service.internal.testing

import com.google.common.truth.extensions.proto.ProtoTruth.assertThat
import com.google.protobuf.ByteString
import com.google.type.date
import kotlin.test.assertFails
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.junit.Before
import org.junit.Rule
Expand All @@ -37,6 +39,10 @@ import org.wfanet.measurement.internal.kingdom.GetExchangeRequest
import org.wfanet.measurement.internal.kingdom.ModelProvider
import org.wfanet.measurement.internal.kingdom.RecurringExchange
import org.wfanet.measurement.internal.kingdom.RecurringExchangesGrpcKt.RecurringExchangesCoroutineImplBase
import org.wfanet.measurement.internal.kingdom.StreamExchangesRequestKt.filter
import org.wfanet.measurement.internal.kingdom.copy
import org.wfanet.measurement.internal.kingdom.createExchangeRequest
import org.wfanet.measurement.internal.kingdom.streamExchangesRequest
import org.wfanet.measurement.kingdom.deploy.common.testing.DuchyIdSetter
import org.wfanet.measurement.kingdom.service.internal.testing.Population.Companion.DUCHIES

Expand Down Expand Up @@ -205,6 +211,95 @@ abstract class ExchangesServiceTest {
assertFails { getExchange() }
}

@Test
fun `streamExchange returns all exchanges`(): Unit = runBlocking {
val createRequest1 = createExchangeRequest { exchange = EXCHANGE }
val createRequest2 = createExchangeRequest {
exchange =
EXCHANGE.copy {
date = date {
year = 2021
month = 1
day = 1
}
}
}

val exchange1 = createExchange(createRequest1)
val exchange2 = createExchange(createRequest2)

val response = exchanges.streamExchanges(streamExchangesRequest {}).toList()

assertThat(response).hasSize(2)
assertThat(response).containsExactly(exchange1, exchange2)
}

@Test
fun `streamExchange respects filter before date`(): Unit = runBlocking {
val oldExchangeRequest = createExchangeRequest {
exchange =
EXCHANGE.copy {
date = date {
year = 2021
month = 1
day = 1
}
}
}
val newExchangeRequest = createExchangeRequest {
exchange =
EXCHANGE.copy {
date = date {
year = 2023
month = 1
day = 1
}
}
}

val oldExchange = createExchange(oldExchangeRequest)
createExchange(newExchangeRequest)

val response =
exchanges
.streamExchanges(
streamExchangesRequest {
filter = filter {
dateBefore = date {
year = 2022
month = 1
day = 1
}
}
}
)
.toList()

assertThat(response).containsExactly(oldExchange)
}

@Test
fun `streamExchange respects limit`(): Unit = runBlocking {
val createRequest1 = createExchangeRequest { exchange = EXCHANGE }
val createRequest2 = createExchangeRequest {
exchange =
EXCHANGE.copy {
date = date {
year = 2021
month = 1
day = 1
}
}
}

createExchange(createRequest1)
createExchange(createRequest2)

val response = exchanges.streamExchanges(streamExchangesRequest { limit = 1 }).toList()

assertThat(response).hasSize(1)
}

private fun createExchange(request: CreateExchangeRequest): Exchange {
return runBlocking { exchanges.createExchange(request) }
}
Expand Down