Skip to content

Commit

Permalink
Remove unnecessary NetworkLocationCache
Browse files Browse the repository at this point in the history
The built-in NetworkTopology implementations do are fixed and do not need caching, and
external implementations can easily add caching.
  • Loading branch information
dain committed Sep 9, 2019
1 parent bfe7306 commit a5f8a4c
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 144 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.prestosql.metadata.Split;
import io.prestosql.spi.HostAddress;

import javax.annotation.PreDestroy;
import javax.inject.Inject;

import java.net.InetAddress;
Expand Down Expand Up @@ -67,7 +66,7 @@ public class NodeScheduler
.expireAfterWrite(30, TimeUnit.SECONDS)
.build();

private final NetworkLocationCache networkLocationCache;
private final NetworkTopology networkTopology;
private final List<CounterStat> topologicalSplitCounters;
private final List<String> networkLocationSegmentNames;
private final InternalNodeManager nodeManager;
Expand All @@ -82,18 +81,9 @@ public class NodeScheduler
@Inject
public NodeScheduler(NetworkTopology networkTopology, InternalNodeManager nodeManager, NodeSchedulerConfig config, NodeTaskMap nodeTaskMap)
{
this(new NetworkLocationCache(networkTopology), networkTopology, nodeManager, config, nodeTaskMap);
}

public NodeScheduler(
NetworkLocationCache networkLocationCache,
NetworkTopology networkTopology,
InternalNodeManager nodeManager,
NodeSchedulerConfig config,
NodeTaskMap nodeTaskMap)
{
this.networkLocationCache = networkLocationCache;
this.nodeManager = nodeManager;
this.networkTopology = requireNonNull(networkTopology, "networkTopology is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
requireNonNull(config, "config is null");
this.minCandidates = config.getMinCandidates();
this.includeCoordinator = config.isIncludeCoordinator();
this.maxSplitsPerNode = config.getMaxSplitsPerNode();
Expand All @@ -116,12 +106,6 @@ public NodeScheduler(
topologicalSplitCounters = builder.build();
}

@PreDestroy
public void stop()
{
networkLocationCache.stop();
}

public Map<String, CounterStat> getTopologicalSplitCounters()
{
ImmutableMap.Builder<String, CounterStat> counters = ImmutableMap.builder();
Expand Down Expand Up @@ -154,7 +138,7 @@ public NodeSelector createNodeSelector(CatalogName catalogName)

for (InternalNode node : nodes) {
if (useNetworkTopology && (includeCoordinator || !coordinatorNodeIds.contains(node.getNodeIdentifier()))) {
NetworkLocation location = networkLocationCache.get(node.getHostAndPort());
NetworkLocation location = networkTopology.locate(node.getHostAndPort());
for (int i = 0; i <= location.getSegments().size(); i++) {
workersByNetworkPath.put(location.subLocation(0, i), node);
}
Expand Down Expand Up @@ -187,7 +171,7 @@ public NodeSelector createNodeSelector(CatalogName catalogName)
maxPendingSplitsPerTask,
topologicalSplitCounters,
networkLocationSegmentNames,
networkLocationCache);
networkTopology);
}
else {
return new SimpleNodeSelector(nodeManager, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask, optimizedLocalScheduling);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class TopologyAwareNodeSelector
private final int maxPendingSplitsPerTask;
private final List<CounterStat> topologicalSplitCounters;
private final List<String> networkLocationSegmentNames;
private final NetworkLocationCache networkLocationCache;
private final NetworkTopology networkTopology;

public TopologyAwareNodeSelector(
InternalNodeManager nodeManager,
Expand All @@ -73,7 +73,7 @@ public TopologyAwareNodeSelector(
int maxPendingSplitsPerTask,
List<CounterStat> topologicalSplitCounters,
List<String> networkLocationSegmentNames,
NetworkLocationCache networkLocationCache)
NetworkTopology networkTopology)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
Expand All @@ -84,7 +84,7 @@ public TopologyAwareNodeSelector(
this.maxPendingSplitsPerTask = maxPendingSplitsPerTask;
this.topologicalSplitCounters = requireNonNull(topologicalSplitCounters, "topologicalSplitCounters is null");
this.networkLocationSegmentNames = requireNonNull(networkLocationSegmentNames, "networkLocationSegmentNames is null");
this.networkLocationCache = requireNonNull(networkLocationCache, "networkLocationCache is null");
this.networkTopology = requireNonNull(networkTopology, "networkTopology is null");
}

@Override
Expand Down Expand Up @@ -147,7 +147,7 @@ else if (!splitWaitingForAnyNode) {
int chosenDepth = 0;
Set<NetworkLocation> locations = new HashSet<>();
for (HostAddress host : split.getAddresses()) {
locations.add(networkLocationCache.get(host));
locations.add(networkTopology.locate(host));
}
if (locations.isEmpty()) {
// Add the root location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.prestosql.connector.CatalogName;
import io.prestosql.execution.scheduler.LegacyNetworkTopology;
import io.prestosql.execution.scheduler.NetworkLocation;
import io.prestosql.execution.scheduler.NetworkLocationCache;
import io.prestosql.execution.scheduler.NetworkTopology;
import io.prestosql.execution.scheduler.NodeScheduler;
import io.prestosql.execution.scheduler.NodeSchedulerConfig;
Expand Down Expand Up @@ -59,7 +58,6 @@
import java.util.concurrent.ThreadLocalRandom;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.prestosql.execution.scheduler.NetworkLocation.ROOT_LOCATION;
import static io.prestosql.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static io.prestosql.testing.assertions.PrestoExceptionAssert.assertPrestoExceptionThrownBy;
import static java.lang.String.format;
Expand Down Expand Up @@ -171,21 +169,7 @@ public void testTopologyAwareScheduling()
.setMaxPendingSplitsPerTask(20);

TestNetworkTopology topology = new TestNetworkTopology();
NetworkLocationCache locationCache = new NetworkLocationCache(topology)
{
@Override
public NetworkLocation get(HostAddress host)
{
// Bypass the cache for workers, since we only look them up once and they would all be unresolved otherwise
if (host.getHostText().startsWith("host")) {
return topology.locate(host);
}
else {
return super.get(host);
}
}
};
NodeScheduler nodeScheduler = new NodeScheduler(locationCache, topology, nodeManager, nodeSchedulerConfig, nodeTaskMap);
NodeScheduler nodeScheduler = new NodeScheduler(topology, nodeManager, nodeSchedulerConfig, nodeTaskMap);
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(CONNECTOR_ID);

// Fill up the nodes with non-local data
Expand Down Expand Up @@ -238,17 +222,6 @@ public NetworkLocation get(HostAddress host)
Set<Split> unassigned = Sets.difference(rackLocalSplits.build(), new HashSet<>(assignments.values()));
// Compute the assignments a second time to account for the fact that some splits may not have been assigned due to asynchronous
// loading of the NetworkLocationCache
boolean cacheRefreshed = false;
while (!cacheRefreshed) {
cacheRefreshed = true;
if (locationCache.get(dataHost1).equals(ROOT_LOCATION)) {
cacheRefreshed = false;
}
if (locationCache.get(dataHost2).equals(ROOT_LOCATION)) {
cacheRefreshed = false;
}
MILLISECONDS.sleep(10);
}
assignments = nodeSelector.computeAssignments(unassigned, ImmutableList.copyOf(taskMap.values())).getAssignments();
for (InternalNode node : assignments.keySet()) {
RemoteTask remoteTask = taskMap.get(node);
Expand Down

0 comments on commit a5f8a4c

Please sign in to comment.