From 2ee3b96625984fbd6aa60734666fb24f65d7a67a Mon Sep 17 00:00:00 2001 From: Vivek Iyer Vaidyanathan Iyer Date: Sun, 3 Nov 2024 19:26:08 -0800 Subject: [PATCH] Address review comments --- .../pinot/core/common/MinionConstants.java | 9 +- ...hSegmentMinionClusterIntegrationTest.java} | 84 ++++++++++++++----- .../RefreshSegmentTaskExecutor.java} | 16 ++-- .../RefreshSegmentTaskGenerator.java} | 53 ++++++------ .../SegmentRefreshTaskExecutorFactory.java | 6 +- ...entRefreshTaskProgressObserverFactory.java | 4 +- 6 files changed, 112 insertions(+), 60 deletions(-) rename pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/{SegmentRefreshMinionClusterIntegrationTest.java => RefreshSegmentMinionClusterIntegrationTest.java} (85%) rename pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/{segmentrefresh/SegmentRefreshTaskExecutor.java => refreshsegment/RefreshSegmentTaskExecutor.java} (91%) rename pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/{segmentrefresh/SegmentRefreshTaskGenerator.java => refreshsegment/RefreshSegmentTaskGenerator.java} (82%) rename pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/{segmentrefresh => refreshsegment}/SegmentRefreshTaskExecutorFactory.java (90%) rename pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/{segmentrefresh => refreshsegment}/SegmentRefreshTaskProgressObserverFactory.java (90%) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 6d75cc90cf8..27209b6f343 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -143,8 +143,6 @@ public static class RealtimeToOfflineSegmentsTask extends MergeTask { // Generate segment and push to controller based on batch ingestion configs public static class SegmentGenerationAndPushTask { public static final String TASK_TYPE = "SegmentGenerationAndPushTask"; - public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE = - "SegmentGenerationAndPushTask.numConcurrentTasksPerInstance"; } /** @@ -163,8 +161,11 @@ public static class SegmentGenerationAndPushTask { * 3. Compute price is paid on all servers hosting the segment.q * 4. Increases server startup time as more and more segments require reload. */ - public static class SegmentRefreshTask { - public static final String TASK_TYPE = "SegmentRefreshTask"; + public static class RefreshSegmentTask { + public static final String TASK_TYPE = "RefreshSegmentTask"; + + // Maximum number of tasks to create per table per run. + public static final int MAX_NUM_TASKS_PER_TABLE = 20; } public static class UpsertCompactionTask { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentRefreshMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java similarity index 85% rename from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentRefreshMinionClusterIntegrationTest.java rename to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java index 269e968e4f2..0620f45c623 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentRefreshMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java @@ -37,6 +37,7 @@ import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; @@ -56,7 +57,7 @@ import static org.testng.Assert.*; -public class SegmentRefreshMinionClusterIntegrationTest extends BaseClusterIntegrationTest { +public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterIntegrationTest { protected PinotHelixTaskResourceManager _helixTaskResourceManager; protected PinotTaskManager _taskManager; protected PinotHelixResourceManager _pinotHelixResourceManager; @@ -78,7 +79,7 @@ public void setUp() throws Exception { Schema schema = createSchema(); addSchema(schema); TableConfig tableConfig = createOfflineTableConfig(); - tableConfig.setTaskConfig(getSegmentRefreshTaskConfig()); + tableConfig.setTaskConfig(getRefreshSegmentTaskConfig()); addTableConfig(tableConfig); // Unpack the Avro files @@ -108,17 +109,17 @@ public void testFirstSegmentRefresh() { // This will create the inverted index as we disable inverted index creation during segment push. String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) - .get(MinionConstants.SegmentRefreshTask.TASK_TYPE)); + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() - .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.SegmentRefreshTask.TASK_TYPE))); + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); // Will not schedule task if there's incomplete task assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) - .get(MinionConstants.SegmentRefreshTask.TASK_TYPE)); + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); waitForTaskToComplete(); // Check that metadata contains expected values Map segmentRefreshTime = new HashMap<>(); - String refreshKey = MinionConstants.SegmentRefreshTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; + String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { // Get the value in segment metadata Map customMap = metadata.getCustomMap(); @@ -128,12 +129,12 @@ public void testFirstSegmentRefresh() { // This should be no-op as nothing changes. assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) - .get(MinionConstants.SegmentRefreshTask.TASK_TYPE)); + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { // Get the value in segment metadata Map customMap = metadata.getCustomMap(); assertTrue( - customMap.containsKey(MinionConstants.SegmentRefreshTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)); + customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)); assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), Long.parseLong(customMap.get(refreshKey)), "Refresh Time doesn't match"); } @@ -153,12 +154,12 @@ public void testValidDatatypeChange() throws Exception { addSchema(schema); assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) - .get(MinionConstants.SegmentRefreshTask.TASK_TYPE)); + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() - .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.SegmentRefreshTask.TASK_TYPE))); + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); // Will not schedule task if there's incomplete task assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) - .get(MinionConstants.SegmentRefreshTask.TASK_TYPE)); + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); waitForTaskToComplete(); waitForServerSegmentDownload(aVoid -> { @@ -232,12 +233,12 @@ public void testIndexChanges() throws Exception { String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) - .get(MinionConstants.SegmentRefreshTask.TASK_TYPE)); + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() - .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.SegmentRefreshTask.TASK_TYPE))); + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); // Will not schedule task if there's incomplete task assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) - .get(MinionConstants.SegmentRefreshTask.TASK_TYPE)); + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); waitForTaskToComplete(); waitForServerSegmentDownload(aVoid -> { @@ -323,16 +324,16 @@ public void checkColumnAddition() throws Exception { String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) - .get(MinionConstants.SegmentRefreshTask.TASK_TYPE)); + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); assertTrue(_helixTaskResourceManager.getTaskQueues() - .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.SegmentRefreshTask.TASK_TYPE))); + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); // Will not schedule task if there's incomplete task assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) - .get(MinionConstants.SegmentRefreshTask.TASK_TYPE)); + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); waitForTaskToComplete(); // Check that metadata contains processed times. - String refreshKey = MinionConstants.SegmentRefreshTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; + String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { // Get the value in segment metadata Map customMap = metadata.getCustomMap(); @@ -386,10 +387,51 @@ public void checkColumnAddition() throws Exception { assertTrue(derivedNullStringColumnIndex.has(StandardIndexes.NULL_VALUE_VECTOR_ID)); } + + @Test(priority = 5) + public void checkRefreshNotNecessary() throws Exception { + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); + TableConfig tableConfig = getOfflineTableConfig(); + tableConfig.setQuotaConfig(new QuotaConfig(null, "10")); + + updateTableConfig(tableConfig); + + assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + assertTrue(_helixTaskResourceManager.getTaskQueues() + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE))); + // Will not schedule task if there's incomplete task + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + waitForTaskToComplete(); + + // Check that metadata contains expected values + Map segmentRefreshTime = new HashMap<>(); + String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; + for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + // Get the value in segment metadata + Map customMap = metadata.getCustomMap(); + assertTrue(customMap.containsKey(refreshKey)); + segmentRefreshTime.put(metadata.getSegmentName(), Long.parseLong(customMap.get(refreshKey))); + } + + // This should be no-op as nothing changes. + assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null) + .get(MinionConstants.RefreshSegmentTask.TASK_TYPE)); + for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + // Get the value in segment metadata + Map customMap = metadata.getCustomMap(); + assertTrue( + customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)); + assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), Long.parseLong(customMap.get(refreshKey)), + "Refresh Time doesn't match"); + } + } + protected void waitForTaskToComplete() { TestUtils.waitForCondition(input -> { // Check task state - for (TaskState taskState : _helixTaskResourceManager.getTaskStates(MinionConstants.SegmentRefreshTask.TASK_TYPE) + for (TaskState taskState : _helixTaskResourceManager.getTaskStates(MinionConstants.RefreshSegmentTask.TASK_TYPE) .values()) { if (taskState != TaskState.COMPLETED) { return false; @@ -406,9 +448,9 @@ protected void waitForServerSegmentDownload(Function conditionFun }, 60_000L, "Failed to meet condition"); } - private TableTaskConfig getSegmentRefreshTaskConfig() { + private TableTaskConfig getRefreshSegmentTaskConfig() { Map tableTaskConfigs = new HashMap<>(); return new TableTaskConfig( - Collections.singletonMap(MinionConstants.SegmentRefreshTask.TASK_TYPE, tableTaskConfigs)); + Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE, tableTaskConfigs)); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java similarity index 91% rename from pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskExecutor.java rename to pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java index b10d9ac0ab6..a2dae7d351f 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.minion.tasks.segmentrefresh; +package org.apache.pinot.plugin.minion.tasks.refreshsegment; import java.io.File; import java.util.Collections; @@ -46,13 +46,13 @@ import org.slf4j.LoggerFactory; -public class SegmentRefreshTaskExecutor extends BaseSingleSegmentConversionExecutor { - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentRefreshTaskGenerator.class); +public class RefreshSegmentTaskExecutor extends BaseSingleSegmentConversionExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(RefreshSegmentTaskGenerator.class); private long _taskStartTime; /** - * The code here currently covers segmentRefresh for the following cases: + * The code here currently covers segment refresh for the following cases: * 1. Process newly added columns. * 2. Addition/removal of indexes. * 3. Compatible datatype change for existing columns @@ -129,7 +129,10 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File if (!needPreprocess && refreshColumnSet.isEmpty()) { LOGGER.info("Skipping segment={}, table={} as it is up-to-date with new table/schema", segmentName, tableNameWithType); + // We just need to update the ZK metadata with the last refresh time to avoid getting picked up again. As the CRC + // check will match, this will only end up being a ZK update. return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType) + .setFile(indexDir) .setSegmentName(segmentName) .build(); } @@ -159,6 +162,9 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig, SegmentMetadataImpl segmentMetadata, String segmentName, Schema schema) { + // Inverted index creation is disabled by default during segment generation typically to reduce segment push times + // from external sources like HDFS. Also, not creating an inverted index here, the segment will always be flagged as + // needReload, causing the segment refresh to take place. tableConfig.getIndexingConfig().setCreateInvertedIndexDuringSegmentGeneration(true); SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); config.setOutDir(workingDir.getPath()); @@ -194,7 +200,7 @@ private static void closeSegmentDirectoryQuietly(SegmentDirectory segmentDirecto protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig, SegmentConversionResult segmentConversionResult) { return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, - Collections.singletonMap(MinionConstants.SegmentRefreshTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX, + Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX, String.valueOf(_taskStartTime))); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java similarity index 82% rename from pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskGenerator.java rename to pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java index 2c0e73e9b0b..6e28dcca307 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.minion.tasks.segmentrefresh; +package org.apache.pinot.plugin.minion.tasks.refreshsegment; import com.google.common.base.Preconditions; import java.util.ArrayList; @@ -30,7 +30,7 @@ import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils; import org.apache.pinot.core.common.MinionConstants; -import org.apache.pinot.core.common.MinionConstants.SegmentRefreshTask; +import org.apache.pinot.core.common.MinionConstants.RefreshSegmentTask; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.annotations.minion.TaskGenerator; import org.apache.pinot.spi.config.table.TableConfig; @@ -43,23 +43,25 @@ @TaskGenerator -public class SegmentRefreshTaskGenerator extends BaseTaskGenerator { - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentRefreshTaskGenerator.class); +public class RefreshSegmentTaskGenerator extends BaseTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(RefreshSegmentTaskGenerator.class); @Override public String getTaskType() { - return MinionConstants.SegmentRefreshTask.TASK_TYPE; + return RefreshSegmentTask.TASK_TYPE; } @Override public List generateTasks(List tableConfigs) { - String taskType = MinionConstants.SegmentRefreshTask.TASK_TYPE; + String taskType = RefreshSegmentTask.TASK_TYPE; List pinotTaskConfigs = new ArrayList<>(); + PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager(); + int tableNumTasks = 0; for (TableConfig tableConfig : tableConfigs) { String tableNameWithType = tableConfig.getTableName(); - LOGGER.info("Start generating SegmentRefresh tasks for table: {}", tableNameWithType); + LOGGER.info("Start generating RefreshSegment tasks for table: {}", tableNameWithType); // Get the task configs for the table. This is used to restrict the maximum number of allowed tasks per table at // any given point. @@ -69,28 +71,36 @@ public List generateTasks(List tableConfigs) { LOGGER.warn("Failed to find task config for table: {}", tableNameWithType); continue; } - taskConfigs = tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentRefreshTask.TASK_TYPE); + taskConfigs = tableTaskConfig.getConfigsForTaskType(RefreshSegmentTask.TASK_TYPE); Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: %s", tableNameWithType); - int tableMaxNumTasks = Integer.MAX_VALUE; + int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE; String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY); if (tableMaxNumTasksConfig != null) { try { tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig); } catch (Exception e) { - tableMaxNumTasks = Integer.MAX_VALUE; + tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE; LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType); } } + // Get info about table and schema. + Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType); + Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); + Stat schemaStat = pinotHelixResourceManager.getSchemaStat(schema.getSchemaName()); + // Get the running segments for a table. Set runningSegments = - TaskGeneratorUtils.getRunningSegments(MinionConstants.SegmentRefreshTask.TASK_TYPE, _clusterInfoAccessor); + TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE, _clusterInfoAccessor); // Make a single ZK call to get the segments. List allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType); for (SegmentZKMetadata segmentZKMetadata : allSegments) { - String segmentName = segmentZKMetadata.getSegmentName(); + // Skip if we have reached the maximum number of permissible tasks per iteration. + if (tableNumTasks >= tableMaxNumTasks) { + break; + } // Skip consuming segments. if (tableConfig.getTableType() == TableType.REALTIME && !segmentZKMetadata.getStatus().isCompleted()) { @@ -102,13 +112,10 @@ public List generateTasks(List tableConfigs) { continue; } - // Skip if we have reached the maximum number of permissible tasks per iteration. - if (tableNumTasks >= tableMaxNumTasks) { - break; - } + String segmentName = segmentZKMetadata.getSegmentName(); // Skip if the segment is already up-to-date and doesn't have to be refreshed. - if (!shouldRefreshSegment(segmentZKMetadata, tableConfig)) { + if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat, schemaStat)) { continue; } @@ -135,22 +142,18 @@ public List generateTasks(List tableConfigs) { * is because inverted index created is disabled by default during segment generation. This can be added as an * additional check in the future, if required. */ - private boolean shouldRefreshSegment(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig) { + private boolean shouldRefreshSegment(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, Stat tableStat, + Stat schemaStat) { String tableNameWithType = tableConfig.getTableName(); - PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager(); - String timestampKey = SegmentRefreshTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; + String timestampKey = RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; long lastProcessedTime = 0L; if (segmentZKMetadata.getCustomMap() != null && segmentZKMetadata.getCustomMap().containsKey(timestampKey)) { lastProcessedTime = Long.parseLong(segmentZKMetadata.getCustomMap().get(timestampKey)); } - Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType); - Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); - Stat schemaStat = pinotHelixResourceManager.getSchemaStat(schema.getSchemaName()); - if (tableStat == null || schemaStat == null) { - LOGGER.warn("Table or schema stat is null for table: {}, schema: {}", tableNameWithType, schema.getSchemaName()); + LOGGER.warn("Table or schema stat is null for table: {}", tableNameWithType); return false; } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskExecutorFactory.java similarity index 90% rename from pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskExecutorFactory.java rename to pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskExecutorFactory.java index 12834a6a83f..2f8f93be658 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskExecutorFactory.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskExecutorFactory.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.minion.tasks.segmentrefresh; +package org.apache.pinot.plugin.minion.tasks.refreshsegment; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.minion.MinionConf; @@ -39,11 +39,11 @@ public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minio @Override public String getTaskType() { - return MinionConstants.SegmentRefreshTask.TASK_TYPE; + return MinionConstants.RefreshSegmentTask.TASK_TYPE; } @Override public PinotTaskExecutor create() { - return new SegmentRefreshTaskExecutor(); + return new RefreshSegmentTaskExecutor(); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskProgressObserverFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskProgressObserverFactory.java similarity index 90% rename from pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskProgressObserverFactory.java rename to pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskProgressObserverFactory.java index 316bf07940e..8387c2d71fa 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentrefresh/SegmentRefreshTaskProgressObserverFactory.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskProgressObserverFactory.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.minion.tasks.segmentrefresh; +package org.apache.pinot.plugin.minion.tasks.refreshsegment; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory; @@ -28,6 +28,6 @@ public class SegmentRefreshTaskProgressObserverFactory extends BaseMinionProgres @Override public String getTaskType() { - return MinionConstants.SegmentRefreshTask.TASK_TYPE; + return MinionConstants.RefreshSegmentTask.TASK_TYPE; } }