Skip to content

Commit

Permalink
Introduce round-robining for scan and ft.search commands (#3334)
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 authored Apr 7, 2023
1 parent 3b46620 commit 58f5761
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 3 deletions.
43 changes: 43 additions & 0 deletions src/main/java/redis/clients/jedis/ScanRoundRobin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package redis.clients.jedis;

import java.util.function.Function;
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.resps.ScanResult;
import redis.clients.jedis.util.JedisRoundRobinBase;

public class ScanRoundRobin extends JedisRoundRobinBase<ScanResult<String>> {

private final int count;
private final Function<String, CommandArguments> args;

public ScanRoundRobin(ConnectionProvider connectionProvider, int batchCount, String match) {
super(connectionProvider, BuilderFactory.SCAN_RESPONSE);
this.count = batchCount;
this.args = (cursor) -> new CommandArguments(Protocol.Command.SCAN).add(cursor)
.add(Keyword.MATCH).add(match).add(Keyword.COUNT).add(count);
}

public ScanRoundRobin(ConnectionProvider connectionProvider, int batchCount, String match, String type) {
super(connectionProvider, BuilderFactory.SCAN_RESPONSE);
this.count = batchCount;
this.args = (cursor) -> new CommandArguments(Protocol.Command.SCAN).add(cursor)
.add(Keyword.MATCH).add(match).add(Keyword.COUNT).add(count).add(Keyword.TYPE).add(type);
}

@Override
protected boolean isIterationCompleted(ScanResult<String> reply) {
return reply.isCompleteIteration();
}

@Override
protected CommandArguments initCommandArguments() {
return args.apply(ScanParams.SCAN_POINTER_START);
}

@Override
protected CommandArguments nextCommandArguments(ScanResult<String> lastReply) {
return args.apply(lastReply.getCursor());
}
}
31 changes: 31 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,14 @@ public ScanResult<String> scan(String cursor, ScanParams params, String type) {
return executeCommand(commandObjects.scan(cursor, params, type));
}

public ScanRoundRobin scan(int batchCount, String match) {
return new ScanRoundRobin(provider, batchCount, match);
}

public ScanRoundRobin scan(int batchCount, String match, String type) {
return new ScanRoundRobin(provider, batchCount, match, type);
}

@Override
public Set<byte[]> keys(byte[] pattern) {
return executeCommand(commandObjects.keys(pattern));
Expand Down Expand Up @@ -3534,11 +3542,34 @@ public SearchResult ftSearch(String indexName, String query, FTSearchParams para
return executeCommand(commandObjects.ftSearch(indexName, query, params));
}

/**
* {@link FTSearchParams#limit(int, int)} will be ignored.
* @param batchCount
* @param indexName
* @param query
* @param params limit will be ignored
* @return search
*/
public FtSearchRoundRobin ftSearch(int batchCount, String indexName, String query, FTSearchParams params) {
return new FtSearchRoundRobin(provider, batchCount, indexName, query, params);
}

@Override
public SearchResult ftSearch(String indexName, Query query) {
return executeCommand(commandObjects.ftSearch(indexName, query));
}

/**
* {@link Query#limit(java.lang.Integer, java.lang.Integer)} will be ignored.
* @param batchCount
* @param indexName
* @param query limit will be ignored
* @return search
*/
public FtSearchRoundRobin ftSearch(int batchCount, String indexName, Query query) {
return new FtSearchRoundRobin(provider, batchCount, indexName, query);
}

@Override
public SearchResult ftSearch(byte[] indexName, Query query) {
return executeCommand(commandObjects.ftSearch(indexName, query));
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/redis/clients/jedis/search/FtSearchRoundRobin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package redis.clients.jedis.search;

import java.util.function.IntFunction;

import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.search.SearchResult.SearchResultBuilder;
import redis.clients.jedis.util.JedisRoundRobinBase;

public class FtSearchRoundRobin extends JedisRoundRobinBase<SearchResult> {

private int batchStart;
private final int batchSize;
private final IntFunction<CommandArguments> args;

public FtSearchRoundRobin(ConnectionProvider connectionProvider, int batchSize, String indexName, String query, FTSearchParams params) {
super(connectionProvider, new SearchResultBuilder(!params.getNoContent(), params.getWithScores(), false, true));
this.batchSize = batchSize;
this.args = (limitFirst) -> new CommandArguments(SearchProtocol.SearchCommand.SEARCH)
.add(indexName).add(query).addParams(params.limit(limitFirst, this.batchSize));
}

public FtSearchRoundRobin(ConnectionProvider connectionProvider, int batchSize, String indexName, Query query) {
super(connectionProvider, new SearchResultBuilder(!query.getNoContent(), query.getWithScores(), query.getWithPayloads(), true));
this.batchSize = batchSize;
this.args = (limitFirst) -> new CommandArguments(SearchProtocol.SearchCommand.SEARCH)
.add(indexName).addParams(query.limit(limitFirst, this.batchSize));
}

@Override
protected boolean isIterationCompleted(SearchResult reply) {
return batchStart >= reply.getTotalResults() - batchSize;
}

@Override
protected CommandArguments initCommandArguments() {
batchStart = 0;
return args.apply(batchStart);
}

@Override
protected CommandArguments nextCommandArguments(SearchResult lastReply) {
batchStart += batchSize;
return args.apply(batchStart);
}
}
81 changes: 81 additions & 0 deletions src/main/java/redis/clients/jedis/util/JedisRoundRobinBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package redis.clients.jedis.util;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;

import redis.clients.jedis.Builder;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.Connection;
import redis.clients.jedis.providers.ConnectionProvider;

public abstract class JedisRoundRobinBase<R> {

private final Builder<R> builder;

private final Queue<Map.Entry> connections;

private Map.Entry connection;

private R lastReply;

private boolean roundRobinCompleted;
private boolean iterationCompleted;

protected JedisRoundRobinBase(ConnectionProvider connectionProvider, Builder<R> responseBuilder) {
Map connectionMap = connectionProvider.getConnectionMap();
ArrayList<Map.Entry> connectionList = new ArrayList<>(connectionMap.entrySet());
Collections.shuffle(connectionList);
this.connections = new LinkedList<>(connectionList);
this.builder = responseBuilder;
this.iterationCompleted = true;
this.roundRobinCompleted = this.connections.isEmpty();
}

public final boolean isRoundRobinCompleted() {
return roundRobinCompleted;
}

protected abstract boolean isIterationCompleted(R reply);

protected abstract CommandArguments initCommandArguments();

protected abstract CommandArguments nextCommandArguments(R lastReply);

public final R get() {
if (roundRobinCompleted) {
throw new NoSuchElementException();
}

CommandArguments args;
if (iterationCompleted) {
connection = connections.poll();
args = initCommandArguments();
} else {
args = nextCommandArguments(lastReply);
}

Object rawReply;
if (connection.getValue() instanceof Connection) {
rawReply = ((Connection) connection.getValue()).executeCommand(args);
} else if (connection.getValue() instanceof Pool) {
try (Connection c = ((Pool<Connection>) connection.getValue()).getResource()) {
rawReply = c.executeCommand(args);
}
} else {
throw new IllegalArgumentException(connection.getValue().getClass() + "is not supported.");
}

lastReply = builder.build(rawReply);
iterationCompleted = isIterationCompleted(lastReply);
if (iterationCompleted) {
if (connections.isEmpty()) {
roundRobinCompleted = true;
}
}
return lastReply;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Test;

import redis.clients.jedis.BuilderFactory;
Expand All @@ -16,9 +18,11 @@
import redis.clients.jedis.GeoCoordinate;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.ScanRoundRobin;
import redis.clients.jedis.args.GeoUnit;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.resps.ScanResult;

public class ClusterValuesCommandsTest extends ClusterJedisCommandsTestBase {

Expand Down Expand Up @@ -113,23 +117,46 @@ public void onUnsubscribe(String channel, int subscribedChannels) {
}

@Test
public void broadcastRawPing() {
public void rawPingBroadcast() {
String reply = cluster.broadcastCommand(
new CommandObject<>(new CommandArguments(Protocol.Command.PING), BuilderFactory.STRING));
assertEquals("PONG", reply);
}

@Test
public void broadcastPing() {
public void pingBroadcast() {
assertEquals("PONG", cluster.ping());
}

@Test
public void broadcastFlushAll() {
public void flushAllBroadcast() {
assertNull(cluster.get("foo"));
assertEquals("OK", cluster.set("foo", "bar"));
assertEquals("bar", cluster.get("foo"));
cluster.flushAll();
assertNull(cluster.get("foo"));
}

@Test
public void scanRoundRobin() {
Set<String> allIn = new HashSet<>(26 * 26);
char[] arr = new char[2];
for (int i = 0; i < 26; i++) {
arr[0] = (char) ('a' + i);
for (int j = 0; j < 26; j++) {
arr[1] = (char) ('a' + j);
String str = new String(arr);
cluster.incr(str);
allIn.add(str);
}
}

Set<String> allScan = new HashSet<>();
ScanRoundRobin scan = cluster.scan(10, "*");
while (!scan.isRoundRobinCompleted()) {
ScanResult<String> batch = scan.get();
allScan.addAll(batch.getResult());
}
assertEquals(allIn, allScan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;
import org.junit.Test;

import redis.clients.jedis.ScanRoundRobin;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.args.ExpiryOption;
import redis.clients.jedis.params.ScanParams;
Expand Down Expand Up @@ -863,4 +864,27 @@ public void copy() {
assertTrue(jedis.copy(bfoo1, bfoo2, true));
assertArrayEquals(bbar1, jedis.get(bfoo2));
}

@Test
public void scanRoundRobin() {
Set<String> allIn = new HashSet<>(26 * 26);
char[] arr = new char[2];
for (int i = 0; i < 26; i++) {
arr[0] = (char) ('a' + i);
for (int j = 0; j < 26; j++) {
arr[1] = (char) ('a' + j);
String str = new String(arr);
jedis.incr(str);
allIn.add(str);
}
}

Set<String> allScan = new HashSet<>();
ScanRoundRobin scan = jedis.scan(10, "*");
while (!scan.isRoundRobinCompleted()) {
ScanResult<String> batch = scan.get();
allScan.addAll(batch.getResult());
}
assertEquals(allIn, allScan);
}
}
26 changes: 26 additions & 0 deletions src/test/java/redis/clients/jedis/modules/search/SearchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.junit.Assert.*;

import java.util.*;
import org.hamcrest.Matchers;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -1208,4 +1209,29 @@ public void searchProfile() {
assertEquals(1L, iteratorsProfile.get("Size"));
assertSame(Double.class, iteratorsProfile.get("Time").getClass());
}

@Test
public void searchRoundRobin() throws Exception {
Schema sc = new Schema().addTextField("first", 1.0).addTextField("last", 1.0).addNumericField("age");
IndexDefinition rule = new IndexDefinition();
assertEquals("OK", client.ftCreate(index, IndexOptions.defaultOptions().setDefinition(rule), sc));

client.hset("profesor:5555", toMap("first", "Albert", "last", "Blue", "age", "55"));
client.hset("student:1111", toMap("first", "Joe", "last", "Dod", "age", "18"));
client.hset("pupil:2222", toMap("first", "Jen", "last", "Rod", "age", "14"));
client.hset("student:3333", toMap("first", "El", "last", "Mark", "age", "17"));
client.hset("pupil:4444", toMap("first", "Pat", "last", "Shu", "age", "21"));
client.hset("student:5555", toMap("first", "Joen", "last", "Ko", "age", "20"));
client.hset("teacher:6666", toMap("first", "Pat", "last", "Rod", "age", "20"));

FtSearchRoundRobin search = client.ftSearch(3, index, new Query());
int total = 0;
while (!search.isRoundRobinCompleted()) {
SearchResult result = search.get();
int count = result.getDocuments().size();
assertThat(count, Matchers.lessThanOrEqualTo(3));
total += count;
}
assertEquals(7, total);
}
}
Loading

0 comments on commit 58f5761

Please sign in to comment.