Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minion Task to support automatic Segment Refresh #14300

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

vvivekiyer
Copy link
Contributor

@vvivekiyer vvivekiyer commented Oct 24, 2024

Currently, when new columns are added or indexes are added/removed, the segment reloads happen on the server. There are a number of issues with this approach:

  1. Increased startup times for Pinot Server hosts. Servers have to reload segments (generating indexes, columns) at server startup when they are replaced/swapped/migrated.
  2. The server reload compute cost is paid on each server when indexes/colums are added. This leads to over-provisioning of servers to account for this compute cost.
  3. Reload on servers when queries are being processed affects latencies.
  4. Takes a long time to reload all segments (default value of 1 segment at a time). Increasing the concurrency affects query latencies.
  5. The segment on the deepstore never contains the new indexes/columns. So the segment in deepstore is at divergence from the server (making it not ideal for disaster recovery).

This PR creates a minion task to automatically refresh segments when there are index/column updates to table config/schema. It can support automatic refresh for the following operations:

  1. Adding/Removing indexes
  2. Adding columns
  3. Changing compatible datatypes.
  4. Converting segment versions

Followup Work:

  1. When there are table config/schema updates, we can validate if the datatype changes for columns are compatible. We can allow compatible updates.
  2. Schedule the SegmentRefresh tasks when there are tableconfig/schema updates rather than waiting for the next iteration of periodic job.

Notes on Implementation

The premise used to solve this was: Keep the deepstore segment in sync with table Config (this will automatically make sure that the servers have the updated segments). Please see #9360. Keeping deep store in sync becomes crucial for: Reducing server startup times when servers are replaced/migrated.

Task Generation Flow:

  1. When there is any table config/schema update, checks if the segment was processed at least once after the table config update. If yes, no task is generated.
  2. If no, creates a minion task.

Task Execution Flow:

  1. Loads the segment.
  2. Checks if the segment needs to be reconstructed depending on table configs.
  3. If (2) is a NO, updates the last processed time and uploads the segment. The upload is purely a ZK metadata update as the CRCs will match
  4. If (2) is a yes, a new segment is built with the updated tableConfig/Schema.

Relying on a server API call to indicate whether a segment needs to be refreshed was not preferred because:

  • Servers might indicate that a segment doesn’t need refresh (using a mechanism like Support API for checking if segments need to be reloaded for a table #12117) just because they were restarted. This will still leave the segments on deepstore outdated.

  • Server Preprocess currently supports very limited operations. As we add more capability like datatype changes/compression changes, relying on server Preprocess will give the wrong signal just because serverPreprocess doesn’t support the operation.

  • Using server APIs to get all segment Metadata to the controller for all tables every time the periodic task runs can be overkill.

Cons of this approach is that there will be minion tasks created for all segments for each table config update.

To overcome this problem, we can use a server side API that will return the list of segments to be refreshed. It is being developed in #14450. We can incorporate these changes in the Task Generation Flow once it is merged. (cc: @swaminathanmanish)

@ankitsultana
Copy link
Contributor

@vvivekiyer : the idea is quite interesting and the value add I see here is:

  • We can increase concurrency for segment refresh thereby reducing the total time to reload all segments
  • Deepstore link can be updated with the new segment
  • Possible perf improvements due to less work done in servers, but I guess we need to test this out.

This is particularly exacerbated for Upsert tables

But for Upserts I think one of the biggest cost is recomputing the validDocId map, so for Upsert tables we won't see any specific benefits right? (outside of the ones which are applicable for Realtime tables too).

@codecov-commenter
Copy link

codecov-commenter commented Oct 24, 2024

Codecov Report

Attention: Patch coverage is 3.65854% with 158 lines in your changes missing coverage. Please review.

Project coverage is 63.72%. Comparing base (59551e4) to head (9a7814c).
Report is 1335 commits behind head on master.

Files with missing lines Patch % Lines
...sks/refreshsegment/RefreshSegmentTaskExecutor.java 0.00% 82 Missing ⚠️
...ks/refreshsegment/RefreshSegmentTaskGenerator.java 0.00% 62 Missing ⚠️
...reshsegment/SegmentRefreshTaskExecutorFactory.java 0.00% 5 Missing ⚠️
.../org/apache/pinot/core/common/MinionConstants.java 0.00% 3 Missing ⚠️
...ntroller/helix/core/PinotHelixResourceManager.java 75.00% 1 Missing and 1 partial ⚠️
...ent/SegmentRefreshTaskProgressObserverFactory.java 0.00% 2 Missing ⚠️
...oller/api/resources/PinotTableRestletResource.java 0.00% 1 Missing ⚠️
...inot/spi/config/table/TableStatsHumanReadable.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14300      +/-   ##
============================================
+ Coverage     61.75%   63.72%   +1.97%     
- Complexity      207     1566    +1359     
============================================
  Files          2436     2667     +231     
  Lines        133233   146351   +13118     
  Branches      20636    22414    +1778     
============================================
+ Hits          82274    93264   +10990     
- Misses        44911    46198    +1287     
- Partials       6048     6889     +841     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.70% <3.65%> (+1.99%) ⬆️
java-21 63.61% <3.65%> (+1.98%) ⬆️
skip-bytebuffers-false 63.72% <3.65%> (+1.97%) ⬆️
skip-bytebuffers-true 34.07% <3.65%> (+6.34%) ⬆️
temurin 63.72% <3.65%> (+1.97%) ⬆️
unittests 63.72% <3.65%> (+1.97%) ⬆️
unittests1 55.52% <0.00%> (+8.63%) ⬆️
unittests2 34.08% <3.65%> (+6.35%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tibrewalpratik17
Copy link
Contributor

We can increase concurrency for segment refresh thereby reducing the total time to reload all segments

Are you suggesting an increase in concurrency at the minion level or the server level? At the server level, it seems we would still issue a SegmentRefreshTask, which means the default concurrency would remain at 1. We can investigate performance improvements that might allow us to adjust the concurrency configuration.

Overall, this appears to be a valuable feature to reduce index build time and associated costs for servers! However, we need to consider the trade-off between SegmentRefresh and SegmentReload costs. Ultimately, we would still issue a SegmentRefresh call to the servers, if I understand correctly. For upsert tables with snapshot enabled, we risk losing validDocIDSnapshot during downloads from deep store since deep store lacks snapshot copies. This could potentially increase refresh times for these tables, as we wouldn't be able to utilize the preload feature.

@vvivekiyer vvivekiyer marked this pull request as ready for review October 25, 2024 19:06
@Jackie-Jiang Jackie-Jiang added release-notes Referenced by PRs that need attention when compiling the next release notes minion Configuration Config changes (addition/deletion/change in behavior) labels Oct 28, 2024
@vvivekiyer
Copy link
Contributor Author

But for Upserts I think one of the biggest cost is recomputing the validDocId map, so for Upsert tables we won't see any specific benefits right? (outside of the ones which are applicable for Realtime tables too).
AND
For upsert tables with snapshot enabled, we risk losing validDocIDSnapshot during downloads from deep store since deep store lacks snapshot copies

Yes, that's right. Exploring possibilities here - if we couple segment refresh minion task to also do other things (like upsert compaction, etc), will that help?

@tibrewalpratik17
Copy link
Contributor

tibrewalpratik17 commented Oct 29, 2024

if we couple segment refresh minion task to also do other things (like upsert compaction, etc), will that help?

The benefits of including compaction in this task will vary from use-case to use-case depending on the number of invalid docIDs.
cc @klsince too on ideas for upsert

// We set _taskStartTime before fetching the tableConfig. Task Generation relies on tableConfig/Schema updates
// happening after the last processed time. So we explicity use the timestamp before fetching tableConfig as the
// processedTime.
_taskStartTime = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking for table divergence in the task generator, between segment metadata & table config, will avoid this state/make the detection stateless.
getSegmentsMetadata in TableMetadataReader, gives the segment metadata and we can compare the metadata with table config to determine which ones need Refresh, in the generator itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are pros and cons to either approach:

  1. With the current way - we pay the computation cost on the Minions (and the controllers/helix).
  2. If we use getSegmentsMetadata we will pay computation cost on the Pinot Servers.

I resolved to (1) to avoid overloading the servers. Do you see any compelling reason for going with approach (2) and issuing a call to Pinot Servers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 2), the cost should be minimal since the API will be called once every scheduled trigger. However we are shipping the metadata payload to the controller which can add to the controller heap (if you have 1000's of segments). This is one downside. The controller/task generator uses the metadata to check if a segment has diverged from table config.

The reason I suggested that is we will know exactly whether subtasks need to be triggered or not, instead of triggering subtasks for every table update even if its unrelated (if I understood the trigger logic).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this. The compute/memory on minion is cheaper and I'm inclined to doing it this way. Moreover, doing it at minion gives us the flexibility of also support compression type changes on segments (where we have to initialize the forward index reader to get the compression info)

Copy link
Contributor

@swaminathanmanish swaminathanmanish Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concerns are, we are unnecessarily doing the following when refresh is not required (partition changes, transformer/filter changes etc.)

  1. Creating a large number of subtasks
  2. Going through BaseSingleSegmentConversionExecutor which modifies Zk metadata and even uploads the segment. Although looking at the code, it seems that we dont do either when segment file is not present in conversion result. What is expected from the executor when a segment need not be refreshed? Do we have to update Zk so that its not picked again.

@vrajat is working on the server side API to detect diverged segments. This will reduce the payload by not shipping the segment metadata/but only sending the segment names that require refresh. The call to the server will only happen during task scheduled trigger (order of minutes/hours) which in my opinion should not put load on the server. Until then we can use getSegmentMetadata call on the server?

#14450

We cannot get this on the server?
Moreover, doing it at minion gives us the flexibility of also support compression type changes on segments (where we have to initialize the forward index reader to get the compression info)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see recently we added an API on server / controller to see if reload is required or not: #12117

Copy link
Contributor Author

@vvivekiyer vvivekiyer Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the pass through comment. I've updated the description with the detailed reasoning for not using server APIs to detect this. Please take a look at the "Notes on Implementation" section. @swaminathanmanish I've also pinged on slack to quickly align on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced with @swaminathanmanish offline. We'll go with these changes now to avoid the controller heap blowup by transporting all segment metadata. We can use the API in #14450 once it is merged.

@@ -112,7 +112,7 @@
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableStats;
import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving this change (TableStats -> TableStatsHumanReadable) to a separate PR for cleaner rollbacks / cherry-picks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to do it in this PR because I'm making use of TableStats and there's a dependency.

// tableMTime > segmentZKMetadata.getCreationTime() || schemaMTime > segmentZKMetadata.getCreationTime();

boolean segmentProcessedBeforeUpdate = tableMTime > lastProcessedTime || schemaMTime > lastProcessedTime;
return segmentProcessedBeforeUpdate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also add a crc check to figure out if we need to trigger a refresh or not. This way we also ensure deepstore copy gets updated with latest indexes / schemas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't get this. Are you suggesting we need to check the crc in the ZK metadata against the crc in deepstore? They are bound to always be the same right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Necessarily not! I have seen this a lot of times in Upsert-compaction task as well: #13491.
This is some race-condition which we should solve fundamentally but I think for now we can let this task refresh the segment in deepstore anyways.

import static org.testng.Assert.*;


public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterIntegrationTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test where refresh is not expected to happen and we validate that task is not running ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test.

@swaminathanmanish
Copy link
Contributor

Thanks for capturing our discussion in the description.
Could you also create a github issue for the follow up task to track?

To overcome this problem, we can use a server side API that will return the list of segments to be refreshed. It is being developed in https://github.com/apache/pinot/issues/14450. We can incorporate these changes in the Task Generation Flow once it is merged



@TaskExecutorFactory
public class SegmentRefreshTaskExecutorFactory implements PinotTaskExecutorFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rename this as well to RefreshSegment...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually prefer "SegmentRefreshTask" over "RefreshSegmentTask". Whichever name you pick, please make sure it's used everywhere.



@EventObserverFactory
public class SegmentRefreshTaskProgressObserverFactory extends BaseMinionProgressObserverFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rename this as well to RefreshSegment...?

// We just need to update the ZK metadata with the last refresh time to avoid getting picked up again. As the CRC
// check will match, this will only end up being a ZK update.
return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType)
.setFile(indexDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you verify that the upload path is lightweight and updates only Zk path? (i.e checks only when there's crc mismatch)

Copy link
Contributor

@sajjad-moradi sajjad-moradi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than a few minor comments, LGTM.

Comment on lines +102 to +105
if (fieldSpecInSchema.isVirtualColumn()) {
continue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do virtual columns show up in the schema?

SegmentConversionResult segmentConversionResult) {
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
String.valueOf(_taskStartTime)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put human readable time string? Something like 2024-11-09T03:21:59.989Z makes debugging much easier.



@TaskExecutorFactory
public class SegmentRefreshTaskExecutorFactory implements PinotTaskExecutorFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually prefer "SegmentRefreshTask" over "RefreshSegmentTask". Whichever name you pick, please make sure it's used everywhere.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Configuration Config changes (addition/deletion/change in behavior) feature minion release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants