Skip to content

Commit

Permalink
Fixed #352 : Running cloud job locally
Browse files Browse the repository at this point in the history
  • Loading branch information
gaohongtao committed Jun 8, 2017
1 parent 5ccfa21 commit ee079ca
Show file tree
Hide file tree
Showing 13 changed files with 477 additions and 3 deletions.
2 changes: 2 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### 功能提升

1. [ISSUE #352](https://github.com/dangdangdotcom/elastic-job/issues/352) Cloud作业本地运行

### 缺陷修正

1. [ISSUE #322](https://github.com/dangdangdotcom/elastic-job/issues/322) elastic-job-cloud-scheduler调度任务评估资源时考虑对executor的资源使用情况
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.cloud.executor.local;

import com.dangdang.ddframe.job.config.JobRootConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* 本地云作业配置.
*
* @author gaohongtao
*/
@RequiredArgsConstructor
@AllArgsConstructor
@Getter
public final class LocalCloudJobConfiguration implements JobRootConfiguration {

private final JobTypeConfiguration typeConfig;

private final LocalCloudJobExecutionType executionType;

private final int shardingItem;

private String beanName;

private String applicationContext;

/**
* 获取作业名称.
*
* @return 作业名称
*/
public String getJobName() {
return typeConfig.getCoreConfig().getJobName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.cloud.executor.local;

/**
* 本地作业执行类型.
*
* @author gaohongtao
*/
public enum LocalCloudJobExecutionType {

DAEMON, TRANSIENT
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.cloud.executor.local;

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
import com.dangdang.ddframe.job.exception.JobConfigurationException;
import com.dangdang.ddframe.job.exception.JobSystemException;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.util.config.ShardingItemParameters;
import com.dangdang.ddframe.job.util.json.GsonFactory;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.dangdang.ddframe.job.api.JobType.DATAFLOW;
import static com.dangdang.ddframe.job.api.JobType.SIMPLE;

/**
* 本地作业执行器.
*
* @author gaohongtao
*/
@RequiredArgsConstructor
public final class LocalTaskExecutor {

private final LocalCloudJobConfiguration localCloudJobConfiguration;

/**
* 本地执行作业.
*/
public void execute() {
if (SIMPLE == localCloudJobConfiguration.getTypeConfig().getJobType()) {
getJobInstance(SimpleJob.class).execute(getShardingContext());
} else if (DATAFLOW == localCloudJobConfiguration.getTypeConfig().getJobType()) {
processDataflow();
} else {
processScript();
}
}

private <T extends ElasticJob> T getJobInstance(final Class<T> clazz) {
if (Strings.isNullOrEmpty(localCloudJobConfiguration.getApplicationContext())) {
String jobClass = localCloudJobConfiguration.getTypeConfig().getJobClass();
try {
return clazz.cast(Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobSystemException("Elastic-Job: Class '%s' initialize failure, the error message is '%s'.", jobClass, ex.getMessage());
}
} else {
return clazz.cast(new ClassPathXmlApplicationContext(localCloudJobConfiguration.getApplicationContext()).getBean(localCloudJobConfiguration.getBeanName()));
}
}

private ShardingContext getShardingContext() {
JobCoreConfiguration coreConfig = localCloudJobConfiguration.getTypeConfig().getCoreConfig();
String shardingItem = new ShardingItemParameters(coreConfig.getShardingItemParameters()).getMap().get(localCloudJobConfiguration.getShardingItem());
Map<Integer, String> shardingItemMap = new HashMap<>(1);
if (!Strings.isNullOrEmpty(shardingItem)) {
shardingItemMap.put(localCloudJobConfiguration.getShardingItem(), shardingItem);
}
return new ShardingContext(new ShardingContexts("foo", localCloudJobConfiguration.getJobName(), coreConfig
.getShardingTotalCount(), coreConfig.getJobParameter(), shardingItemMap), localCloudJobConfiguration.getShardingItem());
}

@SuppressWarnings("unchecked")
private void processDataflow() {
final ShardingContext shardingContext = getShardingContext();
DataflowJob<Object> dataflowJob = getJobInstance(DataflowJob.class);
List<Object> data = dataflowJob.fetchData(shardingContext);
if (null != data && !data.isEmpty()) {
dataflowJob.processData(shardingContext, data);
}
}

private void processScript() {
final String scriptCommandLine = ((ScriptJobConfiguration) localCloudJobConfiguration.getTypeConfig()).getScriptCommandLine();
if (Strings.isNullOrEmpty(scriptCommandLine)) {
throw new JobConfigurationException("Cannot find script command line for job '%s', job is not executed.", localCloudJobConfiguration.getJobName());
}
CommandLine commandLine = CommandLine.parse(scriptCommandLine);
commandLine.addArgument(GsonFactory.getGson().toJson(getShardingContext()), false);
try {
new DefaultExecutor().execute(commandLine);
} catch (final IOException ex) {
throw new JobConfigurationException("Execute script failure.", ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.dangdang.ddframe.job.cloud.executor;

import com.dangdang.ddframe.job.cloud.executor.local.AllLocalExecutorTests;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.junit.runner.RunWith;
Expand All @@ -29,7 +30,8 @@
DaemonTaskSchedulerTest.class,
JobConfigurationContextTest.class,
TaskExecutorTest.class,
TaskExecutorThreadTest.class
TaskExecutorThreadTest.class,
AllLocalExecutorTests.class
})
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class AllCloudExecutorTests {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.cloud.executor.local;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

@RunWith(Suite.class)
@Suite.SuiteClasses(LocalTaskExecutorTest.class)
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class AllLocalExecutorTests {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.cloud.executor.local;

import com.dangdang.ddframe.job.cloud.executor.local.fixture.TestDataflowJob;
import com.dangdang.ddframe.job.cloud.executor.local.fixture.TestSimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.exception.JobConfigurationException;
import com.dangdang.ddframe.job.exception.JobSystemException;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;

import static com.dangdang.ddframe.job.cloud.executor.local.LocalCloudJobExecutionType.DAEMON;
import static com.dangdang.ddframe.job.cloud.executor.local.LocalCloudJobExecutionType.TRANSIENT;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

public class LocalTaskExecutorTest {

@Before
public void setUp() throws Exception {
TestSimpleJob.setShardingContext(null);
TestDataflowJob.setInput(null);
TestDataflowJob.setOutput(null);
}

@Test
public void assertSimpleJob() throws Exception {
new LocalTaskExecutor(new LocalCloudJobConfiguration(new SimpleJobConfiguration(JobCoreConfiguration
.newBuilder(TestSimpleJob.class.getSimpleName(), "*/2 * * * * ?", 3).build(), TestSimpleJob.class.getName()), TRANSIENT, 1)).execute();
assertThat(TestSimpleJob.getShardingContext().getJobName(), is(TestSimpleJob.class.getSimpleName()));
assertThat(TestSimpleJob.getShardingContext().getShardingItem(), is(1));
assertThat(TestSimpleJob.getShardingContext().getShardingTotalCount(), is(3));
assertNull(TestSimpleJob.getShardingContext().getShardingParameter());
assertThat(TestSimpleJob.getShardingContext().getJobParameter(), is(""));
}

@Test(expected = JobSystemException.class)
public void assertNotExistsJobClass() throws Exception {
new LocalTaskExecutor(new LocalCloudJobConfiguration(new SimpleJobConfiguration(JobCoreConfiguration
.newBuilder("not exist", "*/2 * * * * ?", 3).build(), "not exist"), TRANSIENT, 1)).execute();
}

@Test
public void assertSpringSimpleJob() throws Exception {
new LocalTaskExecutor(new LocalCloudJobConfiguration(new SimpleJobConfiguration(JobCoreConfiguration
.newBuilder(TestSimpleJob.class.getSimpleName(), "*/2 * * * * ?", 3)
.shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").jobParameter("dbName=dangdang").build(), TestSimpleJob.class
.getName()), DAEMON, 1, "testSimpleJob", "applicationContext.xml")).execute();
assertThat(TestSimpleJob.getShardingContext().getJobName(), is(TestSimpleJob.class.getSimpleName()));
assertThat(TestSimpleJob.getShardingContext().getShardingTotalCount(), is(3));
assertThat(TestSimpleJob.getShardingContext().getJobParameter(), is("dbName=dangdang"));
assertThat(TestSimpleJob.getShardingParameters().size(), is(1));
assertThat(TestSimpleJob.getShardingParameters().iterator().next(), is("Shanghai"));
}

@Test
public void assertDataflow() throws Exception {
TestDataflowJob.setInput(Arrays.asList("1", "2", "3"));
new LocalTaskExecutor(new LocalCloudJobConfiguration(new DataflowJobConfiguration(JobCoreConfiguration
.newBuilder(TestDataflowJob.class.getSimpleName(), "*/2 * * * * ?", 10).build(), TestDataflowJob.class.getName(), false), TRANSIENT, 5)).execute();
assertFalse(TestDataflowJob.getOutput().isEmpty());
for (String each : TestDataflowJob.getOutput()) {
assertTrue(each.endsWith("-d"));
}
}

@Test(expected = JobConfigurationException.class)
public void assertScriptEmpty() throws Exception {
new LocalTaskExecutor(new LocalCloudJobConfiguration(new ScriptJobConfiguration(JobCoreConfiguration
.newBuilder(TestDataflowJob.class.getSimpleName(), "*/2 * * * * ?", 10).build(), ""), TRANSIENT, 5)).execute();
}

@Test(expected = JobConfigurationException.class)
public void assertScriptNotExists() throws Exception {
new LocalTaskExecutor(new LocalCloudJobConfiguration(new ScriptJobConfiguration(JobCoreConfiguration
.newBuilder(TestDataflowJob.class.getSimpleName(), "*/2 * * * * ?", 10).build(), "not_exists_file param1"), TRANSIENT, 5)).execute();
}
}
Loading

0 comments on commit ee079ca

Please sign in to comment.