Skip to content

Commit

Permalink
Polishing #1435
Browse files Browse the repository at this point in the history
Update what's new section. Remove JvmOverloads annotation since ScanFlow isn't intended to be used from Java or other JVM languages. Add support and test for zscan. Reduce iterations to 300. Remove superfluous kotlin-stdlib-jdk8 dependency.

Add since tag and license header.

Original pull request: #1438.
  • Loading branch information
mp911de committed Nov 2, 2020
1 parent 9297d0c commit feea1e6
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 21 deletions.
7 changes: 0 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
<scope>test</scope>
</dependency>

<!-- CDI -->

<dependency>
Expand Down
6 changes: 6 additions & 0 deletions src/main/asciidoc/new-features.adoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
[[new-features]]
= New & Noteworthy

[[new-features.6-1-0]]
== What's new in Lettuce 6.1

* Kotlin Coroutines support for `SCAN`/`HSCAN`/`SSCAN`/`ZSCAN` through `ScanFlow`.

[[new-features.6-0-0]]

== What's new in Lettuce 6.0

* Support for RESP3 usage with Redis 6 along with RESP2/RESP3 handshake and protocol version discovery.
Expand Down
52 changes: 46 additions & 6 deletions src/main/kotlin/io/lettuce/core/ScanFlow.kt
Original file line number Diff line number Diff line change
@@ -1,19 +1,42 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core

import io.lettuce.core.api.coroutines.*
import io.lettuce.core.cluster.api.coroutines.RedisClusterCoroutinesCommandsImpl
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow

/**
* Coroutines adapter for [ScanStream].
*
* @author Mikhael Sokolov
* @author Mark Paluch
* @since 6.1
*/
@ExperimentalLettuceCoroutinesApi
object ScanFlow {

/**
* Sequentially iterate the keys space.
*
* @param commands coroutines commands
* @param scanArgs scan arguments.
* @return `Flow<K>` flow of keys.
*/
@JvmOverloads
fun <K : Any, V : Any> scan(commands: RedisKeyCoroutinesCommands<K, V>, scanArgs: ScanArgs? = null): Flow<K> {
val ops = when (commands) {
is RedisCoroutinesCommandsImpl -> commands.ops
Expand All @@ -27,7 +50,6 @@ object ScanFlow {
}.asFlow()
}


/**
* Sequentially iterate hash fields and associated values.
*
Expand All @@ -36,7 +58,6 @@ object ScanFlow {
* @param scanArgs scan arguments.
* @return `Flow<KeyValue<K, V>>` flow of key-values.
*/
@JvmOverloads
fun <K : Any, V : Any> hscan(commands: RedisHashCoroutinesCommands<K, V>, key: K, scanArgs: ScanArgs? = null): Flow<KeyValue<K, V>> {
val ops = when (commands) {
is RedisCoroutinesCommandsImpl -> commands.ops
Expand All @@ -50,7 +71,6 @@ object ScanFlow {
}.asFlow()
}


/**
* Sequentially iterate Set elements.
*
Expand All @@ -59,7 +79,6 @@ object ScanFlow {
* @param scanArgs scan arguments.
* @return `Flow<V>` flow of value.
*/
@JvmOverloads
fun <K : Any, V : Any> sscan(commands: RedisSetCoroutinesCommands<K, V>, key: K, scanArgs: ScanArgs? = null): Flow<V> {
val ops = when (commands) {
is RedisCoroutinesCommandsImpl -> commands.ops
Expand All @@ -72,4 +91,25 @@ object ScanFlow {
else -> ScanStream.sscan(ops, key, scanArgs)
}.asFlow()
}
}

/**
* Sequentially iterate Sorted Set elements.
*
* @param commands coroutines commands
* @param key the key.
* @param scanArgs scan arguments.
* @return `Flow<V>` flow of [ScoredValue].
*/
fun <K : Any, V : Any> zscan(commands: RedisSortedSetCoroutinesCommands<K, V>, key: K, scanArgs: ScanArgs? = null): Flow<ScoredValue<V>> {
val ops = when (commands) {
is RedisCoroutinesCommandsImpl -> commands.ops
is RedisClusterCoroutinesCommandsImpl -> commands.ops
is RedisSortedSetCoroutinesCommandsImpl -> commands.ops
else -> throw IllegalArgumentException("Cannot access underlying reactive API")
}
return when (scanArgs) {
null -> ScanStream.zscan(ops, key)
else -> ScanStream.zscan(ops, key, scanArgs)
}.asFlow()
}
}
32 changes: 24 additions & 8 deletions src/test/kotlin/io/lettuce/core/ScanFlowIntegrationTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith
import javax.inject.Inject


/**
* Integration tests for [ScanFlow].
*
* @author Mikhael Sokolov
* @author Mark Paluch
*/
@ExtendWith(LettuceExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class ScanFlowIntegrationTests @Inject constructor(private val connection: StatefulRedisConnection<String, String>) : TestSupport() {

val iterations = 300

@BeforeEach
fun setUp() {
connection.sync().flushall()
Expand All @@ -45,35 +49,47 @@ internal class ScanFlowIntegrationTests @Inject constructor(private val connecti
@Test
fun `should scan iteratively`() = runBlocking<Unit> {
with(connection.coroutines()) {
repeat(1000) {
repeat(iterations) {
set("key - $it", value)
}
assertThat(ScanFlow.scan(this, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250)
assertThat(ScanFlow.scan(this).count()).isEqualTo(1000)
assertThat(ScanFlow.scan(this).count()).isEqualTo(iterations)
}
}

@Test
fun `should hscan iteratively`() = runBlocking<Unit> {
with(connection.coroutines()) {
repeat(1000) {
repeat(iterations) {
hset(key, "field-$it", "value-$it")
}

assertThat(ScanFlow.hscan(this, key, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250)
assertThat(ScanFlow.hscan(this, key).count()).isEqualTo(1000)
assertThat(ScanFlow.hscan(this, key).count()).isEqualTo(iterations)
}
}

@Test
fun shouldSscanIteratively() = runBlocking<Unit> {
with(connection.coroutines()) {
repeat(1000) {
repeat(iterations) {
sadd(key, "value-$it")
}

assertThat(ScanFlow.sscan(this, key, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250)
assertThat(ScanFlow.sscan(this, key).count()).isEqualTo(1000)
assertThat(ScanFlow.sscan(this, key).count()).isEqualTo(iterations)
}
}

@Test
fun shouldZscanIteratively() = runBlocking<Unit> {
with(connection.coroutines()) {
repeat(iterations) {
zadd(key, 1001.0, "value-$it")
}

assertThat(ScanFlow.zscan(this, key, ScanArgs.Builder.limit(200)).take(250).toList()).hasSize(250)
assertThat(ScanFlow.zscan(this, key).count()).isEqualTo(iterations)
}
}
}
}

0 comments on commit feea1e6

Please sign in to comment.