Skip to content

Commit

Permalink
Refactor AbstractInseparablePipelineJob (#32748)
Browse files Browse the repository at this point in the history
* Refactor AbstractInseparablePipelineJob

* Refactor AbstractInseparablePipelineJob
  • Loading branch information
terrymanu authored Aug 31, 2024
1 parent c40a9c5 commit 4442e3d
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
Expand All @@ -30,8 +31,11 @@
import org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
Expand All @@ -55,6 +59,19 @@ public abstract class AbstractInseparablePipelineJob<T extends PipelineJobConfig

private final PipelineJobRunnerManager jobRunnerManager;

private final TransmissionProcessContext jobProcessContext;

protected AbstractInseparablePipelineJob(final String jobId, final PipelineJobRunnerManager jobRunnerManager) {
this.jobRunnerManager = jobRunnerManager;
jobProcessContext = createTransmissionProcessContext(jobId);
}

private TransmissionProcessContext createTransmissionProcessContext(final String jobId) {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue(
new PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId), PipelineJobIdUtils.parseJobType(jobId).getType()));
return new TransmissionProcessContext(jobId, processConfig);
}

@SuppressWarnings("unchecked")
@Override
public final void execute(final ShardingContext shardingContext) {
Expand All @@ -71,7 +88,7 @@ public final void execute(final ShardingContext shardingContext) {
return;
}
P jobItemProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem).orElse(null);
I jobItemContext = buildJobItemContext(jobConfig, shardingItem, jobItemProgress);
I jobItemContext = buildJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext);
if (!jobRunnerManager.addTasksRunner(shardingItem, buildTasksRunner(jobItemContext))) {
continue;
}
Expand All @@ -88,7 +105,7 @@ public final void execute(final ShardingContext shardingContext) {
executeIncrementalTasks(jobItemContexts, jobItemManager);
}

protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P jobItemProgress);
protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P jobItemProgress, TransmissionProcessContext jobProcessContext);

protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ public abstract class AbstractSeparablePipelineJob<T extends PipelineJobConfigur

private final TransmissionProcessContext jobProcessContext;

private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

protected AbstractSeparablePipelineJob(final String jobId) {
this(jobId, true);
}
Expand All @@ -67,7 +65,7 @@ protected AbstractSeparablePipelineJob(final String jobId, final boolean isTrans

private TransmissionProcessContext createTransmissionProcessContext(final String jobId) {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobId), PipelineJobIdUtils.parseJobType(jobId).getType()));
new PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId), PipelineJobIdUtils.parseJobType(jobId).getType()));
return new TransmissionProcessContext(jobId, processConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
Expand All @@ -72,23 +70,19 @@ public final class CDCJob extends AbstractInseparablePipelineJob<CDCJobConfigura

private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");

private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

private final CDCJobPreparer jobPreparer = new CDCJobPreparer();

@Getter
private final PipelineSink sink;

public CDCJob(final PipelineSink sink) {
super(new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
public CDCJob(final String jobId, final PipelineSink sink) {
super(jobId, new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
this.sink = sink;
}

@Override
protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem, final TransmissionJobItemProgress jobItemProgress) {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), "STREAMING"));
TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration jobConfig,
final int shardingItem, final TransmissionJobItemProgress jobItemProgress, final TransmissionProcessContext jobProcessContext) {
CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getProcessConfiguration());
return new CDCJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(), sink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobC
* @param sink sink
*/
public void start(final String jobId, final PipelineSink sink) {
CDCJob job = new CDCJob(sink);
CDCJob job = new CDCJob(jobId, sink);
PipelineJobRegistry.add(jobId, job);
enable(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
Expand Down

0 comments on commit 4442e3d

Please sign in to comment.