Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

complex typed expressions #11853

Merged
merged 12 commits into from
Nov 8, 2021
Merged

Conversation

clintropolis
Copy link
Member

@clintropolis clintropolis commented Oct 28, 2021

Description

This PR adds support for Druid "complex" types to the native expression processing system, made possible after the type system enhancements done in #11713. The implications of this are that now it will be possible for all Druid data to be usable within expressions, should expressions be added to handle these types.

ObjectBinding, the non-vectorized expression input data provider, now implements ColumnInspector so that it can retain type information when available, and a new constant, ComplexExpr has been added which accepts the ExpressionType alongside the value to represent these values provided by the binding.

Several generic nullable value binary serde methods for types have been moved out of ExprEval and into Types, to hopefully be more generally available for writing nullable values that follow the | null (byte) | value (byte[]) | pattern, which is now all of the ExprEval types. I've adusted the binary formats slightly to be more consistent, so there are some minor changes to the expression buffer aggregator, but this should have no compatibility issues because this format is not written to segments anywhere, and contained within processing of a single node.

A base interface has been extracted from ObjectStrategy in druid-processing, which is called ObjectByteStrategy because naming is hard and lives in druid-core, to provide conversion between object and binary format for complex types. A registry of these ObjectByteStrategy to type name has been added to hold these, and registering a ComplexMetricsSerde in ComplexMetrics will automatically register its ObjectStrategy in the lower level ObjectByteStrategy registry. This would be less messy if druid-core and druid-processing were just merged since the ComplexMetrics registry could just be used directly for binary serialization of expressions, but.. they are not yet.

To showcase the new complex expressions, I have added 4 new 'hyperUnique' functions, and 3 new bloom filter expressions to the druid-bloom-filter extension:

  • hyper_unique() - creates a druid built-in HyperLogLogCollector
  • hyper_unique_add(expr1, expr2) - adds expr1 to hyper-log-log collector expr2
  • hyper_unique_estimate(expr) - get double estimate for hyper-log-log collector expr
  • hyper_unique_round_estimate(expr) - get estimate rounded to a long value for hyper-log-log collector expr
  • bloom_filter(expr) - creates a bloom filter with expected capacity expr
  • bloom_filter_test(expr1, expr2) - checks if expr1 is contained in the bloom filter expr2
  • bloom_filter_add(expr1, expr2) - adds expr1 to bloom filter expr2.

To allow complex expressions to be defined as literals, I've also added complex_decode_base64(expr1,expr2), where expr1 must be a string literal with a valid complex type name, and expr2 a base64 encoded string that is a serialized value of that type (or null if the row is null).

I have not documented any these yet, because I'm still considering how to position them, and there are several parts of the expression system which are still missing documentation for the same reason like the native expression aggregator. I have also not wired these up to SQL functions yet for similar reasons.

With these expressions, it is possible for example to even re-create the native bloom filter aggregator - instead using the expression aggregator:

    {
      "type": "expression",
      "name": "bloom_expression",
      "fields": ["user"],
      "initialValue": "bloom_filter(10000)",
      "fold": "bloom_filter_add(user, __acc)",
      "maxSizeBytes": 8096
    }

but I think this is just scratching the surface of what this change will make possible.

Screen Shot 2021-09-25 at 4 52 31 PM

Arrays

Because of multi-value string transformation magic, that automatically translates expressions for selectors into map and for the expression aggregator into fold, it was necessary to support arrays of complex types. I have reworked the code of arrays to collapse all of LongArrayExpr, DoubleArrayExpr, and StringArrayExpr into a single consolidated ArrayExpr, and likewise collapsed the array ExprEval implementations into a single ArrayExprEval. This significantly simplifies a lot of the array handling code, opened the door to allow arrays of complex types, and interestingly, also nested array types!

I have gated nested arrays behind a new feature flag, set in runtime properties with druid.expressions.allowNestedArrays, which defaults to disabled. I think it would be better to hold off on opening this up until we support proper grouping on arrays and drop a lot of the automatic STRING coercion that is currently happening.

I have added some tests with the functionality enabled though, since it is mostly at the selector and above layers that we don't fully handle array types.

The parser doesn't directly understand a nested array literal, so the array function must be used to construct nested arrays, e.g. [['a', 'b', 'c'],['d', 'e']] will not parse correctly, but array(['a', 'b', 'c'],['d', 'e']) will.

Empty arrays can be defined directly as literals, as I have added parser support for the full ExpressionType string representation, e.g. ARRAY<ARRAY<LONG>>[] is an empty nested array of long literal. This syntax also works for non-nested arrays and complex arrays, ARRAY<COMPLEX<hyperUnique>>[], etc.

Type stuffs

Along the way i've further improved the quality of RowSignature available when processing queries, here mostly for IncrementalIndex column selector factories, which were previously making a RowBasedColumnSelectorFactory with an empty RowSignature, and now will be created with the latest RowSignature available in the form of a supplier. RowBasedColumnSelectorFactory accepts this supplier instead of a direct RowSignature, since the schema might change during the lifetime of an incremental index.

I have also enriched the non-vectorized expression type information available, by changing Expr.ObjectBinding to also implement InputBindingInspector, which requires it define a getType method. I have transitioned most uses of ExprEval.bestEffortOf to now use ExprEval.ofType instead, which will fall back to best effort if the type is null.

Future work

Implementing additional expressions for other complex type extensions, such as data sketches, etc.


Key changed/added classes in this PR
  • Expr
  • ConstantExpr
  • ArrayExpr
  • ExprEval
  • Types
  • ObjectStrategy
  • IncrementalIndex
  • RowBasedColumnSelectorFactory

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review, ransom comments thus far.

}
}
return ofStringArray(stringArray);
return ofStringArray(Types.readNullableStringArray(buffer, offset));
Copy link
Contributor

@paul-rogers paul-rogers Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice simplification. (The code below this line that was removed, leaving only this line.)

* Clear and set the 'null' byte of a nullable value to {@link NullHandling#IS_NULL_BYTE} to a {@link ByteBuffer} at
* the supplied position. This method does not change the buffer position, limit, or mark.
*
* Nullable types are stored with a leading byte to indicate if the value is null, followed by teh value bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

teh -> the

* same type as the existing value in the map for said key, an {@link ISE} is thrown.
*
* @param strategy The {@link ObjectByteStrategy} object to be associated with the 'type' in the map.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we like our Javadoc to be Javadoc-like, might want to add <p> to mark paragraphs. Else, it all gets run together.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I don't think we really make an attempt to make it very friendly to read outside of the source code - this is maybe worth doing but I think will leave for now to be consistent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't publish javadoc today, and thus prefer a format that is easily readable in IDE.

*
* @return number of bytes written (1 if null, or 5 + size of Long[] if not)
*/
public static int writeNullableLongArray(ByteBuffer buffer, int offset, @Nullable Long[] array, int maxSizeBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we ensure that the buffer has capacity? In the prior types, lengths are known, so code could check if there is capacity. For the array (and complex type), how do we prevent buffer exhaustion given we don't (easily) know the required capacity ahead of time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added note to javadoc that mention that the contract of these methods is that at least max size bytes must be available in the buffer, which is acceptable for current callers who know their max limit

*
* @return number of bytes written (1 if null, or 5 + size of Double[] if not)
*/
public static int writeNullableDoubleArray(ByteBuffer buffer, int offset, @Nullable Double[] array, int maxSizeBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parachuting into the middle of implementation. How does the max bytes related to buffer capacity? And, since the array length is a good predictor of space used, should we enforce an array length (which the user might understand) vs. a byte length (which seems arbitrary to the user since it is unrelated to array length or buffer size)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is easier to be consistent across all types working in bytes, because variably sized types like strings and string arrays become sort of hard to reason about. The main reason here is that the expression aggregator has a single max size bytes parameter (since that aggregator can potentially aggregate any type)

// | null (byte) | array length (int) | array bytes |
if (array == null) {
return writeNull(buffer, offset);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if the then clause returns, the else clause can be omitted and code outdented one level to indicate that this is the "main event" assuming you get in the non-null door?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 fixed

buffer.put(offset, NullHandling.IS_NOT_NULL_BYTE);
buffer.putInt(offset + 1, array.length);
for (Double element : array) {
if (element != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: since we handle both null and non-null cases, the logic is a bit easier to read if we have the if be element == null. Thus, "if null do this else do that" rather than "if not null do this else do that".

*
* layout: | null (byte) | size (int) | {| null (byte) | long |, | null (byte) |, ... |null (byte) | long |} |
*
* This method does not change the buffer position, limit, or mark.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? The byte buffer has a nice mechanism to keep track of the write mechanism. These functions recreate that by taking a write position, computing the amount of data written, returning that value, and asking the caller to compute the new write offset. These functions write variable amounts of data, so that there is only one right place to write the next value: after the current one.

Maybe explain why we need to recreate the byte buffer write pointer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah these methods are not really well suited for writing some sequence of values, despite that the array readers/writers use them that way. These methods are currently built with the buffer aggregators in mind, where these methods do not own the buffer that they operate on, instead they are used to read and write values at specific positions within a shared buffer that are associated with some key. These methods also happen at high volume, reading and writing values to different offsets in the buffer potentially every row, so the overhead of duping buffers with clean positions and limits seems non-optimal.

I did change stuff around a bit to lean more into temporarily setting and resetting the position for some of the variably sized things and using that to compute sizes.

stringArray[i] = StringUtils.fromUtf8(stringElementBytes);
offset += Integer.BYTES + stringElementBytes.length;
}
offset++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Computing the new position is error-prone. Wouldn't it be easier to save the original position, compute the offset as the final position - the start position? Code would be simpler and we'd compute sizes once rather than twice.

@@ -143,10 +151,10 @@ public void testLongArrayEvalTooBig()
expectedException.expectMessage(StringUtils.format(
"Unable to serialize [%s], size [%s] is larger than max [%s]",
ExpressionType.LONG_ARRAY,
NullHandling.sqlCompatible() ? 33 : 30,
14,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the length? If so, we've more than halved the number of values which can be stored. Is this a good thing? Or, did we force-set the limit somewhere?

@lgtm-com
Copy link

lgtm-com bot commented Nov 2, 2021

This pull request introduces 1 alert and fixes 2 when merging 2022546 into 52539de - view on LGTM.com

new alerts:

  • 1 for Missing format argument

fixed alerts:

  • 2 for Dereferenced variable may be null

@lgtm-com
Copy link

lgtm-com bot commented Nov 2, 2021

This pull request fixes 2 alerts when merging d2b460a into 52539de - view on LGTM.com

fixed alerts:

  • 2 for Dereferenced variable may be null

@lgtm-com
Copy link

lgtm-com bot commented Nov 2, 2021

This pull request fixes 2 alerts when merging 65f3bd3 into a22687e - view on LGTM.com

fixed alerts:

  • 2 for Dereferenced variable may be null

@lgtm-com
Copy link

lgtm-com bot commented Nov 2, 2021

This pull request fixes 2 alerts when merging 0fe88f3 into a22687e - view on LGTM.com

fixed alerts:

  • 2 for Dereferenced variable may be null

@lgtm-com
Copy link

lgtm-com bot commented Nov 2, 2021

This pull request fixes 2 alerts when merging bc48d62 into 652e149 - view on LGTM.com

fixed alerts:

  • 2 for Dereferenced variable may be null

Comment on lines +405 to +407
if (coerced == null) {
return bestEffortOf(null);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not possible since coerced can be null only when val is null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, because it is called inside of the instanceof I think you're right

CartesianMapLambdaBinding lambdaBinding = new CartesianMapLambdaBinding(product, lambdaExpr, bindings);
return applyMap(lambdaExpr, lambdaBinding);
CartesianMapLambdaBinding lambdaBinding = new CartesianMapLambdaBinding(elementType, product, lambdaExpr, bindings);
ExpressionType lambdaType = lambdaExpr.getOutputType(lambdaBinding);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need a null check for lambdaType?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, ExpressionType.asArrayType checks for nulls and applyMap will use the type of the first element as the array type if the hint is null

T fromByteBuffer(ByteBuffer buffer, int numBytes);

@Nullable
byte[] toBytes(@Nullable T val);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this interface is just split from ObjectStrategy, but could it be better if it was void putToBuffer(ByteBuffer buffer, @Nullable T val)? It seems more consistent to fromByteBuffer and useful to avoid materializing to a byte array when possible. This doesn't have to be done in this PR, just thinking out loud.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm currently thinking that this is a temporary state, what I really want is something like this:

+++ b/core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
@@ -0,0 +1,62 @@
+package org.apache.druid.segment.column;
+
+import org.apache.druid.common.config.NullHandling;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+public interface TypeStrategy<T> extends Comparator<T>
+{
+  TypeSignature<?> getType();
+
+  int estimateSizeBytes(@Nullable T value);
+
+  T read(ByteBuffer buffer);
+  void write(ByteBuffer buffer, T value);
+
+  @Nullable
+  default T readNullable(ByteBuffer buffer) {
+    if ((buffer.get() & NullHandling.IS_NULL_BYTE) == NullHandling.IS_NULL_BYTE) {
+      return null;
+    }
+    return read(buffer);
+  }
+
+  @Nullable
+  default T readNullable(ByteBuffer buffer, int offset)
+  {
+    if (TypeStrategies.isNullableNull(buffer, offset)) {
+      return null;
+    }
+    return read(buffer, offset + TypeStrategies.VALUE_OFFSET);
+  }
+
+  default T read(ByteBuffer buffer, int offset)
+  {
+    final int oldPosition = buffer.position();
+    buffer.position(offset);
+    T value = read(buffer);
+    buffer.position(oldPosition);
+    return value;
+  }
+
+  default int write(ByteBuffer buffer, int offset, T value)
+  {
+    final int oldPosition = buffer.position();
+    buffer.position(offset);
+    write(buffer, value);
+    final int size = buffer.position() - offset;
+    buffer.position(oldPosition);
+    return size;
+  }
+
+  default int writeNullable(ByteBuffer buffer, int offset, @Nullable T value)
+  {
+    if (value == null) {
+      return TypeStrategies.writeNull(buffer, offset);
+    }
+    buffer.put(offset, NullHandling.IS_NOT_NULL_BYTE);
+    return write(buffer, offset + TypeStrategies.VALUE_OFFSET, value);
+  }
+}

which I've started to sketch out locally. I imagine that TypeSignature will have a getStrategy method that returns one of these, which means we have an easy way to get binary value serialization and a comparator for any given type. I will likely move ObjectByteStrategy back into ObjectStrategy, and instead make ComplexMetricsSerde define a getTypeStrategy which defaults to wrapping the ObjectStrategy, so that all of them get a free implementation out of the box, but can implement an optimized version in the event they are backed directly by the memory location.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do that, please name it ObjectRowStrategy to indicate that it's primary usage should be when building row-oriented data sets. One of the fundamental problems with the ObjectStrategy that exists here is that the interface forces you to serialize the object in a row-oriented format. This is bad when serializing columns as it doesn't allow you to take advantage of commonalities between values in the same column to achieve smaller sizing and that is fundamentally why the ObjectStrategy method of serializing and deserializing is deprecated. This interface definitely does make sense for result sets where the data is primarily row-oriented, but we should do everything possible to push people away from using it for column persistence.

@@ -1129,6 +1032,10 @@ public String asString()
public boolean isNumericNull()
{
if (isScalar()) {
if (arrayType.getElementType().is(ExprType.STRING)) {
Number n = computeNumber((String) getScalarValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we cache n so that we don't have to compute it again for the same row?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah, I guess i lost the caching the separate string array impl had. it is probably worth considering adding back, though many of the expression selectors cache their result based on the current offset so that they don't need to recompute the expression when getting the same row

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit different than the use case you described though. The caller of this method will likely call another method to get the actual value after the null check using this method. It would be nice if the getValue method (like asLong or asDouble) can just return the cached value computed in this method instead of recomputing it.

Comment on lines +1158 to +1163
if (v == null) {
return null;
}
Long lv = GuavaUtils.tryParseLong((String) v);
if (lv == null) {
Double d = Doubles.tryParse((String) v);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this different than computeNumber?

InputBindings.forFunction(
name -> {
// Sanity check. Bindings should not be used for a constant expression.
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new UnsupportedOperationException();
throw new ISE("Bindings should not be used for constant expressions");

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should never ever happen and we could probably just use InputBindings.nilBindings(), but I didn't really want to change the logic so I kept it. I can change the exception message to be better though 👍

@@ -452,6 +452,6 @@ private Number getCurrentValueAsNumber()
@Override
public ColumnCapabilities getColumnCapabilities(String columnName)
{
return getColumnCapabilities(rowSignature, columnName);
return getColumnCapabilities(rowSignatureSupplier.get(), columnName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can cache ColumnCapabilities and return the same object if rowSignature hasn't changed.

* based algorithm.
*/
@VisibleForTesting
public static void estimateAndCheckMaxBytes(ExprEval eval, int maxSizeBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I feel perhaps there is a better place for this method since it's nice to have size estimation methods and serialization methods in the same class so that we won't forget the other when we modify one of them. But maybe it's fine to be here since it's used only in this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe package-private is better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I do the TypeStrategy thing mentioned in the other comment, then size estimation will be pushed there which seems like a more appropriate place. I put it here for now because this is the only caller

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of my comments are blockers, I will leave it to you whether you address them in this PR or in a follow-up. Thanks @clintropolis!

@clintropolis
Copy link
Member Author

thanks for the review @jihoonson, since it is all the same to you, and this PR has gotten a lot bigger than I initially set out to do, I'll fix up the array expression stuff and propose the TypeStrategy thing in separate follow-up PRs

@clintropolis clintropolis merged commit 7237dc8 into apache:master Nov 8, 2021
@clintropolis clintropolis deleted the complex-expressions branch November 8, 2021 08:33
@clintropolis clintropolis mentioned this pull request Nov 8, 2021
9 tasks
@abhishekagarwal87 abhishekagarwal87 added this to the 0.23.0 milestone May 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants