Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vvivekiyer committed Nov 15, 2024
1 parent f485324 commit 9a7814c
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ public static class RealtimeToOfflineSegmentsTask extends MergeTask {
// Generate segment and push to controller based on batch ingestion configs
public static class SegmentGenerationAndPushTask {
public static final String TASK_TYPE = "SegmentGenerationAndPushTask";
public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
"SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
}

/**
Expand All @@ -163,8 +161,11 @@ public static class SegmentGenerationAndPushTask {
* 3. Compute price is paid on all servers hosting the segment.q
* 4. Increases server startup time as more and more segments require reload.
*/
public static class SegmentRefreshTask {
public static final String TASK_TYPE = "SegmentRefreshTask";
public static class RefreshSegmentTask {
public static final String TASK_TYPE = "RefreshSegmentTask";

// Maximum number of tasks to create per table per run.
public static final int MAX_NUM_TASKS_PER_TABLE = 20;
}

public static class UpsertCompactionTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
Expand All @@ -56,7 +57,7 @@
import static org.testng.Assert.*;


public class SegmentRefreshMinionClusterIntegrationTest extends BaseClusterIntegrationTest {
public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterIntegrationTest {
protected PinotHelixTaskResourceManager _helixTaskResourceManager;
protected PinotTaskManager _taskManager;
protected PinotHelixResourceManager _pinotHelixResourceManager;
Expand All @@ -78,7 +79,7 @@ public void setUp() throws Exception {
Schema schema = createSchema();
addSchema(schema);
TableConfig tableConfig = createOfflineTableConfig();
tableConfig.setTaskConfig(getSegmentRefreshTaskConfig());
tableConfig.setTaskConfig(getRefreshSegmentTaskConfig());
addTableConfig(tableConfig);

// Unpack the Avro files
Expand Down Expand Up @@ -108,17 +109,17 @@ public void testFirstSegmentRefresh() {
// This will create the inverted index as we disable inverted index creation during segment push.
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.SegmentRefreshTask.TASK_TYPE));
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.SegmentRefreshTask.TASK_TYPE)));
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.SegmentRefreshTask.TASK_TYPE));
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
waitForTaskToComplete();

// Check that metadata contains expected values
Map<String, Long> segmentRefreshTime = new HashMap<>();
String refreshKey = MinionConstants.SegmentRefreshTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX;
String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX;
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Get the value in segment metadata
Map<String, String> customMap = metadata.getCustomMap();
Expand All @@ -128,12 +129,12 @@ public void testFirstSegmentRefresh() {

// This should be no-op as nothing changes.
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.SegmentRefreshTask.TASK_TYPE));
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Get the value in segment metadata
Map<String, String> customMap = metadata.getCustomMap();
assertTrue(
customMap.containsKey(MinionConstants.SegmentRefreshTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX));
customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX));
assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), Long.parseLong(customMap.get(refreshKey)),
"Refresh Time doesn't match");
}
Expand All @@ -153,12 +154,12 @@ public void testValidDatatypeChange() throws Exception {
addSchema(schema);

assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.SegmentRefreshTask.TASK_TYPE));
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.SegmentRefreshTask.TASK_TYPE)));
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.SegmentRefreshTask.TASK_TYPE));
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
waitForTaskToComplete();

waitForServerSegmentDownload(aVoid -> {
Expand Down Expand Up @@ -232,12 +233,12 @@ public void testIndexChanges() throws Exception {

String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.SegmentRefreshTask.TASK_TYPE));
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.SegmentRefreshTask.TASK_TYPE)));
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.SegmentRefreshTask.TASK_TYPE));
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
waitForTaskToComplete();

waitForServerSegmentDownload(aVoid -> {
Expand Down Expand Up @@ -323,16 +324,16 @@ public void checkColumnAddition() throws Exception {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());

assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.SegmentRefreshTask.TASK_TYPE));
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.SegmentRefreshTask.TASK_TYPE)));
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.SegmentRefreshTask.TASK_TYPE));
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
waitForTaskToComplete();

// Check that metadata contains processed times.
String refreshKey = MinionConstants.SegmentRefreshTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX;
String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX;
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Get the value in segment metadata
Map<String, String> customMap = metadata.getCustomMap();
Expand Down Expand Up @@ -386,10 +387,51 @@ public void checkColumnAddition() throws Exception {
assertTrue(derivedNullStringColumnIndex.has(StandardIndexes.NULL_VALUE_VECTOR_ID));
}


@Test(priority = 5)
public void checkRefreshNotNecessary() throws Exception {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQuotaConfig(new QuotaConfig(null, "10"));

updateTableConfig(tableConfig);

assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
waitForTaskToComplete();

// Check that metadata contains expected values
Map<String, Long> segmentRefreshTime = new HashMap<>();
String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX;
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Get the value in segment metadata
Map<String, String> customMap = metadata.getCustomMap();
assertTrue(customMap.containsKey(refreshKey));
segmentRefreshTime.put(metadata.getSegmentName(), Long.parseLong(customMap.get(refreshKey)));
}

// This should be no-op as nothing changes.
assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Get the value in segment metadata
Map<String, String> customMap = metadata.getCustomMap();
assertTrue(
customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX));
assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), Long.parseLong(customMap.get(refreshKey)),
"Refresh Time doesn't match");
}
}

protected void waitForTaskToComplete() {
TestUtils.waitForCondition(input -> {
// Check task state
for (TaskState taskState : _helixTaskResourceManager.getTaskStates(MinionConstants.SegmentRefreshTask.TASK_TYPE)
for (TaskState taskState : _helixTaskResourceManager.getTaskStates(MinionConstants.RefreshSegmentTask.TASK_TYPE)
.values()) {
if (taskState != TaskState.COMPLETED) {
return false;
Expand All @@ -406,9 +448,9 @@ protected void waitForServerSegmentDownload(Function<Void, Boolean> conditionFun
}, 60_000L, "Failed to meet condition");
}

private TableTaskConfig getSegmentRefreshTaskConfig() {
private TableTaskConfig getRefreshSegmentTaskConfig() {
Map<String, String> tableTaskConfigs = new HashMap<>();
return new TableTaskConfig(
Collections.singletonMap(MinionConstants.SegmentRefreshTask.TASK_TYPE, tableTaskConfigs));
Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE, tableTaskConfigs));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.minion.tasks.segmentrefresh;
package org.apache.pinot.plugin.minion.tasks.refreshsegment;

import java.io.File;
import java.util.Collections;
Expand Down Expand Up @@ -46,13 +46,13 @@
import org.slf4j.LoggerFactory;


public class SegmentRefreshTaskExecutor extends BaseSingleSegmentConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentRefreshTaskGenerator.class);
public class RefreshSegmentTaskExecutor extends BaseSingleSegmentConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(RefreshSegmentTaskGenerator.class);

private long _taskStartTime;

/**
* The code here currently covers segmentRefresh for the following cases:
* The code here currently covers segment refresh for the following cases:
* 1. Process newly added columns.
* 2. Addition/removal of indexes.
* 3. Compatible datatype change for existing columns
Expand Down Expand Up @@ -90,6 +90,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
SegmentDirectory segmentDirectory =
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(indexDir.toURI(), segmentLoaderContext);

// TODO: Instead of relying on needPreprocess(), process segment metadata file to determine if refresh is needed.
// BaseDefaultColumnHandler part of needPreprocess() does not process any changes to existing columns like datatype,
// change from dimension to metric, etc.
boolean needPreprocess = ImmutableSegmentLoader.needPreprocess(segmentDirectory, indexLoadingConfig, schema);
Expand Down Expand Up @@ -129,7 +130,10 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
if (!needPreprocess && refreshColumnSet.isEmpty()) {
LOGGER.info("Skipping segment={}, table={} as it is up-to-date with new table/schema", segmentName,
tableNameWithType);
// We just need to update the ZK metadata with the last refresh time to avoid getting picked up again. As the CRC
// check will match, this will only end up being a ZK update.
return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType)
.setFile(indexDir)
.setSegmentName(segmentName)
.build();
}
Expand Down Expand Up @@ -159,6 +163,9 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File

private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig,
SegmentMetadataImpl segmentMetadata, String segmentName, Schema schema) {
// Inverted index creation is disabled by default during segment generation typically to reduce segment push times
// from external sources like HDFS. Also, not creating an inverted index here, the segment will always be flagged as
// needReload, causing the segment refresh to take place.
tableConfig.getIndexingConfig().setCreateInvertedIndexDuringSegmentGeneration(true);
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(workingDir.getPath());
Expand Down Expand Up @@ -194,7 +201,7 @@ private static void closeSegmentDirectoryQuietly(SegmentDirectory segmentDirecto
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
SegmentConversionResult segmentConversionResult) {
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
Collections.singletonMap(MinionConstants.SegmentRefreshTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
String.valueOf(_taskStartTime)));
}
}
Loading

0 comments on commit 9a7814c

Please sign in to comment.