Skip to content

Commit

Permalink
[QUICK FIX] A RediSearch command in cluster mode must be sent to a ma…
Browse files Browse the repository at this point in the history
…ster node (#2968)

As a quick fix, sending the commad to a consistent hashslot. In current code, this will choose the respective master of the hashslot.

We should change it to a random master node or at least random hashslot.
  • Loading branch information
sazzad16 authored Apr 7, 2022
1 parent b5e0787 commit ec3582a
Showing 1 changed file with 114 additions and 98 deletions.
212 changes: 114 additions & 98 deletions src/main/java/redis/clients/jedis/ClusterCommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@
import static redis.clients.jedis.Protocol.Command.SCAN;
import static redis.clients.jedis.Protocol.Keyword.TYPE;

import java.util.List;
import java.util.Map;
import java.util.Set;

import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.resps.ScanResult;
import redis.clients.jedis.search.IndexOptions;
import redis.clients.jedis.search.Query;
import redis.clients.jedis.search.Schema;
import redis.clients.jedis.search.SearchResult;
import redis.clients.jedis.search.aggr.AggregationBuilder;
import redis.clients.jedis.search.aggr.AggregationResult;
import redis.clients.jedis.util.JedisClusterHashTag;

public class ClusterCommandObjects extends CommandObjects {
Expand Down Expand Up @@ -97,114 +105,122 @@ public final CommandObject<ScanResult<byte[]>> scan(byte[] cursor, ScanParams pa
public final CommandObject<Long> waitReplicas(int replicas, long timeout) {
throw new UnsupportedOperationException(CLUSTER_UNSUPPORTED_MESSAGE);
}
//

// RediSearch commands
// TODO: Send RediSearch command to random 'master' node or random hashslot.
// boolean searchLite = false;
//
// private <T> CommandObject<T> processSearchCommand(String indexName, CommandObject<T> command) {

private <T> CommandObject<T> processSearchCommand(String indexName, CommandObject<T> command) {
// if (searchLite) command.getArguments().processKey(indexName);
// return command;
// }
//
// @Override
// public final CommandObject<String> ftCreate(String indexName, IndexOptions indexOptions, Schema schema) {
// return processSearchCommand(indexName, super.ftCreate(indexName, indexOptions, schema));
// }
//
// @Override
// public final CommandObject<String> ftAlter(String indexName, Schema schema) {
// return processSearchCommand(indexName, super.ftAlter(indexName, schema));
// }
//
// @Override
// public final CommandObject<SearchResult> ftSearch(String indexName, Query query) {
// return processSearchCommand(indexName, super.ftSearch(indexName, query));
// }
//
// @Override
// public final CommandObject<SearchResult> ftSearch(byte[] indexName, Query query) {
// CommandObject<SearchResult> command = super.ftSearch(indexName, query);
command.getArguments().processKey(indexName);
return command;
}

@Override
public final CommandObject<String> ftCreate(String indexName, IndexOptions indexOptions, Schema schema) {
return processSearchCommand(indexName, super.ftCreate(indexName, indexOptions, schema));
}

@Override
public final CommandObject<String> ftAlter(String indexName, Schema schema) {
return processSearchCommand(indexName, super.ftAlter(indexName, schema));
}

@Override
public final CommandObject<SearchResult> ftSearch(String indexName, Query query) {
return processSearchCommand(indexName, super.ftSearch(indexName, query));
}

@Override
public final CommandObject<SearchResult> ftSearch(byte[] indexName, Query query) {
CommandObject<SearchResult> command = super.ftSearch(indexName, query);
// if (searchLite) command.getArguments().processKey(indexName);
// return command;
// }
//
// @Override
// public CommandObject<String> ftExplain(String indexName, Query query) {
// return processSearchCommand(indexName, super.ftExplain(indexName, query));
// }
//
// @Override
// public CommandObject<List<String>> ftExplainCLI(String indexName, Query query) {
// return processSearchCommand(indexName, super.ftExplainCLI(indexName, query));
// }
//
// @Override
// public CommandObject<AggregationResult> ftAggregate(String indexName, AggregationBuilder aggr) {
// return processSearchCommand(indexName, super.ftAggregate(indexName, aggr));
// }
//
// @Override
// public CommandObject<AggregationResult> ftCursorRead(String indexName, long cursorId, int count) {
// return processSearchCommand(indexName, super.ftCursorRead(indexName, cursorId, count));
// }
//
// @Override
// public CommandObject<String> ftCursorDel(String indexName, long cursorId) {
// return processSearchCommand(indexName, super.ftCursorDel(indexName, cursorId));
// }
//
// @Override
// public CommandObject<String> ftDropIndex(String indexName) {
// return processSearchCommand(indexName, super.ftDropIndex(indexName));
// }
//
// @Override
// public CommandObject<String> ftDropIndexDD(String indexName) {
// return processSearchCommand(indexName, super.ftDropIndexDD(indexName));
// }
//
// @Override
// public CommandObject<String> ftSynUpdate(String indexName, String synonymGroupId, String... terms) {
// return processSearchCommand(indexName, super.ftSynUpdate(indexName, synonymGroupId, terms));
// }
//
// @Override
// public CommandObject<Map<String, List<String>>> ftSynDump(String indexName) {
// return processSearchCommand(indexName, super.ftSynDump(indexName));
// }
//
// @Override
// public CommandObject<Map<String, Object>> ftInfo(String indexName) {
// return processSearchCommand(indexName, super.ftInfo(indexName));
// }
//
// @Override
// public CommandObject<String> ftAliasAdd(String aliasName, String indexName) {
command.getArguments().processKey(indexName);
return command;
}

@Override
public CommandObject<String> ftExplain(String indexName, Query query) {
return processSearchCommand(indexName, super.ftExplain(indexName, query));
}

@Override
public CommandObject<List<String>> ftExplainCLI(String indexName, Query query) {
return processSearchCommand(indexName, super.ftExplainCLI(indexName, query));
}

@Override
public CommandObject<AggregationResult> ftAggregate(String indexName, AggregationBuilder aggr) {
return processSearchCommand(indexName, super.ftAggregate(indexName, aggr));
}

@Override
public CommandObject<AggregationResult> ftCursorRead(String indexName, long cursorId, int count) {
return processSearchCommand(indexName, super.ftCursorRead(indexName, cursorId, count));
}

@Override
public CommandObject<String> ftCursorDel(String indexName, long cursorId) {
return processSearchCommand(indexName, super.ftCursorDel(indexName, cursorId));
}

@Override
public CommandObject<String> ftDropIndex(String indexName) {
return processSearchCommand(indexName, super.ftDropIndex(indexName));
}

@Override
public CommandObject<String> ftDropIndexDD(String indexName) {
return processSearchCommand(indexName, super.ftDropIndexDD(indexName));
}

@Override
public CommandObject<String> ftSynUpdate(String indexName, String synonymGroupId, String... terms) {
return processSearchCommand(indexName, super.ftSynUpdate(indexName, synonymGroupId, terms));
}

@Override
public CommandObject<Map<String, List<String>>> ftSynDump(String indexName) {
return processSearchCommand(indexName, super.ftSynDump(indexName));
}

@Override
public CommandObject<Map<String, Object>> ftInfo(String indexName) {
return processSearchCommand(indexName, super.ftInfo(indexName));
}

@Override
public CommandObject<String> ftAliasAdd(String aliasName, String indexName) {
// CommandObject<String> command = super.ftAliasAdd(aliasName, indexName);
// if (searchLite) command.getArguments().processKey(aliasName).processKey(indexName);
// return command;
// }
//
// @Override
// public CommandObject<String> ftAliasUpdate(String aliasName, String indexName) {
return processSearchCommand(indexName, super.ftAliasAdd(aliasName, indexName));
}

@Override
public CommandObject<String> ftAliasUpdate(String aliasName, String indexName) {
// CommandObject<String> command = super.ftAliasUpdate(aliasName, indexName);
// if (searchLite) command.getArguments().processKey(aliasName).processKey(indexName);
// return command;
// }
//
// @Override
// public CommandObject<String> ftAliasDel(String aliasName) {
return processSearchCommand(indexName, super.ftAliasUpdate(aliasName, indexName));
}

@Override
public CommandObject<String> ftAliasDel(String aliasName) {
// CommandObject<String> command = super.ftAliasDel(aliasName);
// if (searchLite) command.getArguments().processKey(aliasName);
// return command;
// }
//
// @Override
// public CommandObject<Map<String, String>> ftConfigGet(String indexName, String option) {
// return processSearchCommand(indexName, super.ftConfigGet(indexName, option));
// }
//
// @Override
// public CommandObject<String> ftConfigSet(String indexName, String option, String value) {
// return processSearchCommand(indexName, super.ftConfigSet(indexName, option, value));
// }
return processSearchCommand(aliasName, super.ftAliasDel(aliasName));
}

@Override
public CommandObject<Map<String, String>> ftConfigGet(String indexName, String option) {
return processSearchCommand(indexName, super.ftConfigGet(indexName, option));
}

@Override
public CommandObject<String> ftConfigSet(String indexName, String option, String value) {
return processSearchCommand(indexName, super.ftConfigSet(indexName, option, value));
}
// RediSearch commands
}

0 comments on commit ec3582a

Please sign in to comment.