Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Optimize computing user-facing state in data task #8346

Merged
merged 1 commit into from
Aug 24, 2023

Conversation

aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Aug 17, 2023

This PR optimizes computing user-facing state in data tasks to reduce the garbage and improve performance.

It is related to the work in #8336.

@github-actions github-actions bot added the core label Aug 17, 2023
@@ -45,31 +50,67 @@ protected FileScanTask self() {

@Override
protected FileScanTask newSplitTask(FileScanTask parentTask, long offset, long length) {
return new SplitScanTask(offset, length, parentTask);
return new SplitScanTask(offset, length, deletesSizeBytes(), parentTask);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures the size of deletes is only computed once for all split tasks generated from the same file task.


@Override
public int filesCount() {
return 1 + deletes.length;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Override this to avoid materializing deletesAsList to simply compute the files count.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: If we were adding more methods to the parent class, how can we make sure new methods are override in this method? Otherwise, it would probably accidentally materializing deletesAsList?

I don't think the above question is a blocker, and It would be great if we have some way/tests to detect that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably consider that separately from this PR. It would be unfortunate to materialize the list but it would not be the end of the world.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be unfortunate to materialize the list but it would not be the end of the world.

Of course. Current approach works for me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not initialize the deletes list in the constructor?
Then no need to have these overrides.

That would mean BaseFileScanTask constructor needs to make a copy, but that's probably actually a good thing, since it would make the class more immutable.
It will have to make the copy anyway, since List deletes() is the only way to get the deletes information from the class, so it will be called sooner or later.


@Override
public int filesCount() {
return fileScanTask.filesCount();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delegate to the parent task to avoid materializing the list of deletes for the files count in the split task as well.

@aokolnychyi
Copy link
Contributor Author

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pinging me.

Another meta question:
I saw a lot of lazily cache pattern in the code base, is there any simple construct that behavior likes Scala's lazy val xxx = .... I did a quick research, didn't find one. It would be nice if we could add such building block in the utility class(not in this PR's scope).

@@ -28,6 +28,10 @@ public class BaseFileScanTask extends BaseContentScanTask<FileScanTask, DataFile
implements FileScanTask {
private final DeleteFile[] deletes;

// lazy variables
private transient volatile List<DeleteFile> deletesAsList = null;
private transient volatile Long deletesSizeBytes = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the long vs Long in BaseContentScanTask, I would prefer long instead.

Java's Long adds at least 16/24 bytes class header overhead compared to long.
There is also boxing overhead when returning as long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that but was not sure about readability, I switched.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using primitive values initialized to a custom value is not safe with custom serialization. Some Flink tests started to fail as variables were initialized to 0 after deserialization. Switched back to Long. Yes, we would box but I am not sure computing this value in the constructor would be a good idea for BaseFileScanTask, which is constructed by planFiles and may not be used for split planning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for detail explanation.

On a second thought, how about declare it as a normal transient long, such as:

private transient volatile long deletesSizeBytes = 0;

private long deletesSizeBytes() {
    if (deletesSizeBytes == 0) { // the deletesSizeBytes might not initialized yet.
      long size = 0L;
      for (DeleteFile deleteFile : deletes) {
        size += deleteFile.fileSizeInBytes();
      }
      this.deletesSizeBytes = size;
    }

    return deletesSizeBytes;
}

We just need to pay a small addition check for no delete file cases: which is iterating an empty array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably work and would avoid boxing and extra serialization overhead. I switched but use an extra check on the size of the delete array, I think that's more obvious to the reader.

}

@Override
public Schema schema() {
return super.schema();
}

private long deletesSizeBytes() {
if (deletesSizeBytes == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did another look at this, how about we calculate this deleteSizeBytes fields in the constructor directly?

No need to special check a -1 or Long.MIN_VALUE any more...

Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would mean potentially serializing an extra field (while sending to executors). Given that can be many millions of such objects and fields, I'd probably not do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 (size of long) * 1_000_000(1 million) = ~8MB, I wouldn't care too much about this especially the tasks are serialized to multiple executors in multiple rounds(in Spark query engine).

However it do add unnecessary overhead for ScanTask without delete files. So a transient long and lazy calculation would be nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not the size but rather the need to serialize extra values. If there is 1M files and each of them has 4 row groups, it is 4M values to serialize on the driver. If we read 10M files, that's like 40M extra values.

The new approach should avoid both the serialization as well as the boxing overhead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checked the spark code(flink probably couldn't handle that scale), seems I was misunderstanding how tasks are serialized. I used to think only the specific task is serialized and sent to executor for one partition.

Seems like the rdd is serialized as a whole task binary for each task and sent to executor. That definitely adds a lot of overhead even for one extra field.


public BaseScanTaskGroup(StructLike groupingKey, Collection<T> tasks) {
Preconditions.checkNotNull(tasks, "tasks cannot be null");
this.groupingKey = groupingKey;
this.tasks = tasks.toArray();
this.taskCollection = Collections.unmodifiableCollection(tasks);
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to create a list of tasks unless this task group was serialized. We pass a collection here that gets immediately converted to an array for serialization purposes. This task group is then still accessed on the driver via the public tasks() method, which creates another collection while we could have used the one that was passed to the constructor. It is not worth it if we have millions of files.

}

@Override
public long estimatedRowsCount() {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caching isn't my primary goal. When profiling distributed planning, I noticed we generate tons of garbage while planning task groups and it sometimes takes up to 2/3 of the planning time to just plan groups for full table scans with millions of files. My primary motivation is to iterate over the array of tasks, instead of using the parent implementation with LongStream (which is slow and generates many unnecessary objects) or using an iterator-based approach (still has unnecessary overhead). For scans with 10+ million files, this overhead adds up, especially when we are running low on memory.

Internally, we did have a cache of tasks groups that were reused in multiple Spark scans. These metrics are being used for reporting stats to engines so while caching isn't the primary goal, it seems simple enough to do it and may be helpful if we also decide to cache task groups in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to drop the caching step for now. We can add it later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just calculate these values in the constructor? I feel like we pass over this array a bunch of times and we could just figure out these values at the beginning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we compute it in the constructor, it means we have to store that in variables and then serialize. We discussed it a bit here. Also, it may not always be needed. Given our current use cases, we don't benefit from caching. I decided to just optimize the computation itself for now.

@Override
public long sizeBytes() {
if (sizeBytes == Long.MIN_VALUE) {
this.sizeBytes = FileScanTask.super.sizeBytes();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would only be called after serialization. We init sizeBytes in the constructor otherwise.

@aokolnychyi aokolnychyi changed the title Core: Lazily cache user-facing state in BaseFileScanTask Core: Optimize computing user-facing state in data task Aug 19, 2023
@aokolnychyi aokolnychyi force-pushed the improve-file-scan-task branch 2 times, most recently from afd584a to 5f007b6 Compare August 19, 2023 01:03
Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


@Override
public int filesCount() {
return 1 + deletes.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be unfortunate to materialize the list but it would not be the end of the world.

Of course. Current approach works for me.

@@ -123,7 +170,19 @@ public boolean canMerge(ScanTask other) {
@Override
public SplitScanTask merge(ScanTask other) {
SplitScanTask that = (SplitScanTask) other;
return new SplitScanTask(offset, len + that.length(), fileScanTask);
return new SplitScanTask(offset, len + that.length(), fileScanTask, deletesSizeBytes);
Copy link
Contributor

@jerqi jerqi Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the method deleteSizeBytes() instead of the variable deleteSizeBytes? Should this place be consistent with other places? We use the method deleteSizeBytes() at BaseFileScanTask.java#50L.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the variable on purpose to avoid triggering the computation. Only use if already computed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have added a comment, will do tomorrow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SplitScanTask one = xxxx;
SplitScanTask two = xxx;
SplitScanTask three = one.merge(two); // Now the variable deleteSizeBytes is zero
three.deleteSizeBytes(); // we will compute the deleteSizeBytes
one.deleteSizeBytes();  // We will compute the deleteSizeBytes

It will recompute the deleteSizeBytes repeatedly in this situation. Could you point it out if I am wrong?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When SplitScanTask is created from BaseFileScanTask during planning, we are using deleteSizeBytes() to trigger the computation once per BaseFileScanTask instead of once per SplitScanTask (the number of split tasks usually matches the number of row groups). This is because each SplitScanTask has the same set of deletes as the parent task it was created from. After planning, adjacent SplitScanTasks in the same bin are merged together. While merging we are using deleteSizeBytes variable (not method), which should be already populated if the split tasks were created from the parent task. If the variable is not populated, it means tasks were created via a separate process (like parsing). In that case, we don't know whether it is beneficial to compute the size of deletes so we skip computing it unless requested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerqi, the use case above won't happen in practice as TableScanUtil would only use the merged task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When SplitScanTask is created from BaseFileScanTask during planning, we are using deleteSizeBytes() to trigger the computation once per BaseFileScanTask instead of once per SplitScanTask (the number of split tasks usually matches the number of row groups). This is because each SplitScanTask has the same set of deletes as the parent task it was created from. After planning, adjacent SplitScanTasks in the same bin are merged together. While merging we are using deleteSizeBytes variable (not method), which should be already populated if the split tasks were created from the parent task. If the variable is not populated, it means tasks were created via a separate process (like parsing). In that case, we don't know whether it is beneficial to compute the size of deletes so we skip computing it unless requested.

Thanks for your explanation. I get your point. Make sense.

Copy link
Contributor

@jerqi jerqi Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerqi, the use case above won't happen in practice as TableScanUtil would only use the merged task.

Thanks. I got it. Just a little worried whether people will misuse them in the future. It's ok for me.

@aokolnychyi
Copy link
Contributor Author

Here are some numbers with #8336 and this change.

After the changes:

Benchmark                                                                   Mode  Cnt           Score            Error   Units
TaskGroupPlanningBenchmark.planTaskGroups                                     ss    5           1.989 ±          0.097    s/op
TaskGroupPlanningBenchmark.planTaskGroups:·async                              ss                  NaN                      ---
TaskGroupPlanningBenchmark.planTaskGroups:·gc.alloc.rate                      ss    5         569.803 ±         30.506  MB/sec
TaskGroupPlanningBenchmark.planTaskGroups:·gc.alloc.rate.norm                 ss    5  1499818088.000 ±    3617083.698    B/op
TaskGroupPlanningBenchmark.planTaskGroups:·gc.churn.PS_Eden_Space             ss    5         603.209 ±       3180.736  MB/sec
TaskGroupPlanningBenchmark.planTaskGroups:·gc.churn.PS_Eden_Space.norm        ss    5  1596666675.200 ± 8422157507.465    B/op
TaskGroupPlanningBenchmark.planTaskGroups:·gc.churn.PS_Survivor_Space         ss    5          25.930 ±        149.334  MB/sec
TaskGroupPlanningBenchmark.planTaskGroups:·gc.churn.PS_Survivor_Space.norm    ss    5    68311168.000 ±  391184976.025    B/op
TaskGroupPlanningBenchmark.planTaskGroups:·gc.count                           ss    5           2.000                   counts
TaskGroupPlanningBenchmark.planTaskGroups:·gc.time                            ss    5          63.000                       ms

Before the changes:

Benchmark                                                                   Mode  Cnt            Score            Error   Units
TaskGroupPlanningBenchmark.planTaskGroups                                     ss    5            6.659 ±          0.214    s/op
TaskGroupPlanningBenchmark.planTaskGroups:·async                              ss                   NaN                      ---
TaskGroupPlanningBenchmark.planTaskGroups:·gc.alloc.rate                      ss    5         2529.137 ±         70.078  MB/sec
TaskGroupPlanningBenchmark.planTaskGroups:·gc.alloc.rate.norm                 ss    5  19052489617.600 ±    3591565.223    B/op
TaskGroupPlanningBenchmark.planTaskGroups:·gc.churn.PS_Eden_Space             ss    5         2559.777 ±       1165.034  MB/sec
TaskGroupPlanningBenchmark.planTaskGroups:·gc.churn.PS_Eden_Space.norm        ss    5  19285409792.000 ± 8829460580.255    B/op
TaskGroupPlanningBenchmark.planTaskGroups:·gc.churn.PS_Survivor_Space         ss    5           38.440 ±         29.535  MB/sec
TaskGroupPlanningBenchmark.planTaskGroups:·gc.churn.PS_Survivor_Space.norm    ss    5    289638648.000 ±  225080948.753    B/op
TaskGroupPlanningBenchmark.planTaskGroups:·gc.count                           ss    5           22.000                   counts
TaskGroupPlanningBenchmark.planTaskGroups:·gc.time                            ss    5          908.000                       ms

Apart from time, there is 10+ times reduction in allocation rate and only 2 vs 22 garbage collections, which would make a big difference if we are running low on memory.

@@ -45,31 +47,66 @@ protected FileScanTask self() {

@Override
protected FileScanTask newSplitTask(FileScanTask parentTask, long offset, long length) {
return new SplitScanTask(offset, length, parentTask);
return new SplitScanTask(offset, length, parentTask, deletesSizeBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use the method deleteSizeBytes here? I can't get the difference from the 178th line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave a bit of explanation here, let me know if that makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I got it.

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fine to me in general, but do we have any tests making sure that these methods are working correctly? I may be wrong here but I feel like we are changing some of these behaviors and I think normally this should break some tests.

public int filesCount() {
int filesCount = 0;
for (FileScanTask task : tasks) {
filesCount += task.filesCount();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is counting delete files that may be read for multiple data files multiple times, is that ok?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue for bytes too right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's the same behavior like we have today. I agree it is questionable but it is also useful to estimate the actual bytes read for delete files and how things overlap.

@aokolnychyi
Copy link
Contributor Author

The behavior should be exactly like before. There were some some tests for planning as well as for Spark and Flink serialization. I'll find and post them tomorrow.

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@aokolnychyi
Copy link
Contributor Author

There are tests in TestDataTableScan, TestBatchScans and in TestTableScanUtil + each engine has serialization tests. I feel like they cover most of what we need.

@aokolnychyi aokolnychyi merged commit 181d3e2 into apache:master Aug 24, 2023
42 checks passed
@aokolnychyi
Copy link
Contributor Author

Thanks for reviewing, @advancedxy @jerqi @RussellSpitzer!

}

@Override
public List<DeleteFile> deletes() {
return ImmutableList.copyOf(deletes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed that in the Iceberg version used in Trino, the deletes() does copy on every invocation. Thank you for fixing this!


@Override
public int filesCount() {
return 1 + deletes.length;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not initialize the deletes list in the constructor?
Then no need to have these overrides.

That would mean BaseFileScanTask constructor needs to make a copy, but that's probably actually a good thing, since it would make the class more immutable.
It will have to make the copy anyway, since List deletes() is the only way to get the deletes information from the class, so it will be called sooner or later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants