diff --git a/presto-main/src/main/java/io/prestosql/execution/scheduler/NetworkLocationCache.java b/presto-main/src/main/java/io/prestosql/execution/scheduler/NetworkLocationCache.java deleted file mode 100644 index f8156bf2ff9ea..0000000000000 --- a/presto-main/src/main/java/io/prestosql/execution/scheduler/NetworkLocationCache.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.prestosql.execution.scheduler; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import io.airlift.log.Logger; -import io.airlift.units.Duration; -import io.prestosql.spi.HostAddress; - -import java.util.concurrent.ExecutorService; - -import static com.google.common.cache.CacheLoader.asyncReloading; -import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.prestosql.execution.scheduler.NetworkLocation.ROOT_LOCATION; -import static java.util.Objects.requireNonNull; -import static java.util.concurrent.Executors.newCachedThreadPool; -import static java.util.concurrent.TimeUnit.DAYS; -import static java.util.concurrent.TimeUnit.HOURS; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; - -public class NetworkLocationCache -{ - private static final Duration NEGATIVE_CACHE_DURATION = new Duration(10, MINUTES); - - private static final Logger log = Logger.get(NetworkLocationCache.class); - - private final ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("network-location-%s")); - private final NetworkTopology networkTopology; - private final LoadingCache cache; - private final Cache negativeCache; - - public NetworkLocationCache(NetworkTopology networkTopology) - { - this.networkTopology = requireNonNull(networkTopology, "networkTopology is null"); - - this.cache = CacheBuilder.newBuilder() - .expireAfterWrite(1, DAYS) - .refreshAfterWrite(12, HOURS) - .build(asyncReloading(CacheLoader.from(this::locate), executor)); - - this.negativeCache = CacheBuilder.newBuilder() - .expireAfterWrite(NEGATIVE_CACHE_DURATION.toMillis(), MILLISECONDS) - .build(); - } - - public void stop() - { - executor.shutdownNow(); - } - - public NetworkLocation get(HostAddress host) - { - NetworkLocation location = cache.getIfPresent(host); - if ((location == null) && (negativeCache.getIfPresent(host) == null)) { - // Store a value in the cache, so that refresh() is done asynchronously - cache.put(host, ROOT_LOCATION); - cache.refresh(host); - } - // Return the root location for anything we couldn't locate - return location == null ? ROOT_LOCATION : location; - } - - private NetworkLocation locate(HostAddress host) - { - try { - return networkTopology.locate(host); - } - catch (RuntimeException e) { - negativeCache.put(host, true); - log.warn(e, "Unable to determine location of %s. Will attempt again in %s", host, NEGATIVE_CACHE_DURATION); - // no one will see the exception thrown here - throw e; - } - } -} diff --git a/presto-main/src/main/java/io/prestosql/execution/scheduler/NodeScheduler.java b/presto-main/src/main/java/io/prestosql/execution/scheduler/NodeScheduler.java index 7664e01e1b4ff..2c4b598ff1c17 100644 --- a/presto-main/src/main/java/io/prestosql/execution/scheduler/NodeScheduler.java +++ b/presto-main/src/main/java/io/prestosql/execution/scheduler/NodeScheduler.java @@ -33,7 +33,6 @@ import io.prestosql.metadata.Split; import io.prestosql.spi.HostAddress; -import javax.annotation.PreDestroy; import javax.inject.Inject; import java.net.InetAddress; @@ -67,7 +66,7 @@ public class NodeScheduler .expireAfterWrite(30, TimeUnit.SECONDS) .build(); - private final NetworkLocationCache networkLocationCache; + private final NetworkTopology networkTopology; private final List topologicalSplitCounters; private final List networkLocationSegmentNames; private final InternalNodeManager nodeManager; @@ -82,18 +81,9 @@ public class NodeScheduler @Inject public NodeScheduler(NetworkTopology networkTopology, InternalNodeManager nodeManager, NodeSchedulerConfig config, NodeTaskMap nodeTaskMap) { - this(new NetworkLocationCache(networkTopology), networkTopology, nodeManager, config, nodeTaskMap); - } - - public NodeScheduler( - NetworkLocationCache networkLocationCache, - NetworkTopology networkTopology, - InternalNodeManager nodeManager, - NodeSchedulerConfig config, - NodeTaskMap nodeTaskMap) - { - this.networkLocationCache = networkLocationCache; - this.nodeManager = nodeManager; + this.networkTopology = requireNonNull(networkTopology, "networkTopology is null"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + requireNonNull(config, "config is null"); this.minCandidates = config.getMinCandidates(); this.includeCoordinator = config.isIncludeCoordinator(); this.maxSplitsPerNode = config.getMaxSplitsPerNode(); @@ -116,12 +106,6 @@ public NodeScheduler( topologicalSplitCounters = builder.build(); } - @PreDestroy - public void stop() - { - networkLocationCache.stop(); - } - public Map getTopologicalSplitCounters() { ImmutableMap.Builder counters = ImmutableMap.builder(); @@ -154,7 +138,7 @@ public NodeSelector createNodeSelector(CatalogName catalogName) for (InternalNode node : nodes) { if (useNetworkTopology && (includeCoordinator || !coordinatorNodeIds.contains(node.getNodeIdentifier()))) { - NetworkLocation location = networkLocationCache.get(node.getHostAndPort()); + NetworkLocation location = networkTopology.locate(node.getHostAndPort()); for (int i = 0; i <= location.getSegments().size(); i++) { workersByNetworkPath.put(location.subLocation(0, i), node); } @@ -187,7 +171,7 @@ public NodeSelector createNodeSelector(CatalogName catalogName) maxPendingSplitsPerTask, topologicalSplitCounters, networkLocationSegmentNames, - networkLocationCache); + networkTopology); } else { return new SimpleNodeSelector(nodeManager, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask, optimizedLocalScheduling); diff --git a/presto-main/src/main/java/io/prestosql/execution/scheduler/TopologyAwareNodeSelector.java b/presto-main/src/main/java/io/prestosql/execution/scheduler/TopologyAwareNodeSelector.java index 582bad2ca8cb0..b9c39ac1473d1 100644 --- a/presto-main/src/main/java/io/prestosql/execution/scheduler/TopologyAwareNodeSelector.java +++ b/presto-main/src/main/java/io/prestosql/execution/scheduler/TopologyAwareNodeSelector.java @@ -61,7 +61,7 @@ public class TopologyAwareNodeSelector private final int maxPendingSplitsPerTask; private final List topologicalSplitCounters; private final List networkLocationSegmentNames; - private final NetworkLocationCache networkLocationCache; + private final NetworkTopology networkTopology; public TopologyAwareNodeSelector( InternalNodeManager nodeManager, @@ -73,7 +73,7 @@ public TopologyAwareNodeSelector( int maxPendingSplitsPerTask, List topologicalSplitCounters, List networkLocationSegmentNames, - NetworkLocationCache networkLocationCache) + NetworkTopology networkTopology) { this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null"); @@ -84,7 +84,7 @@ public TopologyAwareNodeSelector( this.maxPendingSplitsPerTask = maxPendingSplitsPerTask; this.topologicalSplitCounters = requireNonNull(topologicalSplitCounters, "topologicalSplitCounters is null"); this.networkLocationSegmentNames = requireNonNull(networkLocationSegmentNames, "networkLocationSegmentNames is null"); - this.networkLocationCache = requireNonNull(networkLocationCache, "networkLocationCache is null"); + this.networkTopology = requireNonNull(networkTopology, "networkTopology is null"); } @Override @@ -147,7 +147,7 @@ else if (!splitWaitingForAnyNode) { int chosenDepth = 0; Set locations = new HashSet<>(); for (HostAddress host : split.getAddresses()) { - locations.add(networkLocationCache.get(host)); + locations.add(networkTopology.locate(host)); } if (locations.isEmpty()) { // Add the root location diff --git a/presto-main/src/test/java/io/prestosql/execution/TestNodeScheduler.java b/presto-main/src/test/java/io/prestosql/execution/TestNodeScheduler.java index 9cc35ce8cf97d..6fbb401611976 100644 --- a/presto-main/src/test/java/io/prestosql/execution/TestNodeScheduler.java +++ b/presto-main/src/test/java/io/prestosql/execution/TestNodeScheduler.java @@ -26,7 +26,6 @@ import io.prestosql.connector.CatalogName; import io.prestosql.execution.scheduler.LegacyNetworkTopology; import io.prestosql.execution.scheduler.NetworkLocation; -import io.prestosql.execution.scheduler.NetworkLocationCache; import io.prestosql.execution.scheduler.NetworkTopology; import io.prestosql.execution.scheduler.NodeScheduler; import io.prestosql.execution.scheduler.NodeSchedulerConfig; @@ -59,7 +58,6 @@ import java.util.concurrent.ThreadLocalRandom; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.prestosql.execution.scheduler.NetworkLocation.ROOT_LOCATION; import static io.prestosql.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static io.prestosql.testing.assertions.PrestoExceptionAssert.assertPrestoExceptionThrownBy; import static java.lang.String.format; @@ -171,21 +169,7 @@ public void testTopologyAwareScheduling() .setMaxPendingSplitsPerTask(20); TestNetworkTopology topology = new TestNetworkTopology(); - NetworkLocationCache locationCache = new NetworkLocationCache(topology) - { - @Override - public NetworkLocation get(HostAddress host) - { - // Bypass the cache for workers, since we only look them up once and they would all be unresolved otherwise - if (host.getHostText().startsWith("host")) { - return topology.locate(host); - } - else { - return super.get(host); - } - } - }; - NodeScheduler nodeScheduler = new NodeScheduler(locationCache, topology, nodeManager, nodeSchedulerConfig, nodeTaskMap); + NodeScheduler nodeScheduler = new NodeScheduler(topology, nodeManager, nodeSchedulerConfig, nodeTaskMap); NodeSelector nodeSelector = nodeScheduler.createNodeSelector(CONNECTOR_ID); // Fill up the nodes with non-local data @@ -238,17 +222,6 @@ public NetworkLocation get(HostAddress host) Set unassigned = Sets.difference(rackLocalSplits.build(), new HashSet<>(assignments.values())); // Compute the assignments a second time to account for the fact that some splits may not have been assigned due to asynchronous // loading of the NetworkLocationCache - boolean cacheRefreshed = false; - while (!cacheRefreshed) { - cacheRefreshed = true; - if (locationCache.get(dataHost1).equals(ROOT_LOCATION)) { - cacheRefreshed = false; - } - if (locationCache.get(dataHost2).equals(ROOT_LOCATION)) { - cacheRefreshed = false; - } - MILLISECONDS.sleep(10); - } assignments = nodeSelector.computeAssignments(unassigned, ImmutableList.copyOf(taskMap.values())).getAssignments(); for (InternalNode node : assignments.keySet()) { RemoteTask remoteTask = taskMap.get(node);