Skip to content

Commit

Permalink
For #223.
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed Feb 3, 2017
1 parent 79f4ca5 commit b744595
Show file tree
Hide file tree
Showing 19 changed files with 418 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.dangdang.ddframe.job.exception.JobSystemException;
import com.dangdang.ddframe.job.executor.JobExecutorFactory;
import com.dangdang.ddframe.job.executor.JobFacade;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.mesos.ExecutorDriver;
Expand Down Expand Up @@ -159,9 +160,20 @@ public static final class DaemonJob implements Job {

@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskId).setState(Protos.TaskState.TASK_RUNNING).setMessage("BEGIN").build());
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskId).setState(Protos.TaskState.TASK_RUNNING).setMessage("COMPLETE").build());
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
int jobEventSamplingCount = shardingContexts.getJobEventSamplingCount();
int currentJobEventSamplingCount = shardingContexts.getCurrentJobEventSamplingCount();
if (jobEventSamplingCount > 0 && ++currentJobEventSamplingCount < jobEventSamplingCount) {
shardingContexts.setCurrentJobEventSamplingCount(currentJobEventSamplingCount);
jobFacade.getShardingContexts().setAllowSendJobEvent(false);
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
} else {
jobFacade.getShardingContexts().setAllowSendJobEvent(true);
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskId).setState(Protos.TaskState.TASK_RUNNING).setMessage("BEGIN").build());
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskId).setState(Protos.TaskState.TASK_RUNNING).setMessage("COMPLETE").build());
shardingContexts.setCurrentJobEventSamplingCount(0);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@

@RunWith(Suite.class)
@SuiteClasses({
CloudJobFacadeTest.class,
CloudJobFacadeTest.class,
DaemonTaskSchedulerTest.class,
JobConfigurationContextTest.class,
TaskExecutorTest.class
TaskExecutorTest.class,
TaskExecutorThreadTest.class
})
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class AllCloudExecutorTests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.JobType;
import com.dangdang.ddframe.job.config.JobRootConfiguration;
import com.dangdang.ddframe.job.context.ExecutionType;
import com.dangdang.ddframe.job.event.JobEventBus;
import com.dangdang.ddframe.job.event.type.JobExecutionEvent;
import com.dangdang.ddframe.job.event.type.JobExecutionEvent.ExecutionSource;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.State;
import com.dangdang.ddframe.job.exception.JobExecutionEnvironmentException;
import com.dangdang.ddframe.job.executor.JobFacade;
import com.dangdang.ddframe.job.executor.ShardingContexts;
Expand Down Expand Up @@ -152,7 +156,13 @@ public void assertAfterJobExecuted() {

@Test
public void assertPostJobExecutionEvent() {
jobFacade.postJobExecutionEvent(null);
verify(eventBus).post(null);
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent("fake_task_id", "test_job", ExecutionSource.NORMAL_TRIGGER, 0);
jobFacade.postJobExecutionEvent(jobExecutionEvent);
verify(eventBus).post(jobExecutionEvent);
}

@Test
public void assertPostJobStatusTraceEvent() {
jobFacade.postJobStatusTraceEvent(String.format("%s@-@0@-@%s@-@fake_slave_id@-@0", "test_job", ExecutionType.READY), State.TASK_RUNNING, "message is empty.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.cloud.executor;

import com.dangdang.ddframe.job.cloud.executor.fixture.TestScriptJobConfiguration;
import com.dangdang.ddframe.job.context.ExecutionType;
import com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor;
import com.dangdang.ddframe.job.executor.JobFacade;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.Protos.TaskID;
import org.apache.mesos.Protos.TaskState;
import org.apache.mesos.Protos.TaskStatus;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.quartz.JobExecutionContext;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public final class DaemonTaskSchedulerTest {

@Mock
private JobFacade jobFacade;

@Mock
private ExecutorDriver executorDriver;

@Mock
private JobExecutionContext jobExecutionContext;

@Mock
private AbstractElasticJobExecutor jobExecutor;

@Mock
private ShardingContexts shardingContexts;

private TaskID taskId = TaskID.newBuilder().setValue(String.format("%s@-@0@-@%s@-@fake_slave_id@-@0", "test_job", ExecutionType.READY)).build();

private DaemonTaskScheduler.DaemonJob daemonJob;

@Before
public void setUp() throws NoSuchFieldException {
daemonJob = new DaemonTaskScheduler.DaemonJob();
daemonJob.setElasticJob(null);
daemonJob.setJobFacade(jobFacade);
daemonJob.setExecutorDriver(executorDriver);
daemonJob.setTaskId(taskId);
}

@Test
public void assertJobRun() throws Exception {
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("test.sh"));
daemonJob.execute(jobExecutionContext);
verify(shardingContexts).setAllowSendJobEvent(true);
verify(executorDriver).sendStatusUpdate(TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).setMessage("BEGIN").build());
verify(executorDriver).sendStatusUpdate(TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).setMessage("COMPLETE").build());
verify(shardingContexts).setCurrentJobEventSamplingCount(0);
}

@Test
public void assertJobRunWithEventSampling() throws Exception {
when(shardingContexts.getJobEventSamplingCount()).thenReturn(2);
when(jobFacade.getShardingContexts()).thenReturn(shardingContexts);
when(jobFacade.loadJobRootConfiguration(true)).thenReturn(new TestScriptJobConfiguration("test.sh"));
daemonJob.execute(jobExecutionContext);
verify(shardingContexts).setCurrentJobEventSamplingCount(1);
verify(shardingContexts).setAllowSendJobEvent(false);
when(shardingContexts.getCurrentJobEventSamplingCount()).thenReturn(1);
daemonJob.execute(jobExecutionContext);
verify(shardingContexts).setAllowSendJobEvent(true);
verify(executorDriver).sendStatusUpdate(TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).setMessage("BEGIN").build());
verify(executorDriver).sendStatusUpdate(TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).setMessage("COMPLETE").build());
verify(shardingContexts).setCurrentJobEventSamplingCount(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package com.dangdang.ddframe.job.cloud.executor;

import com.dangdang.ddframe.job.api.JobType;
import com.dangdang.ddframe.job.cloud.executor.fixture.TestJob;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.mesos.ExecutorDriver;
Expand All @@ -34,22 +31,22 @@
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.unitils.util.ReflectionUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.class)

public final class TaskExecutorTest {

@Mock
private ExecutorDriver executorDriver;

@Mock
private ExecutorService executorService;

private ExecutorInfo executorInfo;

private SlaveInfo slaveInfo = SlaveInfo.getDefaultInstance();
Expand All @@ -60,8 +57,8 @@ public final class TaskExecutorTest {

@Before
public void setUp() throws NoSuchFieldException {
executorDriver = mock(ExecutorDriver.class);
taskExecutor = new TaskExecutor();
ReflectionUtils.setFieldValue(taskExecutor, "executorService", executorService);
executorInfo = ExecutorInfo.getDefaultInstance();
}

Expand All @@ -72,24 +69,6 @@ public void assertKillTask() {
verify(executorDriver).sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskID).setState(Protos.TaskState.TASK_KILLED).build());
}

@Test
public void assertLaunchTaskWithDaemonTaskAndJavaSimpleJob() {
TaskInfo taskInfo = buildTransientTaskInfo();
taskExecutor.launchTask(executorDriver, taskInfo);
}

@Test
public void assertLaunchTaskWithTransientTaskAndSpringSimpleJob() {
TaskInfo taskInfo = buildDaemonTaskInfo();
taskExecutor.launchTask(executorDriver, taskInfo);
}

@Test
public void assertLaunchTaskWithTransientTaskAndJavaScriptJob() {
TaskInfo taskInfo = buildScriptDaemonTaskInfo();
taskExecutor.launchTask(executorDriver, taskInfo);
}

@Test
public void assertRegisteredWithoutData() {
// CHECKSTYLE:OFF
Expand All @@ -109,6 +88,12 @@ public void assertRegisteredWithData() {
taskExecutor.registered(executorDriver, executorInfo, frameworkInfo, slaveInfo);
}

@Test
public void assertLaunchTask() {
taskExecutor.launchTask(executorDriver, TaskInfo.newBuilder().setName("test_job")
.setTaskId(TaskID.newBuilder().setValue("fake_task_id")).setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-S0")).build());
}

@Test
public void assertReregistered() {
taskExecutor.reregistered(executorDriver, slaveInfo);
Expand All @@ -133,56 +118,4 @@ public void assertShutdown() {
public void assertError() {
taskExecutor.error(executorDriver, "");
}

private TaskInfo buildTransientTaskInfo() {
return buildTaskInfo(buildSpringJobConfigurationContextMap()).build();
}

private TaskInfo buildDaemonTaskInfo() {
return buildTaskInfo(buildBaseJobConfigurationContextMapWithJobClassAndCron(TestJob.class.getCanonicalName(), "ignoredCron")).build();
}

private TaskInfo buildScriptDaemonTaskInfo() {
return buildTaskInfo(buildBaseJobConfigurationContextMap(TestJob.class.getCanonicalName(), "ignoredCron", JobType.SCRIPT)).build();
}

private TaskInfo.Builder buildTaskInfo(final Map<String, String> jobConfigurationContext) {
return TaskInfo.newBuilder().setData(ByteString.copyFrom(serialize(jobConfigurationContext)))
.setName("test_job").setTaskId(Protos.TaskID.newBuilder().setValue("task_id")).setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-S0"));
}

private byte[] serialize(final Map<String, String> jobConfigurationContext) {
// CHECKSTYLE:OFF
LinkedHashMap<String, Object> result = new LinkedHashMap<>(2, 1);
// CHECKSTYLE:ON
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.singletonMap(1, "a"));
result.put("shardingContext", shardingContexts);
result.put("jobConfigContext", jobConfigurationContext);
return SerializationUtils.serialize(result);
}

private Map<String, String> buildSpringJobConfigurationContextMap() {
Map<String, String> context = buildBaseJobConfigurationContextMapWithJobClass(TestJob.class.getCanonicalName());
context.put("beanName", "testJob");
context.put("applicationContext", "applicationContext.xml");
return context;
}

private Map<String, String> buildBaseJobConfigurationContextMapWithJobClass(final String jobClass) {
return buildBaseJobConfigurationContextMapWithJobClassAndCron(jobClass, "0/1 * * * * ?");
}

private Map<String, String> buildBaseJobConfigurationContextMapWithJobClassAndCron(final String jobClass, final String cron) {
return buildBaseJobConfigurationContextMap(jobClass, cron, JobType.SIMPLE);
}

private Map<String, String> buildBaseJobConfigurationContextMap(final String jobClass, final String cron, final JobType jobType) {
Map<String, String> result = new HashMap<>();
result.put("jobName", "test_job");
result.put("cron", cron);
result.put("jobClass", jobClass);
result.put("jobType", jobType.name());
result.put("scriptCommandLine", "echo \"\"");
return result;
}
}
Loading

0 comments on commit b744595

Please sign in to comment.