Skip to content

Commit

Permalink
TSDB: Add timestamp provider to AggregationExecutionContext (#85850)
Browse files Browse the repository at this point in the history
As LeafWalker already gets the timestamp from doc values, It can add a timestampProvider in the AggregationExecutionContext. This  useful in the time series downsampling, as downsampling
collect can get the timestamp from the AggregationExecutionContext.
  • Loading branch information
weizijun authored Apr 18, 2022
1 parent e4d9ecc commit b55f3fb
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 8 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/85850.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 85850
summary: "Aggregation Execution Context add timestamp provider"
area: TSDB
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedSupplier;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Used to preserve contextual information during aggregation execution. It can be used by search executors and parent
Expand All @@ -22,19 +21,29 @@
*/
public class AggregationExecutionContext {

private final CheckedSupplier<BytesRef, IOException> tsidProvider;
private final Supplier<BytesRef> tsidProvider;
private final Supplier<Long> timestampProvider;
private final LeafReaderContext leafReaderContext;

public AggregationExecutionContext(LeafReaderContext leafReaderContext, CheckedSupplier<BytesRef, IOException> tsidProvider) {
public AggregationExecutionContext(
LeafReaderContext leafReaderContext,
Supplier<BytesRef> tsidProvider,
Supplier<Long> timestampProvider
) {
this.leafReaderContext = leafReaderContext;
this.tsidProvider = tsidProvider;
this.timestampProvider = timestampProvider;
}

public LeafReaderContext getLeafReaderContext() {
return leafReaderContext;
}

public BytesRef getTsid() throws IOException {
public BytesRef getTsid() {
return tsidProvider != null ? tsidProvider.get() : null;
}

public Long getTimestamp() {
return timestampProvider.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ScoreMode scoreMode() {
// TODO: will remove it in a follow up PR
@Override
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
return getLeafCollector(new AggregationExecutionContext(ctx, null));
return getLeafCollector(new AggregationExecutionContext(ctx, null, null));
}

public abstract LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void search(Query query, BucketCollector bucketCollector) throws IOExcept
// this is needed to trigger actions in some bucketCollectors that bypass the normal iteration logic
// for example, global aggregator triggers a separate iterator that ignores the query but still needs
// to know all leaves
bucketCollector.getLeafCollector(new AggregationExecutionContext(leaf, null));
bucketCollector.getLeafCollector(new AggregationExecutionContext(leaf, null, null));
}
}

Expand Down Expand Up @@ -179,7 +179,7 @@ private static class LeafWalker {
long timestamp;

LeafWalker(LeafReaderContext context, Scorer scorer, BucketCollector bucketCollector, LeafReaderContext leaf) throws IOException {
AggregationExecutionContext aggCtx = new AggregationExecutionContext(leaf, scratch::get);
AggregationExecutionContext aggCtx = new AggregationExecutionContext(leaf, scratch::get, () -> timestamp);
this.collector = bucketCollector.getLeafCollector(aggCtx);
liveDocs = context.reader().getLiveDocs();
this.collector.setScorer(scorer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
assertTrue(timestamp.advanceExact(doc));
BytesRef latestTSID = tsid.lookupOrd(tsid.ordValue());
long latestTimestamp = timestamp.longValue();
assertEquals(latestTSID, aggCtx.getTsid());
assertEquals(latestTimestamp, aggCtx.getTimestamp().longValue());

if (currentTSID != null) {
assertTrue(
currentTSID + "->" + latestTSID.utf8ToString(),
Expand Down

0 comments on commit b55f3fb

Please sign in to comment.