Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minion Task to support automatic Segment Refresh #14300

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableStats;
import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving this change (TableStats -> TableStatsHumanReadable) to a separate PR for cleaner rollbacks / cherry-picks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to do it in this PR because I'm making use of TableStats and there's a dependency.

import org.apache.pinot.spi.config.table.TableStatus;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -875,13 +875,13 @@ public String getTableStats(
if ((tableTypeStr == null || TableType.OFFLINE.name().equalsIgnoreCase(tableTypeStr))
&& _pinotHelixResourceManager.hasOfflineTable(tableName)) {
String tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
TableStats tableStats = _pinotHelixResourceManager.getTableStats(tableNameWithType);
TableStatsHumanReadable tableStats = _pinotHelixResourceManager.getTableStatsHumanReadable(tableNameWithType);
ret.set(TableType.OFFLINE.name(), JsonUtils.objectToJsonNode(tableStats));
}
if ((tableTypeStr == null || TableType.REALTIME.name().equalsIgnoreCase(tableTypeStr))
&& _pinotHelixResourceManager.hasRealtimeTable(tableName)) {
String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
TableStats tableStats = _pinotHelixResourceManager.getTableStats(tableNameWithType);
TableStatsHumanReadable tableStats = _pinotHelixResourceManager.getTableStatsHumanReadable(tableNameWithType);
ret.set(TableType.REALTIME.name(), JsonUtils.objectToJsonNode(tableStats));
}
return ret.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableStats;
import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
Expand Down Expand Up @@ -4188,12 +4188,22 @@ public Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) {
return onlineSegments;
}

public TableStats getTableStats(String tableNameWithType) {
public TableStatsHumanReadable getTableStatsHumanReadable(String tableNameWithType) {
String zkPath = ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType);
Stat stat = _propertyStore.getStat(zkPath, AccessOption.PERSISTENT);
Preconditions.checkState(stat != null, "Failed to read ZK stats for table: %s", tableNameWithType);
String creationTime = SIMPLE_DATE_FORMAT.format(Instant.ofEpochMilli(stat.getCtime()));
return new TableStats(creationTime);
return new TableStatsHumanReadable(creationTime);
}

public Stat getTableStat(String tableNameWithType) {
String zkPath = ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType);
return _propertyStore.getStat(zkPath, AccessOption.PERSISTENT);
}

public Stat getSchemaStat(String schemaName) {
String zkPath = ZKMetadataProvider.constructPropertyStorePathForSchema(schemaName);
return _propertyStore.getStat(zkPath, AccessOption.PERSISTENT);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,29 @@ public static class RealtimeToOfflineSegmentsTask extends MergeTask {
// Generate segment and push to controller based on batch ingestion configs
public static class SegmentGenerationAndPushTask {
public static final String TASK_TYPE = "SegmentGenerationAndPushTask";
public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
"SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
}

/**
* Minion task to refresh segments when there are changes to tableConfigs and Schema. This task currently supports the
* following functionality:
* 1. Adding/Removing/Updating indexes.
* 2. Adding new columns (also supports transform configs for new columns).
* 3. Converting segment versions.
* 4. Compatible datatype changes to columns (Note that the minion task will fail if the data in the column is not
* compatible with target datatype)
*
* This is an alternative to performing reload of existing segments on Servers. The reload on servers is sub-optimal
* for many reasons:
* 1. Requires an explicit reload call when index configurations change.
* 2. Is very slow. Happens one (or few - configurable) segment at time to avoid query impact.
* 3. Compute price is paid on all servers hosting the segment.q
* 4. Increases server startup time as more and more segments require reload.
*/
public static class RefreshSegmentTask {
public static final String TASK_TYPE = "RefreshSegmentTask";

// Maximum number of tasks to create per table per run.
public static final int MAX_NUM_TASKS_PER_TABLE = 20;
}

public static class UpsertCompactionTask {
Expand Down
Loading
Loading