Skip to content

Commit

Permalink
Remove supersetSize and subsetSize from InternalSignificantTerms.Buck…
Browse files Browse the repository at this point in the history
…et (elastic#117574)

Those fields are only used to update the score and not serialized in the bucket so they can be removed.
  • Loading branch information
iverase committed Dec 3, 2024
1 parent c1b9842 commit 3f9ef6f
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public void testScriptScore() throws ExecutionException, InterruptedException, I
for (SignificantTerms.Bucket bucket : sigTerms.getBuckets()) {
assertThat(
bucket.getSignificanceScore(),
is((double) bucket.getSubsetDf() + bucket.getSubsetSize() + bucket.getSupersetDf() + bucket.getSupersetSize())
is((double) bucket.getSubsetDf() + sigTerms.getSubsetSize() + bucket.getSupersetDf() + sigTerms.getSupersetSize())
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ SignificantStringTerms.Bucket[] buildBuckets(int size) {

@Override
SignificantStringTerms.Bucket buildEmptyTemporaryBucket() {
return new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
return new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, null, format, 0);
}

private long subsetSize(long owningBucketOrd) {
Expand All @@ -994,22 +994,19 @@ private long subsetSize(long owningBucketOrd) {
}

@Override
BucketUpdater<SignificantStringTerms.Bucket> bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd)
throws IOException {
BucketUpdater<SignificantStringTerms.Bucket> bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd) {
long subsetSize = subsetSize(owningBucketOrd);
return (spare, globalOrd, bucketOrd, docCount) -> {
spare.bucketOrd = bucketOrd;
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
spare.subsetDf = docCount;
spare.subsetSize = subsetSize;
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);
spare.supersetSize = supersetSize;
/*
* During shard-local down-selection we use subset/superset stats
* that are for this shard only. Back at the central reducer these
* properties will be updated with global stats.
*/
spare.updateScore(significanceHeuristic);
spare.updateScore(significanceHeuristic, subsetSize, supersetSize);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected InternalMappedSignificantTerms(StreamInput in, Bucket.Reader<B> bucket
subsetSize = in.readVLong();
supersetSize = in.readVLong();
significanceHeuristic = in.readNamedWriteable(SignificanceHeuristic.class);
buckets = in.readCollectionAsList(stream -> bucketReader.read(stream, subsetSize, supersetSize, format));
buckets = in.readCollectionAsList(stream -> bucketReader.read(stream, format));
}

@Override
Expand Down Expand Up @@ -91,12 +91,12 @@ public B getBucketByKey(String term) {
}

@Override
protected long getSubsetSize() {
public long getSubsetSize() {
return subsetSize;
}

@Override
protected long getSupersetSize() {
public long getSupersetSize() {
return supersetSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,11 @@ public abstract static class Bucket<B extends Bucket<B>> extends InternalMultiBu
*/
@FunctionalInterface
public interface Reader<B extends Bucket<B>> {
B read(StreamInput in, long subsetSize, long supersetSize, DocValueFormat format) throws IOException;
B read(StreamInput in, DocValueFormat format) throws IOException;
}

long subsetDf;
long subsetSize;
long supersetDf;
long supersetSize;
/**
* Ordinal of the bucket while it is being built. Not used after it is
* returned from {@link Aggregator#buildAggregations(org.elasticsearch.common.util.LongArray)} and not
Expand All @@ -70,16 +68,7 @@ public interface Reader<B extends Bucket<B>> {
protected InternalAggregations aggregations;
final transient DocValueFormat format;

protected Bucket(
long subsetDf,
long subsetSize,
long supersetDf,
long supersetSize,
InternalAggregations aggregations,
DocValueFormat format
) {
this.subsetSize = subsetSize;
this.supersetSize = supersetSize;
protected Bucket(long subsetDf, long supersetDf, InternalAggregations aggregations, DocValueFormat format) {
this.subsetDf = subsetDf;
this.supersetDf = supersetDf;
this.aggregations = aggregations;
Expand All @@ -89,9 +78,7 @@ protected Bucket(
/**
* Read from a stream.
*/
protected Bucket(StreamInput in, long subsetSize, long supersetSize, DocValueFormat format) {
this.subsetSize = subsetSize;
this.supersetSize = supersetSize;
protected Bucket(StreamInput in, DocValueFormat format) {
this.format = format;
}

Expand All @@ -105,20 +92,10 @@ public long getSupersetDf() {
return supersetDf;
}

@Override
public long getSupersetSize() {
return supersetSize;
}

@Override
public long getSubsetSize() {
return subsetSize;
}

// TODO we should refactor to remove this, since buckets should be immutable after they are generated.
// This can lead to confusing bugs if the bucket is re-created (via createBucket() or similar) without
// the score
void updateScore(SignificanceHeuristic significanceHeuristic) {
void updateScore(SignificanceHeuristic significanceHeuristic, long subsetSize, long supersetSize) {
score = significanceHeuristic.getScore(subsetDf, subsetSize, supersetDf, supersetSize);
}

Expand Down Expand Up @@ -262,13 +239,11 @@ public InternalAggregation get() {
buckets.forEach(entry -> {
final B b = createBucket(
entry.value.subsetDf[0],
globalSubsetSize,
entry.value.supersetDf[0],
globalSupersetSize,
entry.value.reducer.getAggregations(),
entry.value.reducer.getProto()
);
b.updateScore(heuristic);
b.updateScore(heuristic, globalSubsetSize, globalSupersetSize);
if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) {
final B removed = ordered.insertWithOverflow(b);
if (removed == null) {
Expand Down Expand Up @@ -317,9 +292,7 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
.map(
b -> createBucket(
samplingContext.scaleUp(b.subsetDf),
subsetSize,
samplingContext.scaleUp(b.supersetDf),
supersetSize,
InternalAggregations.finalizeSampling(b.aggregations, samplingContext),
b
)
Expand All @@ -328,14 +301,7 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
);
}

abstract B createBucket(
long subsetDf,
long subsetSize,
long supersetDf,
long supersetSize,
InternalAggregations aggregations,
B prototype
);
abstract B createBucket(long subsetDf, long supersetDf, InternalAggregations aggregations, B prototype);

protected abstract A create(long subsetSize, long supersetSize, List<B> buckets);

Expand All @@ -344,10 +310,6 @@ abstract B createBucket(
*/
protected abstract B[] createBucketsArray(int size);

protected abstract long getSubsetSize();

protected abstract long getSupersetSize();

protected abstract SignificanceHeuristic getSignificanceHeuristic();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Supplier;

import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;

Expand Down Expand Up @@ -296,7 +295,7 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro
try (ObjectArrayPriorityQueue<B> ordered = buildPriorityQueue(size)) {
B spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningOrd);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningOrd);
BucketUpdater<B> bucketUpdater = bucketUpdater(owningOrd);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
Expand All @@ -305,9 +304,9 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro
}
if (spare == null) {
checkRealMemoryCBForInternalBucket();
spare = emptyBucketBuilder.get();
spare = buildEmptyBucket();
}
updateBucket(spare, ordsEnum, docCount);
bucketUpdater.updateBucket(spare, ordsEnum, docCount);
spare = ordered.insertWithOverflow(spare);
}

Expand Down Expand Up @@ -348,9 +347,9 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro
abstract void collectZeroDocEntriesIfNeeded(long owningBucketOrd, boolean excludeDeletedDocs) throws IOException;

/**
* Build an empty temporary bucket.
* Build an empty bucket.
*/
abstract Supplier<B> emptyBucketBuilder(long owningBucketOrd);
abstract B buildEmptyBucket();

/**
* Build a {@link PriorityQueue} to sort the buckets. After we've
Expand All @@ -362,7 +361,7 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro
* Update fields in {@code spare} to reflect information collected for
* this bucket ordinal.
*/
abstract void updateBucket(B spare, BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount) throws IOException;
abstract BucketUpdater<B> bucketUpdater(long owningBucketOrd);

/**
* Build an array to hold the "top" buckets for each ordinal.
Expand Down Expand Up @@ -399,6 +398,10 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro
abstract R buildEmptyResult();
}

interface BucketUpdater<B extends InternalMultiBucketAggregation.InternalBucket> {
void updateBucket(B spare, BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount) throws IOException;
}

/**
* Builds results for the standard {@code terms} aggregation.
*/
Expand Down Expand Up @@ -490,8 +493,8 @@ private void collectZeroDocEntries(BinaryDocValues values, Bits liveDocs, int ma
}

@Override
Supplier<StringTerms.Bucket> emptyBucketBuilder(long owningBucketOrd) {
return () -> new StringTerms.Bucket(new BytesRef(), 0, null, showTermDocCountError, 0, format);
StringTerms.Bucket buildEmptyBucket() {
return new StringTerms.Bucket(new BytesRef(), 0, null, showTermDocCountError, 0, format);
}

@Override
Expand All @@ -500,10 +503,12 @@ ObjectArrayPriorityQueue<StringTerms.Bucket> buildPriorityQueue(int size) {
}

@Override
void updateBucket(StringTerms.Bucket spare, BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount) throws IOException {
ordsEnum.readValue(spare.termBytes);
spare.docCount = docCount;
spare.bucketOrd = ordsEnum.ord();
BucketUpdater<StringTerms.Bucket> bucketUpdater(long owningBucketOrd) {
return (spare, ordsEnum, docCount) -> {
ordsEnum.readValue(spare.termBytes);
spare.docCount = docCount;
spare.bucketOrd = ordsEnum.ord();
};
}

@Override
Expand Down Expand Up @@ -615,9 +620,8 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
void collectZeroDocEntriesIfNeeded(long owningBucketOrd, boolean excludeDeletedDocs) throws IOException {}

@Override
Supplier<SignificantStringTerms.Bucket> emptyBucketBuilder(long owningBucketOrd) {
long subsetSize = subsetSizes.get(owningBucketOrd);
return () -> new SignificantStringTerms.Bucket(new BytesRef(), 0, subsetSize, 0, 0, null, format, 0);
SignificantStringTerms.Bucket buildEmptyBucket() {
return new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, null, format, 0);
}

@Override
Expand All @@ -626,20 +630,20 @@ ObjectArrayPriorityQueue<SignificantStringTerms.Bucket> buildPriorityQueue(int s
}

@Override
void updateBucket(SignificantStringTerms.Bucket spare, BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum, long docCount)
throws IOException {

ordsEnum.readValue(spare.termBytes);
spare.bucketOrd = ordsEnum.ord();
spare.subsetDf = docCount;
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);
spare.supersetSize = supersetSize;
/*
* During shard-local down-selection we use subset/superset stats
* that are for this shard only. Back at the central reducer these
* properties will be updated with global stats.
*/
spare.updateScore(significanceHeuristic);
BucketUpdater<SignificantStringTerms.Bucket> bucketUpdater(long owningBucketOrd) {
long subsetSize = subsetSizes.get(owningBucketOrd);
return (spare, ordsEnum, docCount) -> {
ordsEnum.readValue(spare.termBytes);
spare.bucketOrd = ordsEnum.ord();
spare.subsetDf = docCount;
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);
/*
* During shard-local down-selection we use subset/superset stats
* that are for this shard only. Back at the central reducer these
* properties will be updated with global stats.
*/
spare.updateScore(significanceHeuristic, subsetSize, supersetSize);
};
}

@Override
Expand Down
Loading

0 comments on commit 3f9ef6f

Please sign in to comment.