Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Post Aggregators for Tuple Sketches #13819

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions docs/development/extensions-core/datasketches-tuple.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,39 @@ Returns a human-readable summary of a given ArrayOfDoublesSketch. This is a stri
"field" : <post aggregator that refers to an ArrayOfDoublesSketch (fieldAccess or another post aggregator)>
}
```


### Constant ArrayOfDoublesSketch

This post aggregator adds a Base64-encoded constant ArrayOfDoublesSketch value that you can use in other post aggregators.
```json
{
"type": "arrayOfDoublesSketchConstant",
"name": DESTINATION_COLUMN_NAME,
"value": CONSTANT_SKETCH_VALUE
}
```

### Base64 output of ArrayOfDoublesSketch

This post aggregator outputs an ArrayOfDoublesSketch as a Base64-encoded string storing the constant tuple sketch value that you can use in other post aggregators.

```json
{
"type": "arrayOfDoublesSketchToBase64String",
"name": DESTINATION_COLUMN_NAME,
"field": <post aggregator that refers to a ArrayOfDoublesSketch (fieldAccess or another post aggregator)>
}
```

### Estimated metrics values for each column of ArrayOfDoublesSketch

This post aggregator returns a list of estimated sum for each metric value from a given ArrayOfDoublesSketch. The result is _N_ double values, where _N_ is the number of double values kept in the sketch per key.
anshu-makkar marked this conversation as resolved.
Show resolved Hide resolved

```json
{
"type": "arrayOfDoublesSketchToMetricsSumEstimate",
"name": DESTINATION_COLUMN_NAME,
"field": <post aggregator that refers to a ArrayOfDoublesSketch (fieldAccess or another post aggregator)>
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.druid.query.aggregation.datasketches.tuple;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.cache.CacheKeyBuilder;

import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
anshu-makkar marked this conversation as resolved.
Show resolved Hide resolved
* This post-aggregator converts a given Base64 encoded string to an ArrayOfDoublesSketch.
* The input column contains name of post-aggregator output and base64 encoded input string.
* The output is a deserialized {@link ArrayOfDoublesSketch} .
*/
public class ArrayOfDoublesSketchConstantPostAggregator extends ArrayOfDoublesSketchPostAggregator
anshu-makkar marked this conversation as resolved.
Show resolved Hide resolved
{

private final String value;
private final ArrayOfDoublesSketch sketchValue;

@JsonCreator
public ArrayOfDoublesSketchConstantPostAggregator(@JsonProperty("name") String name, @JsonProperty("value") String value)
{
super(name);
Preconditions.checkArgument(value != null && !value.isEmpty(),
"Constant value cannot be null or empty, expecting base64 encoded sketch string");
this.value = value;
this.sketchValue = ArrayOfDoublesSketchOperations.deserializeFromBase64EncodedStringSafe(value);
}

@Override
public Set<String> getDependentFields()
{
return Collections.emptySet();
}

@Override
public Comparator getComparator()
{
return Comparators.alwaysEqual();
}

@Override
public Object compute(Map<String, Object> combinedAggregators)
{
return sketchValue;
}

@Override
public ArrayOfDoublesSketchConstantPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

public ArrayOfDoublesSketch getSketchValue()
anshu-makkar marked this conversation as resolved.
Show resolved Hide resolved
{
return sketchValue;
}

@Override
public String toString()
{
return "ArrayOfDoublesSketchConstantPostAggregator{name='" + this.getName() + "', value='" + value + "'}";
}

private String getRawSketchValue()
{
return value;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ArrayOfDoublesSketchConstantPostAggregator that = (ArrayOfDoublesSketchConstantPostAggregator) o;
anshu-makkar marked this conversation as resolved.
Show resolved Hide resolved
if (!(Objects.equals(this.getName(), that.getName()) && Objects.equals(this.value, that.value)
&& Objects.equals(this.getSketchValue(), that.getSketchValue()))) {
return false;
}
return true;
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), value, sketchValue);
}

@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_CONSTANT_SKETCH_CACHE_TYPE_ID)
.appendString(DigestUtils.sha1Hex(value)).build();
}
}
19 changes: 19 additions & 0 deletions ...ava/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchModule.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public class ArrayOfDoublesSketchModule implements DruidModule
public static final ColumnType BUILD_TYPE = ColumnType.ofComplex(ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG);
public static final ColumnType MERGE_TYPE = ColumnType.ofComplex(ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG);

public static final String ARRAY_OF_DOUBLES_SKETCH_CONSTANT = "arrayOfDoublesSketchConstant";

public static final String ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING = "arrayOfDoublesSketchToBase64String";

public static final String ARRAY_OF_DOUBLES_SKETCH_METRICS_SUM_ESTIMATE = "arrayOfDoublesSketchToMetricsEstimate";
anshu-makkar marked this conversation as resolved.
Show resolved Hide resolved


@Override
public void configure(final Binder binder)
{
Expand Down Expand Up @@ -100,6 +107,18 @@ public List<? extends Module> getJacksonModules()
new NamedType(
ArrayOfDoublesSketchToStringPostAggregator.class,
"arrayOfDoublesSketchToString"
),
new NamedType(
ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator.class,
ARRAY_OF_DOUBLES_SKETCH_METRICS_SUM_ESTIMATE
),
new NamedType(
ArrayOfDoublesSketchConstantPostAggregator.class,
ARRAY_OF_DOUBLES_SKETCH_CONSTANT
),
new NamedType(
ArrayOfDoublesSketchToBase64StringPostAggregator.class,
ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING
)
).addSerializer(ArrayOfDoublesSketch.class, new ArrayOfDoublesSketchJsonSerializer())
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.datasketches.tuple;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnType;

import java.util.Comparator;
import java.util.Map;

/**
* Returns a base64 encoded string of a given {@link ArrayOfDoublesSketch}.
* This is a string returned by encoding the output of toByteArray() using Base64 method of the sketch.
* This can be useful for debugging and using the sketch output in other operations.
*/
public class ArrayOfDoublesSketchToBase64StringPostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator
{

@JsonCreator
public ArrayOfDoublesSketchToBase64StringPostAggregator(
@JsonProperty("name") final String name,
@JsonProperty("field") final PostAggregator field
)
{
super(name, field);
}

@Override
public String compute(final Map<String, Object> combinedAggregators)
{
final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
return StringUtils.encodeBase64String(sketch.toByteArray());
}

@Override
public ColumnType getType(ColumnInspector signature)
{
return ColumnType.STRING;
}

@Override
public Comparator<String> getComparator()
{
throw new IAE("Comparing sketch summaries is not supported");
}

@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_BASE64_STRING_CACHE_TYPE_ID)
.appendCacheable(getField())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.datasketches.tuple;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketchIterator;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.column.ColumnType;

import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;

/**
* Returns a list of estimate values of metrics column from a given {@link ArrayOfDoublesSketch}.
* The result will be N double values, where N is the number of double values kept in the sketch per key.
*/
public class ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator
{

@JsonCreator
public ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
@JsonProperty("name") final String name,
@JsonProperty("field") final PostAggregator field
)
{
super(name, field);
}

@Override
public double[] compute(final Map<String, Object> combinedAggregators)
{
final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
final SummaryStatistics[] stats = new SummaryStatistics[sketch.getNumValues()];
Arrays.setAll(stats, i -> new SummaryStatistics());
final ArrayOfDoublesSketchIterator it = sketch.iterator();
while (it.next()) {
final double[] values = it.getValues();
for (int i = 0; i < values.length; i++) {
stats[i].addValue(values[i]);
}
}
final double[] estimates = new double[sketch.getNumValues()];
Arrays.setAll(estimates, i -> (stats[i].getSum()) / (sketch.getTheta()));
return estimates;
}

@Override
public ColumnType getType(ColumnInspector signature)
{
return ColumnType.DOUBLE_ARRAY;
}

@Override
public Comparator<double[]> getComparator()
{
throw new IAE("Comparing arrays of estimate values is not supported");
}

@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_METRICS_SUM_ESTIMATE_CACHE_TYPE_ID)
.appendCacheable(getField())
.build();
}
}
Loading