Skip to content

Commit

Permalink
[trinodb#10] Added iceberg Bloomfilter & Bitmap index supported
Browse files Browse the repository at this point in the history
  • Loading branch information
向阿鲲 authored and fengguangyuan committed Mar 9, 2022
1 parent f4adbff commit 9c3cca8
Show file tree
Hide file tree
Showing 11 changed files with 326 additions and 12 deletions.
8 changes: 7 additions & 1 deletion plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.iceberg.version>0.12.0</dep.iceberg.version>
<dep.iceberg.version>0.11.1-bili-0.5.2</dep.iceberg.version>
<dep.caffeine.version>3.0.3</dep.caffeine.version>
</properties>

Expand Down Expand Up @@ -167,6 +167,12 @@
<version>${dep.iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-index</artifactId>
<version>${dep.iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class IcebergConfig
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
private boolean projectionPushdownEnabled = true;
private boolean readIndicesSwitchOn;

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -167,4 +168,16 @@ public IcebergConfig setProjectionPushdownEnabled(boolean projectionPushdownEnab
this.projectionPushdownEnabled = projectionPushdownEnabled;
return this;
}

public boolean isReadIndicesSwitchOn()
{
return readIndicesSwitchOn;
}

@Config("iceberg.read-indices-switch-on")
public IcebergConfig setReadIndicesSwitchOn(boolean readIndicesSwitchOn)
{
this.readIndicesSwitchOn = readIndicesSwitchOn;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.graph.Traverser;
import io.airlift.log.Logger;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.orc.OrcColumn;
import io.trino.orc.OrcCorruptionException;
Expand Down Expand Up @@ -59,6 +60,7 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.ArrayType;
Expand All @@ -74,6 +76,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.MappedFields;
import org.apache.iceberg.mapping.NameMapping;
Expand Down Expand Up @@ -147,6 +150,8 @@
public class IcebergPageSourceProvider
implements ConnectorPageSourceProvider
{
private static final Logger log = Logger.get(IcebergPageSourceProvider.class);

private final HdfsEnvironment hdfsEnvironment;
private final FileFormatDataSourceStats fileFormatDataSourceStats;
private final OrcReaderOptions orcReaderOptions;
Expand Down Expand Up @@ -180,6 +185,18 @@ public ConnectorPageSource createPageSource(
IcebergSplit split = (IcebergSplit) connectorSplit;
IcebergTableHandle table = (IcebergTableHandle) connectorTable;

HdfsContext hdfsContext = new HdfsContext(session);
FileScanTask fileScanTask = split.decodeFileScanTask();
if (fileScanTask != null) {
long start = System.currentTimeMillis();
HdfsFileIo hdfsFileIo = new HdfsFileIo(hdfsEnvironment, hdfsContext);
if (!fileScanTask.isRequired(hdfsFileIo, false)) {
log.debug("Indices hit for file : %s, split skipped, time spent : %s ms", fileScanTask.file().path(), System.currentTimeMillis() - start);
return new EmptyPageSource();
}
log.debug("Indices missed for file : %s, time spent : %s ms", fileScanTask.file().path(), System.currentTimeMillis() - start);
}

List<IcebergColumnHandle> icebergColumns = columns.stream()
.map(IcebergColumnHandle.class::cast)
.collect(toImmutableList());
Expand All @@ -194,7 +211,6 @@ public ConnectorPageSource createPageSource(
.intersect(dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast))
.simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD);

HdfsContext hdfsContext = new HdfsContext(session);
ReaderPageSource dataPageSource = createDataPageSource(
session,
hdfsContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public final class IcebergSessionProperties
private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout";
private static final String STATISTICS_ENABLED = "statistics_enabled";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String READ_INDICES_SWITCH_ON = "read_indices_switch_on";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand Down Expand Up @@ -210,6 +211,11 @@ public IcebergSessionProperties(
"Read only required fields from a struct",
icebergConfig.isProjectionPushdownEnabled(),
false))
.add(booleanProperty(
READ_INDICES_SWITCH_ON,
"Switch to read iceberg indices, default false",
icebergConfig.isReadIndicesSwitchOn(),
false))
.build();
}

Expand Down Expand Up @@ -345,4 +351,9 @@ public static boolean isProjectionPushdownEnabled(ConnectorSession session)
{
return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isReadIndicesSwitchOn(ConnectorSession session)
{
return session.getProperty(READ_INDICES_SWITCH_ON, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSplit;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -31,13 +36,17 @@
public class IcebergSplit
implements ConnectorSplit
{
private static final Logger log = Logger.get(IcebergSplit.class);

private final String path;
private final long start;
private final long length;
private final long fileSize;
private final FileFormat fileFormat;
private final List<HostAddress> addresses;
private final Map<Integer, Optional<String>> partitionKeys;
private final String fileScanTaskEncode;
private FileScanTask fileScanTask;

@JsonCreator
public IcebergSplit(
Expand All @@ -47,7 +56,8 @@ public IcebergSplit(
@JsonProperty("fileSize") long fileSize,
@JsonProperty("fileFormat") FileFormat fileFormat,
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("partitionKeys") Map<Integer, Optional<String>> partitionKeys)
@JsonProperty("partitionKeys") Map<Integer, Optional<String>> partitionKeys,
@JsonProperty("fileScanTaskEncode") String fileScanTaskEncode)
{
this.path = requireNonNull(path, "path is null");
this.start = start;
Expand All @@ -56,6 +66,7 @@ public IcebergSplit(
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.partitionKeys = ImmutableMap.copyOf(requireNonNull(partitionKeys, "partitionKeys is null"));
this.fileScanTaskEncode = fileScanTaskEncode;
}

@Override
Expand Down Expand Up @@ -107,6 +118,30 @@ public Map<Integer, Optional<String>> getPartitionKeys()
return partitionKeys;
}

@JsonProperty
public String getFileScanTaskEncode()
{
return fileScanTaskEncode;
}

public FileScanTask decodeFileScanTask()
{
if (fileScanTask != null) {
return fileScanTask;
}

if (getFileScanTaskEncode() != null && !getFileScanTaskEncode().isBlank()) {
byte[] decode = Base64.getDecoder().decode(getFileScanTaskEncode());
try (ObjectInputStream ob = new ObjectInputStream(new ByteArrayInputStream(decode))) {
fileScanTask = (FileScanTask) ob.readObject();
}
catch (Exception e) {
log.error(e, "Decode fileScanTask error");
}
}
return fileScanTask;
}

@Override
public Object getInfo()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.IndexSpec;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.util.PropertyUtil;

import javax.inject.Inject;

import java.util.Set;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isReadIndicesSwitchOn;
import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getIdentityPartitions;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -81,14 +85,34 @@ public ConnectorSplitSource getSplits(

TableScan tableScan = icebergTable.newScan()
.useSnapshot(table.getSnapshotId().get());
boolean indicesEnabled = isIndicesEnabled(tableScan, session);
if (indicesEnabled) {
tableScan = tableScan.includeIndexStats();
}

IcebergSplitSource splitSource = new IcebergSplitSource(
table,
identityPartitionColumns,
tableScan,
dynamicFilter,
dynamicFilteringWaitTimeout,
constraint);
constraint,
indicesEnabled);

return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
}

private boolean isIndicesEnabled(TableScan tableScan, ConnectorSession session)
{
// Only when both table level indices and trino session indices property are enabled, then indices is enabled
boolean indicesEnable = PropertyUtil.propertyAsBoolean(tableScan.table().properties(), TableProperties.HEURISTIC_INDEX_ENABLED_ON_READ,
TableProperties.HEURISTIC_INDEX_ENABLED_ON_READ_DEFAULT) && isReadIndicesSwitchOn(session);
if (indicesEnable) {
// check the table has a valid IndexSpec
IndexSpec indexSpec = tableScan.table().indexSpec();
return indexSpec != null && !indexSpec.isUnIndexed();
}

return false;
}
}
Loading

0 comments on commit 9c3cca8

Please sign in to comment.