diff --git a/pom.xml b/pom.xml
index 7e1e639c54..9f79cad458 100644
--- a/pom.xml
+++ b/pom.xml
@@ -328,13 +328,6 @@
test
-
- org.jetbrains.kotlin
- kotlin-stdlib-jdk8
- ${kotlin.version}
- test
-
-
diff --git a/src/main/asciidoc/new-features.adoc b/src/main/asciidoc/new-features.adoc
index b28dc2389a..9967454023 100644
--- a/src/main/asciidoc/new-features.adoc
+++ b/src/main/asciidoc/new-features.adoc
@@ -1,7 +1,13 @@
[[new-features]]
= New & Noteworthy
+[[new-features.6-1-0]]
+== What's new in Lettuce 6.1
+
+* Kotlin Coroutines support for `SCAN`/`HSCAN`/`SSCAN`/`ZSCAN` through `ScanFlow`.
+
[[new-features.6-0-0]]
+
== What's new in Lettuce 6.0
* Support for RESP3 usage with Redis 6 along with RESP2/RESP3 handshake and protocol version discovery.
diff --git a/src/main/kotlin/io/lettuce/core/ScanFlow.kt b/src/main/kotlin/io/lettuce/core/ScanFlow.kt
index 1993525775..ca35482f0d 100644
--- a/src/main/kotlin/io/lettuce/core/ScanFlow.kt
+++ b/src/main/kotlin/io/lettuce/core/ScanFlow.kt
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package io.lettuce.core
import io.lettuce.core.api.coroutines.*
@@ -5,7 +20,16 @@ import io.lettuce.core.cluster.api.coroutines.RedisClusterCoroutinesCommandsImpl
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
+/**
+ * Coroutines adapter for [ScanStream].
+ *
+ * @author Mikhael Sokolov
+ * @author Mark Paluch
+ * @since 6.1
+ */
+@ExperimentalLettuceCoroutinesApi
object ScanFlow {
+
/**
* Sequentially iterate the keys space.
*
@@ -13,7 +37,6 @@ object ScanFlow {
* @param scanArgs scan arguments.
* @return `Flow` flow of keys.
*/
- @JvmOverloads
fun scan(commands: RedisKeyCoroutinesCommands, scanArgs: ScanArgs? = null): Flow {
val ops = when (commands) {
is RedisCoroutinesCommandsImpl -> commands.ops
@@ -27,7 +50,6 @@ object ScanFlow {
}.asFlow()
}
-
/**
* Sequentially iterate hash fields and associated values.
*
@@ -36,7 +58,6 @@ object ScanFlow {
* @param scanArgs scan arguments.
* @return `Flow>` flow of key-values.
*/
- @JvmOverloads
fun hscan(commands: RedisHashCoroutinesCommands, key: K, scanArgs: ScanArgs? = null): Flow> {
val ops = when (commands) {
is RedisCoroutinesCommandsImpl -> commands.ops
@@ -50,7 +71,6 @@ object ScanFlow {
}.asFlow()
}
-
/**
* Sequentially iterate Set elements.
*
@@ -59,7 +79,6 @@ object ScanFlow {
* @param scanArgs scan arguments.
* @return `Flow` flow of value.
*/
- @JvmOverloads
fun sscan(commands: RedisSetCoroutinesCommands, key: K, scanArgs: ScanArgs? = null): Flow {
val ops = when (commands) {
is RedisCoroutinesCommandsImpl -> commands.ops
@@ -72,4 +91,25 @@ object ScanFlow {
else -> ScanStream.sscan(ops, key, scanArgs)
}.asFlow()
}
-}
\ No newline at end of file
+
+ /**
+ * Sequentially iterate Sorted Set elements.
+ *
+ * @param commands coroutines commands
+ * @param key the key.
+ * @param scanArgs scan arguments.
+ * @return `Flow` flow of [ScoredValue].
+ */
+ fun zscan(commands: RedisSortedSetCoroutinesCommands, key: K, scanArgs: ScanArgs? = null): Flow> {
+ val ops = when (commands) {
+ is RedisCoroutinesCommandsImpl -> commands.ops
+ is RedisClusterCoroutinesCommandsImpl -> commands.ops
+ is RedisSortedSetCoroutinesCommandsImpl -> commands.ops
+ else -> throw IllegalArgumentException("Cannot access underlying reactive API")
+ }
+ return when (scanArgs) {
+ null -> ScanStream.zscan(ops, key)
+ else -> ScanStream.zscan(ops, key, scanArgs)
+ }.asFlow()
+ }
+}
diff --git a/src/test/kotlin/io/lettuce/core/ScanFlowIntegrationTests.kt b/src/test/kotlin/io/lettuce/core/ScanFlowIntegrationTests.kt
index e9f085014f..e0fa64c1bf 100644
--- a/src/test/kotlin/io/lettuce/core/ScanFlowIntegrationTests.kt
+++ b/src/test/kotlin/io/lettuce/core/ScanFlowIntegrationTests.kt
@@ -29,14 +29,18 @@ import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith
import javax.inject.Inject
-
/**
+ * Integration tests for [ScanFlow].
+ *
* @author Mikhael Sokolov
+ * @author Mark Paluch
*/
@ExtendWith(LettuceExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class ScanFlowIntegrationTests @Inject constructor(private val connection: StatefulRedisConnection) : TestSupport() {
+ val iterations = 300
+
@BeforeEach
fun setUp() {
connection.sync().flushall()
@@ -45,35 +49,47 @@ internal class ScanFlowIntegrationTests @Inject constructor(private val connecti
@Test
fun `should scan iteratively`() = runBlocking {
with(connection.coroutines()) {
- repeat(1000) {
+ repeat(iterations) {
set("key - $it", value)
}
assertThat(ScanFlow.scan(this, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250)
- assertThat(ScanFlow.scan(this).count()).isEqualTo(1000)
+ assertThat(ScanFlow.scan(this).count()).isEqualTo(iterations)
}
}
@Test
fun `should hscan iteratively`() = runBlocking {
with(connection.coroutines()) {
- repeat(1000) {
+ repeat(iterations) {
hset(key, "field-$it", "value-$it")
}
assertThat(ScanFlow.hscan(this, key, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250)
- assertThat(ScanFlow.hscan(this, key).count()).isEqualTo(1000)
+ assertThat(ScanFlow.hscan(this, key).count()).isEqualTo(iterations)
}
}
@Test
fun shouldSscanIteratively() = runBlocking {
with(connection.coroutines()) {
- repeat(1000) {
+ repeat(iterations) {
sadd(key, "value-$it")
}
assertThat(ScanFlow.sscan(this, key, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250)
- assertThat(ScanFlow.sscan(this, key).count()).isEqualTo(1000)
+ assertThat(ScanFlow.sscan(this, key).count()).isEqualTo(iterations)
+ }
+ }
+
+ @Test
+ fun shouldZscanIteratively() = runBlocking {
+ with(connection.coroutines()) {
+ repeat(iterations) {
+ zadd(key, 1001.0, "value-$it")
+ }
+
+ assertThat(ScanFlow.zscan(this, key, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250)
+ assertThat(ScanFlow.zscan(this, key).count()).isEqualTo(iterations)
}
}
-}
\ No newline at end of file
+}