diff --git a/src/main/java/io/lettuce/core/ScanStream.java b/src/main/java/io/lettuce/core/ScanStream.java new file mode 100644 index 0000000000..3e617cb88a --- /dev/null +++ b/src/main/java/io/lettuce/core/ScanStream.java @@ -0,0 +1,242 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core; + +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.Function; + +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import io.lettuce.core.api.reactive.RedisHashReactiveCommands; +import io.lettuce.core.api.reactive.RedisKeyReactiveCommands; +import io.lettuce.core.api.reactive.RedisSetReactiveCommands; +import io.lettuce.core.api.reactive.RedisSortedSetReactiveCommands; +import io.lettuce.core.internal.LettuceAssert; + +/** + * Scan command support exposed through {@link Flux}. + *

+ * {@link ScanStream} uses reactive command interfaces to scan over keys ({@code SCAN}), sets ({@code SSCAN}), sorted sets ( + * {@code ZSCAN}), and hashes ({@code HSCAN}). + *

+ * Use {@link ScanArgs#limit(long)} to set the batch size. + *

+ * Data structure scanning is progressive and stateful and demand-aware. It supports full iterations (until all received cursors + * are exhausted) and premature termination. Subsequent scan commands to fetch the cursor data get only issued if the subscriber + * signals demand. + * + * @author Mark Paluch + * @since 5.1 + */ +public abstract class ScanStream { + + private ScanStream() { + } + + /** + * Sequentially iterate over keys in the keyspace. This method uses {@code SCAN} to perform an iterative scan. + * + * @param commands the commands interface, must not be {@literal null}. + * @param Key type. + * @param Value type. + * @return a new {@link Flux}. + */ + public static Flux scan(RedisKeyReactiveCommands commands) { + return scan(commands, Optional.empty()); + } + + /** + * Sequentially iterate over keys in the keyspace. This method uses {@code SCAN} to perform an iterative scan. + * + * @param commands the commands interface, must not be {@literal null}. + * @param scanArgs the scan arguments, must not be {@literal null}. + * @param Key type. + * @param Value type. + * @return a new {@link Flux}. + */ + public static Flux scan(RedisKeyReactiveCommands commands, ScanArgs scanArgs) { + + LettuceAssert.notNull(scanArgs, "ScanArgs must not be null"); + + return scan(commands, Optional.of(scanArgs)); + } + + private static Flux scan(RedisKeyReactiveCommands commands, Optional scanArgs) { + + LettuceAssert.notNull(commands, "RedisKeyCommands must not be null"); + + Mono> res = scanArgs.map(commands::scan).orElseGet(commands::scan); + + return res.flatMapMany(cursor -> scan(scanArgs, cursor, commands::scan, commands::scan, KeyScanCursor::getKeys)); + } + + /** + * Sequentially iterate over entries in a hash identified by {@code key}. This method uses {@code HSCAN} to perform an + * iterative scan. + * + * @param commands the commands interface, must not be {@literal null}. + * @param key the hash to scan. + * @param Key type. + * @param Value type. + * @return a new {@link Flux}. + */ + public static Flux> hscan(RedisHashReactiveCommands commands, K key) { + return hscan(commands, key, Optional.empty()); + } + + /** + * Sequentially iterate over entries in a hash identified by {@code key}. This method uses {@code HSCAN} to perform an + * iterative scan. + * + * @param commands the commands interface, must not be {@literal null}. + * @param key the hash to scan. + * @param scanArgs the scan arguments, must not be {@literal null}. + * @param Key type. + * @param Value type. + * @return a new {@link Flux}. + */ + public static Flux> hscan(RedisHashReactiveCommands commands, K key, ScanArgs scanArgs) { + + LettuceAssert.notNull(scanArgs, "ScanArgs must not be null"); + + return hscan(commands, key, Optional.of(scanArgs)); + } + + private static Flux> hscan(RedisHashReactiveCommands commands, K key, + Optional scanArgs) { + + LettuceAssert.notNull(commands, "RedisHashReactiveCommands must not be null"); + LettuceAssert.notNull(key, "Key must not be null"); + + Mono> res = scanArgs.map(it -> commands.hscan(key, it)).orElseGet(() -> commands.hscan(key)); + + return res.flatMapMany(cursor -> scan(scanArgs, cursor, // + c -> commands.hscan(key, c), // + (c, args) -> commands.hscan(key, c, args), // + c -> c.getMap().entrySet())).map(me -> KeyValue.fromNullable(me.getKey(), me.getValue())); + } + + /** + * Sequentially iterate over elements in a set identified by {@code key}. This method uses {@code SSCAN} to perform an + * iterative scan. + * + * @param commands the commands interface, must not be {@literal null}. + * @param key the set to scan. + * @param Key type. + * @param Value type. + * @return a new {@link Flux}. + */ + public static Flux sscan(RedisSetReactiveCommands commands, K key) { + return sscan(commands, key, Optional.empty()); + } + + /** + * Sequentially iterate over elements in a set identified by {@code key}. This method uses {@code SSCAN} to perform an + * iterative scan. + * + * @param commands the commands interface, must not be {@literal null}. + * @param key the set to scan. + * @param scanArgs the scan arguments, must not be {@literal null}. + * @param Key type. + * @param Value type. + * @return a new {@link Flux}. + */ + public static Flux sscan(RedisSetReactiveCommands commands, K key, ScanArgs scanArgs) { + + LettuceAssert.notNull(scanArgs, "ScanArgs must not be null"); + + return sscan(commands, key, Optional.of(scanArgs)); + } + + private static Flux sscan(RedisSetReactiveCommands commands, K key, Optional scanArgs) { + + LettuceAssert.notNull(commands, "RedisSetReactiveCommands must not be null"); + LettuceAssert.notNull(key, "Key must not be null"); + + Mono> res = scanArgs.map(it -> commands.sscan(key, it)).orElseGet(() -> commands.sscan(key)); + + return res.flatMapMany(cursor -> scan(scanArgs, cursor, // + c -> commands.sscan(key, c), // + (c, args) -> commands.sscan(key, c, args), // + ValueScanCursor::getValues)); + } + + /** + * Sequentially iterate over elements in a set identified by {@code key}. This method uses {@code SSCAN} to perform an + * iterative scan. + * + * @param commands the commands interface, must not be {@literal null}. + * @param key the sorted set to scan. + * @param Key type. + * @param Value type. + * @return a new {@link Flux}. + */ + public static Flux> zscan(RedisSortedSetReactiveCommands commands, K key) { + return zscan(commands, key, Optional.empty()); + } + + /** + * Sequentially iterate over elements in a set identified by {@code key}. This method uses {@code SSCAN} to perform an + * iterative scan. + * + * @param commands the commands interface, must not be {@literal null}. + * @param key the sorted set to scan. + * @param scanArgs the scan arguments, must not be {@literal null}. + * @param Key type. + * @param Value type. + * @return a new {@link Flux}. + */ + public static Flux> zscan(RedisSortedSetReactiveCommands commands, K key, ScanArgs scanArgs) { + + LettuceAssert.notNull(scanArgs, "ScanArgs must not be null"); + + return zscan(commands, key, Optional.of(scanArgs)); + } + + private static Flux> zscan(RedisSortedSetReactiveCommands commands, K key, + Optional scanArgs) { + + LettuceAssert.notNull(commands, "RedisSortedSetReactiveCommands must not be null"); + LettuceAssert.notNull(key, "Key must not be null"); + + Mono> res = scanArgs.map(it -> commands.zscan(key, it)).orElseGet(() -> commands.zscan(key)); + + return res.flatMapMany(cursor -> scan(scanArgs, cursor, // + c -> commands.zscan(key, c), // + (c, args) -> commands.zscan(key, c, args), // + ScoredValueScanCursor::getValues)); + } + + private static Publisher scan(Optional scanArgs, C cursor, + Function> scanFunction, BiFunction> scanWithArgsFunction, + Function> manyMapper) { + + Flux stream = Flux.fromIterable(manyMapper.apply(cursor)); + + if (!cursor.isFinished()) { + + Mono mono = scanArgs.map(args -> scanWithArgsFunction.apply(cursor, args)).orElseGet( + () -> scanFunction.apply(cursor)); + return stream.concatWith(mono.flatMapMany(nextCursor -> scan(scanArgs, nextCursor, scanFunction, + scanWithArgsFunction, manyMapper))); + } + + return stream; + } +} diff --git a/src/test/java/io/lettuce/core/ScanStreamTest.java b/src/test/java/io/lettuce/core/ScanStreamTest.java new file mode 100644 index 0000000000..bc561097c4 --- /dev/null +++ b/src/test/java/io/lettuce/core/ScanStreamTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core; + +import java.util.List; + +import org.junit.Test; + +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; +import io.lettuce.core.api.reactive.RedisReactiveCommands; + +/** + * @author Mark Paluch + */ +public class ScanStreamTest extends AbstractRedisClientTest { + + @Test + public void shouldScanIteratively() { + + for (int i = 0; i < 1000; i++) { + redis.set("key-" + i, value); + } + ScanIterator scan = ScanIterator.scan(redis); + List list = Flux.fromIterable(() -> scan).collectList().block(); + + RedisReactiveCommands reactive = redis.getStatefulConnection().reactive(); + + StepVerifier.create(ScanStream.scan(reactive, ScanArgs.Builder.limit(200)).take(250)).expectNextCount(250) + .verifyComplete(); + StepVerifier.create(ScanStream.scan(reactive)).expectNextSequence(list).verifyComplete(); + } + + @Test + public void shouldHscanIteratively() { + + for (int i = 0; i < 1000; i++) { + redis.hset(key, "field-" + i, "value-" + i); + } + + RedisReactiveCommands reactive = redis.getStatefulConnection().reactive(); + + StepVerifier.create(ScanStream.hscan(reactive, key, ScanArgs.Builder.limit(200)).take(250)).expectNextCount(250) + .verifyComplete(); + StepVerifier.create(ScanStream.hscan(reactive, key)).expectNextCount(1000).verifyComplete(); + } + + @Test + public void shouldSscanIteratively() { + + for (int i = 0; i < 1000; i++) { + redis.sadd(key, "value-" + i); + } + + RedisReactiveCommands reactive = redis.getStatefulConnection().reactive(); + + StepVerifier.create(ScanStream.sscan(reactive, key, ScanArgs.Builder.limit(200)).take(250)).expectNextCount(250) + .verifyComplete(); + StepVerifier.create(ScanStream.sscan(reactive, key)).expectNextCount(1000).verifyComplete(); + } + + @Test + public void shouldZscanIteratively() { + + for (int i = 0; i < 1000; i++) { + redis.zadd(key, (double) i, "value-" + i); + } + + RedisReactiveCommands reactive = redis.getStatefulConnection().reactive(); + + StepVerifier.create(ScanStream.zscan(reactive, key, ScanArgs.Builder.limit(200)).take(250)).expectNextCount(250) + .verifyComplete(); + StepVerifier.create(ScanStream.zscan(reactive, key)).expectNextCount(1000).verifyComplete(); + } +} diff --git a/src/test/java/io/lettuce/core/cluster/ScanStreamTest.java b/src/test/java/io/lettuce/core/cluster/ScanStreamTest.java new file mode 100644 index 0000000000..dba1f81aa2 --- /dev/null +++ b/src/test/java/io/lettuce/core/cluster/ScanStreamTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.cluster; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import reactor.test.StepVerifier; +import io.lettuce.core.ScanArgs; +import io.lettuce.core.ScanStream; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands; +import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; + +/** + * @author Mark Paluch + */ +public class ScanStreamTest extends AbstractClusterTest { + + private StatefulRedisClusterConnection connection; + private RedisAdvancedClusterCommands redis; + + @Before + public void before() throws Exception { + + this.connection = clusterClient.connect(); + this.redis = this.connection.sync(); + this.redis.flushall(); + } + + @After + public void tearDown() { + this.connection.close(); + } + + @Test + public void shouldScanIteratively() { + + for (int i = 0; i < 1000; i++) { + redis.set("key-" + i, value); + } + + RedisAdvancedClusterReactiveCommands reactive = connection.reactive(); + + StepVerifier.create(ScanStream.scan(reactive, ScanArgs.Builder.limit(200)).take(250)).expectNextCount(250) + .verifyComplete(); + StepVerifier.create(ScanStream.scan(reactive)).expectNextCount(1000).verifyComplete(); + } +}