diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 9210af5e96..f21b3d7e26 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -6,6 +6,7 @@
1. [ISSUE #341](https://github.com/dangdangdotcom/elastic-job/issues/341) elastic-job-cloud界面script作业配置缺少执行脚本
1. [ISSUE #343](https://github.com/dangdangdotcom/elastic-job/issues/343) elastic-job-cloud中Script类型作业执行脚本不正确
+1. [ISSUE #322](https://github.com/dangdangdotcom/elastic-job/issues/322) elastic-job-cloud-scheduler调度任务评估资源时考虑对executor的资源使用情况
## 2.1.3
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/AppConstraintEvaluator.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/AppConstraintEvaluator.java
new file mode 100644
index 0000000000..55f0ceb6b9
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/AppConstraintEvaluator.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import com.dangdang.ddframe.job.cloud.scheduler.config.app.CloudAppConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfiguration;
+import com.dangdang.ddframe.job.context.TaskContext;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskTrackerState;
+import com.netflix.fenzo.VirtualMachineCurrentState;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.codehaus.jettison.json.JSONException;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * App目标slave适配度限制器.
+ * 选择slave时需要考虑其上是否运行有App的executor,如果没有运行executor需要将其资源消耗考虑进适配计算算法中.
+ *
+ * @author gaohongtao
+ */
+@Slf4j
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public class AppConstraintEvaluator implements ConstraintEvaluator {
+
+ private static AppConstraintEvaluator instance;
+
+ private final Set runningApps = new HashSet<>();
+
+ private final FacadeService facadeService;
+
+ public static void init(final FacadeService facadeService) {
+ instance = new AppConstraintEvaluator(facadeService);
+ }
+
+ static AppConstraintEvaluator getInstance() {
+ Preconditions.checkNotNull(instance);
+ return instance;
+ }
+
+ void loadAppRunningState() {
+ try {
+ for (MesosStateService.ExecutorStateInfo each : facadeService.loadExecutorInfo()) {
+ runningApps.add(each.getId());
+ }
+ } catch (final JSONException | UniformInterfaceException | ClientHandlerException e) {
+ clearAppRunningState();
+ }
+ }
+
+ void clearAppRunningState() {
+ runningApps.clear();
+ }
+
+ @Override
+ public String getName() {
+ return "App-Fitness-Calculator";
+ }
+
+ @Override
+ public Result evaluate(final TaskRequest taskRequest, final VirtualMachineCurrentState targetVM, final TaskTrackerState taskTrackerState) {
+ double assigningCpus = 0.0d;
+ double assigningMemoryMB = 0.0d;
+ final String slaveId = targetVM.getAllCurrentOffers().iterator().next().getSlaveId().getValue();
+ try {
+ if (isAppRunningOnSlave(taskRequest.getId(), slaveId)) {
+ return new Result(true, "");
+ }
+ Set calculatedApps = new HashSet<>();
+ List taskRequests = new ArrayList<>(targetVM.getTasksCurrentlyAssigned().size() + 1);
+ taskRequests.add(taskRequest);
+ for (TaskAssignmentResult each : targetVM.getTasksCurrentlyAssigned()) {
+ taskRequests.add(each.getRequest());
+ }
+ for (TaskRequest each : taskRequests) {
+ assigningCpus += each.getCPUs();
+ assigningMemoryMB += each.getMemory();
+ if (isAppRunningOnSlave(each.getId(), slaveId)) {
+ continue;
+ }
+ CloudAppConfiguration assigningAppConfig = getAppConfiguration(each.getId());
+ if (!calculatedApps.add(assigningAppConfig.getAppName())) {
+ continue;
+ }
+ assigningCpus += assigningAppConfig.getCpuCount();
+ assigningMemoryMB += assigningAppConfig.getMemoryMB();
+ }
+ } catch (final LackConfigException e) {
+ log.warn("Lack config, disable {}", getName(), e);
+ return new Result(true, "");
+ }
+ if (assigningCpus > targetVM.getCurrAvailableResources().cpuCores()) {
+ log.debug("Failure {} {} cpus:{}/{}", taskRequest.getId(), slaveId, assigningCpus, targetVM.getCurrAvailableResources().cpuCores());
+ return new Result(false, String.format("cpu:%s/%s", assigningCpus, targetVM.getCurrAvailableResources().cpuCores()));
+ }
+ if (assigningMemoryMB > targetVM.getCurrAvailableResources().memoryMB()) {
+ log.debug("Failure {} {} mem:{}/{}", taskRequest.getId(), slaveId, assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB());
+ return new Result(false, String.format("mem:%s/%s", assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB()));
+ }
+ log.debug("Success {} {} cpus:{}/{} mem:{}/{}", taskRequest.getId(), slaveId, assigningCpus, targetVM.getCurrAvailableResources()
+ .cpuCores(), assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB());
+ return new Result(true, String.format("cpus:%s/%s mem:%s/%s", assigningCpus, targetVM.getCurrAvailableResources()
+ .cpuCores(), assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB()));
+ }
+
+ private boolean isAppRunningOnSlave(final String taskId, final String slaveId) throws LackConfigException {
+ TaskContext taskContext = TaskContext.from(taskId);
+ taskContext.setSlaveId(slaveId);
+ return runningApps.contains(taskContext.getExecutorId(getJobConfiguration(taskContext).getAppName()));
+ }
+
+ private CloudAppConfiguration getAppConfiguration(final String taskId) throws LackConfigException {
+ CloudJobConfiguration jobConfig = getJobConfiguration(TaskContext.from(taskId));
+ Optional appConfigOptional = facadeService.loadAppConfig(jobConfig.getAppName());
+ if (!appConfigOptional.isPresent()) {
+ throw new LackConfigException("APP", jobConfig.getAppName());
+ }
+ return appConfigOptional.get();
+ }
+
+ private CloudJobConfiguration getJobConfiguration(final TaskContext taskContext) throws LackConfigException {
+ Optional jobConfigOptional = facadeService.load(taskContext.getMetaInfo().getJobName());
+ if (!jobConfigOptional.isPresent()) {
+ throw new LackConfigException("JOB", taskContext.getMetaInfo().getJobName());
+ }
+ return jobConfigOptional.get();
+ }
+
+ private class LackConfigException extends Exception {
+ LackConfigException(final String scope, final String configName) {
+ super(String.format("Lack %s's config %s", scope, configName));
+ }
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeService.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeService.java
index cc1e8de94a..527c58e1a9 100644
--- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeService.java
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeService.java
@@ -37,6 +37,7 @@
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
+import org.codehaus.jettison.json.JSONException;
import java.util.ArrayList;
import java.util.Collection;
@@ -68,6 +69,8 @@ public final class FacadeService {
private final DisableJobService disableJobService;
+ private final MesosStateService mesosStateService;
+
public FacadeService(final CoordinatorRegistryCenter regCenter) {
appConfigService = new CloudAppConfigurationService(regCenter);
jobConfigService = new CloudJobConfigurationService(regCenter);
@@ -76,6 +79,7 @@ public FacadeService(final CoordinatorRegistryCenter regCenter) {
failoverService = new FailoverService(regCenter);
disableAppService = new DisableAppService(regCenter);
disableJobService = new DisableJobService(regCenter);
+ mesosStateService = new MesosStateService(regCenter);
}
/**
@@ -311,6 +315,15 @@ public void disableJob(final String jobName) {
disableJobService.add(jobName);
}
+ /**
+ * 获取所有正在运行的Executor的信息.
+ *
+ * @return Executor信息集合
+ */
+ public Collection loadExecutorInfo() throws JSONException {
+ return mesosStateService.executors();
+ }
+
/**
* 停止门面服务.
*/
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/JobTaskRequest.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/JobTaskRequest.java
index 1490f782a0..d552061273 100644
--- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/JobTaskRequest.java
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/JobTaskRequest.java
@@ -82,7 +82,7 @@ public Map getScalarRequests() {
@Override
public List extends ConstraintEvaluator> getHardConstraints() {
- return null;
+ return Collections.singletonList(AppConstraintEvaluator.getInstance());
}
@Override
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/TaskLaunchScheduledService.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/TaskLaunchScheduledService.java
index a1b531c753..ea8656691a 100644
--- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/TaskLaunchScheduledService.java
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/TaskLaunchScheduledService.java
@@ -38,6 +38,7 @@
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.protobuf.ByteString;
import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.TaskScheduler;
import com.netflix.fenzo.VMAssignmentResult;
import com.netflix.fenzo.VirtualMachineLease;
@@ -92,6 +93,7 @@ protected Scheduler scheduler() {
@Override
protected void startUp() throws Exception {
log.info("Elastic Job: Start {}", serviceName());
+ AppConstraintEvaluator.init(facadeService);
}
@Override
@@ -103,8 +105,11 @@ protected void shutDown() throws Exception {
protected void runOneIteration() throws Exception {
try {
LaunchingTasks launchingTasks = new LaunchingTasks(facadeService.getEligibleJobContext());
- List virtualMachineLeases = LeasesQueue.getInstance().drainTo();
- Collection vmAssignmentResults = taskScheduler.scheduleOnce(launchingTasks.getPendingTasks(), virtualMachineLeases).getResultMap().values();
+ List taskRequests = launchingTasks.getPendingTasks();
+ if (!taskRequests.isEmpty()) {
+ AppConstraintEvaluator.getInstance().loadAppRunningState();
+ }
+ Collection vmAssignmentResults = taskScheduler.scheduleOnce(taskRequests, LeasesQueue.getInstance().drainTo()).getResultMap().values();
List taskContextsList = new LinkedList<>();
Map, List> offerIdTaskInfoMap = new HashMap<>();
for (VMAssignmentResult each: vmAssignmentResults) {
@@ -128,6 +133,8 @@ protected void runOneIteration() throws Exception {
} catch (Throwable throwable) {
//CHECKSTYLE:ON
log.error("Launch task error", throwable);
+ } finally {
+ AppConstraintEvaluator.getInstance().clearAppRunningState();
}
}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/fixture/CloudJobConfigurationBuilder.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/fixture/CloudJobConfigurationBuilder.java
index 0340d1626a..90301a60d7 100644
--- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/fixture/CloudJobConfigurationBuilder.java
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/fixture/CloudJobConfigurationBuilder.java
@@ -54,6 +54,12 @@ public static CloudJobConfiguration createCloudJobConfiguration(final String job
1.0d, 128.0d, CloudJobExecutionType.TRANSIENT);
}
+ public static CloudJobConfiguration createCloudJobConfiguration(final String jobName, final String appName) {
+ return new CloudJobConfiguration(appName,
+ new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(jobName, "0/30 * * * * ?", 10).failover(true).misfire(true).build(), TestSimpleJob.class.getCanonicalName()),
+ 1.0d, 128.0d, CloudJobExecutionType.TRANSIENT);
+ }
+
public static CloudJobConfiguration createOtherCloudJobConfiguration(final String jobName) {
return new CloudJobConfiguration("test_app",
new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(jobName, "0/30 * * * * ?", 3).failover(false).misfire(true).build(), TestSimpleJob.class.getCanonicalName()),
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/AllMesosTests.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/AllMesosTests.java
index 667f72658b..68992605cd 100644
--- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/AllMesosTests.java
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/AllMesosTests.java
@@ -34,7 +34,8 @@
LaunchingTasksTest.class,
FrameworkIDServiceTest.class,
MesosStateServiceTest.class,
- ReconcileServiceTest.class
+ ReconcileServiceTest.class,
+ AppConstraintEvaluatorTest.class
})
public final class AllMesosTests {
}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/AppConstraintEvaluatorTest.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/AppConstraintEvaluatorTest.java
new file mode 100644
index 0000000000..914db1d581
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/AppConstraintEvaluatorTest.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import com.dangdang.ddframe.job.cloud.scheduler.config.app.CloudAppConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.fixture.CloudAppConfigurationBuilder;
+import com.dangdang.ddframe.job.cloud.scheduler.fixture.CloudJobConfigurationBuilder;
+import com.dangdang.ddframe.job.context.ExecutionType;
+import com.dangdang.ddframe.job.context.TaskContext;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.SchedulingResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VMAssignmentResult;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import com.netflix.fenzo.plugins.VMLeaseObject;
+import org.apache.mesos.Protos;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AppConstraintEvaluatorTest {
+
+ private static final double SUFFICIENT_CPU = 1.0 * 13;
+
+ private static final double INSUFFICIENT_CPU = 1.0 * 11;
+
+ private static final double SUFFICIENT_MEM = 128.0 * 13;
+
+ private static final double INSUFFICIENT_MEM = 128.0 * 11;
+
+ private static FacadeService facadeService;
+
+ private TaskScheduler taskScheduler;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ facadeService = mock(FacadeService.class);
+ AppConstraintEvaluator.init(facadeService);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ taskScheduler = new TaskScheduler.Builder()
+ .withLeaseOfferExpirySecs(1000000000L).withLeaseRejectAction(new Action1() {
+ @Override
+ public void call(final VirtualMachineLease virtualMachineLease) {
+
+ }
+ }).build();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ AppConstraintEvaluator.getInstance().clearAppRunningState();
+ }
+
+ @Test
+ public void assertFirstLaunch() throws Exception {
+ SchedulingResult result = taskScheduler.scheduleOnce(getTasks(), Arrays.asList(getLease(0, SUFFICIENT_CPU, SUFFICIENT_MEM), getLease(1, SUFFICIENT_CPU, SUFFICIENT_MEM)));
+ assertThat(result.getResultMap().size(), is(2));
+ assertThat(result.getFailures().size(), is(0));
+ assertThat(getAssignedTaskNumber(result), is(20));
+ }
+
+ @Test
+ public void assertFirstLaunchLackCpu() throws Exception {
+ SchedulingResult result = taskScheduler.scheduleOnce(getTasks(), Arrays.asList(getLease(0, INSUFFICIENT_CPU, SUFFICIENT_MEM), getLease(1, INSUFFICIENT_CPU, SUFFICIENT_MEM)));
+ assertThat(result.getResultMap().size(), is(2));
+ assertThat(getAssignedTaskNumber(result), is(18));
+ }
+
+ @Test
+ public void assertFirstLaunchLackMem() throws Exception {
+ SchedulingResult result = taskScheduler.scheduleOnce(getTasks(), Arrays.asList(getLease(0, SUFFICIENT_CPU, INSUFFICIENT_MEM), getLease(1, SUFFICIENT_CPU, INSUFFICIENT_MEM)));
+ assertThat(result.getResultMap().size(), is(2));
+ assertThat(getAssignedTaskNumber(result), is(18));
+ }
+
+ @Test
+ public void assertExistExecutorOnS0() throws Exception {
+ when(facadeService.loadExecutorInfo()).thenReturn(ImmutableList.of(new MesosStateService.ExecutorStateInfo("foo-app@-@S0", "S0")));
+ AppConstraintEvaluator.getInstance().loadAppRunningState();
+ SchedulingResult result = taskScheduler.scheduleOnce(getTasks(), Arrays.asList(getLease(0, INSUFFICIENT_CPU, INSUFFICIENT_MEM), getLease(1, INSUFFICIENT_CPU, INSUFFICIENT_MEM)));
+ assertThat(result.getResultMap().size(), is(2));
+ assertTrue(getAssignedTaskNumber(result) > 18);
+ }
+
+ @Test
+ public void assertGetExecutorError() throws Exception {
+ when(facadeService.loadExecutorInfo()).thenThrow(JSONException.class);
+ AppConstraintEvaluator.getInstance().loadAppRunningState();
+ SchedulingResult result = taskScheduler.scheduleOnce(getTasks(), Arrays.asList(getLease(0, INSUFFICIENT_CPU, INSUFFICIENT_MEM), getLease(1, INSUFFICIENT_CPU, INSUFFICIENT_MEM)));
+ assertThat(result.getResultMap().size(), is(2));
+ assertThat(getAssignedTaskNumber(result), is(18));
+ }
+
+ @Test
+ public void assertLackJobConfig() throws Exception {
+ when(facadeService.load("test")).thenReturn(Optional.absent());
+ SchedulingResult result = taskScheduler.scheduleOnce(Collections.singletonList(getTask("test")), Collections.singletonList(getLease(0, 1.5, 192)));
+ assertThat(result.getResultMap().size(), is(1));
+ assertThat(getAssignedTaskNumber(result), is(1));
+ }
+
+ @Test
+ public void assertLackAppConfig() throws Exception {
+ when(facadeService.load("test")).thenReturn(Optional.of(CloudJobConfigurationBuilder.createCloudJobConfiguration("test")));
+ when(facadeService.loadAppConfig("test_app")).thenReturn(Optional.absent());
+ SchedulingResult result = taskScheduler.scheduleOnce(Collections.singletonList(getTask("test")), Collections.singletonList(getLease(0, 1.5, 192)));
+ assertThat(result.getResultMap().size(), is(1));
+ assertThat(getAssignedTaskNumber(result), is(1));
+ }
+
+ private VirtualMachineLease getLease(final int index, final double cpus, final double mem) {
+ return new VMLeaseObject(Protos.Offer.newBuilder()
+ .setId(Protos.OfferID.newBuilder().setValue("offer" + index))
+ .setSlaveId(Protos.SlaveID.newBuilder().setValue("S" + index))
+ .setHostname("slave" + index)
+ .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("f1"))
+ .addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(cpus)))
+ .addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(mem)))
+ .build());
+ }
+
+ private List getTasks() {
+ List result = new ArrayList<>(20);
+ for (int i = 0; i < 20; i++) {
+ String jobName;
+ String appName;
+ if (i % 2 == 0) {
+ jobName = String.format("foo-%d", i);
+ appName = "foo-app";
+ } else {
+ jobName = String.format("bar-%d", i);
+ appName = "bar-app";
+ }
+ result.add(getTask(jobName));
+ when(facadeService.load(jobName)).thenReturn(Optional.of(CloudJobConfigurationBuilder.createCloudJobConfiguration(jobName, appName)));
+
+ }
+ when(facadeService.loadAppConfig("foo-app")).thenReturn(Optional.of(CloudAppConfigurationBuilder.createCloudAppConfiguration("foo-app")));
+ when(facadeService.loadAppConfig("bar-app")).thenReturn(Optional.of(CloudAppConfigurationBuilder.createCloudAppConfiguration("bar-app")));
+ return result;
+ }
+
+ private TaskRequest getTask(final String jobName) {
+ TaskRequest result = mock(TaskRequest.class);
+ when(result.getCPUs()).thenReturn(1.0d);
+ when(result.getMemory()).thenReturn(128.0d);
+ when(result.getHardConstraints()).thenAnswer(new Answer>() {
+ @Override
+ public List extends ConstraintEvaluator> answer(final InvocationOnMock invocationOnMock) throws Throwable {
+ return ImmutableList.of(AppConstraintEvaluator.getInstance());
+ }
+ });
+ when(result.getId()).thenReturn(new TaskContext(jobName, Collections.singletonList(0), ExecutionType.READY).getId());
+ return result;
+ }
+
+ private int getAssignedTaskNumber(final SchedulingResult schedulingResult) {
+ int result = 0;
+ for (VMAssignmentResult each : schedulingResult.getResultMap().values()) {
+ result += each.getTasksAssigned().size();
+ }
+ return result;
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeServiceTest.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeServiceTest.java
index 4460d59f33..35e44ccded 100644
--- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeServiceTest.java
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeServiceTest.java
@@ -82,6 +82,9 @@ public final class FacadeServiceTest {
@Mock
private DisableJobService disableJobService;
+ @Mock
+ private MesosStateService mesosStateService;
+
private FacadeService facadeService;
@Before
@@ -94,6 +97,7 @@ public void setUp() throws NoSuchFieldException {
ReflectionUtils.setFieldValue(facadeService, "failoverService", failoverService);
ReflectionUtils.setFieldValue(facadeService, "disableAppService", disableAppService);
ReflectionUtils.setFieldValue(facadeService, "disableJobService", disableJobService);
+ ReflectionUtils.setFieldValue(facadeService, "mesosStateService", mesosStateService);
}
@Test
@@ -309,4 +313,10 @@ public void assertDisableJob() {
facadeService.disableJob("test_job");
verify(disableJobService).add("test_job");
}
+
+ @Test
+ public void assertLoadExecutor() throws Exception {
+ facadeService.loadExecutorInfo();
+ verify(mesosStateService).executors();
+ }
}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/JobTaskRequestTest.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/JobTaskRequestTest.java
index fcbd37e3ef..cea52a2957 100644
--- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/JobTaskRequestTest.java
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/JobTaskRequestTest.java
@@ -23,7 +23,6 @@
import com.netflix.fenzo.TaskRequest;
import org.junit.Test;
-import java.util.Arrays;
import java.util.Collections;
import static org.hamcrest.core.Is.is;
@@ -33,8 +32,9 @@
public final class JobTaskRequestTest {
- private JobTaskRequest jobTaskRequest =
- new JobTaskRequest(new TaskContext("test_job", Arrays.asList(0), ExecutionType.READY, "unassigned-slave"), CloudJobConfigurationBuilder.createCloudJobConfiguration("test_job"));
+ private final JobTaskRequest jobTaskRequest =
+ new JobTaskRequest(new TaskContext("test_job", Collections.singletonList(0), ExecutionType
+ .READY, "unassigned-slave"), CloudJobConfigurationBuilder.createCloudJobConfiguration("test_job"));
@Test
public void assertGetId() {
@@ -78,7 +78,8 @@ public void assertGetScalarRequests() {
@Test
public void assertGetHardConstraints() {
- assertNull(jobTaskRequest.getHardConstraints());
+ AppConstraintEvaluator.init(null);
+ assertThat(jobTaskRequest.getHardConstraints().size(), is(1));
}
@Test