diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java index a06e8894ec779..ba741ccb8df8b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java @@ -51,7 +51,7 @@ public final class TransmissionProcessContext implements PipelineProcessContext private final PipelineLazyInitializer incrementalExecuteEngineLazyInitializer; public TransmissionProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) { - processConfiguration = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig); + processConfiguration = PipelineProcessConfigurationUtils.fillInDefaultValue(originalProcessConfig); PipelineReadConfiguration readConfig = processConfiguration.getRead(); AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter(); readRateLimitAlgorithm = null == readRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, readRateLimiter.getType(), readRateLimiter.getProps()); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java index ba0246b93f918..3cfdc634b5c61 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java @@ -66,7 +66,7 @@ protected AbstractSeparablePipelineJob(final String jobId, final boolean isTrans } private TransmissionProcessContext createTransmissionProcessContext(final String jobId) { - PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue( processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobId), PipelineJobIdUtils.parseJobType(jobId).getType())); return new TransmissionProcessContext(jobId, processConfig); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/PipelineProcessConfigurationUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/PipelineProcessConfigurationUtils.java index 906dce06aca82..194443f3228ee 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/PipelineProcessConfigurationUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/PipelineProcessConfigurationUtils.java @@ -36,32 +36,23 @@ public final class PipelineProcessConfigurationUtils { private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new YamlPipelineProcessConfigurationSwapper(); /** - * Convert with default value. + * Fill default value for pipeline process configuration. * - * @param originalConfig original process configuration, nullable + * @param originalConfig original process configuration * @return process configuration */ - public static PipelineProcessConfiguration convertWithDefaultValue(final PipelineProcessConfiguration originalConfig) { + public static PipelineProcessConfiguration fillInDefaultValue(final PipelineProcessConfiguration originalConfig) { YamlPipelineProcessConfiguration yamlConfig = null == originalConfig ? new YamlPipelineProcessConfiguration() : SWAPPER.swapToYamlConfiguration(originalConfig); fillInDefaultValue(yamlConfig); return SWAPPER.swapToObject(yamlConfig); } - /** - * Fill in default value. - * - * @param yamlConfig YAML configuration, non-null - */ - public static void fillInDefaultValue(final YamlPipelineProcessConfiguration yamlConfig) { + private static void fillInDefaultValue(final YamlPipelineProcessConfiguration yamlConfig) { if (null == yamlConfig.getRead()) { - yamlConfig.setRead(YamlPipelineReadConfiguration.buildWithDefaultValue()); - } else { - yamlConfig.getRead().fillInNullFieldsWithDefaultValue(); + yamlConfig.setRead(new YamlPipelineReadConfiguration()); } if (null == yamlConfig.getWrite()) { - yamlConfig.setWrite(YamlPipelineWriteConfiguration.buildWithDefaultValue()); - } else { - yamlConfig.getWrite().fillInNullFieldsWithDefaultValue(); + yamlConfig.setWrite(new YamlPipelineWriteConfiguration()); } if (null == yamlConfig.getStreamChannel()) { YamlAlgorithmConfiguration yamlAlgorithmConfig = new YamlAlgorithmConfiguration(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineProcessConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineProcessConfiguration.java index ebd4041f1a6d9..462094355e0b3 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineProcessConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineProcessConfiguration.java @@ -29,14 +29,14 @@ @Setter public final class YamlPipelineProcessConfiguration implements YamlConfiguration { - private YamlPipelineReadConfiguration read; + private YamlPipelineReadConfiguration read = new YamlPipelineReadConfiguration(); - private YamlPipelineWriteConfiguration write; + private YamlPipelineWriteConfiguration write = new YamlPipelineWriteConfiguration(); private YamlAlgorithmConfiguration streamChannel; @Override public boolean isEmpty() { - return null == read && null == write && null == streamChannel; + return null == streamChannel; } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineReadConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineReadConfiguration.java index 34234061dd1ea..15f9ccf9a7763 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineReadConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineReadConfiguration.java @@ -29,45 +29,11 @@ @Setter public final class YamlPipelineReadConfiguration implements YamlConfiguration { - private static final Integer DEFAULT_WORKER_THREAD = 20; + private int workerThread = 20; - private static final Integer DEFAULT_BATCH_SIZE = 1000; + private int batchSize = 1000; - private static final Integer DEFAULT_SHARDING_SIZE = 10000000; - - private Integer workerThread; - - private Integer batchSize; - - private Integer shardingSize; + private int shardingSize = 10000000; private YamlAlgorithmConfiguration rateLimiter; - - /** - * Build with default value. - * - * @return read configuration - */ - public static YamlPipelineReadConfiguration buildWithDefaultValue() { - YamlPipelineReadConfiguration result = new YamlPipelineReadConfiguration(); - result.workerThread = DEFAULT_WORKER_THREAD; - result.batchSize = DEFAULT_BATCH_SIZE; - result.shardingSize = DEFAULT_SHARDING_SIZE; - return result; - } - - /** - * Fill in null fields with default value. - */ - public void fillInNullFieldsWithDefaultValue() { - if (null == workerThread) { - workerThread = DEFAULT_WORKER_THREAD; - } - if (null == batchSize) { - batchSize = DEFAULT_BATCH_SIZE; - } - if (null == shardingSize) { - shardingSize = DEFAULT_SHARDING_SIZE; - } - } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineWriteConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineWriteConfiguration.java index ea632dc4e35a7..e8258030222a8 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineWriteConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/config/YamlPipelineWriteConfiguration.java @@ -29,37 +29,9 @@ @Setter public final class YamlPipelineWriteConfiguration implements YamlConfiguration { - private static final Integer DEFAULT_WORKER_THREAD = 20; + private int workerThread = 20; - private static final Integer DEFAULT_BATCH_SIZE = 1000; - - private Integer workerThread; - - private Integer batchSize; + private int batchSize = 1000; private YamlAlgorithmConfiguration rateLimiter; - - /** - * Build with default value. - * - * @return write configuration - */ - public static YamlPipelineWriteConfiguration buildWithDefaultValue() { - YamlPipelineWriteConfiguration result = new YamlPipelineWriteConfiguration(); - result.workerThread = DEFAULT_WORKER_THREAD; - result.batchSize = DEFAULT_BATCH_SIZE; - return result; - } - - /** - * Fill in null fields with default value. - */ - public void fillInNullFieldsWithDefaultValue() { - if (null == workerThread) { - workerThread = DEFAULT_WORKER_THREAD; - } - if (null == batchSize) { - batchSize = DEFAULT_BATCH_SIZE; - } - } } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/swapper/YamlPipelineProcessConfigurationSwapperTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/swapper/YamlPipelineProcessConfigurationSwapperTest.java index 73c90236f3ef6..6cce0decc15e7 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/swapper/YamlPipelineProcessConfigurationSwapperTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/config/yaml/swapper/YamlPipelineProcessConfigurationSwapperTest.java @@ -54,14 +54,14 @@ void assertSwapToObject() { } private YamlPipelineProcessConfiguration createYamlConfiguration() { - YamlPipelineReadConfiguration yamlReadConfig = YamlPipelineReadConfiguration.buildWithDefaultValue(); + YamlPipelineReadConfiguration yamlReadConfig = new YamlPipelineReadConfiguration(); YamlAlgorithmConfiguration yamlReadRateLimiterConfig = new YamlAlgorithmConfiguration(); yamlReadRateLimiterConfig.setType("INPUT"); yamlReadRateLimiterConfig.setProps(PropertiesBuilder.build(new Property("batch-size", "1000"), new Property("qps", "500"))); yamlReadConfig.setRateLimiter(yamlReadRateLimiterConfig); YamlPipelineProcessConfiguration result = new YamlPipelineProcessConfiguration(); result.setRead(yamlReadConfig); - YamlPipelineWriteConfiguration yamlWriteConfig = YamlPipelineWriteConfiguration.buildWithDefaultValue(); + YamlPipelineWriteConfiguration yamlWriteConfig = new YamlPipelineWriteConfiguration(); YamlAlgorithmConfiguration yamlWriteRateLimiterConfig = new YamlAlgorithmConfiguration(); yamlWriteRateLimiterConfig.setType("OUTPUT"); yamlWriteRateLimiterConfig.setProps(PropertiesBuilder.build(new Property("batch-size", "1000"), new Property("tps", "2000"))); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/query/ShowTransmissionRuleQueryResult.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/query/ShowTransmissionRuleQueryResult.java index 8f71215c9f0e5..de382539d1a0f 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/query/ShowTransmissionRuleQueryResult.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/query/ShowTransmissionRuleQueryResult.java @@ -46,7 +46,7 @@ public final class ShowTransmissionRuleQueryResult { * @return query result row */ public Collection getRows() { - PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(persistService.load(new PipelineContextKey(InstanceType.PROXY), jobType)); + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue(persistService.load(new PipelineContextKey(InstanceType.PROXY), jobType)); return Collections.singleton(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java index 0827512185658..9964e219b6c23 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java @@ -91,7 +91,7 @@ public CDCJob(final PipelineSink sink) { @Override protected CDCJobItemContext buildJobItemContext(final PipelineJobConfiguration jobConfig, final int shardingItem) { Optional initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem); - PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue( processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), "STREAMING")); TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); CDCTaskConfiguration taskConfig = buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, jobProcessContext.getProcessConfiguration()); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java index 3d750ebd0c2f2..db2ffde2735e5 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java @@ -46,7 +46,7 @@ protected ExecuteEngine doInitialize() { @Override public PipelineProcessConfiguration getProcessConfiguration() { - return PipelineProcessConfigurationUtils.convertWithDefaultValue(null); + return PipelineProcessConfigurationUtils.fillInDefaultValue(null); } /** diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 6a0fea10d152b..7b88ec54a54c2 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -109,7 +109,7 @@ protected void runBlocking() { PipelineJobType jobType = PipelineJobIdUtils.parseJobType(parentJobId); PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobType).getJobConfiguration(parentJobId); try { - PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue( processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()), jobType.getType())); PipelineDataConsistencyChecker checker = jobType.buildDataConsistencyChecker( parentJobConfig, new TransmissionProcessContext(parentJobConfig.getJobId(), processConfig), jobItemContext.getProgressContext()); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java index 565bcd4d507ba..f5af74fd5c71f 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java @@ -42,11 +42,10 @@ static void beforeClass() { @Test void assertLoadAndPersist() { YamlPipelineProcessConfiguration yamlProcessConfig = new YamlPipelineProcessConfiguration(); - YamlPipelineReadConfiguration yamlReadConfig = YamlPipelineReadConfiguration.buildWithDefaultValue(); - yamlReadConfig.fillInNullFieldsWithDefaultValue(); + YamlPipelineReadConfiguration yamlReadConfig = new YamlPipelineReadConfiguration(); yamlReadConfig.setShardingSize(10); yamlProcessConfig.setRead(yamlReadConfig); - YamlPipelineWriteConfiguration yamlWriteConfig = YamlPipelineWriteConfiguration.buildWithDefaultValue(); + YamlPipelineWriteConfiguration yamlWriteConfig = new YamlPipelineWriteConfiguration(); yamlProcessConfig.setWrite(yamlWriteConfig); YamlAlgorithmConfiguration yamlStreamChannel = new YamlAlgorithmConfiguration(); yamlStreamChannel.setType("MEMORY"); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java index 552e95d367668..20fb9217c84ae 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java @@ -200,11 +200,11 @@ public static MigrationJobItemContext mockMigrationJobItemContext(final Migratio } private static PipelineProcessConfiguration mockPipelineProcessConfiguration() { - YamlPipelineReadConfiguration yamlReadConfig = YamlPipelineReadConfiguration.buildWithDefaultValue(); + YamlPipelineReadConfiguration yamlReadConfig = new YamlPipelineReadConfiguration(); yamlReadConfig.setShardingSize(10); YamlPipelineProcessConfiguration yamlProcessConfig = new YamlPipelineProcessConfiguration(); yamlProcessConfig.setRead(yamlReadConfig); - PipelineProcessConfigurationUtils.fillInDefaultValue(yamlProcessConfig); + PipelineProcessConfigurationUtils.fillInDefaultValue(new YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig)); return new YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index aa1b74d796cb9..c638c6b71d2ee 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -195,7 +195,7 @@ void assertDataConsistencyCheck() { MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); initTableData(jobConfig); jobManager.start(jobConfig); - PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue( new PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobType.getType())); TransmissionProcessContext processContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); Map checkResultMap = jobType.buildDataConsistencyChecker(