diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index 6c798ce032..e561a9e13e 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -156,6 +156,12 @@ public class AmoroManagementConf { .defaultValue(60000L) .withDescription("Interval for refreshing table metadata."); + public static final ConfigOption REFRESH_MAX_PENDING_PARTITIONS = + ConfigOptions.key("refresh-tables.max-pending-partition-count") + .intType() + .defaultValue(100) + .withDescription("Filters will not be used beyond that number of partitions"); + public static final ConfigOption BLOCKER_TIMEOUT = ConfigOptions.key("blocker.timeout") .longType() diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java index cadf5ae2ad..90a890753b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java @@ -50,6 +50,7 @@ import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class OptimizingEvaluator { @@ -58,14 +59,17 @@ public class OptimizingEvaluator { protected final MixedTable mixedTable; protected final TableRuntime tableRuntime; protected final TableSnapshot currentSnapshot; + protected final int maxPendingPartitions; protected boolean isInitialized = false; protected Map partitionPlanMap = Maps.newHashMap(); - public OptimizingEvaluator(TableRuntime tableRuntime, MixedTable table) { + public OptimizingEvaluator( + TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) { this.tableRuntime = tableRuntime; this.mixedTable = table; this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime); + this.maxPendingPartitions = maxPendingPartitions; } public TableRuntime getTableRuntime() { @@ -129,7 +133,10 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { mixedTable.id(), count, System.currentTimeMillis() - startTime); - partitionPlanMap.values().removeIf(plan -> !plan.isNecessary()); + partitionPlanMap = partitionPlanMap.entrySet().stream() + .filter(entry -> entry.getValue().isNecessary()) + .limit(maxPendingPartitions) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } private Map partitionProperties(Pair partition) { diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java index 66f311f2fd..1f77e7abff 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java @@ -64,7 +64,7 @@ public OptimizingPlanner( MixedTable table, double availableCore, long maxInputSizePerThread) { - super(tableRuntime, table); + super(tableRuntime, table, Integer.MAX_VALUE); this.partitionFilter = tableRuntime.getPendingInput() == null ? Expressions.alwaysTrue() diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java index 0af3998fe6..affd72c312 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java @@ -75,7 +75,8 @@ public void setup(TableManager tableManager, Configurations conf) { new TableRuntimeRefreshExecutor( tableManager, conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT), - conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL)); + conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL), + conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS)); if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) { this.tagsAutoCreatingExecutor = new TagsAutoCreatingExecutor( diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java index 7837537ede..dc4ef14b36 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java @@ -31,10 +31,13 @@ public class TableRuntimeRefreshExecutor extends BaseTableExecutor { // 1 minutes private final long interval; + private final int maxPendingPartitions; - public TableRuntimeRefreshExecutor(TableManager tableRuntimes, int poolSize, long interval) { + public TableRuntimeRefreshExecutor( + TableManager tableRuntimes, int poolSize, long interval, int maxPendingPartitions) { super(tableRuntimes, poolSize); this.interval = interval; + this.maxPendingPartitions = maxPendingPartitions; } @Override @@ -48,7 +51,8 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) { private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable table) { if (tableRuntime.isOptimizingEnabled() && !tableRuntime.getOptimizingStatus().isProcessing()) { - OptimizingEvaluator evaluator = new OptimizingEvaluator(tableRuntime, table); + OptimizingEvaluator evaluator = + new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions); if (evaluator.isNecessary()) { OptimizingEvaluator.PendingInput pendingInput = evaluator.getPendingInput(); logger.debug( diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 01631f073c..3451cc441e 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -411,7 +411,7 @@ protected void reboot() throws InterruptedException { private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor { public TableRuntimeRefresher() { - super(tableService(), 1, Integer.MAX_VALUE); + super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE); } void refreshPending() { diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java index 07c77179b2..87dddd5f42 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java @@ -110,7 +110,7 @@ public void testFragmentFiles() { } protected OptimizingEvaluator buildOptimizingEvaluator() { - return new OptimizingEvaluator(getTableRuntime(), getMixedTable()); + return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100); } protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) { diff --git a/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml b/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml index cdea700842..d5d50c6f35 100644 --- a/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml +++ b/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml @@ -42,6 +42,7 @@ ams: refresh-tables: thread-count: 10 interval: 60000 # 1min + max-pending-partition-count: 100 # default 100 self-optimizing: commit-thread-count: 10