Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for low-level client round-robin behaviour #31616

Merged
merged 4 commits into from
Jun 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions client/rest/src/main/java/org/elasticsearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -615,16 +615,16 @@ private void setHeaders(HttpRequest httpRequest, Collection<Header> requestHeade
*/
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
List<Node> hosts = selectHosts(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
Iterable<Node> hosts = selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ops, I also renamed this method while I was at it, and made it return Iterable rather than a list. Hopefully that's ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with me.

return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);
}

/**
* Select hosts to try. Package private for testing.
* Select nodes to try and sorts them so that the first one will be tried initially, then the following ones
* if the previous attempt failed and so on. Package private for testing.
*/
static List<Node> selectHosts(NodeTuple<List<Node>> nodeTuple,
Map<HttpHost, DeadHostState> blacklist, AtomicInteger lastNodeIndex,
NodeSelector nodeSelector) throws IOException {
static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist,
AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException {
/*
* Sort the nodes into living and dead lists.
*/
Expand Down Expand Up @@ -653,24 +653,22 @@ static List<Node> selectHosts(NodeTuple<List<Node>> nodeTuple,
nodeSelector.select(selectedLivingNodes);
if (false == selectedLivingNodes.isEmpty()) {
/*
* Rotate the list so subsequent requests will prefer the
* nodes in a different order.
* Rotate the list using a global counter as the distance so subsequent
* requests will try the nodes in a different order.
*/
Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
return selectedLivingNodes;
}
}

/*
* Last resort: If there are no good nodes to use, either because
* Last resort: there are no good nodes to use, either because
* the selector rejected all the living nodes or because there aren't
* any living ones. Either way, we want to revive a single dead node
* that the NodeSelectors are OK with. We do this by sorting the dead
* nodes by their revival time and passing them through the
* NodeSelector so it can have its say in which nodes are ok and their
* ordering. If the selector is ok with any of the nodes then use just
* the first one in the list because we only want to revive a single
* node.
* that the NodeSelectors are OK with. We do this by passing the dead
* nodes through the NodeSelector so it can have its say in which nodes
* are ok. If the selector is ok with any of the nodes then we will take
* the one in the list that has the lowest revival time and try it.
*/
if (false == deadNodes.isEmpty()) {
final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.client.AuthCache;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.elasticsearch.client.DeadHostStateTests.ConfigurableTimeSupplier;
import org.elasticsearch.client.RestClient.NodeTuple;
Expand All @@ -35,13 +38,14 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.singletonList;
import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -407,8 +411,8 @@ public String toString() {
* blacklist time. It'll revive the node that is closest
* to being revived that the NodeSelector is ok with.
*/
assertEquals(singletonList(n1), RestClient.selectHosts(nodeTuple, blacklist, new AtomicInteger(), NodeSelector.ANY));
assertEquals(singletonList(n2), RestClient.selectHosts(nodeTuple, blacklist, new AtomicInteger(), not1));
assertEquals(singletonList(n1), RestClient.selectNodes(nodeTuple, blacklist, new AtomicInteger(), NodeSelector.ANY));
assertEquals(singletonList(n2), RestClient.selectNodes(nodeTuple, blacklist, new AtomicInteger(), not1));

/*
* Try a NodeSelector that excludes all nodes. This should
Expand Down Expand Up @@ -449,23 +453,23 @@ private void assertSelectLivingHosts(List<Node> expectedNodes, NodeTuple<List<No
Map<HttpHost, DeadHostState> blacklist, NodeSelector nodeSelector) throws IOException {
int iterations = 1000;
AtomicInteger lastNodeIndex = new AtomicInteger(0);
assertEquals(expectedNodes, RestClient.selectHosts(nodeTuple, blacklist, lastNodeIndex, nodeSelector));
assertEquals(expectedNodes, RestClient.selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector));
// Calling it again rotates the set of results
for (int i = 1; i < iterations; i++) {
Collections.rotate(expectedNodes, 1);
assertEquals("iteration " + i, expectedNodes,
RestClient.selectHosts(nodeTuple, blacklist, lastNodeIndex, nodeSelector));
RestClient.selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector));
}
}

/**
* Assert that {@link RestClient#selectHosts} fails on the provided arguments.
* Assert that {@link RestClient#selectNodes} fails on the provided arguments.
* @return the message in the exception thrown by the failure
*/
private String assertSelectAllRejected( NodeTuple<List<Node>> nodeTuple,
private static String assertSelectAllRejected( NodeTuple<List<Node>> nodeTuple,
Map<HttpHost, DeadHostState> blacklist, NodeSelector nodeSelector) {
try {
RestClient.selectHosts(nodeTuple, blacklist, new AtomicInteger(0), nodeSelector);
RestClient.selectNodes(nodeTuple, blacklist, new AtomicInteger(0), nodeSelector);
throw new AssertionError("expected selectHosts to fail");
} catch (IOException e) {
return e.getMessage();
Expand All @@ -478,5 +482,56 @@ private static RestClient createRestClient() {
new Header[] {}, nodes, null, null, null);
}

public void testRoundRobin() throws IOException {
int numNodes = randomIntBetween(2, 10);
AuthCache authCache = new BasicAuthCache();
List<Node> nodes = new ArrayList<>(numNodes);
for (int i = 0; i < numNodes; i++) {
Node node = new Node(new HttpHost("localhost", 9200 + i));
nodes.add(node);
authCache.put(node.getHost(), new BasicScheme());
}
NodeTuple<List<Node>> nodeTuple = new NodeTuple<>(nodes, authCache);

//test the transition from negative to positive values
AtomicInteger lastNodeIndex = new AtomicInteger(-numNodes);
assertNodes(nodeTuple, lastNodeIndex, 50);
assertEquals(-numNodes + 50, lastNodeIndex.get());

//test the highest positive values up to MAX_VALUE
lastNodeIndex.set(Integer.MAX_VALUE - numNodes * 10);
assertNodes(nodeTuple, lastNodeIndex, numNodes * 10);
assertEquals(Integer.MAX_VALUE, lastNodeIndex.get());

//test the transition from MAX_VALUE to MIN_VALUE
//this is the only time where there is most likely going to be a jump from a node
//to another one that's not necessarily the next one.
assertEquals(Integer.MIN_VALUE, lastNodeIndex.incrementAndGet());
assertNodes(nodeTuple, lastNodeIndex, 50);
assertEquals(Integer.MIN_VALUE + 50, lastNodeIndex.get());
}

private static void assertNodes(NodeTuple<List<Node>> nodeTuple, AtomicInteger lastNodeIndex, int runs) throws IOException {
int distance = lastNodeIndex.get() % nodeTuple.nodes.size();
/*
* Collections.rotate is not super intuitive: distance 1 means that the last element will become the first and so on,
* while distance -1 means that the second element will become the first and so on.
*/
int expectedOffset = distance > 0 ? nodeTuple.nodes.size() - distance : Math.abs(distance);
for (int i = 0; i < runs; i++) {
Iterable<Node> selectedNodes = RestClient.selectNodes(nodeTuple, Collections.<HttpHost, DeadHostState>emptyMap(),
lastNodeIndex, NodeSelector.ANY);
List<Node> expectedNodes = nodeTuple.nodes;
int index = 0;
for (Node actualNode : selectedNodes) {
Node expectedNode = expectedNodes.get((index + expectedOffset) % expectedNodes.size());
assertSame(expectedNode, actualNode);
index++;
}
expectedOffset--;
if (expectedOffset < 0) {
expectedOffset += nodeTuple.nodes.size();
}
}
}
}