Skip to content

Commit

Permalink
Merge pull request #355 from Esjob-Cloud-DevOps/elastic-job-scheduler…
Browse files Browse the repository at this point in the history
…/322

Fixed #322, Account for the resources of app while launching task
  • Loading branch information
terrymanu authored Jun 6, 2017
2 parents e7ab8b1 + 3251537 commit ee35e44
Show file tree
Hide file tree
Showing 10 changed files with 409 additions and 8 deletions.
1 change: 1 addition & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

1. [ISSUE #341](https://github.com/dangdangdotcom/elastic-job/issues/341) elastic-job-cloud界面script作业配置缺少执行脚本
1. [ISSUE #343](https://github.com/dangdangdotcom/elastic-job/issues/343) elastic-job-cloud中Script类型作业执行脚本不正确
1. [ISSUE #322](https://github.com/dangdangdotcom/elastic-job/issues/322) elastic-job-cloud-scheduler调度任务评估资源时考虑对executor的资源使用情况

## 2.1.3

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.cloud.scheduler.mesos;

import com.dangdang.ddframe.job.cloud.scheduler.config.app.CloudAppConfiguration;
import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfiguration;
import com.dangdang.ddframe.job.context.TaskContext;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.netflix.fenzo.ConstraintEvaluator;
import com.netflix.fenzo.TaskAssignmentResult;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.TaskTrackerState;
import com.netflix.fenzo.VirtualMachineCurrentState;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.UniformInterfaceException;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jettison.json.JSONException;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* App目标slave适配度限制器.
* 选择slave时需要考虑其上是否运行有App的executor,如果没有运行executor需要将其资源消耗考虑进适配计算算法中.
*
* @author gaohongtao
*/
@Slf4j
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class AppConstraintEvaluator implements ConstraintEvaluator {

private static AppConstraintEvaluator instance;

private final Set<String> runningApps = new HashSet<>();

private final FacadeService facadeService;

public static void init(final FacadeService facadeService) {
instance = new AppConstraintEvaluator(facadeService);
}

static AppConstraintEvaluator getInstance() {
Preconditions.checkNotNull(instance);
return instance;
}

void loadAppRunningState() {
try {
for (MesosStateService.ExecutorStateInfo each : facadeService.loadExecutorInfo()) {
runningApps.add(each.getId());
}
} catch (final JSONException | UniformInterfaceException | ClientHandlerException e) {
clearAppRunningState();
}
}

void clearAppRunningState() {
runningApps.clear();
}

@Override
public String getName() {
return "App-Fitness-Calculator";
}

@Override
public Result evaluate(final TaskRequest taskRequest, final VirtualMachineCurrentState targetVM, final TaskTrackerState taskTrackerState) {
double assigningCpus = 0.0d;
double assigningMemoryMB = 0.0d;
final String slaveId = targetVM.getAllCurrentOffers().iterator().next().getSlaveId().getValue();
try {
if (isAppRunningOnSlave(taskRequest.getId(), slaveId)) {
return new Result(true, "");
}
Set<String> calculatedApps = new HashSet<>();
List<TaskRequest> taskRequests = new ArrayList<>(targetVM.getTasksCurrentlyAssigned().size() + 1);
taskRequests.add(taskRequest);
for (TaskAssignmentResult each : targetVM.getTasksCurrentlyAssigned()) {
taskRequests.add(each.getRequest());
}
for (TaskRequest each : taskRequests) {
assigningCpus += each.getCPUs();
assigningMemoryMB += each.getMemory();
if (isAppRunningOnSlave(each.getId(), slaveId)) {
continue;
}
CloudAppConfiguration assigningAppConfig = getAppConfiguration(each.getId());
if (!calculatedApps.add(assigningAppConfig.getAppName())) {
continue;
}
assigningCpus += assigningAppConfig.getCpuCount();
assigningMemoryMB += assigningAppConfig.getMemoryMB();
}
} catch (final LackConfigException e) {
log.warn("Lack config, disable {}", getName(), e);
return new Result(true, "");
}
if (assigningCpus > targetVM.getCurrAvailableResources().cpuCores()) {
log.debug("Failure {} {} cpus:{}/{}", taskRequest.getId(), slaveId, assigningCpus, targetVM.getCurrAvailableResources().cpuCores());
return new Result(false, String.format("cpu:%s/%s", assigningCpus, targetVM.getCurrAvailableResources().cpuCores()));
}
if (assigningMemoryMB > targetVM.getCurrAvailableResources().memoryMB()) {
log.debug("Failure {} {} mem:{}/{}", taskRequest.getId(), slaveId, assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB());
return new Result(false, String.format("mem:%s/%s", assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB()));
}
log.debug("Success {} {} cpus:{}/{} mem:{}/{}", taskRequest.getId(), slaveId, assigningCpus, targetVM.getCurrAvailableResources()
.cpuCores(), assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB());
return new Result(true, String.format("cpus:%s/%s mem:%s/%s", assigningCpus, targetVM.getCurrAvailableResources()
.cpuCores(), assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB()));
}

private boolean isAppRunningOnSlave(final String taskId, final String slaveId) throws LackConfigException {
TaskContext taskContext = TaskContext.from(taskId);
taskContext.setSlaveId(slaveId);
return runningApps.contains(taskContext.getExecutorId(getJobConfiguration(taskContext).getAppName()));
}

private CloudAppConfiguration getAppConfiguration(final String taskId) throws LackConfigException {
CloudJobConfiguration jobConfig = getJobConfiguration(TaskContext.from(taskId));
Optional<CloudAppConfiguration> appConfigOptional = facadeService.loadAppConfig(jobConfig.getAppName());
if (!appConfigOptional.isPresent()) {
throw new LackConfigException("APP", jobConfig.getAppName());
}
return appConfigOptional.get();
}

private CloudJobConfiguration getJobConfiguration(final TaskContext taskContext) throws LackConfigException {
Optional<CloudJobConfiguration> jobConfigOptional = facadeService.load(taskContext.getMetaInfo().getJobName());
if (!jobConfigOptional.isPresent()) {
throw new LackConfigException("JOB", taskContext.getMetaInfo().getJobName());
}
return jobConfigOptional.get();
}

private class LackConfigException extends Exception {
LackConfigException(final String scope, final String configName) {
super(String.format("Lack %s's config %s", scope, configName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jettison.json.JSONException;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -68,6 +69,8 @@ public final class FacadeService {

private final DisableJobService disableJobService;

private final MesosStateService mesosStateService;

public FacadeService(final CoordinatorRegistryCenter regCenter) {
appConfigService = new CloudAppConfigurationService(regCenter);
jobConfigService = new CloudJobConfigurationService(regCenter);
Expand All @@ -76,6 +79,7 @@ public FacadeService(final CoordinatorRegistryCenter regCenter) {
failoverService = new FailoverService(regCenter);
disableAppService = new DisableAppService(regCenter);
disableJobService = new DisableJobService(regCenter);
mesosStateService = new MesosStateService(regCenter);
}

/**
Expand Down Expand Up @@ -311,6 +315,15 @@ public void disableJob(final String jobName) {
disableJobService.add(jobName);
}

/**
* 获取所有正在运行的Executor的信息.
*
* @return Executor信息集合
*/
public Collection<MesosStateService.ExecutorStateInfo> loadExecutorInfo() throws JSONException {
return mesosStateService.executors();
}

/**
* 停止门面服务.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public Map<String, Double> getScalarRequests() {

@Override
public List<? extends ConstraintEvaluator> getHardConstraints() {
return null;
return Collections.singletonList(AppConstraintEvaluator.getInstance());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.protobuf.ByteString;
import com.netflix.fenzo.TaskAssignmentResult;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.TaskScheduler;
import com.netflix.fenzo.VMAssignmentResult;
import com.netflix.fenzo.VirtualMachineLease;
Expand Down Expand Up @@ -92,6 +93,7 @@ protected Scheduler scheduler() {
@Override
protected void startUp() throws Exception {
log.info("Elastic Job: Start {}", serviceName());
AppConstraintEvaluator.init(facadeService);
}

@Override
Expand All @@ -103,8 +105,11 @@ protected void shutDown() throws Exception {
protected void runOneIteration() throws Exception {
try {
LaunchingTasks launchingTasks = new LaunchingTasks(facadeService.getEligibleJobContext());
List<VirtualMachineLease> virtualMachineLeases = LeasesQueue.getInstance().drainTo();
Collection<VMAssignmentResult> vmAssignmentResults = taskScheduler.scheduleOnce(launchingTasks.getPendingTasks(), virtualMachineLeases).getResultMap().values();
List<TaskRequest> taskRequests = launchingTasks.getPendingTasks();
if (!taskRequests.isEmpty()) {
AppConstraintEvaluator.getInstance().loadAppRunningState();
}
Collection<VMAssignmentResult> vmAssignmentResults = taskScheduler.scheduleOnce(taskRequests, LeasesQueue.getInstance().drainTo()).getResultMap().values();
List<TaskContext> taskContextsList = new LinkedList<>();
Map<List<Protos.OfferID>, List<Protos.TaskInfo>> offerIdTaskInfoMap = new HashMap<>();
for (VMAssignmentResult each: vmAssignmentResults) {
Expand All @@ -128,6 +133,8 @@ protected void runOneIteration() throws Exception {
} catch (Throwable throwable) {
//CHECKSTYLE:ON
log.error("Launch task error", throwable);
} finally {
AppConstraintEvaluator.getInstance().clearAppRunningState();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public static CloudJobConfiguration createCloudJobConfiguration(final String job
1.0d, 128.0d, CloudJobExecutionType.TRANSIENT);
}

public static CloudJobConfiguration createCloudJobConfiguration(final String jobName, final String appName) {
return new CloudJobConfiguration(appName,
new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(jobName, "0/30 * * * * ?", 10).failover(true).misfire(true).build(), TestSimpleJob.class.getCanonicalName()),
1.0d, 128.0d, CloudJobExecutionType.TRANSIENT);
}

public static CloudJobConfiguration createOtherCloudJobConfiguration(final String jobName) {
return new CloudJobConfiguration("test_app",
new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(jobName, "0/30 * * * * ?", 3).failover(false).misfire(true).build(), TestSimpleJob.class.getCanonicalName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
LaunchingTasksTest.class,
FrameworkIDServiceTest.class,
MesosStateServiceTest.class,
ReconcileServiceTest.class
ReconcileServiceTest.class,
AppConstraintEvaluatorTest.class
})
public final class AllMesosTests {
}
Loading

0 comments on commit ee35e44

Please sign in to comment.