Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor SemanticCreator #16700

Merged
merged 8 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ public void testSegmentProviderFindSegmentsWithEmptySegmentsThrowException()
);
provider.checkSegments(LockGranularity.TIME_CHUNK, ImmutableList.of());
}

@Test
public void testCreateIngestionSchema() throws IOException
{
Expand Down Expand Up @@ -2033,14 +2033,6 @@ private static class TestIndexIO extends IndexIO
}
}

final Metadata metadata = new Metadata(
null,
aggregatorFactories.toArray(new AggregatorFactory[0]),
null,
null,
null
);

queryableIndexMap.put(
entry.getValue(),
new SimpleQueryableIndex(
Expand All @@ -2049,7 +2041,13 @@ private static class TestIndexIO extends IndexIO
null,
columnMap,
null,
metadata,
() -> new Metadata(
null,
aggregatorFactories.toArray(new AggregatorFactory[0]),
null,
null,
null
),
false
)
);
Expand All @@ -2074,7 +2072,7 @@ void removeMetadata(File file)
index.getBitmapFactoryForDimensions(),
index.getColumns(),
index.getFileMapper(),
null,
() -> null,
false
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
* under the License.
*/

package org.apache.druid.query.rowsandcols;
package org.apache.druid.common.semantic;

import org.apache.druid.query.rowsandcols.RowsAndColumns;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
Expand All @@ -26,8 +28,8 @@

/**
* Annotation used to indicate that the method is used as a creator for a semantic interface.
*
* Used in conjuction with {@link RowsAndColumns#makeAsMap(Class)} to build maps for simplified implementation of
* <p>
* Used in conjuction with {@link SemanticUtils#makeAsMap(Class)} to build maps for simplified implementation of
* the {@link RowsAndColumns#as(Class)} method.
*/
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.common.semantic;

import org.apache.druid.error.DruidException;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;

public class SemanticUtils
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this Utils could be renamed to be the interface containing the as for these things; the static method could remain as those are providing services...

please also add apidoc how this as stuff works/etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea about creating the interface for .as() and having these utils be static helpers on it. As a refactor, that should probably be done for all of the places that are using an .as() so I think I'd like to let this current PR go through (as it's following the current pattern in the code) and one of us can take on the refactor as you defined it as I do think that's an improvement.

{
private static final Map<Class<?>, Map<Class<?>, Function<?, ?>>> OVERRIDES = new LinkedHashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to put everything into a centralized map?
there is no point in caching this as all filed which are calling makeAsMap are static fields for which only once that method will be invoked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exists to allow extensions to register new interfaces and implementations without needing to impact the core code. This is probably worthy of javadoc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a javadoc

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean to override default behaviour?
could you give an example? how this supposed to work if 2-3 classes want to override the same?

what's the problem you are trying to solve?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially, we could have a class A that has a SemanticCreator toB(). This allows A.as(B.class) to work.

However, there could be extensions that want a different implementation to be bound. The extension would want to use that implementation only if the extension is loaded and also, it would not be able to change the first binding. Overrides will allow the extension to bind something like:

SemanticUtils.registerAsOverride(
        A.class,
        B.class,
        (a) -> new C(a) // C extends B
    );

to modify
If there are multiple bindings for A and B from different places, SemanticUtils would throw an exception.


/**
* Allows the registration of overrides, which allows overriding of already existing mappings.
* This allows extensions to register mappings.
*/
@SuppressWarnings("unused")
public static <C, T> void registerAsOverride(Class<C> clazz, Class<T> asInterface, Function<C, T> fn)
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
{
final Map<Class<?>, Function<?, ?>> classOverrides = OVERRIDES.computeIfAbsent(
clazz,
theClazz -> new LinkedHashMap<>()
);

final Function<?, ?> oldVal = classOverrides.get(asInterface);
if (oldVal != null) {
throw DruidException.defensive(
"Attempt to side-override the same interface [%s] multiple times for the same class [%s].",
asInterface,
clazz
);
} else {
classOverrides.put(asInterface, fn);
}
}

public static <T> Map<Class<?>, Function<T, ?>> makeAsMap(Class<T> clazz)
{
final Map<Class<?>, Function<T, ?>> retVal = new HashMap<>();

for (Method method : clazz.getMethods()) {
if (method.isAnnotationPresent(SemanticCreator.class)) {
if (method.getParameterCount() != 0) {
throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method);
}

retVal.put(method.getReturnType(), arg -> {
try {
return method.invoke(arg);
}
catch (InvocationTargetException | IllegalAccessException e) {
throw DruidException.defensive().build(e, "Problem invoking method [%s]", method);
}
});
}
}

final Map<Class<?>, Function<?, ?>> classOverrides = OVERRIDES.get(clazz);
if (classOverrides != null) {
for (Map.Entry<Class<?>, Function<?, ?>> overrideEntry : classOverrides.entrySet()) {
//noinspection unchecked
retVal.put(overrideEntry.getKey(), (Function<T, ?>) overrideEntry.getValue());
}
}

return retVal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ public static DruidException defensive(String format, Object... args)
return defensive().build(format, args);
}

/**
* Build a "defensive" exception, this is an exception that should never actually be triggered, but we are
* throwing it inside a defensive check.
*
* @return A builder for a defensive exception.
*/
public static DruidException defensive(Throwable cause, String format, Object... args)
{
return defensive().build(cause, format, args);
}

/**
* Build a "defensive" exception, this is an exception that should never actually be triggered. Throw to
* allow messages to be seen by developers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntComparator;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.common.semantic.SemanticCreator;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.operator.ColumnWithDirection;
Expand Down Expand Up @@ -73,7 +75,7 @@
public class ArrayListRowsAndColumns<RowType> implements AppendableRowsAndColumns
{
@SuppressWarnings("rawtypes")
private static final Map<Class<?>, Function<ArrayListRowsAndColumns, ?>> AS_MAP = RowsAndColumns
private static final Map<Class<?>, Function<ArrayListRowsAndColumns, ?>> AS_MAP = SemanticUtils
.makeAsMap(ArrayListRowsAndColumns.class);

private final ArrayList<RowType> rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.druid.query.rowsandcols;

import com.google.common.collect.ImmutableList;
import org.apache.druid.common.semantic.SemanticCreator;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.frame.key.KeyColumn;
Expand Down Expand Up @@ -66,7 +68,7 @@

public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
{
private static final Map<Class<?>, Function<LazilyDecoratedRowsAndColumns, ?>> AS_MAP = RowsAndColumns
private static final Map<Class<?>, Function<LazilyDecoratedRowsAndColumns, ?>> AS_MAP = SemanticUtils
.makeAsMap(LazilyDecoratedRowsAndColumns.class);

private RowsAndColumns base;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,13 @@

package org.apache.druid.query.rowsandcols;

import org.apache.druid.error.DruidException;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

/**
* An interface representing a chunk of RowsAndColumns. Essentially a RowsAndColumns is just a batch of rows
Expand Down Expand Up @@ -75,31 +69,6 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input)
return retVal;
}

static <T> Map<Class<?>, Function<T, ?>> makeAsMap(Class<T> clazz)
{
Map<Class<?>, Function<T, ?>> retVal = new HashMap<>();

for (Method method : clazz.getMethods()) {
if (method.isAnnotationPresent(SemanticCreator.class)) {
if (method.getParameterCount() != 0) {
throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method);
}

retVal.put(method.getReturnType(), arg -> {
try {
return method.invoke(arg);
}
catch (InvocationTargetException | IllegalAccessException e) {
throw DruidException.defensive().build(e, "Problem invoking method [%s]", method);
}
});
}
}

return retVal;
}


/**
* The set of column names available from the RowsAndColumns
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

package org.apache.druid.query.rowsandcols.concrete;

import org.apache.druid.common.semantic.SemanticCreator;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.SemanticCreator;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.QueryableIndex;
Expand All @@ -41,7 +42,7 @@

public class QueryableIndexRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
{
private static final Map<Class<?>, Function<QueryableIndexRowsAndColumns, ?>> AS_MAP = RowsAndColumns
private static final Map<Class<?>, Function<QueryableIndexRowsAndColumns, ?>> AS_MAP = SemanticUtils
.makeAsMap(QueryableIndexRowsAndColumns.class);

private final QueryableIndex index;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.rowsandcols.semantic;

import org.apache.druid.frame.Frame;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.RowSignature;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

public class DefaultFrameMaker implements FrameMaker
{
private final RowsAndColumns rac;

public DefaultFrameMaker(RowsAndColumns rac)
{
this.rac = rac;
}

@Override
public RowSignature computeSignature()
{
final RowSignature.Builder signatureBuilder = RowSignature.builder();
for (String column : rac.getColumnNames()) {
final Column racColumn = rac.findColumn(column);
if (racColumn == null) {
continue;
}
signatureBuilder.add(column, racColumn.toAccessor().getType());
}

return signatureBuilder.build();
}

@Override
public Frame toColumnBasedFrame()
{
final AtomicInteger rowId = new AtomicInteger(0);
final int numRows = rac.numRows();
final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac);
final ColumnSelectorFactory selectorFactory = csfm.make(rowId);

final ArenaMemoryAllocatorFactory memFactory = new ArenaMemoryAllocatorFactory(200 << 20); // 200 MB

final FrameWriter frameWriter = FrameWriters.makeColumnBasedFrameWriterFactory(
memFactory,
computeSignature(),
Collections.emptyList()
).newFrameWriter(selectorFactory);

rowId.set(0);
for (; rowId.get() < numRows; rowId.incrementAndGet()) {
frameWriter.addSelection();
}

return Frame.wrap(frameWriter.toByteArray());
}
}
Loading
Loading