Skip to content

Commit

Permalink
Refactor YamlPipelineReadConfiguration and YamlPipelineWriteConfigura…
Browse files Browse the repository at this point in the history
…tion (#32727)

* Refactor YamlPipelineReadConfiguration and YamlPipelineWriteConfiguration

* Refactor YamlPipelineReadConfiguration and YamlPipelineWriteConfiguration
  • Loading branch information
terrymanu authored Aug 30, 2024
1 parent c128b16 commit a97f022
Show file tree
Hide file tree
Showing 14 changed files with 27 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class TransmissionProcessContext implements PipelineProcessContext
private final PipelineLazyInitializer<ExecuteEngine> incrementalExecuteEngineLazyInitializer;

public TransmissionProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
processConfiguration = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
processConfiguration = PipelineProcessConfigurationUtils.fillInDefaultValue(originalProcessConfig);
PipelineReadConfiguration readConfig = processConfiguration.getRead();
AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
readRateLimitAlgorithm = null == readRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, readRateLimiter.getType(), readRateLimiter.getProps());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected AbstractSeparablePipelineJob(final String jobId, final boolean isTrans
}

private TransmissionProcessContext createTransmissionProcessContext(final String jobId) {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobId), PipelineJobIdUtils.parseJobType(jobId).getType()));
return new TransmissionProcessContext(jobId, processConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,23 @@ public final class PipelineProcessConfigurationUtils {
private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new YamlPipelineProcessConfigurationSwapper();

/**
* Convert with default value.
* Fill default value for pipeline process configuration.
*
* @param originalConfig original process configuration, nullable
* @param originalConfig original process configuration
* @return process configuration
*/
public static PipelineProcessConfiguration convertWithDefaultValue(final PipelineProcessConfiguration originalConfig) {
public static PipelineProcessConfiguration fillInDefaultValue(final PipelineProcessConfiguration originalConfig) {
YamlPipelineProcessConfiguration yamlConfig = null == originalConfig ? new YamlPipelineProcessConfiguration() : SWAPPER.swapToYamlConfiguration(originalConfig);
fillInDefaultValue(yamlConfig);
return SWAPPER.swapToObject(yamlConfig);
}

/**
* Fill in default value.
*
* @param yamlConfig YAML configuration, non-null
*/
public static void fillInDefaultValue(final YamlPipelineProcessConfiguration yamlConfig) {
private static void fillInDefaultValue(final YamlPipelineProcessConfiguration yamlConfig) {
if (null == yamlConfig.getRead()) {
yamlConfig.setRead(YamlPipelineReadConfiguration.buildWithDefaultValue());
} else {
yamlConfig.getRead().fillInNullFieldsWithDefaultValue();
yamlConfig.setRead(new YamlPipelineReadConfiguration());
}
if (null == yamlConfig.getWrite()) {
yamlConfig.setWrite(YamlPipelineWriteConfiguration.buildWithDefaultValue());
} else {
yamlConfig.getWrite().fillInNullFieldsWithDefaultValue();
yamlConfig.setWrite(new YamlPipelineWriteConfiguration());
}
if (null == yamlConfig.getStreamChannel()) {
YamlAlgorithmConfiguration yamlAlgorithmConfig = new YamlAlgorithmConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
@Setter
public final class YamlPipelineProcessConfiguration implements YamlConfiguration {

private YamlPipelineReadConfiguration read;
private YamlPipelineReadConfiguration read = new YamlPipelineReadConfiguration();

private YamlPipelineWriteConfiguration write;
private YamlPipelineWriteConfiguration write = new YamlPipelineWriteConfiguration();

private YamlAlgorithmConfiguration streamChannel;

@Override
public boolean isEmpty() {
return null == read && null == write && null == streamChannel;
return null == streamChannel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,45 +29,11 @@
@Setter
public final class YamlPipelineReadConfiguration implements YamlConfiguration {

private static final Integer DEFAULT_WORKER_THREAD = 20;
private int workerThread = 20;

private static final Integer DEFAULT_BATCH_SIZE = 1000;
private int batchSize = 1000;

private static final Integer DEFAULT_SHARDING_SIZE = 10000000;

private Integer workerThread;

private Integer batchSize;

private Integer shardingSize;
private int shardingSize = 10000000;

private YamlAlgorithmConfiguration rateLimiter;

/**
* Build with default value.
*
* @return read configuration
*/
public static YamlPipelineReadConfiguration buildWithDefaultValue() {
YamlPipelineReadConfiguration result = new YamlPipelineReadConfiguration();
result.workerThread = DEFAULT_WORKER_THREAD;
result.batchSize = DEFAULT_BATCH_SIZE;
result.shardingSize = DEFAULT_SHARDING_SIZE;
return result;
}

/**
* Fill in null fields with default value.
*/
public void fillInNullFieldsWithDefaultValue() {
if (null == workerThread) {
workerThread = DEFAULT_WORKER_THREAD;
}
if (null == batchSize) {
batchSize = DEFAULT_BATCH_SIZE;
}
if (null == shardingSize) {
shardingSize = DEFAULT_SHARDING_SIZE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,9 @@
@Setter
public final class YamlPipelineWriteConfiguration implements YamlConfiguration {

private static final Integer DEFAULT_WORKER_THREAD = 20;
private int workerThread = 20;

private static final Integer DEFAULT_BATCH_SIZE = 1000;

private Integer workerThread;

private Integer batchSize;
private int batchSize = 1000;

private YamlAlgorithmConfiguration rateLimiter;

/**
* Build with default value.
*
* @return write configuration
*/
public static YamlPipelineWriteConfiguration buildWithDefaultValue() {
YamlPipelineWriteConfiguration result = new YamlPipelineWriteConfiguration();
result.workerThread = DEFAULT_WORKER_THREAD;
result.batchSize = DEFAULT_BATCH_SIZE;
return result;
}

/**
* Fill in null fields with default value.
*/
public void fillInNullFieldsWithDefaultValue() {
if (null == workerThread) {
workerThread = DEFAULT_WORKER_THREAD;
}
if (null == batchSize) {
batchSize = DEFAULT_BATCH_SIZE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ void assertSwapToObject() {
}

private YamlPipelineProcessConfiguration createYamlConfiguration() {
YamlPipelineReadConfiguration yamlReadConfig = YamlPipelineReadConfiguration.buildWithDefaultValue();
YamlPipelineReadConfiguration yamlReadConfig = new YamlPipelineReadConfiguration();
YamlAlgorithmConfiguration yamlReadRateLimiterConfig = new YamlAlgorithmConfiguration();
yamlReadRateLimiterConfig.setType("INPUT");
yamlReadRateLimiterConfig.setProps(PropertiesBuilder.build(new Property("batch-size", "1000"), new Property("qps", "500")));
yamlReadConfig.setRateLimiter(yamlReadRateLimiterConfig);
YamlPipelineProcessConfiguration result = new YamlPipelineProcessConfiguration();
result.setRead(yamlReadConfig);
YamlPipelineWriteConfiguration yamlWriteConfig = YamlPipelineWriteConfiguration.buildWithDefaultValue();
YamlPipelineWriteConfiguration yamlWriteConfig = new YamlPipelineWriteConfiguration();
YamlAlgorithmConfiguration yamlWriteRateLimiterConfig = new YamlAlgorithmConfiguration();
yamlWriteRateLimiterConfig.setType("OUTPUT");
yamlWriteRateLimiterConfig.setProps(PropertiesBuilder.build(new Property("batch-size", "1000"), new Property("tps", "2000")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public final class ShowTransmissionRuleQueryResult {
* @return query result row
*/
public Collection<LocalDataQueryResultRow> getRows() {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(persistService.load(new PipelineContextKey(InstanceType.PROXY), jobType));
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue(persistService.load(new PipelineContextKey(InstanceType.PROXY), jobType));
return Collections.singleton(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public CDCJob(final PipelineSink sink) {
@Override
protected CDCJobItemContext buildJobItemContext(final PipelineJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), "STREAMING"));
TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig = buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, jobProcessContext.getProcessConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected ExecuteEngine doInitialize() {

@Override
public PipelineProcessConfiguration getProcessConfiguration() {
return PipelineProcessConfigurationUtils.convertWithDefaultValue(null);
return PipelineProcessConfigurationUtils.fillInDefaultValue(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected void runBlocking() {
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobType).getJobConfiguration(parentJobId);
try {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()), jobType.getType()));
PipelineDataConsistencyChecker checker = jobType.buildDataConsistencyChecker(
parentJobConfig, new TransmissionProcessContext(parentJobConfig.getJobId(), processConfig), jobItemContext.getProgressContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ static void beforeClass() {
@Test
void assertLoadAndPersist() {
YamlPipelineProcessConfiguration yamlProcessConfig = new YamlPipelineProcessConfiguration();
YamlPipelineReadConfiguration yamlReadConfig = YamlPipelineReadConfiguration.buildWithDefaultValue();
yamlReadConfig.fillInNullFieldsWithDefaultValue();
YamlPipelineReadConfiguration yamlReadConfig = new YamlPipelineReadConfiguration();
yamlReadConfig.setShardingSize(10);
yamlProcessConfig.setRead(yamlReadConfig);
YamlPipelineWriteConfiguration yamlWriteConfig = YamlPipelineWriteConfiguration.buildWithDefaultValue();
YamlPipelineWriteConfiguration yamlWriteConfig = new YamlPipelineWriteConfiguration();
yamlProcessConfig.setWrite(yamlWriteConfig);
YamlAlgorithmConfiguration yamlStreamChannel = new YamlAlgorithmConfiguration();
yamlStreamChannel.setType("MEMORY");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,11 @@ public static MigrationJobItemContext mockMigrationJobItemContext(final Migratio
}

private static PipelineProcessConfiguration mockPipelineProcessConfiguration() {
YamlPipelineReadConfiguration yamlReadConfig = YamlPipelineReadConfiguration.buildWithDefaultValue();
YamlPipelineReadConfiguration yamlReadConfig = new YamlPipelineReadConfiguration();
yamlReadConfig.setShardingSize(10);
YamlPipelineProcessConfiguration yamlProcessConfig = new YamlPipelineProcessConfiguration();
yamlProcessConfig.setRead(yamlReadConfig);
PipelineProcessConfigurationUtils.fillInDefaultValue(yamlProcessConfig);
PipelineProcessConfigurationUtils.fillInDefaultValue(new YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig));
return new YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void assertDataConsistencyCheck() {
MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
initTableData(jobConfig);
jobManager.start(jobConfig);
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue(
new PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobType.getType()));
TransmissionProcessContext processContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
Map<String, TableDataConsistencyCheckResult> checkResultMap = jobType.buildDataConsistencyChecker(
Expand Down

0 comments on commit a97f022

Please sign in to comment.