Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to save memory on aggregations #53793

Merged
merged 8 commits into from
Mar 23, 2020

Conversation

nik9000
Copy link
Member

@nik9000 nik9000 commented Mar 19, 2020

This delays deserializing the aggregation response try until right
before we merge the objects. It isn't super clear how much space this
saves, but:

  • It turns some object from fairly long lives to very short lived
    which is almost certainly a good thing from a JVM perspective.
  • It gives us a convenient place to check how much space the
    aggregation tree uses.
  • It probably does save a fair bit of memory because of the overhead
    the JVM has for many small objects, which the aggregation tree is
    mostly made up of.

This delays deserializing the aggregation response try until *right*
before we merge the objects.
@nik9000
Copy link
Member Author

nik9000 commented Mar 19, 2020

I'm making this a draft PR so the robots can chew on it.

* A holder for {@link Writeable}s that can delays reading the underlying
* {@linkplain Writeable} when it is read from a remote node.
*/
public abstract class DelayableWriteable<T extends Writeable> implements Supplier<T>, Writeable {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're only using this for InternalAggregations, but it is a heck of a lot simpler to test if it is generic.

@nik9000
Copy link
Member Author

nik9000 commented Mar 19, 2020

Ok, robots, get to work!

@nik9000
Copy link
Member Author

nik9000 commented Mar 19, 2020

Ok, robots, get to work!

OK! The tests are passing. I think it is time to un-draft this!

@nik9000 nik9000 marked this pull request as ready for review March 19, 2020 15:39
@nik9000 nik9000 requested a review from jimczi March 19, 2020 15:39
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (:Analytics/Aggregations)

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some questions but I like this change a lot. It's simple but it could also improve the memory profile of coordinating node significantly.

Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
aggsBuffer[0] = () -> reducedAggs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we nullify the rest of the array to make the reduced aggs eligible for gc ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the line right above does that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but we keep the serialized + deserialized form until after the partial reduce. We can try to release the serialized form early with:

 List<InternalAggregations> toReduce = Arrays.stream(aggsBuffer).map(Supplier::get).collect(toList());
 Arrays.fill(aggsBuffer, null);
 InternalAggregaions reducedAggs = InternalAggregations.topLevelReduce(toReduce, aggReduceContextBuilder.forPartialReduction());
 aggsBuffer[0] = () -> reducedAggs;

Or we can nullify the serialized form when the supplier is called like discussed below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! I noticed that right after I sent this. I'm playing with nulling the cell in the array as soon as I call get. That feels a little safer than nulling the bytes.

try (StreamInput in = registry == null ?
serialized.streamInput() : new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)) {
in.setVersion(remoteVersion);
return reader.read(in);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we nullify the bytes ref before returning the deserialized aggs ? We could also protect against multiple calls by keeping the deserialized aggs internally on the first call ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about race conditions with that. The way it is it is fairly simple the look at and say "there are no race conditions." I think nulifying the other references would be good enough from a GC perspective. Do you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep nullifying the reference should be enough but it would be better if we can nullify after each deserialization. Otherwise you'd need to keep the deserialized aggs and their bytes representation during the entire partial reduce which defeats the purpose of saving memories here ?

@@ -366,7 +371,11 @@ public void writeToNoId(StreamOutput out) throws IOException {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
aggregations.writeTo(out);
if (out.getVersion().before(Version.V_8_0_0)) {
aggregations.get().writeTo(out);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can maybe get the aggs once if the remote node is in a version before v8 (instead of calling get here and below to get the pipeline aggs) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 ? should we avoid the double deserialization if we need the pipeline aggs below ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Darn it. I twisted the other side around but missed this comment. Of course!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly these are going to be the "referencing" ones anyway. But I'll turn it around.

@@ -54,7 +55,7 @@
private TotalHits totalHits;
private float maxScore = Float.NaN;
private DocValueFormat[] sortValueFormats;
private InternalAggregations aggregations;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here to explain why we use a delayable writable ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

@nik9000
Copy link
Member Author

nik9000 commented Mar 23, 2020

@jimczi I think this is ready for another round.

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left one comment but the change looks good to me.
One simple follow up could be to add the memory consumed by these aggregations in the request circuit breaker and to release it when we perform a partial/final reduce ?

@@ -366,7 +371,11 @@ public void writeToNoId(StreamOutput out) throws IOException {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
aggregations.writeTo(out);
if (out.getVersion().before(Version.V_8_0_0)) {
aggregations.get().writeTo(out);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 ? should we avoid the double deserialization if we need the pipeline aggs below ?

@nik9000
Copy link
Member Author

nik9000 commented Mar 23, 2020

One simple follow up could be to add the memory consumed by these aggregations in the request circuit breaker and to release it when we perform a partial/final reduce ?

Yeah! I'd also love to add it somewhere where I can see it. But that does seem like a great thing.

@nik9000 nik9000 merged commit 1ca52fc into elastic:master Mar 23, 2020
nik9000 added a commit to nik9000/elasticsearch that referenced this pull request Mar 23, 2020
This delays deserializing the aggregation response try until *right*
before we merge the objects.
nik9000 added a commit to nik9000/elasticsearch that referenced this pull request Mar 23, 2020
Update version in wire protocol and disable BWC.
nik9000 added a commit that referenced this pull request Mar 23, 2020
Update version in wire protocol and disable BWC.
nik9000 added a commit that referenced this pull request Mar 23, 2020
This delays deserializing the aggregation response try until *right*
before we merge the objects.
nik9000 added a commit to nik9000/elasticsearch that referenced this pull request Mar 23, 2020
nik9000 added a commit to nik9000/elasticsearch that referenced this pull request Mar 23, 2020
I created this bug today in elastic#53793. When a `DelayableWriteable` that
references an existing object serializes itself it wasn't taking the
version of the node on the other side of the wire into account. This
fixes that.
nik9000 added a commit that referenced this pull request Mar 23, 2020
I created this bug today in #53793. When a `DelayableWriteable` that
references an existing object serializes itself it wasn't taking the
version of the node on the other side of the wire into account. This
fixes that.
nik9000 added a commit that referenced this pull request Mar 23, 2020
* Reenable BWC tests after backport of #53793
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants