-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add experimental support for first/last for double/float/long #10702 #14462
Add experimental support for first/last for double/float/long #10702 #14462
Conversation
processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
Fixed
Show fixed
Hide fixed
...ing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
Fixed
Show fixed
Hide fixed
processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
Fixed
Show fixed
Hide fixed
...ssing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
Fixed
Show fixed
Hide fixed
...rc/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java
Fixed
Show fixed
Hide fixed
...rc/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java
Fixed
Show fixed
Hide fixed
7a39e3e
to
32bca40
Compare
Thank you @ankit0811 for reviving this PR. I will review this soon. |
@abhishekagarwal87 I missed adding the fold logic in NumericLastVectorAggregator::aggregator |
Thanks @ankit0811 for the PR! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for raising this PR!! I am relatively unfamiliar with the complex column's serde and am still going through the implementation, however, I have a few high-level comments, based on my understanding of the StringFirst/Last aggregators:
-
In the
deserializeColumn
, we should allocate the first byte for a version number. This would help if we decide to update the strategy down the line and want backward compatibility. -
The current StringFirst/Last comparators use delta encoding and compression on the numeric column to reduce the size. Also, the PR (Improve String Last/First Storage Efficiency #12879) which introduced this change also added a few classes which help in this. I suppose that we would want it sometime. The versioning as mentioned above would help in making these changes if we decide that we don't want it now. However, the complexity of changing from one version to another is high and we require backward compatibility therefore we should question if that should be done in the original PR itself (check this comment:
Lines 54 to 66 in d63eff3
/** * This is a configuration parameter to allow for turning on compression. It is a hack, it would be significantly * better if this could be delivered via properties. The number one reason this is a hack is because it reads * the System.getProperty which doesn't actually have runtime.properties files put into it, so this setting * could be set in runtime.properties and this code wouldn't see it, because that's not how it is wired up. * * The intent of this parameter is so that Druid 25 can be released using the legacy serialization format. This * will allow us to get code released that can *read* both the legacy and the new format. Then, in Druid 26, * we can completely eliminate this boolean and start to only *write* the new format, in which case this * hack of a configuration property disappears. */ private static final boolean COMPRESSION_ENABLED = Boolean.parseBoolean(System.getProperty("druid.columns.pairLongString.compressed", "false")); -
Can we refactor the preexisting LongString serde classes to be subclasses of the newly introduced
AbstractSerializableLongObjectPairSerde
? I see that there are some similarities in the methods. In any case, it would be helpful if all of them are under the same umbrella, and we can further categorize the newly added classes as children ofAbstractSerializableLongNumbericPairSerde
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR. I am still going over this and will add my comments as I go over the rest.
...in/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java
Outdated
Show resolved
Hide resolved
{ | ||
double firstValue; | ||
|
||
public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, BaseDoubleColumnValueSelector valueSelector) | ||
public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does double first need a fold check ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe so. Let me know if I am mistaken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@somu-imply I am not completely aware of the foldCheck, which implementation of the selectors produces results that cannot be optimized? Also, is there any particular reason that double won't require the check, since it seems analogous to what StringFirst et al. are doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still confused as to why doubles wouldn't require a folds check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should require a folds check in case of rollups I think.
...ing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java
Outdated
Show resolved
Hide resolved
Best if we take this in a separate PR? else it will be too big a PR to be reviewed. It already touches quite a few classes |
Seems reasonable to me, we can mark this as an improvement once we get the functionality in. |
...sing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java
Outdated
Show resolved
Hide resolved
...ssing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
Outdated
Show resolved
Hide resolved
daabc4e
to
e35a906
Compare
Thanks for resolving the merge conflicts and the comments. I'll test it from MSQ's perspective and check if it works fine with MSQ queries in a couple of days. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pulled in the patch and tested the changes out for MSQ, and the queries are now working, which didn't due to lack of serde, therefore confirming that this indeed unblocks EARLIEST and LATEST on MSQ 🚀
Leaving a final set of review comments from my side. Thanks for being patient and receptive during the review process :)
...in/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java
Outdated
Show resolved
Hide resolved
return new byte[]{}; | ||
} | ||
|
||
ByteBuffer bbuf = ByteBuffer.allocate(Long.BYTES + Byte.BYTES + Float.BYTES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment as above, in case it is null, we should avoid allocating the space for Float.BYTES. (Should apply for the remaining factories as well)
processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
Outdated
Show resolved
Hide resolved
The There is a better way that the stringLast/First aggregators already use, can we please adjust the |
Generally speaking, I don't believe that we should merge the code with the |
Random drive-by comment, but I don't really think that is the best pattern to use here either though, since the pairs here are fixed width and so a much more appropriate way of compressing them that doesn't involve storing offsets separately could be used... |
@imply-cheddar a qq for clarifications since this part of code is new to me
|
if (foldNeeded) { | ||
final SerializablePair<Long, Number> inPair = (SerializablePair<Long, Number>) objectsWhichMightBeNumeric[row]; | ||
if (useDefault || inPair != null) { | ||
if (inPair.lhs != null && inPair.lhs < firstTime) { |
Check warning
Code scanning / CodeQL
Dereferenced variable may be null Warning
inPair
this
if (foldNeeded) { | ||
final SerializablePair<Long, Number> inPair = (SerializablePair<Long, Number>) objectsWhichMightBeNumeric[row]; | ||
if (useDefault || inPair != null) { | ||
if (inPair.lhs != null && inPair.lhs >= lastTime) { |
Check warning
Code scanning / CodeQL
Dereferenced variable may be null Warning
inPair
this
Will we see this in Druid 28? I see myself back on the need for a doubleLast or longFirst for way too long |
@daniel-imgarena The branch for Druid 28 has been cut, and only release blockers & regressions are allowed at this point. Unfortunately, we won't be seeing this in the 28 release, since it's a new feature. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving a partial review as of now, but there are two main comments, apart from the line items:
- A lot of the abstract methods for serde & aggregators can be merged with the string's methods without any tweaking (I suppose), since they don't rely on the numeric properties of these aggregators. We should do that, otherwise, there will be a lot of code duplication. Also, one would need to keep track of both the classes (abstract ones & the string ones) while fixing bugs or making any improvements.
- Thanks for taking the time to rewrite without using the GenericIndexed. I am curious if there's any benchmarking that you did or if there was a performance/size benefit that you observed from the change.
processing/src/main/java/org/apache/druid/query/aggregation/first/FirstLastUtils.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
/** | ||
* Returns whether an object *might* contain SerializablePairLongString objects. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Outdated javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is still outdated
processing/src/main/java/org/apache/druid/query/aggregation/first/FirstLastUtils.java
Outdated
Show resolved
Hide resolved
{ | ||
double firstValue; | ||
|
||
public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, BaseDoubleColumnValueSelector valueSelector) | ||
public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still confused as to why doubles wouldn't require a folds check.
...ing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnSerializer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The String methods could also inherit from this class I suppose.
...in/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleComplexMetricSerde.java
Show resolved
Hide resolved
if (capabilities != null) { | ||
return new DoubleFirstVectorAggregator(timeSelector, vSelector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this check required? I don't think we are using the capabilities inside the aggregator
VectorObjectSelector objectSelector = ExpressionVectorSelectors.castValueSelectorToObject( | ||
columnSelectorFactory.getReadableVectorInspector(), | ||
fieldName, | ||
valueSelector, | ||
capabilities.toColumnType(), | ||
ColumnType.DOUBLE | ||
); | ||
return new DoubleFirstVectorAggregator(timeSelector, objectSelector); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this is correct. If the type is numeric, why do we need an explicit cast to Double
. I could be wrong, but this seems kinda suspicious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same code flow as StringFirstAggregatorFactory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the string factory, we'd need to check if it is of a numeric type and cast if so. But if it is a numeric, then it shouldn't be the case, at least for DOUBLE objects. Also, if it is a String aggregator, now we return a DoubleFirstVectorAggregator (if capabilities correspond to String capabilities).
@@ -164,7 +215,7 @@ void updateTimeWithNull(ByteBuffer buf, int position, long time) | |||
* Abstract function which needs to be overridden by subclasses to set the | |||
* latest value in the buffer depending on the datatype | |||
*/ | |||
abstract void putValue(ByteBuffer buf, int position, int index); | |||
abstract void putValue(ByteBuffer buf, int position, Number number); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's revert this change if we can. It will lead to some duplication, however, this will lead to the boxing of the primitives, which is what the column selectors aim to reduce (if they can). Therefore there are separate methods like isNull
+ getLong
which are preferred instead of getObject
because the former doesn't auto-box the primitive variable to an object.
*/ | ||
void updateTimeWithValue(ByteBuffer buf, int position, long time, int index) | ||
void updateTimeWithValue(ByteBuffer buf, int position, long time, Number number) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Autoboxing alert, as mentioned in the other comment. Let's see if we can get away without this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might not be as clean as the current code, however, there shouldn't be a lot of duplication.
if (StringFirstAggregatorFactory.TIME_COMPARATOR.compare(lastValue, newValue) < 0) { | ||
lastValue = (SerializablePairLongString) selector.getObject(); | ||
T newValue = (T) selector.getObject(); | ||
if (Longs.compare(lastValue.lhs, newValue.lhs) <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why have we converted it from '<' to '<='? If unintentional, let's keep it the same way. Although the logic is still sound, it can lead to changes in the results of the queries of the users who have been using the LATEST function.
|
||
public abstract class AbstractSerializablePairLongObjectColumnHeader<T extends SerializablePair<Long, ?>> | ||
{ | ||
private static final int HEADER_SIZE_BYTES = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a missing comment here from the original code.
import java.io.IOException; | ||
import java.nio.channels.WritableByteChannel; | ||
|
||
public abstract class AbstractSerializablePairLongObjectColumnSerializer<T extends SerializablePair<Long, ?>> implements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are missing Javadocs from the original code. I do think that they should be migrated here.
import java.nio.ByteBuffer; | ||
import java.util.Locale; | ||
|
||
public abstract class AbstractSerializablePairLongObjectDeltaEncodedStagedSerde<T extends SerializablePair<Long, ?>> implements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Javadocs missing
.../org/apache/druid/query/aggregation/AbstractSerializablePairLongObjectSimpleStagedSerde.java
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/aggregation/first/FirstLastUtils.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
/** | ||
* Returns whether an object *might* contain SerializablePairLongString objects. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is still outdated
{ | ||
double firstValue; | ||
|
||
public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, BaseDoubleColumnValueSelector valueSelector) | ||
public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, ColumnValueSelector valueSelector, boolean needsFoldCheck) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should require a folds check in case of rollups I think.
...main/java/org/apache/druid/query/aggregation/SerializablePairLongLongComplexMetricSerde.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/druid/query/aggregation/SerializablePairLongFloatComplexMetricSerde.java
Outdated
Show resolved
Hide resolved
.../org/apache/druid/query/aggregation/AbstractSerializablePairLongObjectSimpleStagedSerde.java
Outdated
Show resolved
Hide resolved
rhsBytes = Float.BYTES; | ||
} | ||
} | ||
return Long.BYTES + Byte.BYTES + rhsBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This computation is incorrect. For case when rhsObject is null, we'd only be storing Long.BYTES + Byte.BYTES
, so this should be conditional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Long.BYTES + Byte.BYTES + rhsBytes; | |
return Long.BYTES + Byte.BYTES + (rhsObject == null ? 0 : rhsBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rhsBytes
is assigned to 0 and is assigned value only if rhsBytes
is non null. So isn't it already conditional?
} | ||
} | ||
|
||
return (useIntegerDelta ? Integer.BYTES : Long.BYTES) + Byte.BYTES + rhsBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be conditional on whether the rhsObject is null or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return (useIntegerDelta ? Integer.BYTES : Long.BYTES) + Byte.BYTES + rhsBytes; | |
return (useIntegerDelta ? Integer.BYTES : Long.BYTES) + Byte.BYTES + (rhsObject == null ? 0 : rhsBytes); |
@LakshSingla thank you so much for reviewing this PR and being patient throughout the process. Have tried to address your comments let me know if theres any thing more left |
} | ||
}; | ||
selector = new BaseDoubleVectorValueSelector(new NoFilterVectorOffset(VALUES.length, 0, VALUES.length) | ||
selector = new VectorObjectSelector() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious if this change is required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It will be required as the selector can now be of either VectorObject (pair) or VectorValue (numeric).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation
Thanks for the patch @ankit0811. I think there's some follow up that can be done, in this area
|
Can you please update the description with the release note |
I still believe that we can clean up the code a bit further if we get rid of the SerializablePairLongString abstractions and keep them as generics, however, I don't know what entails such a change. That would allow the reuse of multiple classes. However, that will be taken in a follow up PR, so its a go-ahead from my side. |
SInce we have identified the caveats, we can merge this and work on it in a follow up PR. |
Fixes #10702
Description
This PR revives #10949 which address issue #10702 by adding support for doubleLast, doubleFirst, FloatLast, FloatFirst, longLast and longFirst.
We now support Numeric Last/First data types while ingestion and native query (for MSQ as well)
Ingestion
Query
usage:
Release note
Key changed/added classes in this PR
AbstractSerializableLongObjectPairSerde
abstract class to share serde between double/float/longAbstractSerializablePairLongObjectBufferStore.java
AbstractSerializablePairLongObjectColumnHeader.java
AbstractSerializablePairLongObjectColumnSerializer.java
AbstractSerializablePairLongObjectDeltaEncodedStagedSerde.java
AbstractSerializablePairLongObjectSimpleStagedSerde.java
SerializablePairLongDoubleComplexMetricSerde
for doubleSerializablePairLongFloatComplexMetricSerde
for floatSerializablePairLongStringComplexMetricSerde
for longGenericFirstAggregateCombiner
first agg combiner to share between double/float/longGenericLastAggregateCombiner
first agg combiner to share between double/float/longDoubleFirstAggregatorFactory
,DoubleLastAggregatorFactory
FloatFirstAggregatorFactory
,FloatLastAggregatorFactory
LongFirstAggregatorFactory
,LongLastAggregatorFactory
Further introduces new
ColumnType
Post suggestions, we are no longer using
GenericIndexed
. Using the pattern defined inSerializablePairLongStringComplexMetricSerde
we observed an improvement of 40% in terms of storage (These number are against ingestion of 1 days worth of wikiticker data)To-Do
These will be taken up in a follow-up PR
This PR has: