Skip to content

Commit

Permalink
YARN-10316. FS-CS converter: convert maxAppsDefault, maxRunningApps s…
Browse files Browse the repository at this point in the history
…ettings. Contributed by Peter Bacsko
  • Loading branch information
szilard-nemeth committed Jun 23, 2020
1 parent fa14e4b commit 03f855e
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public class FSConfigToCSConfigConverter {
private boolean preemptionEnabled = false;
private int queueMaxAppsDefault;
private float queueMaxAMShareDefault;
private Map<String, Integer> userMaxApps;
private int userMaxAppsDefault;

private boolean autoCreateChildQueues = false;
private boolean sizeBasedWeight = false;
private boolean userAsDefaultQueue = false;
Expand All @@ -99,6 +102,8 @@ public class FSConfigToCSConfigConverter {
private boolean consoleMode = false;
private boolean convertPlacementRules = false;



public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
ruleHandler, ConversionOptions conversionOptions) {
this.ruleHandler = ruleHandler;
Expand Down Expand Up @@ -242,14 +247,13 @@ void convert(Configuration inputYarnSiteConfig) throws Exception {

AllocationConfiguration allocConf = fs.getAllocationConfiguration();
queueMaxAppsDefault = allocConf.getQueueMaxAppsDefault();
userMaxAppsDefault = allocConf.getUserMaxAppsDefault();
userMaxApps = allocConf.getUserMaxApps();
queueMaxAMShareDefault = allocConf.getQueueMaxAMShareDefault();

convertedYarnSiteConfig = new Configuration(false);
capacitySchedulerConfig = new Configuration(false);

checkUserMaxApps(allocConf);
checkUserMaxAppsDefault(allocConf);

convertYarnSiteXml(inputYarnSiteConfig, havePlacementPolicies);
convertCapacitySchedulerXml(fs);

Expand Down Expand Up @@ -287,7 +291,9 @@ private void convertYarnSiteXml(Configuration inputYarnSiteConfig,

private void convertCapacitySchedulerXml(FairScheduler fs) {
FSParentQueue rootQueue = fs.getQueueManager().getRootQueue();
emitDefaultMaxApplications();
emitDefaultQueueMaxParallelApplications();
emitDefaultUserMaxParallelApplications();
emitUserMaxParallelApplications();
emitDefaultMaxAMShare();

FSQueueConverter queueConverter = FSQueueConverterBuilder.create()
Expand Down Expand Up @@ -322,14 +328,30 @@ private void convertCapacitySchedulerXml(FairScheduler fs) {
}
}

private void emitDefaultMaxApplications() {
private void emitDefaultQueueMaxParallelApplications() {
if (queueMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS,
PREFIX + "max-parallel-apps",
String.valueOf(queueMaxAppsDefault));
}
}

private void emitDefaultUserMaxParallelApplications() {
if (userMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
PREFIX + "user.max-parallel-apps",
String.valueOf(userMaxAppsDefault));
}
}

private void emitUserMaxParallelApplications() {
userMaxApps
.forEach((user, apps) -> {
capacitySchedulerConfig.setInt(
PREFIX + "user." + user + ".max-parallel-apps", apps);
});
}

private void emitDefaultMaxAMShare() {
if (queueMaxAMShareDefault == QUEUE_MAX_AM_SHARE_DISABLED) {
capacitySchedulerConfig.setFloat(
Expand Down Expand Up @@ -374,19 +396,6 @@ private void checkReservationSystem(Configuration conf) {
}
}

private void checkUserMaxApps(AllocationConfiguration allocConf) {
if (allocConf.getUserMaxApps() != null
&& allocConf.getUserMaxApps().size() > 0) {
ruleHandler.handleUserMaxApps();
}
}

private void checkUserMaxAppsDefault(AllocationConfiguration allocConf) {
if (allocConf.getUserMaxAppsDefault() > 0) {
ruleHandler.handleUserMaxAppsDefault();
}
}

private boolean isDrfUsed(FairScheduler fs) {
FSQueue rootQueue = fs.getQueueManager().getRootQueue();
AllocationConfiguration allocConf = fs.getAllocationConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,6 @@ public void handleChildQueueCount(String queue, int count) {
}
}

public void handleUserMaxApps() {
handle(USER_MAX_RUNNING_APPS, "<maxRunningApps>", null);
}

public void handleUserMaxAppsDefault() {
handle(USER_MAX_APPS_DEFAULT, "<userMaxAppsDefault>", null);
}

public void handleDynamicMaxAssign() {
handle(DYNAMIC_MAX_ASSIGN,
FairSchedulerConfiguration.DYNAMIC_MAX_ASSIGN, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
*/
public class FSQueueConverter {
public static final float QUEUE_MAX_AM_SHARE_DISABLED = -1.0f;
private static final int MAX_RUNNING_APPS_UNSET = Integer.MIN_VALUE;
private static final int MAX_RUNNING_APPS_UNSET = Integer.MAX_VALUE;
private static final String FAIR_POLICY = "fair";
private static final String FIFO_POLICY = "fifo";

Expand Down Expand Up @@ -79,7 +79,7 @@ public void convertQueueHierarchy(FSQueue queue) {

emitChildQueues(queueName, children);
emitMaxAMShare(queueName, queue);
emitMaxRunningApps(queueName, queue);
emitMaxParallelApps(queueName, queue);
emitMaxAllocations(queueName, queue);
emitPreemptionDisabled(queueName, queue);

Expand Down Expand Up @@ -138,14 +138,14 @@ private void emitMaxAMShare(String queueName, FSQueue queue) {

/**
* &lt;maxRunningApps&gt;
* ==> yarn.scheduler.capacity.&lt;queue-name&gt;.maximum-applications.
* ==> yarn.scheduler.capacity.&lt;queue-name&gt;.max-parallel-apps.
* @param queueName
* @param queue
*/
private void emitMaxRunningApps(String queueName, FSQueue queue) {
private void emitMaxParallelApps(String queueName, FSQueue queue) {
if (queue.getMaxRunningApps() != MAX_RUNNING_APPS_UNSET
&& queue.getMaxRunningApps() != queueMaxAppsDefault) {
capacitySchedulerConfig.set(PREFIX + queueName + ".maximum-applications",
capacitySchedulerConfig.set(PREFIX + queueName + ".max-parallel-apps",
String.valueOf(queue.getMaxRunningApps()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,7 @@ private void createConverter() {
.withOutputDirectory(FSConfigConverterTestCommons.OUTPUT_DIR);
}

@Test
public void testDefaultMaxApplications() throws Exception {
converter.convert(config);

Configuration conf = converter.getCapacitySchedulerConfig();
int maxApps =
conf.getInt(
CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, -1);

assertEquals("Default max apps", 15, maxApps);
}

@Test
public void testDefaultMaxAMShare() throws Exception {
Expand Down Expand Up @@ -252,57 +242,73 @@ public void testConvertACLs() throws Exception {
}

@Test
public void testDefaultMaxRunningApps() throws Exception {
public void testDefaultQueueMaxParallelApps() throws Exception {
converter.convert(config);

Configuration conf = converter.getCapacitySchedulerConfig();

// default setting
assertEquals("Default max apps", 15,
conf.getInt(PREFIX + "maximum-applications", -1));
assertEquals("Default max parallel apps", 15,
conf.getInt(PREFIX + "max-parallel-apps", -1));
}

@Test
public void testQueueMaxChildCapacityNotSupported() throws Exception {
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("test");
public void testSpecificQueueMaxParallelApps() throws Exception {
converter.convert(config);

Mockito.doThrow(new UnsupportedPropertyException("test"))
.when(ruleHandler).handleMaxChildCapacity();
Configuration conf = converter.getCapacitySchedulerConfig();

converter.convert(config);
assertEquals("root.admins.alice max parallel apps", 2,
conf.getInt(PREFIX + "root.admins.alice.max-parallel-apps", -1));
}

@Test
public void testReservationSystemNotSupported() throws Exception {
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("maxCapacity");
public void testDefaultUserMaxParallelApps() throws Exception {
converter.convert(config);

Mockito.doThrow(new UnsupportedPropertyException("maxCapacity"))
.when(ruleHandler).handleMaxChildCapacity();
config.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
Configuration conf = converter.getCapacitySchedulerConfig();
int userMaxParallelApps =
conf.getInt(
PREFIX + "user.max-parallel-apps", -1);

assertEquals("Default user max parallel apps", 10,
userMaxParallelApps);
}

@Test
public void testSpecificUserMaxParallelApps() throws Exception {
converter.convert(config);

Configuration conf = converter.getCapacitySchedulerConfig();

assertEquals("Max parallel apps for alice", 30,
conf.getInt(PREFIX + "user.alice.max-parallel-apps", -1));
assertNull("Max parallel apps should be undefined for user bob",
conf.get(PREFIX + "user.bob.max-parallel-apps"));
assertNull("Max parallel apps should be undefined for user joe",
conf.get(PREFIX + "user.joe.max-parallel-apps"));
assertNull("Max parallel apps should be undefined for user john",
conf.get(PREFIX + "user.john.max-parallel-apps"));
}

@Test
public void testUserMaxAppsNotSupported() throws Exception {
public void testQueueMaxChildCapacityNotSupported() throws Exception {
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("userMaxApps");
expectedException.expectMessage("test");

Mockito.doThrow(new UnsupportedPropertyException("userMaxApps"))
.when(ruleHandler).handleUserMaxApps();
Mockito.doThrow(new UnsupportedPropertyException("test"))
.when(ruleHandler).handleMaxChildCapacity();

converter.convert(config);
}

@Test
public void testUserMaxAppsDefaultNotSupported() throws Exception {
public void testReservationSystemNotSupported() throws Exception {
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("userMaxAppsDefault");
expectedException.expectMessage("maxCapacity");

Mockito.doThrow(new UnsupportedPropertyException("userMaxAppsDefault"))
.when(ruleHandler).handleUserMaxAppsDefault();
Mockito.doThrow(new UnsupportedPropertyException("maxCapacity"))
.when(ruleHandler).handleMaxChildCapacity();
config.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);

converter.convert(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ public void testInitPropertyActionsToWarning() throws IOException {
ruleHandler.handleQueueAutoCreate("test");
ruleHandler.handleReservationSystem();
ruleHandler.handleSpecifiedNotFirstRule();
ruleHandler.handleUserMaxApps();
ruleHandler.handleUserMaxAppsDefault();
}

@Test
Expand Down Expand Up @@ -106,8 +104,6 @@ public void testAllRulesWarning() throws IOException {
ruleHandler.handleQueueAutoCreate("test");
ruleHandler.handleReservationSystem();
ruleHandler.handleSpecifiedNotFirstRule();
ruleHandler.handleUserMaxApps();
ruleHandler.handleUserMaxAppsDefault();
}

@Test
Expand Down Expand Up @@ -140,8 +136,6 @@ public void testAllRulesAbort() throws IOException {
expectAbort(() -> ruleHandler.handleQueueAutoCreate("test"));
expectAbort(() -> ruleHandler.handleReservationSystem());
expectAbort(() -> ruleHandler.handleSpecifiedNotFirstRule());
expectAbort(() -> ruleHandler.handleUserMaxApps());
expectAbort(() -> ruleHandler.handleUserMaxAppsDefault());
expectAbort(() -> ruleHandler.handleFairAsDrf("test"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,18 @@ public void testQueueMaxAMShare() {
}

@Test
public void testQueueMaxRunningApps() {
public void testQueueMaxParallelApps() {
converter = builder.build();

converter.convertQueueHierarchy(rootQueue);

assertEquals("root.admins.alice max apps", 2,
csConfig.getInt(PREFIX + "root.admins.alice.maximum-applications",
csConfig.getInt(PREFIX + "root.admins.alice.max-parallel-apps",
-1));

Set<String> remaining = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.admins.alice"));
assertNoValueForQueues(remaining, ".maximum-applications", csConfig);
assertNoValueForQueues(remaining, ".max-parallel-apps", csConfig);
}

@Test
Expand Down

0 comments on commit 03f855e

Please sign in to comment.