Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a Coroutine variant of ScanStream/ScanIterator #1438

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
<scope>test</scope>
</dependency>

<!-- CDI -->

<dependency>
Expand Down Expand Up @@ -385,6 +392,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test-junit5</artifactId>
<version>${kotlin.version}</version>
<scope>test</scope>
</dependency>

<!-- Logging -->

<dependency>
Expand Down Expand Up @@ -631,6 +645,7 @@
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<configuration>
<jvmTarget>1.8</jvmTarget>
<args>
<arg>-Xopt-in=kotlin.RequiresOptIn</arg>
<arg>-Xopt-in=io.lettuce.core.ExperimentalLettuceCoroutinesApi</arg>
Expand Down
75 changes: 75 additions & 0 deletions src/main/kotlin/io/lettuce/core/ScanFlow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.lettuce.core

import io.lettuce.core.api.coroutines.*
import io.lettuce.core.cluster.api.coroutines.RedisClusterCoroutinesCommandsImpl
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow

object ScanFlow {
/**
* Sequentially iterate the keys space.
*
* @param commands coroutines commands
* @param scanArgs scan arguments.
* @return `Flow<K>` flow of keys.
*/
@JvmOverloads
fun <K : Any, V : Any> scan(commands: RedisKeyCoroutinesCommands<K, V>, scanArgs: ScanArgs? = null): Flow<K> {
val ops = when (commands) {
is RedisCoroutinesCommandsImpl -> commands.ops
is RedisClusterCoroutinesCommandsImpl -> commands.ops
is RedisKeyCoroutinesCommandsImpl -> commands.ops
else -> throw IllegalArgumentException("Cannot access underlying reactive API")
}
return when (scanArgs) {
null -> ScanStream.scan(ops)
else -> ScanStream.scan(ops, scanArgs)
}.asFlow()
}


/**
* Sequentially iterate hash fields and associated values.
*
* @param commands coroutines commands
* @param key the key.
* @param scanArgs scan arguments.
* @return `Flow<KeyValue<K, V>>` flow of key-values.
*/
@JvmOverloads
fun <K : Any, V : Any> hscan(commands: RedisHashCoroutinesCommands<K, V>, key: K, scanArgs: ScanArgs? = null): Flow<KeyValue<K, V>> {
val ops = when (commands) {
is RedisCoroutinesCommandsImpl -> commands.ops
is RedisClusterCoroutinesCommandsImpl -> commands.ops
is RedisHashCoroutinesCommandsImpl -> commands.ops
else -> throw IllegalArgumentException("Cannot access underlying reactive API")
}
return when (scanArgs) {
null -> ScanStream.hscan(ops, key)
else -> ScanStream.hscan(ops, key, scanArgs)
}.asFlow()
}


/**
* Sequentially iterate Set elements.
*
* @param commands coroutines commands
* @param key the key.
* @param scanArgs scan arguments.
* @return `Flow<V>` flow of value.
*/
@JvmOverloads
fun <K : Any, V : Any> sscan(commands: RedisSetCoroutinesCommands<K, V>, key: K, scanArgs: ScanArgs? = null): Flow<V> {
val ops = when (commands) {
is RedisCoroutinesCommandsImpl -> commands.ops
is RedisClusterCoroutinesCommandsImpl -> commands.ops
is RedisSetCoroutinesCommandsImpl -> commands.ops
else -> throw IllegalArgumentException("Cannot access underlying reactive API")
}
return when (scanArgs) {
null -> ScanStream.sscan(ops, key)
else -> ScanStream.sscan(ops, key, scanArgs)
}.asFlow()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import kotlinx.coroutines.reactive.awaitSingle
* @since 6.0
*/
@ExperimentalLettuceCoroutinesApi
internal class BaseRedisCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: BaseRedisReactiveCommands<K, V>) : BaseRedisCoroutinesCommands<K, V> {
internal class BaseRedisCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: BaseRedisReactiveCommands<K, V>) : BaseRedisCoroutinesCommands<K, V> {

override suspend fun publish(channel: K, message: V): Long? = ops.publish(channel, message).awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
*/
@ExperimentalLettuceCoroutinesApi
open class RedisCoroutinesCommandsImpl<K : Any, V : Any>(
private val ops: RedisReactiveCommands<K, V>
internal val ops: RedisReactiveCommands<K, V>
) : RedisCoroutinesCommands<K, V>, RedisClusterCoroutinesCommands<K, V>,
BaseRedisCoroutinesCommands<K, V> by BaseRedisCoroutinesCommandsImpl(ops),
RedisGeoCoroutinesCommands<K, V> by RedisGeoCoroutinesCommandsImpl(ops),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
* @since 6.0
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisGeoCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisGeoReactiveCommands<K, V>) : RedisGeoCoroutinesCommands<K, V> {
internal class RedisGeoCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisGeoReactiveCommands<K, V>) : RedisGeoCoroutinesCommands<K, V> {

override suspend fun geoadd(key: K, longitude: Double, latitude: Double, member: V): Long? = ops.geoadd(key, longitude, latitude, member).awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
* @since 6.0
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisHLLCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisHLLReactiveCommands<K, V>) : RedisHLLCoroutinesCommands<K, V> {
internal class RedisHLLCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisHLLReactiveCommands<K, V>) : RedisHLLCoroutinesCommands<K, V> {

override suspend fun pfadd(key: K, vararg values: V): Long? = ops.pfadd(key, *values).awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
* @since 6.0
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisHashCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisHashReactiveCommands<K, V>) : RedisHashCoroutinesCommands<K, V> {
internal class RedisHashCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisHashReactiveCommands<K, V>) : RedisHashCoroutinesCommands<K, V> {

override suspend fun hdel(key: K, vararg fields: K): Long? = ops.hdel(key, *fields).awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.*
* @since 6.0
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisKeyCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisKeyReactiveCommands<K, V>) : RedisKeyCoroutinesCommands<K, V> {
internal class RedisKeyCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisKeyReactiveCommands<K, V>) : RedisKeyCoroutinesCommands<K, V> {

override suspend fun del(vararg keys: K): Long? = ops.del(*keys).awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
* @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesReactiveImplementation
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisListCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisListReactiveCommands<K, V>) : RedisListCoroutinesCommands<K, V> {
internal class RedisListCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisListReactiveCommands<K, V>) : RedisListCoroutinesCommands<K, V> {

override suspend fun blpop(timeout: Long, vararg keys: K): KeyValue<K, V>? = ops.blpop(timeout, *keys).awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
* @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesReactiveImplementation
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisScriptingCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisScriptingReactiveCommands<K, V>) : RedisScriptingCoroutinesCommands<K, V> {
internal class RedisScriptingCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisScriptingReactiveCommands<K, V>) : RedisScriptingCoroutinesCommands<K, V> {

override suspend fun <T> eval(script: String, type: ScriptOutputType, vararg keys: K): T? = ops.eval<T>(script, type, *keys).awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import java.util.*
* @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesReactiveImplementation
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisServerCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisServerReactiveCommands<K, V>) : RedisServerCoroutinesCommands<K, V> {
internal class RedisServerCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisServerReactiveCommands<K, V>) : RedisServerCoroutinesCommands<K, V> {

override suspend fun bgrewriteaof(): String? = ops.bgrewriteaof().awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
* @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesReactiveImplementation
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisSetCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisSetReactiveCommands<K, V>) : RedisSetCoroutinesCommands<K, V> {
internal class RedisSetCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisSetReactiveCommands<K, V>) : RedisSetCoroutinesCommands<K, V> {

override suspend fun sadd(key: K, vararg members: V): Long? = ops.sadd(key, *members).awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
* @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesReactiveImplementation
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisSortedSetCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisSortedSetReactiveCommands<K, V>) : RedisSortedSetCoroutinesCommands<K, V> {
internal class RedisSortedSetCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisSortedSetReactiveCommands<K, V>) : RedisSortedSetCoroutinesCommands<K, V> {

override suspend fun bzpopmin(timeout: Long, vararg keys: K): KeyValue<K, ScoredValue<V>>? = ops.bzpopmin(timeout, *keys).awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
* @since 5.1
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisStreamCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisStreamReactiveCommands<K, V>) : RedisStreamCoroutinesCommands<K, V> {
internal class RedisStreamCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisStreamReactiveCommands<K, V>) : RedisStreamCoroutinesCommands<K, V> {

override suspend fun xack(key: K, group: K, vararg messageIds: String): Long? = ops.xack(key, group, *messageIds).awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
* @since 6.0
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisStringCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisStringReactiveCommands<K, V>) : RedisStringCoroutinesCommands<K, V> {
internal class RedisStringCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisStringReactiveCommands<K, V>) : RedisStringCoroutinesCommands<K, V> {

override suspend fun append(key: K, value: V): Long? = ops.append(key, value).awaitFirstOrNull()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import kotlinx.coroutines.reactive.awaitLast
* @since 6.0
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisTransactionalCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisTransactionalReactiveCommands<K, V>) : RedisTransactionalCoroutinesCommands<K, V> {
internal class RedisTransactionalCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisTransactionalReactiveCommands<K, V>) : RedisTransactionalCoroutinesCommands<K, V> {

override suspend fun discard(): String = ops.discard().awaitLast()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisClusterCoroutinesCommandsImpl<K : Any, V : Any>(
private val ops: RedisClusterReactiveCommands<K, V>
internal val ops: RedisClusterReactiveCommands<K, V>
) : RedisClusterCoroutinesCommands<K, V>,
BaseRedisCoroutinesCommands<K, V> by BaseRedisCoroutinesCommandsImpl(ops),
RedisGeoCoroutinesCommands<K, V> by RedisGeoCoroutinesCommandsImpl(ops),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import java.net.SocketAddress
* @since 6.0
*/
@ExperimentalLettuceCoroutinesApi
internal class RedisSentinelCoroutinesCommandsImpl<K : Any, V : Any>(private val ops: RedisSentinelReactiveCommands<K, V>) : RedisSentinelCoroutinesCommands<K, V> {
internal class RedisSentinelCoroutinesCommandsImpl<K : Any, V : Any>(internal val ops: RedisSentinelReactiveCommands<K, V>) : RedisSentinelCoroutinesCommands<K, V> {

override suspend fun getMasterAddrByName(key: K): SocketAddress = ops.getMasterAddrByName(key).awaitLast()

Expand Down
79 changes: 79 additions & 0 deletions src/test/kotlin/io/lettuce/core/ScanFlowIntegrationTests.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core
sokomishalov marked this conversation as resolved.
Show resolved Hide resolved

import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.api.coroutines
import io.lettuce.test.LettuceExtension
import kotlinx.coroutines.flow.count
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith
import javax.inject.Inject


/**
* @author Mikhael Sokolov
*/
@ExtendWith(LettuceExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class ScanFlowIntegrationTests @Inject constructor(private val connection: StatefulRedisConnection<String, String>) : TestSupport() {

@BeforeEach
fun setUp() {
connection.sync().flushall()
}

@Test
fun `should scan iteratively`() = runBlocking<Unit> {
with(connection.coroutines()) {
repeat(1000) {
set("key - $it", value)
}
assertThat(ScanFlow.scan(this, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250)
assertThat(ScanFlow.scan(this).count()).isEqualTo(1000)
}
}

@Test
fun `should hscan iteratively`() = runBlocking<Unit> {
with(connection.coroutines()) {
repeat(1000) {
hset(key, "field-$it", "value-$it")
}

assertThat(ScanFlow.hscan(this, key, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250)
assertThat(ScanFlow.hscan(this, key).count()).isEqualTo(1000)
}
}

@Test
fun shouldSscanIteratively() = runBlocking<Unit> {
with(connection.coroutines()) {
repeat(1000) {
sadd(key, "value-$it")
}

assertThat(ScanFlow.sscan(this, key, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250)
assertThat(ScanFlow.sscan(this, key).count()).isEqualTo(1000)
}
}
}