Skip to content

Commit

Permalink
remix nested columns (#14014)
Browse files Browse the repository at this point in the history
changes:
* introduce ColumnFormat to separate physical storage format from logical type. ColumnFormat is now used instead of ColumnCapabilities to get column handlers for segment creation
* introduce new 'auto' type indexer and merger which produces a new common nested format of columns, which is the next logical iteration of the nested column stuff. Essentially this is an automatic type column indexer that produces the most appropriate column for the given inputs, making either STRING, ARRAY<STRING>, LONG, ARRAY<LONG>, DOUBLE, ARRAY<DOUBLE>, or COMPLEX<json>.
* revert NestedDataColumnIndexer, NestedDataColumnMerger, NestedDataColumnSerializer to their version pre #13803 behavior (v4) for backwards compatibility
* fix a bug in RoaringBitmapSerdeFactory if anything actually ever wrote out an empty bitmap using toBytes and then later tried to read it (the nerve!)
  • Loading branch information
clintropolis authored Apr 5, 2023
1 parent f60f377 commit d21babc
Show file tree
Hide file tree
Showing 113 changed files with 10,271 additions and 1,541 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.TransformingInputEntityReader;
Expand Down Expand Up @@ -269,14 +269,14 @@ public void testParseNestedData() throws Exception
timestampSpec,
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("event"),
new StringDimensionSchema("id"),
new StringDimensionSchema("someOtherId"),
new StringDimensionSchema("isValid"),
new StringDimensionSchema("eventType"),
new NestedDataDimensionSchema("foo"),
new NestedDataDimensionSchema("bar"),
new StringDimensionSchema("someBytesColumn")
new AutoTypeColumnSchema("event"),
new AutoTypeColumnSchema("id"),
new AutoTypeColumnSchema("someOtherId"),
new AutoTypeColumnSchema("isValid"),
new AutoTypeColumnSchema("eventType"),
new AutoTypeColumnSchema("foo"),
new AutoTypeColumnSchema("bar"),
new AutoTypeColumnSchema("someBytesColumn")
)
),
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.druid.segment.BaseProgressIndicator;
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnFormat;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -292,7 +292,7 @@ private static IncrementalIndex makeIncrementalIndex(
AggregatorFactory[] aggs,
HadoopDruidIndexerConfig config,
Iterable<String> oldDimOrder,
Map<String, ColumnCapabilities> oldCapabilities
Map<String, ColumnFormat> oldCapabilities
)
{
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
Expand Down Expand Up @@ -456,7 +456,7 @@ protected void reduce(final BytesWritable key, Iterable<BytesWritable> values, f
dimOrder.addAll(index.getDimensionOrder());
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnHandlerCapabilities());
index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnFormats());
}

index.add(value);
Expand Down Expand Up @@ -752,7 +752,7 @@ public void doRun()
combiningAggs,
config,
allDimensionNames,
persistIndex.getColumnHandlerCapabilities()
persistIndex.getColumnFormats()
);
startTime = System.currentTimeMillis();
++indexCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
Expand Down Expand Up @@ -80,11 +76,9 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -869,47 +863,6 @@ public void close()
);
}

@VisibleForTesting
static DimensionSchema createDimensionSchema(
String name,
ColumnCapabilities capabilities,
DimensionSchema.MultiValueHandling multiValueHandling
)
{
switch (capabilities.getType()) {
case FLOAT:
Preconditions.checkArgument(
multiValueHandling == null,
"multi-value dimension [%s] is not supported for float type yet",
name
);
return new FloatDimensionSchema(name);
case LONG:
Preconditions.checkArgument(
multiValueHandling == null,
"multi-value dimension [%s] is not supported for long type yet",
name
);
return new LongDimensionSchema(name);
case DOUBLE:
Preconditions.checkArgument(
multiValueHandling == null,
"multi-value dimension [%s] is not supported for double type yet",
name
);
return new DoubleDimensionSchema(name);
case STRING:
return new StringDimensionSchema(name, multiValueHandling, capabilities.hasBitmapIndexes());
default:
DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(
name,
capabilities,
multiValueHandling
);
return handler.getDimensionSchema(capabilities);
}
}

/**
* Class for fetching and analyzing existing segments in order to generate reingestion specs.
*/
Expand Down Expand Up @@ -1109,7 +1062,7 @@ private void processDimensionsSpec(final QueryableIndex index)
);

if (!uniqueDims.containsKey(dimension)) {
final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
Preconditions.checkNotNull(
dimensionHandlerMap.get(dimension),
"Cannot find dimensionHandler for dimension[%s]",
dimension
Expand All @@ -1118,11 +1071,7 @@ private void processDimensionsSpec(final QueryableIndex index)
uniqueDims.put(dimension, uniqueDims.size());
dimensionSchemaMap.put(
dimension,
createDimensionSchema(
dimension,
columnHolder.getHandlerCapabilities(),
dimensionHandler.getMultivalueHandling()
)
columnHolder.getColumnFormat().getColumnSchema(dimension)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,12 @@ public SamplerResponse sample(
if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionDesc.getName())) {
final ColumnType columnType = dimensionDesc.getCapabilities().toColumnType();
signatureBuilder.add(dimensionDesc.getName(), columnType);
// for now, use legacy types instead of standard type
logicalDimensionSchemas.add(
DimensionSchema.getDefaultSchemaForBuiltInType(dimensionDesc.getName(), dimensionDesc.getCapabilities())
);
physicalDimensionSchemas.add(
dimensionDesc.getHandler().getDimensionSchema(dimensionDesc.getCapabilities())
dimensionDesc.getIndexer().getFormat().getColumnSchema(dimensionDesc.getName())
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.DoubleDimensionHandler;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
Expand Down Expand Up @@ -1670,107 +1669,6 @@ public void testChooseFinestGranularityAllNulls()
Assert.assertNull(chooseFinestGranularityHelper(input));
}

@Test
public void testCreateDimensionSchema()
{
final String dimensionName = "dim";
DimensionHandlerUtils.registerDimensionHandlerProvider(
ExtensionDimensionHandler.TYPE_NAME,
d -> new ExtensionDimensionHandler(d)
);
DimensionSchema stringSchema = CompactionTask.createDimensionSchema(
dimensionName,
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
.setHasBitmapIndexes(true)
.setDictionaryEncoded(true)
.setDictionaryValuesUnique(true)
.setDictionaryValuesUnique(true),
DimensionSchema.MultiValueHandling.SORTED_SET
);

Assert.assertTrue(stringSchema instanceof StringDimensionSchema);

DimensionSchema floatSchema = CompactionTask.createDimensionSchema(
dimensionName,
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.FLOAT),
null
);
Assert.assertTrue(floatSchema instanceof FloatDimensionSchema);

DimensionSchema doubleSchema = CompactionTask.createDimensionSchema(
dimensionName,
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE),
null
);
Assert.assertTrue(doubleSchema instanceof DoubleDimensionSchema);

DimensionSchema longSchema = CompactionTask.createDimensionSchema(
dimensionName,
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.LONG),
null
);
Assert.assertTrue(longSchema instanceof LongDimensionSchema);

DimensionSchema extensionSchema = CompactionTask.createDimensionSchema(
dimensionName,
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(
ColumnType.ofComplex(ExtensionDimensionHandler.TYPE_NAME)
),
null
);
Assert.assertTrue(extensionSchema instanceof ExtensionDimensionSchema);
}

@Test
public void testCreateDimensionSchemaIllegalFloat()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("multi-value dimension [foo] is not supported for float type yet");
CompactionTask.createDimensionSchema(
"foo",
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.FLOAT),
DimensionSchema.MultiValueHandling.SORTED_SET
);
}

@Test
public void testCreateDimensionSchemaIllegalDouble()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("multi-value dimension [foo] is not supported for double type yet");
CompactionTask.createDimensionSchema(
"foo",
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE),
DimensionSchema.MultiValueHandling.SORTED_SET
);
}

@Test
public void testCreateDimensionSchemaIllegalLong()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("multi-value dimension [foo] is not supported for long type yet");
CompactionTask.createDimensionSchema(
"foo",
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.LONG),
DimensionSchema.MultiValueHandling.SORTED_SET
);
}

@Test
public void testCreateDimensionSchemaIllegalComplex()
{
expectedException.expect(ISE.class);
expectedException.expectMessage("Can't find DimensionHandlerProvider for typeName [unknown]");
CompactionTask.createDimensionSchema(
"foo",
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(
ColumnType.ofComplex("unknown")
),
DimensionSchema.MultiValueHandling.SORTED_SET
);
}

private Granularity chooseFinestGranularityHelper(List<Granularity> granularities)
{
SettableSupplier<Granularity> queryGranularity = new SettableSupplier<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;

import java.util.Objects;
Expand All @@ -47,7 +51,8 @@
@JsonSubTypes.Type(name = DimensionSchema.FLOAT_TYPE_NAME, value = FloatDimensionSchema.class),
@JsonSubTypes.Type(name = DimensionSchema.DOUBLE_TYPE_NAME, value = DoubleDimensionSchema.class),
@JsonSubTypes.Type(name = DimensionSchema.SPATIAL_TYPE_NAME, value = NewSpatialDimensionSchema.class),
@JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = NestedDataDimensionSchema.class)
@JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = NestedDataDimensionSchema.class),
@JsonSubTypes.Type(name = AutoTypeColumnSchema.TYPE, value = AutoTypeColumnSchema.class)
})
public abstract class DimensionSchema
{
Expand Down Expand Up @@ -150,6 +155,17 @@ public boolean hasBitmapIndex()
@JsonIgnore
public abstract ColumnType getColumnType();

@JsonIgnore
public DimensionHandler getDimensionHandler()
{
// default implementation for backwards compatibility
return DimensionHandlerUtils.getHandlerFromCapabilities(
name,
IncrementalIndex.makeDefaultCapabilitiesFromValueType(getColumnType()),
multiValueHandling
);
}

@Override
public boolean equals(final Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.DoubleDimensionHandler;
import org.apache.druid.segment.column.ColumnType;

public class DoubleDimensionSchema extends DimensionSchema
Expand All @@ -42,4 +44,10 @@ public ColumnType getColumnType()
{
return ColumnType.DOUBLE;
}

@Override
public DimensionHandler getDimensionHandler()
{
return new DoubleDimensionHandler(getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.FloatDimensionHandler;
import org.apache.druid.segment.column.ColumnType;

public class FloatDimensionSchema extends DimensionSchema
Expand All @@ -46,4 +48,10 @@ public ColumnType getColumnType()
{
return ColumnType.FLOAT;
}

@Override
public DimensionHandler getDimensionHandler()
{
return new FloatDimensionHandler(getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.LongDimensionHandler;
import org.apache.druid.segment.column.ColumnType;


Expand All @@ -47,4 +49,10 @@ public ColumnType getColumnType()
{
return ColumnType.LONG;
}

@Override
public DimensionHandler getDimensionHandler()
{
return new LongDimensionHandler(getName());
}
}
Loading

0 comments on commit d21babc

Please sign in to comment.