Skip to content

Commit

Permalink
MSQ writes out string arrays instead of MVDs by default (#15093)
Browse files Browse the repository at this point in the history
MSQ uses the string dimension schema for ARRAY<STRING> typed columns, which creates MVDs instead of string arrays as required. Therefore someone trying to ingest columns of type ARRAY<STRING> from an external data source or another data source would get STRING columns in the newly generated segments.

This patch changes the following:

- Use auto dimension schema to ingest the ARRAY<STRING> columns, which will create columns with the desired type.
- Add an undocumented flag ingestStringArraysAsMVDs to preserve the legacy behavior. Legacy behaviour is turned on by default. 
- Create MSQArraysInsertTest and refactor some of the tests in MSQInsertTest.
  • Loading branch information
LakshSingla authored Oct 9, 2023
1 parent 36edbce commit b0edbc3
Show file tree
Hide file tree
Showing 8 changed files with 965 additions and 387 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory;
import org.apache.druid.msq.shuffle.input.WorkerInputChannelFactory;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
import org.apache.druid.msq.util.ArrayIngestMode;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.msq.util.IntervalUtils;
import org.apache.druid.msq.util.MSQFutureUtils;
Expand Down Expand Up @@ -1999,6 +2000,17 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
final Query<?> query
)
{
// Log a warning unconditionally if arrayIngestMode is MVD, since the behaviour is incorrect, and is subject to
// deprecation and removal in future
if (MultiStageQueryContext.getArrayIngestMode(query.context()) == ArrayIngestMode.MVD) {
log.warn(
"'%s' is set to 'mvd' in the query's context. This ingests the string arrays as multi-value "
+ "strings instead of arrays, and is preserved for legacy reasons when MVDs were the only way to ingest string "
+ "arrays in Druid. It is incorrect behaviour and will likely be removed in the future releases of Druid",
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE
);
}

final List<DimensionSchema> dimensions = new ArrayList<>();
final List<AggregatorFactory> aggregators = new ArrayList<>();

Expand Down Expand Up @@ -2076,7 +2088,8 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
DimensionSchemaUtils.createDimensionSchema(
outputColumnName,
type,
MultiStageQueryContext.useAutoColumnSchemas(query.context())
MultiStageQueryContext.useAutoColumnSchemas(query.context()),
MultiStageQueryContext.getArrayIngestMode(query.context())
)
);
} else if (!isRollupQuery) {
Expand Down Expand Up @@ -2125,7 +2138,8 @@ private static void populateDimensionsAndAggregators(
DimensionSchemaUtils.createDimensionSchema(
outputColumn,
type,
MultiStageQueryContext.useAutoColumnSchemas(context)
MultiStageQueryContext.useAutoColumnSchemas(context),
MultiStageQueryContext.getArrayIngestMode(context)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ private static Iterator<SegmentWithDescriptor> inputSourceSegmentIterator(
new DimensionsSpec(
signature.getColumnNames().stream().map(
column ->
DimensionSchemaUtils.createDimensionSchema(
DimensionSchemaUtils.createDimensionSchemaForExtern(
column,
signature.getColumnType(column).orElse(null),
false
signature.getColumnType(column).orElse(null)
)
).collect(Collectors.toList())
),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.util;

/**
* Values that the query context flag 'arrayIngestMode' can take to specify the behaviour of ingestion of arrays via
* MSQ's INSERT queries
*/
public enum ArrayIngestMode
{
/**
* Disables the ingestion of arrays via MSQ's INSERT queries.
*/
NONE,

/**
* String arrays are ingested as MVDs. This is to preserve the legacy behaviour of Druid and will be removed in the
* future, since MVDs are not true array types and the behaviour is incorrect.
* This also disables the ingestion of numeric arrays
*/
MVD,

/**
* Allows numeric and string arrays to be ingested as arrays. This should be the preferred method of ingestion,
* unless bound by compatibility reasons to use 'mvd'
*/
ARRAY
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnCapabilities;
Expand All @@ -40,15 +42,31 @@
*/
public class DimensionSchemaUtils
{

/**
* Creates a dimension schema for creating {@link org.apache.druid.data.input.InputSourceReader}.
*/
public static DimensionSchema createDimensionSchemaForExtern(final String column, @Nullable final ColumnType type)
{
return createDimensionSchema(
column,
type,
false,
// Least restrictive mode since we do not have any type restrictions while reading the extern files.
ArrayIngestMode.ARRAY
);
}

public static DimensionSchema createDimensionSchema(
final String column,
@Nullable final ColumnType type,
boolean useAutoType
boolean useAutoType,
ArrayIngestMode arrayIngestMode
)
{
if (useAutoType) {
// for complex types that are not COMPLEX<json>, we still want to use the handler since 'auto' typing
// only works for the 'standard' built-in typesg
// only works for the 'standard' built-in types
if (type != null && type.is(ValueType.COMPLEX) && !ColumnType.NESTED_DATA.equals(type)) {
final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(type);
return DimensionHandlerUtils.getHandlerFromCapabilities(column, capabilities, null)
Expand All @@ -57,35 +75,54 @@ public static DimensionSchema createDimensionSchema(

return new AutoTypeColumnSchema(column);
} else {
// if schema information not available, create a string dimension
// if schema information is not available, create a string dimension
if (type == null) {
return new StringDimensionSchema(column);
}

switch (type.getType()) {
case STRING:
return new StringDimensionSchema(column);
case LONG:
return new LongDimensionSchema(column);
case FLOAT:
return new FloatDimensionSchema(column);
case DOUBLE:
return new DoubleDimensionSchema(column);
case ARRAY:
switch (type.getElementType().getType()) {
case STRING:
return new StringDimensionSchema(column, DimensionSchema.MultiValueHandling.ARRAY, null);
case LONG:
case FLOAT:
case DOUBLE:
return new AutoTypeColumnSchema(column);
default:
throw new ISE("Cannot create dimension for type [%s]", type.toString());
} else if (type.getType() == ValueType.STRING) {
return new StringDimensionSchema(column);
} else if (type.getType() == ValueType.LONG) {
return new LongDimensionSchema(column);
} else if (type.getType() == ValueType.FLOAT) {
return new FloatDimensionSchema(column);
} else if (type.getType() == ValueType.DOUBLE) {
return new DoubleDimensionSchema(column);
} else if (type.getType() == ValueType.ARRAY) {
ValueType elementType = type.getElementType().getType();
if (elementType == ValueType.STRING) {
if (arrayIngestMode == ArrayIngestMode.NONE) {
throw InvalidInput.exception(
"String arrays can not be ingested when '%s' is set to '%s'. Either set '%s' in query context "
+ "to 'array' to ingest the string array as an array, or ingest it as an MVD by explicitly casting the "
+ "array to an MVD with ARRAY_TO_MV function.",
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE,
StringUtils.toLowerCase(arrayIngestMode.name()),
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE
);
} else if (arrayIngestMode == ArrayIngestMode.MVD) {
return new StringDimensionSchema(column, DimensionSchema.MultiValueHandling.ARRAY, null);
} else {
// arrayIngestMode == ArrayIngestMode.ARRAY would be true
return new AutoTypeColumnSchema(column);
}
} else if (elementType.isNumeric()) {
// ValueType == LONG || ValueType == FLOAT || ValueType == DOUBLE
if (arrayIngestMode == ArrayIngestMode.ARRAY) {
return new AutoTypeColumnSchema(column);
} else {
throw InvalidInput.exception(
"Numeric arrays can only be ingested when '%s' is set to 'array' in the MSQ query's context. "
+ "Current value of the parameter [%s]",
MultiStageQueryContext.CTX_ARRAY_INGEST_MODE,
StringUtils.toLowerCase(arrayIngestMode.name())
);
}
default:
final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(type);
return DimensionHandlerUtils.getHandlerFromCapabilities(column, capabilities, null)
.getDimensionSchema(capabilities);
} else {
throw new ISE("Cannot create dimension for type [%s]", type.toString());
}
} else {
final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(type);
return DimensionHandlerUtils.getHandlerFromCapabilities(column, capabilities, null)
.getDimensionSchema(capabilities);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
* {@link org.apache.druid.segment.AutoTypeColumnSchema} for all 'standard' type columns during segment generation,
* see {@link DimensionSchemaUtils#createDimensionSchema} for more details.
*
* <li><b>arrayIngestMode</b>: Tri-state query context that controls the behaviour and support of arrays that are
* ingested via MSQ. If set to 'none', arrays are not allowed to be ingested in MSQ. If set to 'array', array types
* can be ingested as expected. If set to 'mvd', numeric arrays can not be ingested, and string arrays will be
* ingested as MVDs (this is kept for legacy purpose).
* </ol>
**/
public class MultiStageQueryContext
Expand Down Expand Up @@ -127,6 +131,11 @@ public class MultiStageQueryContext
public static final String CTX_INDEX_SPEC = "indexSpec";

public static final String CTX_USE_AUTO_SCHEMAS = "useAutoColumnSchemas";
public static final boolean DEFAULT_USE_AUTO_SCHEMAS = false;

public static final String CTX_ARRAY_INGEST_MODE = "arrayIngestMode";
public static final ArrayIngestMode DEFAULT_ARRAY_INGEST_MODE = ArrayIngestMode.MVD;


private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);

Expand Down Expand Up @@ -266,7 +275,12 @@ public static IndexSpec getIndexSpec(final QueryContext queryContext, final Obje

public static boolean useAutoColumnSchemas(final QueryContext queryContext)
{
return queryContext.getBoolean(CTX_USE_AUTO_SCHEMAS, false);
return queryContext.getBoolean(CTX_USE_AUTO_SCHEMAS, DEFAULT_USE_AUTO_SCHEMAS);
}

public static ArrayIngestMode getArrayIngestMode(final QueryContext queryContext)
{
return queryContext.getEnum(CTX_ARRAY_INGEST_MODE, ArrayIngestMode.class, DEFAULT_ARRAY_INGEST_MODE);
}

/**
Expand Down
Loading

0 comments on commit b0edbc3

Please sign in to comment.