Skip to content

Commit

Permalink
Add test for DiscoveryNodeManager listener
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Jan 23, 2019
1 parent 86d780a commit b950d0f
Showing 1 changed file with 94 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.discovery.client.ServiceDescriptor;
import io.airlift.discovery.client.ServiceSelector;
import io.airlift.discovery.client.testing.StaticServiceSelector;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
Expand All @@ -32,13 +32,17 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import javax.annotation.concurrent.GuardedBy;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import static io.airlift.discovery.client.ServiceDescriptor.serviceDescriptor;
import static io.airlift.discovery.client.ServiceSelectorConfig.DEFAULT_POOL;
import static io.airlift.http.client.HttpStatus.OK;
import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder;
import static io.prestosql.spi.NodeState.ACTIVE;
Expand All @@ -52,10 +56,11 @@ public class TestDiscoveryNodeManager
private final NodeInfo nodeInfo = new NodeInfo("test");
private final InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig();
private NodeVersion expectedVersion;
private List<PrestoNode> activeNodes;
private List<PrestoNode> inactiveNodes;
private Set<Node> activeNodes;
private Set<Node> inactiveNodes;
private PrestoNode coordinator;
private ServiceSelector selector;
private PrestoNode currentNode;
private final PrestoNodeServiceSelector selector = new PrestoNodeServiceSelector();
private HttpClient testHttpClient;

@BeforeMethod
Expand All @@ -64,27 +69,19 @@ public void setup()
testHttpClient = new TestingHttpClient(input -> new TestingResponse(OK, ArrayListMultimap.create(), ACTIVE.name().getBytes()));

expectedVersion = new NodeVersion("1");
coordinator = new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion, false);
activeNodes = ImmutableList.of(
new PrestoNode(nodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false),
coordinator = new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion, true);
currentNode = new PrestoNode(nodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false);

activeNodes = ImmutableSet.of(
currentNode,
new PrestoNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.1:8080"), expectedVersion, false),
new PrestoNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.3"), expectedVersion, false),
coordinator);
inactiveNodes = ImmutableList.of(
inactiveNodes = ImmutableSet.of(
new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.3.9"), NodeVersion.UNKNOWN, false),
new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.4.9"), new NodeVersion("2"), false));

List<ServiceDescriptor> descriptors = new ArrayList<>();
for (PrestoNode node : Iterables.concat(activeNodes, inactiveNodes)) {
descriptors.add(serviceDescriptor("presto")
.setNodeId(node.getNodeIdentifier())
.addProperty("http", node.getHttpUri().toString())
.addProperty("node_version", node.getNodeVersion().toString())
.addProperty("coordinator", String.valueOf(node.equals(coordinator)))
.build());
}

selector = new StaticServiceSelector(descriptors);
selector.announceNodes(activeNodes, inactiveNodes);
}

@Test
Expand Down Expand Up @@ -124,15 +121,13 @@ public void testGetAllNodes()
@Test
public void testGetCurrentNode()
{
Node expected = activeNodes.get(0);

NodeInfo nodeInfo = new NodeInfo(new NodeConfig()
.setEnvironment("test")
.setNodeId(expected.getNodeIdentifier()));
.setNodeId(currentNode.getNodeIdentifier()));

DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig);
try {
assertEquals(manager.getCurrentNode(), expected);
assertEquals(manager.getCurrentNode(), currentNode);
}
finally {
manager.stop();
Expand All @@ -157,4 +152,79 @@ public void testGetCurrentNodeRequired()
{
new DiscoveryNodeManager(selector, new NodeInfo("test"), new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig);
}

@Test(timeOut = 60000)
public void testNodeChangeListener()
throws Exception
{
DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig);
try {
manager.startPollingNodeStates();

BlockingQueue<AllNodes> notifications = new ArrayBlockingQueue<>(100);
manager.addNodeChangeListener(notifications::add);
AllNodes allNodes = notifications.take();
assertEquals(allNodes.getActiveNodes(), activeNodes);
assertEquals(allNodes.getInactiveNodes(), inactiveNodes);

selector.announceNodes(ImmutableSet.of(currentNode), ImmutableSet.of(coordinator));
allNodes = notifications.take();
assertEquals(allNodes.getActiveNodes(), ImmutableSet.of(currentNode, coordinator));
assertEquals(allNodes.getActiveCoordinators(), ImmutableSet.of(coordinator));

selector.announceNodes(activeNodes, inactiveNodes);
allNodes = notifications.take();
assertEquals(allNodes.getActiveNodes(), activeNodes);
assertEquals(allNodes.getInactiveNodes(), inactiveNodes);
}
finally {
manager.stop();
}
}

public static class PrestoNodeServiceSelector
implements ServiceSelector
{
@GuardedBy("this")
private List<ServiceDescriptor> descriptors = ImmutableList.of();

private synchronized void announceNodes(Set<Node> activeNodes, Set<Node> inactiveNodes)
{
ImmutableList.Builder<ServiceDescriptor> descriptors = ImmutableList.builder();
for (Node node : Iterables.concat(activeNodes, inactiveNodes)) {
descriptors.add(serviceDescriptor("presto")
.setNodeId(node.getNodeIdentifier())
.addProperty("http", node.getHttpUri().toString())
.addProperty("node_version", ((PrestoNode) node).getNodeVersion().toString())
.addProperty("coordinator", String.valueOf(node.isCoordinator()))
.build());
}

this.descriptors = descriptors.build();
}

@Override
public String getType()
{
return "presto";
}

@Override
public String getPool()
{
return DEFAULT_POOL;
}

@Override
public synchronized List<ServiceDescriptor> selectAllServices()
{
return descriptors;
}

@Override
public ListenableFuture<List<ServiceDescriptor>> refresh()
{
throw new UnsupportedOperationException();
}
}
}

0 comments on commit b950d0f

Please sign in to comment.