Skip to content

Commit

Permalink
Add reactive scanning #638
Browse files Browse the repository at this point in the history
Lettuce now supports reactive scanning for all connections (Standalone, Master/Slave, Cluster). Progressive scanning respects demand and executes subsequent scan commands if the subscriber has signalled enough demand.

RedisReactiveCommands<String, String> reactive = …;

Flux<String> keys = ScanStream.scan(reactive);

Flux<ScoredValue<String>> scoredValues = ScanStream.zscan(reactive, key, ScanArgs.Builder.limit(200)); // set batch size to 200
  • Loading branch information
mp911de committed Oct 29, 2017
1 parent 691061d commit ee84ba9
Show file tree
Hide file tree
Showing 3 changed files with 393 additions and 0 deletions.
242 changes: 242 additions & 0 deletions src/main/java/io/lettuce/core/ScanStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core;

import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.reactivestreams.Publisher;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import io.lettuce.core.api.reactive.RedisHashReactiveCommands;
import io.lettuce.core.api.reactive.RedisKeyReactiveCommands;
import io.lettuce.core.api.reactive.RedisSetReactiveCommands;
import io.lettuce.core.api.reactive.RedisSortedSetReactiveCommands;
import io.lettuce.core.internal.LettuceAssert;

/**
* Scan command support exposed through {@link Flux}.
* <p>
* {@link ScanStream} uses reactive command interfaces to scan over keys ({@code SCAN}), sets ({@code SSCAN}), sorted sets (
* {@code ZSCAN}), and hashes ({@code HSCAN}).
* <p>
* Use {@link ScanArgs#limit(long)} to set the batch size.
* <p>
* Data structure scanning is progressive and stateful and demand-aware. It supports full iterations (until all received cursors
* are exhausted) and premature termination. Subsequent scan commands to fetch the cursor data get only issued if the subscriber
* signals demand.
*
* @author Mark Paluch
* @since 5.1
*/
public abstract class ScanStream {

private ScanStream() {
}

/**
* Sequentially iterate over keys in the keyspace. This method uses {@code SCAN} to perform an iterative scan.
*
* @param commands the commands interface, must not be {@literal null}.
* @param <K> Key type.
* @param <V> Value type.
* @return a new {@link Flux}.
*/
public static <K, V> Flux<K> scan(RedisKeyReactiveCommands<K, V> commands) {
return scan(commands, Optional.empty());
}

/**
* Sequentially iterate over keys in the keyspace. This method uses {@code SCAN} to perform an iterative scan.
*
* @param commands the commands interface, must not be {@literal null}.
* @param scanArgs the scan arguments, must not be {@literal null}.
* @param <K> Key type.
* @param <V> Value type.
* @return a new {@link Flux}.
*/
public static <K, V> Flux<K> scan(RedisKeyReactiveCommands<K, V> commands, ScanArgs scanArgs) {

LettuceAssert.notNull(scanArgs, "ScanArgs must not be null");

return scan(commands, Optional.of(scanArgs));
}

private static <K, V> Flux<K> scan(RedisKeyReactiveCommands<K, V> commands, Optional<ScanArgs> scanArgs) {

LettuceAssert.notNull(commands, "RedisKeyCommands must not be null");

Mono<KeyScanCursor<K>> res = scanArgs.map(commands::scan).orElseGet(commands::scan);

return res.flatMapMany(cursor -> scan(scanArgs, cursor, commands::scan, commands::scan, KeyScanCursor::getKeys));
}

/**
* Sequentially iterate over entries in a hash identified by {@code key}. This method uses {@code HSCAN} to perform an
* iterative scan.
*
* @param commands the commands interface, must not be {@literal null}.
* @param key the hash to scan.
* @param <K> Key type.
* @param <V> Value type.
* @return a new {@link Flux}.
*/
public static <K, V> Flux<KeyValue<K, V>> hscan(RedisHashReactiveCommands<K, V> commands, K key) {
return hscan(commands, key, Optional.empty());
}

/**
* Sequentially iterate over entries in a hash identified by {@code key}. This method uses {@code HSCAN} to perform an
* iterative scan.
*
* @param commands the commands interface, must not be {@literal null}.
* @param key the hash to scan.
* @param scanArgs the scan arguments, must not be {@literal null}.
* @param <K> Key type.
* @param <V> Value type.
* @return a new {@link Flux}.
*/
public static <K, V> Flux<KeyValue<K, V>> hscan(RedisHashReactiveCommands<K, V> commands, K key, ScanArgs scanArgs) {

LettuceAssert.notNull(scanArgs, "ScanArgs must not be null");

return hscan(commands, key, Optional.of(scanArgs));
}

private static <K, V> Flux<KeyValue<K, V>> hscan(RedisHashReactiveCommands<K, V> commands, K key,
Optional<ScanArgs> scanArgs) {

LettuceAssert.notNull(commands, "RedisHashReactiveCommands must not be null");
LettuceAssert.notNull(key, "Key must not be null");

Mono<MapScanCursor<K, V>> res = scanArgs.map(it -> commands.hscan(key, it)).orElseGet(() -> commands.hscan(key));

return res.flatMapMany(cursor -> scan(scanArgs, cursor, //
c -> commands.hscan(key, c), //
(c, args) -> commands.hscan(key, c, args), //
c -> c.getMap().entrySet())).map(me -> KeyValue.fromNullable(me.getKey(), me.getValue()));
}

/**
* Sequentially iterate over elements in a set identified by {@code key}. This method uses {@code SSCAN} to perform an
* iterative scan.
*
* @param commands the commands interface, must not be {@literal null}.
* @param key the set to scan.
* @param <K> Key type.
* @param <V> Value type.
* @return a new {@link Flux}.
*/
public static <K, V> Flux<V> sscan(RedisSetReactiveCommands<K, V> commands, K key) {
return sscan(commands, key, Optional.empty());
}

/**
* Sequentially iterate over elements in a set identified by {@code key}. This method uses {@code SSCAN} to perform an
* iterative scan.
*
* @param commands the commands interface, must not be {@literal null}.
* @param key the set to scan.
* @param scanArgs the scan arguments, must not be {@literal null}.
* @param <K> Key type.
* @param <V> Value type.
* @return a new {@link Flux}.
*/
public static <K, V> Flux<V> sscan(RedisSetReactiveCommands<K, V> commands, K key, ScanArgs scanArgs) {

LettuceAssert.notNull(scanArgs, "ScanArgs must not be null");

return sscan(commands, key, Optional.of(scanArgs));
}

private static <K, V> Flux<V> sscan(RedisSetReactiveCommands<K, V> commands, K key, Optional<ScanArgs> scanArgs) {

LettuceAssert.notNull(commands, "RedisSetReactiveCommands must not be null");
LettuceAssert.notNull(key, "Key must not be null");

Mono<ValueScanCursor<V>> res = scanArgs.map(it -> commands.sscan(key, it)).orElseGet(() -> commands.sscan(key));

return res.flatMapMany(cursor -> scan(scanArgs, cursor, //
c -> commands.sscan(key, c), //
(c, args) -> commands.sscan(key, c, args), //
ValueScanCursor::getValues));
}

/**
* Sequentially iterate over elements in a set identified by {@code key}. This method uses {@code SSCAN} to perform an
* iterative scan.
*
* @param commands the commands interface, must not be {@literal null}.
* @param key the sorted set to scan.
* @param <K> Key type.
* @param <V> Value type.
* @return a new {@link Flux}.
*/
public static <K, V> Flux<ScoredValue<V>> zscan(RedisSortedSetReactiveCommands<K, V> commands, K key) {
return zscan(commands, key, Optional.empty());
}

/**
* Sequentially iterate over elements in a set identified by {@code key}. This method uses {@code SSCAN} to perform an
* iterative scan.
*
* @param commands the commands interface, must not be {@literal null}.
* @param key the sorted set to scan.
* @param scanArgs the scan arguments, must not be {@literal null}.
* @param <K> Key type.
* @param <V> Value type.
* @return a new {@link Flux}.
*/
public static <K, V> Flux<ScoredValue<V>> zscan(RedisSortedSetReactiveCommands<K, V> commands, K key, ScanArgs scanArgs) {

LettuceAssert.notNull(scanArgs, "ScanArgs must not be null");

return zscan(commands, key, Optional.of(scanArgs));
}

private static <K, V> Flux<ScoredValue<V>> zscan(RedisSortedSetReactiveCommands<K, V> commands, K key,
Optional<ScanArgs> scanArgs) {

LettuceAssert.notNull(commands, "RedisSortedSetReactiveCommands must not be null");
LettuceAssert.notNull(key, "Key must not be null");

Mono<ScoredValueScanCursor<V>> res = scanArgs.map(it -> commands.zscan(key, it)).orElseGet(() -> commands.zscan(key));

return res.flatMapMany(cursor -> scan(scanArgs, cursor, //
c -> commands.zscan(key, c), //
(c, args) -> commands.zscan(key, c, args), //
ScoredValueScanCursor::getValues));
}

private static <V, C extends ScanCursor> Publisher<V> scan(Optional<ScanArgs> scanArgs, C cursor,
Function<ScanCursor, Mono<C>> scanFunction, BiFunction<ScanCursor, ScanArgs, Mono<C>> scanWithArgsFunction,
Function<C, Iterable<V>> manyMapper) {

Flux<V> stream = Flux.fromIterable(manyMapper.apply(cursor));

if (!cursor.isFinished()) {

Mono<C> mono = scanArgs.map(args -> scanWithArgsFunction.apply(cursor, args)).orElseGet(
() -> scanFunction.apply(cursor));
return stream.concatWith(mono.flatMapMany(nextCursor -> scan(scanArgs, nextCursor, scanFunction,
scanWithArgsFunction, manyMapper)));
}

return stream;
}
}
88 changes: 88 additions & 0 deletions src/test/java/io/lettuce/core/ScanStreamTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core;

import java.util.List;

import org.junit.Test;

import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import io.lettuce.core.api.reactive.RedisReactiveCommands;

/**
* @author Mark Paluch
*/
public class ScanStreamTest extends AbstractRedisClientTest {

@Test
public void shouldScanIteratively() {

for (int i = 0; i < 1000; i++) {
redis.set("key-" + i, value);
}
ScanIterator<String> scan = ScanIterator.scan(redis);
List<String> list = Flux.fromIterable(() -> scan).collectList().block();

RedisReactiveCommands<String, String> reactive = redis.getStatefulConnection().reactive();

StepVerifier.create(ScanStream.scan(reactive, ScanArgs.Builder.limit(200)).take(250)).expectNextCount(250)
.verifyComplete();
StepVerifier.create(ScanStream.scan(reactive)).expectNextSequence(list).verifyComplete();
}

@Test
public void shouldHscanIteratively() {

for (int i = 0; i < 1000; i++) {
redis.hset(key, "field-" + i, "value-" + i);
}

RedisReactiveCommands<String, String> reactive = redis.getStatefulConnection().reactive();

StepVerifier.create(ScanStream.hscan(reactive, key, ScanArgs.Builder.limit(200)).take(250)).expectNextCount(250)
.verifyComplete();
StepVerifier.create(ScanStream.hscan(reactive, key)).expectNextCount(1000).verifyComplete();
}

@Test
public void shouldSscanIteratively() {

for (int i = 0; i < 1000; i++) {
redis.sadd(key, "value-" + i);
}

RedisReactiveCommands<String, String> reactive = redis.getStatefulConnection().reactive();

StepVerifier.create(ScanStream.sscan(reactive, key, ScanArgs.Builder.limit(200)).take(250)).expectNextCount(250)
.verifyComplete();
StepVerifier.create(ScanStream.sscan(reactive, key)).expectNextCount(1000).verifyComplete();
}

@Test
public void shouldZscanIteratively() {

for (int i = 0; i < 1000; i++) {
redis.zadd(key, (double) i, "value-" + i);
}

RedisReactiveCommands<String, String> reactive = redis.getStatefulConnection().reactive();

StepVerifier.create(ScanStream.zscan(reactive, key, ScanArgs.Builder.limit(200)).take(250)).expectNextCount(250)
.verifyComplete();
StepVerifier.create(ScanStream.zscan(reactive, key)).expectNextCount(1000).verifyComplete();
}
}
63 changes: 63 additions & 0 deletions src/test/java/io/lettuce/core/cluster/ScanStreamTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.cluster;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import reactor.test.StepVerifier;
import io.lettuce.core.ScanArgs;
import io.lettuce.core.ScanStream;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;

/**
* @author Mark Paluch
*/
public class ScanStreamTest extends AbstractClusterTest {

private StatefulRedisClusterConnection<String, String> connection;
private RedisAdvancedClusterCommands<String, String> redis;

@Before
public void before() throws Exception {

this.connection = clusterClient.connect();
this.redis = this.connection.sync();
this.redis.flushall();
}

@After
public void tearDown() {
this.connection.close();
}

@Test
public void shouldScanIteratively() {

for (int i = 0; i < 1000; i++) {
redis.set("key-" + i, value);
}

RedisAdvancedClusterReactiveCommands<String, String> reactive = connection.reactive();

StepVerifier.create(ScanStream.scan(reactive, ScanArgs.Builder.limit(200)).take(250)).expectNextCount(250)
.verifyComplete();
StepVerifier.create(ScanStream.scan(reactive)).expectNextCount(1000).verifyComplete();
}
}

0 comments on commit ee84ba9

Please sign in to comment.