-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support to first/last aggregators for numeric types during ingestion #10949
Changes from 12 commits
cd96cdc
a2b8435
fe84826
8bf9e00
32439bc
48e196f
cfc6814
14ccec1
998567f
a1ffc8e
cffff80
f96aa6c
72a7a0d
37dccf9
c6529b5
0ac6358
7785507
901097b
6e94ce8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,116 @@ | ||||||
/* | ||||||
* Licensed to the Apache Software Foundation (ASF) under one | ||||||
* or more contributor license agreements. See the NOTICE file | ||||||
* distributed with this work for additional information | ||||||
* regarding copyright ownership. The ASF licenses this file | ||||||
* to you under the Apache License, Version 2.0 (the | ||||||
* "License"); you may not use this file except in compliance | ||||||
* with the License. You may obtain a copy of the License at | ||||||
* | ||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||
* | ||||||
* Unless required by applicable law or agreed to in writing, | ||||||
* software distributed under the License is distributed on an | ||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||
* KIND, either express or implied. See the License for the | ||||||
* specific language governing permissions and limitations | ||||||
* under the License. | ||||||
*/ | ||||||
|
||||||
package org.apache.druid.query.aggregation; | ||||||
|
||||||
import com.google.common.primitives.Longs; | ||||||
import org.apache.druid.collections.SerializablePair; | ||||||
import org.apache.druid.data.input.InputRow; | ||||||
import org.apache.druid.segment.GenericColumnSerializer; | ||||||
import org.apache.druid.segment.column.ColumnBuilder; | ||||||
import org.apache.druid.segment.data.GenericIndexed; | ||||||
import org.apache.druid.segment.data.ObjectStrategy; | ||||||
import org.apache.druid.segment.serde.ComplexColumnPartSupplier; | ||||||
import org.apache.druid.segment.serde.ComplexMetricExtractor; | ||||||
import org.apache.druid.segment.serde.ComplexMetricSerde; | ||||||
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; | ||||||
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; | ||||||
|
||||||
import javax.annotation.Nullable; | ||||||
import java.nio.ByteBuffer; | ||||||
|
||||||
/** | ||||||
* The class serializes a Pair<Long, ?> object for double/float/longFirst and double/float/longLast aggregators | ||||||
*/ | ||||||
public abstract class AbstractSerializablePairSerde<T extends SerializablePair<Long, ?>> extends ComplexMetricSerde | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: change class name to
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||
{ | ||||||
private final Class<T> pairClassObject; | ||||||
|
||||||
public AbstractSerializablePairSerde(Class<T> pairClassObject) | ||||||
{ | ||||||
this.pairClassObject = pairClassObject; | ||||||
} | ||||||
|
||||||
@Override | ||||||
public ComplexMetricExtractor getExtractor() | ||||||
{ | ||||||
return new ComplexMetricExtractor() | ||||||
{ | ||||||
@Override | ||||||
public Class<T> extractedClass() | ||||||
{ | ||||||
return pairClassObject; | ||||||
} | ||||||
|
||||||
@Override | ||||||
public Object extractValue(InputRow inputRow, String metricName) | ||||||
{ | ||||||
return inputRow.getRaw(metricName); | ||||||
} | ||||||
}; | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder) | ||||||
{ | ||||||
final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper()); | ||||||
columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column)); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public ObjectStrategy<T> getObjectStrategy() | ||||||
{ | ||||||
return new ObjectStrategy<T>() | ||||||
{ | ||||||
@Override | ||||||
public int compare(@Nullable T o1, @Nullable T o2) | ||||||
{ | ||||||
return Longs.compare(o1.lhs, o2.lhs); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not correctly handle if either o1 or o2 is null. See Would it be possible to update the integration tests that were added to surface this error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will check if UT is able to check this error first because IT cases share some data sources with other cases. |
||||||
} | ||||||
|
||||||
@Override | ||||||
public Class<T> getClazz() | ||||||
{ | ||||||
return pairClassObject; | ||||||
} | ||||||
|
||||||
@Override | ||||||
public T fromByteBuffer(ByteBuffer buffer, int numBytes) | ||||||
{ | ||||||
return toPairObject(buffer); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public byte[] toBytes(T val) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||
{ | ||||||
return pairToBytes(val); | ||||||
} | ||||||
}; | ||||||
} | ||||||
|
||||||
@Override | ||||||
public GenericColumnSerializer<T> getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) | ||||||
{ | ||||||
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy()); | ||||||
} | ||||||
|
||||||
protected abstract T toPairObject(ByteBuffer buffer); | ||||||
|
||||||
protected abstract byte[] pairToBytes(T val); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. javadocs please There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the new commit, null check is added to the caller of this method, so it's not necessary to declare this parameter as |
||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.query.aggregation; | ||
|
||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import org.apache.druid.collections.SerializablePair; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
public class SerializablePairLongDouble extends SerializablePair<Long, Double> | ||
{ | ||
@JsonCreator | ||
public SerializablePairLongDouble(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") @Nullable Double rhs) | ||
{ | ||
super(lhs, rhs); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you describe why you chose not to also make
SerializablePairLongStringSerde
not extend this class?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's reasonable to refactor
SerializablePairLongStringSerde
to be subclass ofAbstractSerializablePairSerde
, but I think this kind of change is not tightly related to this PR because this PR is already a little bit large. I think a coming PR would resolve this problem once this PR is merged.