Skip to content

Commit

Permalink
Update node scheduler config properties to reflect actual usage
Browse files Browse the repository at this point in the history
Rename node-scheduler.network-topology to node-scheduler.policy and
change values from legacy and flat to uniform and topology.
  • Loading branch information
dain committed Sep 9, 2019
1 parent 83b4e4c commit 482ea5e
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 37 deletions.
13 changes: 7 additions & 6 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -470,17 +470,18 @@ Node Scheduler Properties
across all worker nodes. Setting it too high may increase query
latency and increase CPU usage on the coordinator.

``node-scheduler.network-topology``
``node-scheduler.policy``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Allowed values:** ``legacy``, ``flat``
* **Default value:** ``legacy``
* **Allowed values:** ``uniform``, ``topology``
* **Default value:** ``uniform``

Sets the network topology to use when scheduling splits. ``legacy`` will ignore
the topology when scheduling splits. ``flat`` will try to schedule splits on the host
Sets the node scheduler policy to use when scheduling splits. ``uniform`` will attempt
to schedule splits on the host where the data is located, while maintaining a uniform
distribution across all hosts. ``topology`` will try to schedule splits on the host
where the data is located by reserving 50% of the work queue for local splits.
It is recommended to use ``flat`` for clusters where distributed storage runs on
It is recommended to use ``uniform`` for clusters where distributed storage runs on
the same nodes as Presto workers.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,52 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import static java.util.Locale.ENGLISH;

@DefunctConfig({"node-scheduler.location-aware-scheduling-enabled", "node-scheduler.multiple-tasks-per-node-enabled"})
public class NodeSchedulerConfig
{
public static final class NetworkTopologyType
public enum NodeSchedulerPolicy
{
public static final String LEGACY = "legacy";
public static final String FLAT = "flat";
public static final String BENCHMARK = "benchmark";
UNIFORM, TOPOLOGY
}

private int minCandidates = 10;
private boolean includeCoordinator = true;
private int maxSplitsPerNode = 100;
private int maxPendingSplitsPerTask = 10;
private String networkTopology = NetworkTopologyType.LEGACY;
private NodeSchedulerPolicy nodeSchedulerPolicy = NodeSchedulerPolicy.UNIFORM;
private boolean optimizedLocalScheduling = true;

@NotNull
public String getNetworkTopology()
public NodeSchedulerPolicy getNodeSchedulerPolicy()
{
return networkTopology;
return nodeSchedulerPolicy;
}

@Config("node-scheduler.network-topology")
public NodeSchedulerConfig setNetworkTopology(String networkTopology)
@LegacyConfig("node-scheduler.network-topology")
@Config("node-scheduler.policy")
public NodeSchedulerConfig setNodeSchedulerPolicy(String nodeSchedulerPolicy)
{
this.networkTopology = networkTopology;
this.nodeSchedulerPolicy = toNodeSchedulerPolicy(nodeSchedulerPolicy);
return this;
}

private static NodeSchedulerPolicy toNodeSchedulerPolicy(String nodeSchedulerPolicy)
{
// "legacy" and "flat" are here for backward compatibility
switch (nodeSchedulerPolicy.toLowerCase(ENGLISH)) {
case "legacy":
case "uniform":
return NodeSchedulerPolicy.UNIFORM;
case "flat":
case "topology":
return NodeSchedulerPolicy.TOPOLOGY;
default:
throw new IllegalArgumentException("Unknown node scheduler policy: " + nodeSchedulerPolicy);
}
}

@Min(1)
public int getMinCandidates()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@
import static io.airlift.json.JsonBinder.jsonBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.prestosql.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType.FLAT;
import static io.prestosql.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType.LEGACY;
import static io.prestosql.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.TOPOLOGY;
import static io.prestosql.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.UNIFORM;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
Expand Down Expand Up @@ -248,13 +248,14 @@ protected void setup(Binder binder)
// TODO: move to CoordinatorModule when NodeScheduler is moved
install(installModuleIf(
NodeSchedulerConfig.class,
config -> LEGACY.equalsIgnoreCase(config.getNetworkTopology()),
config -> UNIFORM == config.getNodeSchedulerPolicy(),
moduleBinder -> moduleBinder.bind(NodeSelectorFactory.class).to(SimpleNodeSelectorFactory.class).in(Scopes.SINGLETON)));
install(installModuleIf(
NodeSchedulerConfig.class,
config -> FLAT.equalsIgnoreCase(config.getNetworkTopology()),
config -> TOPOLOGY == config.getNodeSchedulerPolicy(),
moduleBinder -> {
moduleBinder.bind(NetworkTopology.class).to(FlatNetworkTopology.class).in(Scopes.SINGLETON);
moduleBinder.bind(TopologyAwareNodeSelectorFactory.class).in(Scopes.SINGLETON);
moduleBinder.bind(NodeSelectorFactory.class).to(TopologyAwareNodeSelectorFactory.class).in(Scopes.SINGLETON);
binder.bind(NodeSchedulerExporter.class).in(Scopes.SINGLETON);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@
import java.util.concurrent.TimeUnit;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.prestosql.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType.BENCHMARK;
import static io.prestosql.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType.FLAT;
import static io.prestosql.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType.LEGACY;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;

Expand Down Expand Up @@ -129,10 +126,10 @@ public Object benchmark(BenchmarkData data)
@State(Scope.Thread)
public static class BenchmarkData
{
@Param({LEGACY,
BENCHMARK,
FLAT})
private String topologyName = LEGACY;
@Param({"uniform",
"benchmark",
"topology"})
private String policy = "uniform";

private FinalizerService finalizerService = new FinalizerService();
private NodeSelector nodeSelector;
Expand Down Expand Up @@ -188,19 +185,19 @@ private NodeSchedulerConfig getNodeSchedulerConfig()
return new NodeSchedulerConfig()
.setMaxSplitsPerNode(MAX_SPLITS_PER_NODE)
.setIncludeCoordinator(false)
.setNetworkTopology(topologyName)
.setNodeSchedulerPolicy(policy)
.setMaxPendingSplitsPerTask(MAX_PENDING_SPLITS_PER_TASK_PER_NODE);
}

private NodeSelectorFactory getNodeSelectorFactory(InMemoryNodeManager nodeManager, NodeTaskMap nodeTaskMap)
{
NodeSchedulerConfig nodeSchedulerConfig = getNodeSchedulerConfig();
switch (topologyName) {
case LEGACY:
switch (policy) {
case "uniform":
return new SimpleNodeSelectorFactory(nodeManager, nodeSchedulerConfig, nodeTaskMap);
case FLAT:
case "topology":
return new TopologyAwareNodeSelectorFactory(new FlatNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap);
case BENCHMARK:
case "benchmark":
return new TopologyAwareNodeSelectorFactory(new BenchmarkNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap);
default:
throw new IllegalStateException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public void testTopologyAwareScheduling()
NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig()
.setMaxSplitsPerNode(25)
.setIncludeCoordinator(false)
.setNetworkTopology("test")
.setMaxPendingSplitsPerTask(20);

TestNetworkTopology topology = new TestNetworkTopology();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.prestosql.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType.LEGACY;
import static io.prestosql.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.UNIFORM;

public class TestNodeSchedulerConfig
{
@Test
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(NodeSchedulerConfig.class)
.setNetworkTopology(LEGACY)
.setNodeSchedulerPolicy(UNIFORM.name())
.setMinCandidates(10)
.setMaxSplitsPerNode(100)
.setMaxPendingSplitsPerTask(10)
Expand All @@ -42,7 +42,7 @@ public void testDefaults()
public void testExplicitPropertyMappings()
{
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("node-scheduler.network-topology", "flat")
.put("node-scheduler.policy", "topology")
.put("node-scheduler.min-candidates", "11")
.put("node-scheduler.include-coordinator", "false")
.put("node-scheduler.max-pending-splits-per-task", "11")
Expand All @@ -51,7 +51,7 @@ public void testExplicitPropertyMappings()
.build();

NodeSchedulerConfig expected = new NodeSchedulerConfig()
.setNetworkTopology("flat")
.setNodeSchedulerPolicy("topology")
.setIncludeCoordinator(false)
.setMaxSplitsPerNode(101)
.setMaxPendingSplitsPerTask(11)
Expand Down

0 comments on commit 482ea5e

Please sign in to comment.