From 1220007bbc2ce5dd229d5ea2ae2f38dc192b54bb Mon Sep 17 00:00:00 2001 From: sokomishalov Date: Wed, 30 Sep 2020 19:15:15 +0300 Subject: [PATCH] ScanStream bridge to the coroutines API #1435 --- pom.xml | 15 ++++ src/main/kotlin/io/lettuce/core/ScanFlow.kt | 75 ++++++++++++++++++ .../BaseRedisCoroutinesCommandsImpl.kt | 2 +- .../coroutines/RedisCoroutinesCommandsImpl.kt | 2 +- .../RedisGeoCoroutinesCommandsImpl.kt | 2 +- .../RedisHLLCoroutinesCommandsImpl.kt | 2 +- .../RedisHashCoroutinesCommandsImpl.kt | 2 +- .../RedisKeyCoroutinesCommandsImpl.kt | 2 +- .../RedisListCoroutinesCommandsImpl.kt | 2 +- .../RedisScriptingCoroutinesCommandsImpl.kt | 2 +- .../RedisServerCoroutinesCommandsImpl.kt | 2 +- .../RedisSetCoroutinesCommandsImpl.kt | 2 +- .../RedisSortedSetCoroutinesCommandsImpl.kt | 2 +- .../RedisStreamCoroutinesCommandsImpl.kt | 2 +- .../RedisStringCoroutinesCommandsImpl.kt | 2 +- ...edisTransactionalCoroutinesCommandsImpl.kt | 2 +- .../RedisClusterCoroutinesCommandsImpl.kt | 2 +- .../RedisSentinelCoroutinesCommandsImpl.kt | 2 +- .../lettuce/core/ScanFlowIntegrationTests.kt | 79 +++++++++++++++++++ 19 files changed, 185 insertions(+), 16 deletions(-) create mode 100644 src/main/kotlin/io/lettuce/core/ScanFlow.kt create mode 100644 src/test/kotlin/io/lettuce/core/ScanFlowIntegrationTests.kt diff --git a/pom.xml b/pom.xml index a9861e0277..e013ea9d84 100644 --- a/pom.xml +++ b/pom.xml @@ -329,6 +329,13 @@ test + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + ${kotlin.version} + test + + @@ -385,6 +392,13 @@ test + + org.jetbrains.kotlin + kotlin-test-junit5 + ${kotlin.version} + test + + @@ -631,6 +645,7 @@ org.jetbrains.kotlin kotlin-maven-plugin + 1.8 -Xopt-in=kotlin.RequiresOptIn -Xopt-in=io.lettuce.core.ExperimentalLettuceCoroutinesApi diff --git a/src/main/kotlin/io/lettuce/core/ScanFlow.kt b/src/main/kotlin/io/lettuce/core/ScanFlow.kt new file mode 100644 index 0000000000..1993525775 --- /dev/null +++ b/src/main/kotlin/io/lettuce/core/ScanFlow.kt @@ -0,0 +1,75 @@ +package io.lettuce.core + +import io.lettuce.core.api.coroutines.* +import io.lettuce.core.cluster.api.coroutines.RedisClusterCoroutinesCommandsImpl +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.reactive.asFlow + +object ScanFlow { + /** + * Sequentially iterate the keys space. + * + * @param commands coroutines commands + * @param scanArgs scan arguments. + * @return `Flow` flow of keys. + */ + @JvmOverloads + fun scan(commands: RedisKeyCoroutinesCommands, scanArgs: ScanArgs? = null): Flow { + val ops = when (commands) { + is RedisCoroutinesCommandsImpl -> commands.ops + is RedisClusterCoroutinesCommandsImpl -> commands.ops + is RedisKeyCoroutinesCommandsImpl -> commands.ops + else -> throw IllegalArgumentException("Cannot access underlying reactive API") + } + return when (scanArgs) { + null -> ScanStream.scan(ops) + else -> ScanStream.scan(ops, scanArgs) + }.asFlow() + } + + + /** + * Sequentially iterate hash fields and associated values. + * + * @param commands coroutines commands + * @param key the key. + * @param scanArgs scan arguments. + * @return `Flow>` flow of key-values. + */ + @JvmOverloads + fun hscan(commands: RedisHashCoroutinesCommands, key: K, scanArgs: ScanArgs? = null): Flow> { + val ops = when (commands) { + is RedisCoroutinesCommandsImpl -> commands.ops + is RedisClusterCoroutinesCommandsImpl -> commands.ops + is RedisHashCoroutinesCommandsImpl -> commands.ops + else -> throw IllegalArgumentException("Cannot access underlying reactive API") + } + return when (scanArgs) { + null -> ScanStream.hscan(ops, key) + else -> ScanStream.hscan(ops, key, scanArgs) + }.asFlow() + } + + + /** + * Sequentially iterate Set elements. + * + * @param commands coroutines commands + * @param key the key. + * @param scanArgs scan arguments. + * @return `Flow` flow of value. + */ + @JvmOverloads + fun sscan(commands: RedisSetCoroutinesCommands, key: K, scanArgs: ScanArgs? = null): Flow { + val ops = when (commands) { + is RedisCoroutinesCommandsImpl -> commands.ops + is RedisClusterCoroutinesCommandsImpl -> commands.ops + is RedisSetCoroutinesCommandsImpl -> commands.ops + else -> throw IllegalArgumentException("Cannot access underlying reactive API") + } + return when (scanArgs) { + null -> ScanStream.sscan(ops, key) + else -> ScanStream.sscan(ops, key, scanArgs) + }.asFlow() + } +} \ No newline at end of file diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt index 3f5a11cf09..6c3b15f6e8 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt @@ -36,7 +36,7 @@ import kotlinx.coroutines.reactive.awaitSingle * @since 6.0 */ @ExperimentalLettuceCoroutinesApi -internal class BaseRedisCoroutinesCommandsImpl(private val ops: BaseRedisReactiveCommands) : BaseRedisCoroutinesCommands { +internal class BaseRedisCoroutinesCommandsImpl(internal val ops: BaseRedisReactiveCommands) : BaseRedisCoroutinesCommands { override suspend fun publish(channel: K, message: V): Long? = ops.publish(channel, message).awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisCoroutinesCommandsImpl.kt index a3c88a05a5..966af05a9e 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisCoroutinesCommandsImpl.kt @@ -33,7 +33,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull */ @ExperimentalLettuceCoroutinesApi open class RedisCoroutinesCommandsImpl( - private val ops: RedisReactiveCommands + internal val ops: RedisReactiveCommands ) : RedisCoroutinesCommands, RedisClusterCoroutinesCommands, BaseRedisCoroutinesCommands by BaseRedisCoroutinesCommandsImpl(ops), RedisGeoCoroutinesCommands by RedisGeoCoroutinesCommandsImpl(ops), diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisGeoCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisGeoCoroutinesCommandsImpl.kt index 5993ec5faf..4a3d450495 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisGeoCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisGeoCoroutinesCommandsImpl.kt @@ -31,7 +31,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull * @since 6.0 */ @ExperimentalLettuceCoroutinesApi -internal class RedisGeoCoroutinesCommandsImpl(private val ops: RedisGeoReactiveCommands) : RedisGeoCoroutinesCommands { +internal class RedisGeoCoroutinesCommandsImpl(internal val ops: RedisGeoReactiveCommands) : RedisGeoCoroutinesCommands { override suspend fun geoadd(key: K, longitude: Double, latitude: Double, member: V): Long? = ops.geoadd(key, longitude, latitude, member).awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisHLLCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisHLLCoroutinesCommandsImpl.kt index 80de897c76..8fe2b4ad3f 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisHLLCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisHLLCoroutinesCommandsImpl.kt @@ -30,7 +30,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull * @since 6.0 */ @ExperimentalLettuceCoroutinesApi -internal class RedisHLLCoroutinesCommandsImpl(private val ops: RedisHLLReactiveCommands) : RedisHLLCoroutinesCommands { +internal class RedisHLLCoroutinesCommandsImpl(internal val ops: RedisHLLReactiveCommands) : RedisHLLCoroutinesCommands { override suspend fun pfadd(key: K, vararg values: V): Long? = ops.pfadd(key, *values).awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisHashCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisHashCoroutinesCommandsImpl.kt index 8e9798cc23..20c2caa638 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisHashCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisHashCoroutinesCommandsImpl.kt @@ -32,7 +32,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull * @since 6.0 */ @ExperimentalLettuceCoroutinesApi -internal class RedisHashCoroutinesCommandsImpl(private val ops: RedisHashReactiveCommands) : RedisHashCoroutinesCommands { +internal class RedisHashCoroutinesCommandsImpl(internal val ops: RedisHashReactiveCommands) : RedisHashCoroutinesCommands { override suspend fun hdel(key: K, vararg fields: K): Long? = ops.hdel(key, *fields).awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisKeyCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisKeyCoroutinesCommandsImpl.kt index 3b4f040a30..442694368d 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisKeyCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisKeyCoroutinesCommandsImpl.kt @@ -32,7 +32,7 @@ import java.util.* * @since 6.0 */ @ExperimentalLettuceCoroutinesApi -internal class RedisKeyCoroutinesCommandsImpl(private val ops: RedisKeyReactiveCommands) : RedisKeyCoroutinesCommands { +internal class RedisKeyCoroutinesCommandsImpl(internal val ops: RedisKeyReactiveCommands) : RedisKeyCoroutinesCommands { override suspend fun del(vararg keys: K): Long? = ops.del(*keys).awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisListCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisListCoroutinesCommandsImpl.kt index 618d77751f..28a36369ae 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisListCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisListCoroutinesCommandsImpl.kt @@ -36,7 +36,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull * @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesReactiveImplementation */ @ExperimentalLettuceCoroutinesApi -internal class RedisListCoroutinesCommandsImpl(private val ops: RedisListReactiveCommands) : RedisListCoroutinesCommands { +internal class RedisListCoroutinesCommandsImpl(internal val ops: RedisListReactiveCommands) : RedisListCoroutinesCommands { override suspend fun blpop(timeout: Long, vararg keys: K): KeyValue? = ops.blpop(timeout, *keys).awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisScriptingCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisScriptingCoroutinesCommandsImpl.kt index 8ed8edd4a3..0e21297693 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisScriptingCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisScriptingCoroutinesCommandsImpl.kt @@ -36,7 +36,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull * @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesReactiveImplementation */ @ExperimentalLettuceCoroutinesApi -internal class RedisScriptingCoroutinesCommandsImpl(private val ops: RedisScriptingReactiveCommands) : RedisScriptingCoroutinesCommands { +internal class RedisScriptingCoroutinesCommandsImpl(internal val ops: RedisScriptingReactiveCommands) : RedisScriptingCoroutinesCommands { override suspend fun eval(script: String, type: ScriptOutputType, vararg keys: K): T? = ops.eval(script, type, *keys).awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommandsImpl.kt index aa642db3ef..2903c94e24 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisServerCoroutinesCommandsImpl.kt @@ -39,7 +39,7 @@ import java.util.* * @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesReactiveImplementation */ @ExperimentalLettuceCoroutinesApi -internal class RedisServerCoroutinesCommandsImpl(private val ops: RedisServerReactiveCommands) : RedisServerCoroutinesCommands { +internal class RedisServerCoroutinesCommandsImpl(internal val ops: RedisServerReactiveCommands) : RedisServerCoroutinesCommands { override suspend fun bgrewriteaof(): String? = ops.bgrewriteaof().awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisSetCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisSetCoroutinesCommandsImpl.kt index b98c5b4891..fdca45ae45 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisSetCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisSetCoroutinesCommandsImpl.kt @@ -38,7 +38,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull * @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesReactiveImplementation */ @ExperimentalLettuceCoroutinesApi -internal class RedisSetCoroutinesCommandsImpl(private val ops: RedisSetReactiveCommands) : RedisSetCoroutinesCommands { +internal class RedisSetCoroutinesCommandsImpl(internal val ops: RedisSetReactiveCommands) : RedisSetCoroutinesCommands { override suspend fun sadd(key: K, vararg members: V): Long? = ops.sadd(key, *members).awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisSortedSetCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisSortedSetCoroutinesCommandsImpl.kt index 9c8e11fbc4..9ec45fd693 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisSortedSetCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisSortedSetCoroutinesCommandsImpl.kt @@ -34,7 +34,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull * @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesReactiveImplementation */ @ExperimentalLettuceCoroutinesApi -internal class RedisSortedSetCoroutinesCommandsImpl(private val ops: RedisSortedSetReactiveCommands) : RedisSortedSetCoroutinesCommands { +internal class RedisSortedSetCoroutinesCommandsImpl(internal val ops: RedisSortedSetReactiveCommands) : RedisSortedSetCoroutinesCommands { override suspend fun bzpopmin(timeout: Long, vararg keys: K): KeyValue>? = ops.bzpopmin(timeout, *keys).awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt index a8ca3fb4f6..1ea1e05c49 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStreamCoroutinesCommandsImpl.kt @@ -35,7 +35,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull * @since 5.1 */ @ExperimentalLettuceCoroutinesApi -internal class RedisStreamCoroutinesCommandsImpl(private val ops: RedisStreamReactiveCommands) : RedisStreamCoroutinesCommands { +internal class RedisStreamCoroutinesCommandsImpl(internal val ops: RedisStreamReactiveCommands) : RedisStreamCoroutinesCommands { override suspend fun xack(key: K, group: K, vararg messageIds: String): Long? = ops.xack(key, group, *messageIds).awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStringCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStringCoroutinesCommandsImpl.kt index 60890c5999..85ed20f536 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStringCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisStringCoroutinesCommandsImpl.kt @@ -33,7 +33,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull * @since 6.0 */ @ExperimentalLettuceCoroutinesApi -internal class RedisStringCoroutinesCommandsImpl(private val ops: RedisStringReactiveCommands) : RedisStringCoroutinesCommands { +internal class RedisStringCoroutinesCommandsImpl(internal val ops: RedisStringReactiveCommands) : RedisStringCoroutinesCommands { override suspend fun append(key: K, value: V): Long? = ops.append(key, value).awaitFirstOrNull() diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisTransactionalCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisTransactionalCoroutinesCommandsImpl.kt index a06f7380b8..9711db9451 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/RedisTransactionalCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/RedisTransactionalCoroutinesCommandsImpl.kt @@ -31,7 +31,7 @@ import kotlinx.coroutines.reactive.awaitLast * @since 6.0 */ @ExperimentalLettuceCoroutinesApi -internal class RedisTransactionalCoroutinesCommandsImpl(private val ops: RedisTransactionalReactiveCommands) : RedisTransactionalCoroutinesCommands { +internal class RedisTransactionalCoroutinesCommandsImpl(internal val ops: RedisTransactionalReactiveCommands) : RedisTransactionalCoroutinesCommands { override suspend fun discard(): String = ops.discard().awaitLast() diff --git a/src/main/kotlin/io/lettuce/core/cluster/api/coroutines/RedisClusterCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/cluster/api/coroutines/RedisClusterCoroutinesCommandsImpl.kt index 59ca8aa813..a7164131f0 100644 --- a/src/main/kotlin/io/lettuce/core/cluster/api/coroutines/RedisClusterCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/cluster/api/coroutines/RedisClusterCoroutinesCommandsImpl.kt @@ -33,7 +33,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull */ @ExperimentalLettuceCoroutinesApi internal class RedisClusterCoroutinesCommandsImpl( - private val ops: RedisClusterReactiveCommands + internal val ops: RedisClusterReactiveCommands ) : RedisClusterCoroutinesCommands, BaseRedisCoroutinesCommands by BaseRedisCoroutinesCommandsImpl(ops), RedisGeoCoroutinesCommands by RedisGeoCoroutinesCommandsImpl(ops), diff --git a/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt index eb70d2ccbe..18ee67e951 100644 --- a/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt @@ -34,7 +34,7 @@ import java.net.SocketAddress * @since 6.0 */ @ExperimentalLettuceCoroutinesApi -internal class RedisSentinelCoroutinesCommandsImpl(private val ops: RedisSentinelReactiveCommands) : RedisSentinelCoroutinesCommands { +internal class RedisSentinelCoroutinesCommandsImpl(internal val ops: RedisSentinelReactiveCommands) : RedisSentinelCoroutinesCommands { override suspend fun getMasterAddrByName(key: K): SocketAddress = ops.getMasterAddrByName(key).awaitLast() diff --git a/src/test/kotlin/io/lettuce/core/ScanFlowIntegrationTests.kt b/src/test/kotlin/io/lettuce/core/ScanFlowIntegrationTests.kt new file mode 100644 index 0000000000..e9f085014f --- /dev/null +++ b/src/test/kotlin/io/lettuce/core/ScanFlowIntegrationTests.kt @@ -0,0 +1,79 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core + +import io.lettuce.core.api.StatefulRedisConnection +import io.lettuce.core.api.coroutines +import io.lettuce.test.LettuceExtension +import kotlinx.coroutines.flow.count +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.junit.jupiter.api.extension.ExtendWith +import javax.inject.Inject + + +/** + * @author Mikhael Sokolov + */ +@ExtendWith(LettuceExtension::class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +internal class ScanFlowIntegrationTests @Inject constructor(private val connection: StatefulRedisConnection) : TestSupport() { + + @BeforeEach + fun setUp() { + connection.sync().flushall() + } + + @Test + fun `should scan iteratively`() = runBlocking { + with(connection.coroutines()) { + repeat(1000) { + set("key - $it", value) + } + assertThat(ScanFlow.scan(this, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250) + assertThat(ScanFlow.scan(this).count()).isEqualTo(1000) + } + } + + @Test + fun `should hscan iteratively`() = runBlocking { + with(connection.coroutines()) { + repeat(1000) { + hset(key, "field-$it", "value-$it") + } + + assertThat(ScanFlow.hscan(this, key, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250) + assertThat(ScanFlow.hscan(this, key).count()).isEqualTo(1000) + } + } + + @Test + fun shouldSscanIteratively() = runBlocking { + with(connection.coroutines()) { + repeat(1000) { + sadd(key, "value-$it") + } + + assertThat(ScanFlow.sscan(this, key, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250) + assertThat(ScanFlow.sscan(this, key).count()).isEqualTo(1000) + } + } +} \ No newline at end of file