diff --git a/.github/workflows/github_actions.yml b/.github/workflows/github_actions.yml index fbbcff0574..8c815e4274 100644 --- a/.github/workflows/github_actions.yml +++ b/.github/workflows/github_actions.yml @@ -17,42 +17,30 @@ jobs: os: [ ubuntu-latest, macos-11 ] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 + + - name: Set up Java + uses: actions/setup-java@v3 + with: + distribution: 'adopt' + java-version: '17' + check-latest: true - run: | - export JAVA_HOME=$JAVA_HOME_11_X64 mvn checkstyle:checkstyle shell: bash name: checkStyle - run: | - export JAVA_HOME=$JAVA_HOME_11_X64 - mvn clean package -DskipTests + mvn clean package -e -DskipTests shell: bash name: checkFormat - run: | - export JAVA_HOME=$JAVA_HOME_11_X64 mvn test shell: bash name: test - - run: | - set -e pipefail - # Display log files if the build failed - echo "Dumping log files for failed build" - echo "----------------------------------" - for f in $(find $BUILD_REPOSITORY_LOCALPATH -name *.dumpstream); - do echo "------" - echo $f - echo "======" - cat $f - done; - shell: bash - name: error_print - if: ${{ failure() }} - - Release: if: startsWith(github.ref, 'refs/tags/') needs: [Test] diff --git a/pom.xml b/pom.xml index d770aa22d5..43932eaf13 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ trino-root io.trino - 357 + 405 trino-tiledb @@ -54,7 +54,13 @@ com.github.oshi oshi-core - 3.10.0 + 6.4.0 + + + org.slf4j + slf4j-api + + @@ -145,7 +151,7 @@ io.trino trino-testing - 357 + 405 test @@ -191,6 +197,7 @@ org.testng testng + 7.3.0 test diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBClient.java b/src/main/java/io/trino/plugin/tiledb/TileDBClient.java index 3a35fa0c1a..c64415e815 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBClient.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBClient.java @@ -62,7 +62,6 @@ public class TileDBClient protected Context ctx; protected TileDBConfig config; - protected oshi.SystemInfo systemInfo; protected HardwareAbstractionLayer hardwareAbstractionLayer; @@ -101,6 +100,11 @@ public TileDBClient(TileDBConnectorId connectorId, TileDBConfig config) hardwareAbstractionLayer = systemInfo.getHardware(); } + public HardwareAbstractionLayer getHardwareAbstractionLayer() + { + return hardwareAbstractionLayer; + } + /** * Get plugin configuration * @return TileDB plugin configuration @@ -229,11 +233,6 @@ public Context getCtx() return ctx; } - public HardwareAbstractionLayer getHardwareAbstractionLayer() - { - return hardwareAbstractionLayer; - } - /** * Rollback a create table statement, this just drops the array * @param handle tiledb table handler diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBConfig.java b/src/main/java/io/trino/plugin/tiledb/TileDBConfig.java index 340cd0a390..9b50032d6e 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBConfig.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBConfig.java @@ -35,7 +35,7 @@ public class TileDBConfig private int readBufferSize = 1024 * 1024 * 10; - private int writeBufferSize = 1024 * 1024 * 10; + private int writeBufferSize = 1024 * 1024 * 1; private String awsAccessKeyId; diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBConnector.java b/src/main/java/io/trino/plugin/tiledb/TileDBConnector.java index 9ed3176f20..e4c36d2478 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBConnector.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBConnector.java @@ -21,6 +21,7 @@ import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorRecordSetProvider; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.session.PropertyMetadata; @@ -78,7 +79,7 @@ public TileDBConnector( } @Override - public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit) { TileDBTransactionHandle transaction = new TileDBTransactionHandle(); transactions.put(transaction, metadata); @@ -100,7 +101,7 @@ public void rollback(ConnectorTransactionHandle transactionHandle) } @Override - public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) + public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle) { return metadata; } diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBConnectorFactory.java b/src/main/java/io/trino/plugin/tiledb/TileDBConnectorFactory.java index f08f5fbcd7..58cb13faec 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBConnectorFactory.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBConnectorFactory.java @@ -22,7 +22,6 @@ import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; -import io.trino.spi.connector.ConnectorHandleResolver; import java.util.Map; @@ -42,12 +41,6 @@ public String getName() return "tiledb"; } - @Override - public ConnectorHandleResolver getHandleResolver() - { - return new TileDBHandleResolver(); - } - @Override public Connector create(String catalogName, Map requiredConfig, ConnectorContext context) { @@ -62,7 +55,6 @@ public Connector create(String catalogName, Map requiredConfig, }); Injector injector = app - .strictConfig() .doNotInitializeLogging() .setRequiredConfigurationProperties(requiredConfig) .initialize(); diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBHandleResolver.java b/src/main/java/io/trino/plugin/tiledb/TileDBHandleResolver.java deleted file mode 100644 index 5f13b89d57..0000000000 --- a/src/main/java/io/trino/plugin/tiledb/TileDBHandleResolver.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.tiledb; - -import io.trino.spi.connector.ColumnHandle; -import io.trino.spi.connector.ConnectorHandleResolver; -import io.trino.spi.connector.ConnectorInsertTableHandle; -import io.trino.spi.connector.ConnectorOutputTableHandle; -import io.trino.spi.connector.ConnectorSplit; -import io.trino.spi.connector.ConnectorTableHandle; -import io.trino.spi.connector.ConnectorTableLayoutHandle; -import io.trino.spi.connector.ConnectorTransactionHandle; - -/** - * TileDBHandleResolver returns the class types for each handler component so prestodb can load the correct - * classes - */ -public class TileDBHandleResolver - implements ConnectorHandleResolver -{ - @Override - public Class getTableLayoutHandleClass() - { - return TileDBTableLayoutHandle.class; - } - - @Override - public Class getTableHandleClass() - { - return TileDBTableHandle.class; - } - - @Override - public Class getColumnHandleClass() - { - return TileDBColumnHandle.class; - } - - @Override - public Class getSplitClass() - { - return TileDBSplit.class; - } - - @Override - public Class getTransactionHandleClass() - { - return TileDBTransactionHandle.class; - } - - @Override - public Class getOutputTableHandleClass() - { - return TileDBOutputTableHandle.class; - } - - @Override - public Class getInsertTableHandleClass() - { - return TileDBOutputTableHandle.class; - } -} diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBMetadata.java b/src/main/java/io/trino/plugin/tiledb/TileDBMetadata.java index 973ee49a46..532c5d44dd 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBMetadata.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBMetadata.java @@ -13,11 +13,9 @@ */ package io.trino.plugin.tiledb; -import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.tiledb.java.api.Array; @@ -35,28 +33,19 @@ import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMetadata; -import io.trino.spi.connector.ConnectorNewTableLayout; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorOutputTableHandle; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTableLayout; -import io.trino.spi.connector.ConnectorTableLayoutHandle; -import io.trino.spi.connector.ConnectorTableLayoutResult; import io.trino.spi.connector.ConnectorTableMetadata; -import io.trino.spi.connector.Constraint; -import io.trino.spi.connector.LocalProperty; +import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.connector.TableNotFoundException; -import io.trino.spi.predicate.Domain; -import io.trino.spi.predicate.Range; -import io.trino.spi.predicate.TupleDomain; -import io.trino.spi.predicate.ValueSet; import io.trino.spi.statistics.ComputedStatistics; import io.trino.spi.type.Type; import io.trino.spi.type.VarcharType; -import org.apache.commons.beanutils.ConvertUtils; import javax.inject.Inject; @@ -65,36 +54,26 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import static io.airlift.slice.Slices.utf8Slice; import static io.tiledb.java.api.ArrayType.TILEDB_SPARSE; import static io.tiledb.java.api.Constants.TILEDB_VAR_NUM; -import static io.tiledb.java.api.QueryType.TILEDB_READ; import static io.trino.plugin.tiledb.TileDBColumnProperties.getDimension; import static io.trino.plugin.tiledb.TileDBColumnProperties.getExtent; import static io.trino.plugin.tiledb.TileDBColumnProperties.getFilterList; import static io.trino.plugin.tiledb.TileDBColumnProperties.getLowerBound; -import static io.trino.plugin.tiledb.TileDBColumnProperties.getNullable; import static io.trino.plugin.tiledb.TileDBColumnProperties.getUpperBound; import static io.trino.plugin.tiledb.TileDBErrorCode.TILEDB_CREATE_TABLE_ERROR; -import static io.trino.plugin.tiledb.TileDBErrorCode.TILEDB_RECORD_SET_ERROR; import static io.trino.plugin.tiledb.TileDBModule.tileDBTypeFromTrinoType; import static io.trino.plugin.tiledb.TileDBSessionProperties.getEncryptionKey; -import static io.trino.plugin.tiledb.TileDBSessionProperties.getSplitOnlyPredicates; import static io.trino.plugin.tiledb.TileDBTableProperties.getEncryptionKey; -import static io.trino.spi.type.RealType.REAL; -import static java.lang.Float.floatToRawIntBits; import static java.util.Objects.requireNonNull; /** @@ -153,140 +132,6 @@ public TileDBTableHandle getTableHandle(ConnectorSession session, SchemaTableNam return new TileDBTableHandle(connectorId, tableName.getSchemaName(), tableName.getTableName(), table.getURI().toString()); } - @Override - public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns) - { - TileDBTableHandle tableHandle = (TileDBTableHandle) table; - - // Set the dimensions as the partition columns - Optional> partitioningColumns = Optional.empty(); - ImmutableList.Builder> localProperties = ImmutableList.builder(); - - Map columns = getColumnHandles(session, tableHandle); - boolean hasPredicate = constraint.predicate().isPresent(); - - // Predicates are fetched as summary of constraints - TupleDomain effectivePredicate = constraint.getSummary(); -// Set columnHandles = new HashSet<>(); //TODO check if possible in the future -// for (ColumnHandle e : columns.values()) { -// if (hasPredicate) { -// columnsWithPredicates.add(e); -// } -// //columns that are not included in the columnHandles are filtered by presto, not tileDB. Strings and queries with 'OR' are not pushed down for now. -// if ((((effectivePredicate.getDomains().get().get(e) != null) && -// (effectivePredicate.getDomains().get().get(e).getValues().getRanges().getOrderedRanges().size() > 1)))) { //having more than one range effectively means it is an OR condition, which is not yet supported by the core library. -// LOG.info("Column %s has an OR condition which is not yet supported by TileDB's native QueryCondition. The filtering will happen by Trino.", ((TileDBColumnHandle) e).getColumnName()); -// } -// else if (columnsWithPredicates.contains(e)) { //Predicates are not supported by the QueryCondition, thus we need to leave this to Presto. -// LOG.info("Column %s uses a Predicate which is not yet supported by TileDB's native QueryCondition. The filtering will happen by Trino.", ((TileDBColumnHandle) e).getColumnName()); -// } -// else { -// columnHandles.add(e); -// } -// } - - Set dimensionHandles = columns.values().stream() - .filter(e -> ((TileDBColumnHandle) e).getIsDimension()) - .collect(Collectors.toSet()); - - List columnsInLayout; - if (desiredColumns.isPresent()) { - // Add all dimensions since dimensions will always be returned by tiledb - Set desiredColumnsWithDimension = new HashSet<>(desiredColumns.get()); - desiredColumnsWithDimension.addAll(dimensionHandles); -// desiredColumnsWithDimension.addAll(columnHandles); - columnsInLayout = new ArrayList<>(desiredColumnsWithDimension); - } - else { - columnsInLayout = new ArrayList<>(columns.values()); - } - - // The only enforceable constraints are ones for dimension columns - Map enforceableDimensionDomains = new HashMap<>(Maps.filterKeys(effectivePredicate.getDomains().get(), Predicates.in(dimensionHandles))); - - if (!getSplitOnlyPredicates(session)) { - try { - Array array; - String key = getEncryptionKey(session); - if (key == null) { - array = new Array(tileDBClient.buildContext(session), tableHandle.getURI(), TILEDB_READ); - } - else { - array = new Array(tileDBClient.buildContext(session), tableHandle.getURI(), TILEDB_READ, EncryptionType.TILEDB_AES_256_GCM, key.getBytes()); - } - - HashMap nonEmptyDomain = array.nonEmptyDomain(); - // Find any dimension which do not have predicates and add one for the entire domain. - // This is required so we can later split on the predicates - for (ColumnHandle dimensionHandle : dimensionHandles) { - if (!enforceableDimensionDomains.containsKey(dimensionHandle)) { - TileDBColumnHandle columnHandle = ((TileDBColumnHandle) dimensionHandle); - if (nonEmptyDomain.containsKey(columnHandle.getColumnName())) { - Pair domain = nonEmptyDomain.get(columnHandle.getColumnName()); - Object nonEmptyMin = domain.getFirst(); - Object nonEmptyMax = domain.getSecond(); - Type type = columnHandle.getColumnType(); - if (nonEmptyMin == null || nonEmptyMax == null || nonEmptyMin.equals("") || nonEmptyMax.equals("")) { - continue; - } - - Range range; - if (REAL.equals(type)) { - range = Range.range(type, ((Integer) floatToRawIntBits((Float) nonEmptyMin)).longValue(), true, - ((Integer) floatToRawIntBits((Float) nonEmptyMax)).longValue(), true); - } - else if (type instanceof VarcharType) { - range = Range.range(type, utf8Slice(nonEmptyMin.toString()), true, - utf8Slice(nonEmptyMax.toString()), true); - } - else { - range = Range.range(type, - ConvertUtils.convert(nonEmptyMin, type.getJavaType()), true, - ConvertUtils.convert(nonEmptyMax, type.getJavaType()), true); - } - - enforceableDimensionDomains.put( - dimensionHandle, - Domain.create(ValueSet.ofRanges(range), false)); - } - } - } - array.close(); - } - catch (TileDBError tileDBError) { - throw new TrinoException(TILEDB_RECORD_SET_ERROR, tileDBError); - } - } - - TupleDomain enforceableTupleDomain = TupleDomain.withColumnDomains(enforceableDimensionDomains); - TupleDomain remainingTupleDomain; - - // The remaining tuples non-enforced by TileDB are attributes - remainingTupleDomain = TupleDomain.withColumnDomains(Maps.filterKeys(effectivePredicate.getDomains().get(), Predicates.not(Predicates.in(dimensionHandles)))); - - ConnectorTableLayout layout = new ConnectorTableLayout( - new TileDBTableLayoutHandle(tableHandle, enforceableTupleDomain, dimensionHandles), - Optional.of(columnsInLayout), - TupleDomain.all(), - Optional.empty(), - partitioningColumns, - Optional.empty(), - localProperties.build()); - - return ImmutableList.of(new ConnectorTableLayoutResult(layout, remainingTupleDomain)); - } - - @Override - public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) - { - TileDBTableLayoutHandle layout = (TileDBTableLayoutHandle) handle; - - // tables in this connector have a single layout - return getTableLayouts(session, layout.getTable(), Constraint.alwaysTrue(), Optional.empty()) - .get(0) - .getTableLayout(); - } - @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { @@ -400,7 +245,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe * @return output table handles */ @Override - public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout) + public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode) { TileDBOutputTableHandle handle = beginCreateArray(session, tableMetadata); setRollback(() -> tileDBClient.rollbackCreateTable(handle)); @@ -461,7 +306,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle } @Override - public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle) + public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns, RetryMode retryMode) { // Get schema/table names ConnectorTableMetadata tableMetadata = getTableMetadata(session, tableHandle); @@ -592,7 +437,7 @@ public TileDBOutputTableHandle beginCreateArray(ConnectorSession session, Connec if (filterPairs.isPresent()) { attribute.setFilterList(Util.createTileDBFilterList(localCtx, filterPairs.get())); } - attribute.setNullable(getNullable(columnProperties)); + attribute.setNullable(column.isNullable()); arraySchema.addAttribute(attribute); } @@ -672,10 +517,4 @@ public TileDBOutputTableHandle beginCreateArray(ConnectorSession session, Connec throw new TrinoException(TILEDB_CREATE_TABLE_ERROR, e); } } - - @Override - public boolean usesLegacyTableLayouts() - { - return true; - } } diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBModule.java b/src/main/java/io/trino/plugin/tiledb/TileDBModule.java index d4fc2deab2..4985d8a387 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBModule.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBModule.java @@ -43,7 +43,7 @@ import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; import static io.trino.spi.type.SmallintType.SMALLINT; -import static io.trino.spi.type.TimestampType.TIMESTAMP; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.util.Objects.requireNonNull; @@ -137,7 +137,7 @@ public static Type prestoTypeFromTileDBType(Datatype type) throws TileDBError case TILEDB_DATETIME_SEC: case TILEDB_DATETIME_MIN: case TILEDB_DATETIME_HR: - return TIMESTAMP; + return TIMESTAMP_MILLIS; case TILEDB_DATETIME_DAY: case TILEDB_DATETIME_WEEK: case TILEDB_DATETIME_MONTH: @@ -187,7 +187,7 @@ else if (type.equals(DOUBLE)) { else if (type.equals(DATE)) { return TILEDB_DATETIME_DAY; } - else if (type.equals(TIMESTAMP)) { + else if (type.equals(TIMESTAMP_MILLIS)) { return Datatype.TILEDB_DATETIME_MS; } //TODO: HANDLE ANY and other types diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBPageSink.java b/src/main/java/io/trino/plugin/tiledb/TileDBPageSink.java index 0d647db38b..f811c65b75 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBPageSink.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBPageSink.java @@ -69,7 +69,7 @@ import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; import static io.trino.spi.type.SmallintType.SMALLINT; -import static io.trino.spi.type.TimestampType.TIMESTAMP; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarbinaryType.VARBINARY; import static java.lang.Float.intBitsToFloat; @@ -176,7 +176,7 @@ private void resetBuffers(Map> buffers) t buffers.clear(); query.resetBuffers(); - validityMaps = new short[columnHandles.size()][maxBufferSize]; + validityMaps = new short[columnHandles.size()][]; // Loop through each column for (int channel = 0; channel < columnHandles.size(); channel++) { // Datatype @@ -195,9 +195,16 @@ private void resetBuffers(Map> buffers) t if (isVariableLength) { offsets = new NativeArray(ctx, maxBufferSize, Datatype.TILEDB_UINT64); } - if (!columnHandle.getIsDimension()) { - Arrays.fill(validityMaps[channel], (short) 1); // all valid + + // allocate validity buffers for nullable attributes only. + if (array.getSchema().hasAttribute(columnName)) { + Attribute attribute = array.getSchema().getAttribute(columnHandle.getColumnName()); + if (attribute.getNullable()) { + validityMaps[channel] = new short[maxBufferSize / type.getNativeSize()]; //allocate for the max amount of values + Arrays.fill(validityMaps[channel], (short) 1); // all valid + } } + buffers.put(columnName, new Pair<>(offsets, values)); } } @@ -528,7 +535,7 @@ else if (DATE.equals(type)) { columnBuffer.setItem(bufferPosition, value); } - else if (TIMESTAMP.equals(type)) { + else if (TIMESTAMP_MILLIS.equals(type)) { columnBuffer.setItem(bufferPosition, type.getLong(block, position)); } else { diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBPageSinkProvider.java b/src/main/java/io/trino/plugin/tiledb/TileDBPageSinkProvider.java index 2aa8376097..63b4a6ab3c 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBPageSinkProvider.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBPageSinkProvider.java @@ -16,6 +16,7 @@ import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorOutputTableHandle; import io.trino.spi.connector.ConnectorPageSink; +import io.trino.spi.connector.ConnectorPageSinkId; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTransactionHandle; @@ -39,14 +40,14 @@ public TileDBPageSinkProvider(TileDBClient tileDBClient) } @Override - public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle tableHandle) + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, ConnectorPageSinkId pageSinkId) { - return new TileDBPageSink((TileDBOutputTableHandle) tableHandle, tileDBClient, session); + return new TileDBPageSink((TileDBOutputTableHandle) outputTableHandle, tileDBClient, session); } @Override - public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle) + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, ConnectorPageSinkId pageSinkId) { - return new TileDBPageSink((TileDBOutputTableHandle) tableHandle, tileDBClient, session); + return new TileDBPageSink((TileDBOutputTableHandle) insertTableHandle, tileDBClient, session); } } diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBRecordCursor.java b/src/main/java/io/trino/plugin/tiledb/TileDBRecordCursor.java index 5d68cebd40..d06c672421 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBRecordCursor.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBRecordCursor.java @@ -919,7 +919,7 @@ public long getCompletedBytes() } @Override - public long getSystemMemoryUsage() + public long getMemoryUsage() { return calculateNativeArrayByteSizes(); } diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBRecordSetProvider.java b/src/main/java/io/trino/plugin/tiledb/TileDBRecordSetProvider.java index 196479b446..fdc56cc8e8 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBRecordSetProvider.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBRecordSetProvider.java @@ -18,6 +18,7 @@ import io.trino.spi.connector.ConnectorRecordSetProvider; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.RecordSet; @@ -44,7 +45,7 @@ public TileDBRecordSetProvider(TileDBClient tileDBClient) } @Override - public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List columns) + public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List columns) { TileDBSplit tileDBSplit = (TileDBSplit) split; diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBSplitManager.java b/src/main/java/io/trino/plugin/tiledb/TileDBSplitManager.java index 3b6d8ae0a8..ac1da5e569 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBSplitManager.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBSplitManager.java @@ -13,7 +13,9 @@ */ package io.trino.plugin.tiledb; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; import io.airlift.log.Logger; import io.tiledb.java.api.Array; import io.tiledb.java.api.EncryptionType; @@ -26,13 +28,17 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorSplitSource; -import io.trino.spi.connector.ConnectorTableLayoutHandle; +import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.Constraint; +import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.FixedSplitSource; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.predicate.ValueSet; +import io.trino.spi.type.Type; +import io.trino.spi.type.VarcharType; import org.apache.commons.beanutils.ConvertUtils; import javax.inject.Inject; @@ -41,12 +47,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import static io.airlift.slice.Slices.utf8Slice; +import static io.tiledb.java.api.QueryType.TILEDB_READ; +import static io.trino.plugin.tiledb.TileDBErrorCode.TILEDB_RECORD_SET_ERROR; import static io.trino.plugin.tiledb.TileDBErrorCode.TILEDB_SPLIT_MANAGER_ERROR; import static io.trino.plugin.tiledb.TileDBSessionProperties.getEncryptionKey; +import static io.trino.plugin.tiledb.TileDBSessionProperties.getSplitOnlyPredicates; import static io.trino.spi.predicate.Range.range; import static io.trino.spi.type.RealType.REAL; +import static java.lang.Float.floatToRawIntBits; import static java.util.Objects.requireNonNull; /** @@ -58,23 +70,23 @@ public class TileDBSplitManager private static final Logger LOG = Logger.get(TileDBSplitManager.class); private final String connectorId; private final TileDBClient tileDBClient; - private TileDBTableLayoutHandle layoutHandle; private final NodeManager nodeManager; + private final TileDBMetadata tileDBMetadata; + @Inject public TileDBSplitManager(TileDBConnectorId connectorId, TileDBClient tileDBClient, NodeManager nodeManager) { this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); this.tileDBClient = requireNonNull(tileDBClient, "client is null"); + this.tileDBMetadata = new TileDBMetadata(connectorId, tileDBClient); } @Override - public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingStrategy splitSchedulingStrategy) + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, DynamicFilter dynamicFilter, Constraint constraint) { - this.layoutHandle = (TileDBTableLayoutHandle) layout; - - TileDBTableHandle tableHandle = layoutHandle.getTable(); + TileDBTableHandle tableHandle = (TileDBTableHandle) table; try { Array array; @@ -90,7 +102,12 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand if (numSplits == -1) { numSplits = nodeManager.getWorkerNodes().size(); } - List> domainRangeSplits = splitTupleDomainOnRanges(layoutHandle.getTupleDomain(), numSplits, TileDBSessionProperties.getSplitOnlyPredicates(session), array.nonEmptyDomain()); + + Object[] domainAndColumnHandlesSize = getTupleDomain(session, tableHandle, constraint); + TupleDomain tupleDomain = (TupleDomain) domainAndColumnHandlesSize[0]; + int dimensionCount = ((Set) domainAndColumnHandlesSize[1]).size(); + + List> domainRangeSplits = splitTupleDomainOnRanges(tupleDomain, numSplits, TileDBSessionProperties.getSplitOnlyPredicates(session), array.nonEmptyDomain(), dimensionCount); List splits = domainRangeSplits.stream().map( tuple -> new TileDBSplit( @@ -106,14 +123,98 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand } } - /** + /** + * Returns the tuple domain and the column handles + * @param session The connector session + * @param tableHandle The table handle + * @param constraint The constraint + * @return an Object[] array with a size of 2 to return both objects + */ + private Object[] getTupleDomain(ConnectorSession session, TileDBTableHandle tableHandle, Constraint constraint) + { + Object[] result = new Object[2]; + + TupleDomain effectivePredicate = constraint.getSummary(); + + Map columns = tileDBMetadata.getColumnHandles(session, tableHandle); + + Set dimensionHandles = columns.values().stream() + .filter(e -> ((TileDBColumnHandle) e).getIsDimension()) + .collect(Collectors.toSet()); + + // The only enforceable constraints are ones for dimension columns + Map enforceableDimensionDomains = new HashMap<>(Maps.filterKeys(effectivePredicate.getDomains().get(), Predicates.in(dimensionHandles))); + + if (!getSplitOnlyPredicates(session)) { + try { + Array array; + String key = getEncryptionKey(session); + if (key == null) { + array = new Array(tileDBClient.buildContext(session), tableHandle.getURI(), TILEDB_READ); + } + else { + array = new Array(tileDBClient.buildContext(session), tableHandle.getURI(), TILEDB_READ, EncryptionType.TILEDB_AES_256_GCM, key.getBytes()); + } + + HashMap nonEmptyDomain = array.nonEmptyDomain(); + // Find any dimension which do not have predicates and add one for the entire domain. + // This is required so we can later split on the predicates + for (ColumnHandle dimensionHandle : dimensionHandles) { + if (!enforceableDimensionDomains.containsKey(dimensionHandle)) { + TileDBColumnHandle columnHandle = ((TileDBColumnHandle) dimensionHandle); + if (nonEmptyDomain.containsKey(columnHandle.getColumnName())) { + Pair domain = nonEmptyDomain.get(columnHandle.getColumnName()); + Object nonEmptyMin = domain.getFirst(); + Object nonEmptyMax = domain.getSecond(); + Type type = columnHandle.getColumnType(); + if (nonEmptyMin == null || nonEmptyMax == null || nonEmptyMin.equals("") || nonEmptyMax.equals("")) { + continue; + } + + Range range; + if (REAL.equals(type)) { + range = Range.range(type, ((Integer) floatToRawIntBits((Float) nonEmptyMin)).longValue(), true, + ((Integer) floatToRawIntBits((Float) nonEmptyMax)).longValue(), true); + } + else if (type instanceof VarcharType) { + range = Range.range(type, utf8Slice(nonEmptyMin.toString()), true, + utf8Slice(nonEmptyMax.toString()), true); + } + else { + range = Range.range(type, + ConvertUtils.convert(nonEmptyMin, type.getJavaType()), true, + ConvertUtils.convert(nonEmptyMax, type.getJavaType()), true); + } + + enforceableDimensionDomains.put( + dimensionHandle, + Domain.create(ValueSet.ofRanges(range), false)); + } + } + } + array.close(); + } + catch (TileDBError tileDBError) { + throw new TrinoException(TILEDB_RECORD_SET_ERROR, tileDBError); + } + } + + TupleDomain enforceableTupleDomain = TupleDomain.withColumnDomains(enforceableDimensionDomains); + result[0] = enforceableTupleDomain; + result[1] = dimensionHandles; + return result; + } + + /** * Split tuple domain if there are multiple ranges + * * @param tupleDomain * @param splitOnlyPredicates * @param nonEmptyDomains - * @return + * @param dimensionCount + * @return */ - private List> splitTupleDomainOnRanges(TupleDomain tupleDomain, int splits, boolean splitOnlyPredicates, HashMap nonEmptyDomains) + private List> splitTupleDomainOnRanges(TupleDomain tupleDomain, int splits, boolean splitOnlyPredicates, HashMap nonEmptyDomains, int dimensionCount) { List>> domainList = new ArrayList<>(); // Loop through each column handle's domain to see if there are ranges to split @@ -123,7 +224,6 @@ private List> splitTupleDomainOnRanges(TupleDomain i.getValue().getValues().getRanges().getRangeCount()) .sum(); - int dimensionCount = this.layoutHandle.getDimensionColumnHandles().size(); if (splitOnlyPredicates) { dimensionCount = ((Long) tupleDomain.getDomains().get().entrySet().stream() .filter(e -> ((TileDBColumnHandle) e.getKey()).getIsDimension()).count()).intValue(); @@ -171,7 +271,7 @@ private List> splitTupleDomainOnRanges(TupleDomain> results = new ArrayList<>(); // Create combination for all tuple domains - GenerateCombinationTupleDomains(domainList, intermediateResults, 0, new HashMap<>()); + generateCombinationTupleDomains(domainList, intermediateResults, 0, new HashMap<>()); for (Map domain : intermediateResults) { results.add(TupleDomain.withColumnDomains(domain)); } @@ -186,7 +286,7 @@ private List> splitTupleDomainOnRanges(TupleDomain>> lists, List> result, int depth, Map current) + private void generateCombinationTupleDomains(List>> lists, List> result, int depth, Map current) { if (depth == lists.size()) { result.add(current); @@ -200,7 +300,7 @@ private void GenerateCombinationTupleDomains(List currentCopy = current.entrySet().stream() .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); - GenerateCombinationTupleDomains(lists, result, depth + 1, currentCopy); + generateCombinationTupleDomains(lists, result, depth + 1, currentCopy); } } diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBTable.java b/src/main/java/io/trino/plugin/tiledb/TileDBTable.java index 847e4d847b..8dc747fd2e 100644 --- a/src/main/java/io/trino/plugin/tiledb/TileDBTable.java +++ b/src/main/java/io/trino/plugin/tiledb/TileDBTable.java @@ -30,6 +30,7 @@ import java.net.URI; import java.util.List; +import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; @@ -73,7 +74,15 @@ public TileDBTable( // Add dimensions as a column for (Dimension dimension : domain.getDimensions()) { Type type = prestoTypeFromTileDBType(dimension.getType()); - columnsMetadata.add(new ColumnMetadata(dimension.getName(), type, "Dimension", null, false)); + ColumnMetadata columnMetadata = ColumnMetadata.builder() + .setName(dimension.getName()) + .setType(type) + .setExtraInfo(Optional.empty()) + .setComment(Optional.of("Dimension")) + .setHidden(false) + .build(); + + columnsMetadata.add(columnMetadata); columns.add(new TileDBColumn(dimension.getName(), type, dimension.getType(), dimension.isVar(), true, false)); dimension.close(); } @@ -84,7 +93,14 @@ public TileDBTable( if (attribute.getType() == Datatype.TILEDB_CHAR && !attribute.isVar()) { type = VarcharType.createVarcharType(toIntExact(attribute.getCellValNum())); } - columnsMetadata.add(new ColumnMetadata(attribute.getName(), type, "Attribute", null, false)); + ColumnMetadata columnMetadata = ColumnMetadata.builder() + .setName(attribute.getName()) + .setType(type) + .setExtraInfo(Optional.empty()) + .setComment(Optional.of("Attribute")) + .setHidden(false) + .build(); + columnsMetadata.add(columnMetadata); columns.add(new TileDBColumn(attribute.getName(), type, attribute.getType(), attribute.isVar(), false, attribute.getNullable())); attribute.close(); } diff --git a/src/main/java/io/trino/plugin/tiledb/TileDBTableLayoutHandle.java b/src/main/java/io/trino/plugin/tiledb/TileDBTableLayoutHandle.java deleted file mode 100644 index 170f3e3761..0000000000 --- a/src/main/java/io/trino/plugin/tiledb/TileDBTableLayoutHandle.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.tiledb; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import io.trino.spi.connector.ColumnHandle; -import io.trino.spi.connector.ConnectorTableLayoutHandle; -import io.trino.spi.predicate.TupleDomain; - -import java.util.Objects; -import java.util.Set; - -import static java.util.Objects.requireNonNull; - -/** - * TileDBTableLayoutHandle implements the thin interface for a connector table handler. The main purpose is to - * hold the table handler instance and the tupleDomain which contains the constraints for each column - */ -public class TileDBTableLayoutHandle - implements ConnectorTableLayoutHandle -{ - private final TileDBTableHandle table; - private final TupleDomain tupleDomain; - private final Set dimensionColumnHandles; - - @JsonCreator - public TileDBTableLayoutHandle( - @JsonProperty("table") TileDBTableHandle table, - @JsonProperty("tupleDomain") TupleDomain domain, - @JsonProperty("dimensionColumnHandles") Set dimensionColumnHandles) - { - this.table = table; - this.tupleDomain = requireNonNull(domain, "tupleDomain is null"); - this.dimensionColumnHandles = requireNonNull(dimensionColumnHandles, "dimensionColumnHandles is null"); - } - - @JsonProperty - public TileDBTableHandle getTable() - { - return table; - } - - @JsonProperty - public TupleDomain getTupleDomain() - { - return tupleDomain; - } - - @JsonProperty - public Set getDimensionColumnHandles() - { - return dimensionColumnHandles; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TileDBTableLayoutHandle that = (TileDBTableLayoutHandle) o; - return Objects.equals(table, that.table) && - Objects.equals(tupleDomain, that.tupleDomain); - } - - @Override - public int hashCode() - { - return Objects.hash(table, tupleDomain); - } - - @Override - public String toString() - { - return table.toString(); - } -} diff --git a/src/test/java/io/trino/plugin/tiledb/TestTileDBConfig.java b/src/test/java/io/trino/plugin/tiledb/TestTileDBConfig.java index 80dd6b99a3..fd35591188 100644 --- a/src/test/java/io/trino/plugin/tiledb/TestTileDBConfig.java +++ b/src/test/java/io/trino/plugin/tiledb/TestTileDBConfig.java @@ -30,7 +30,7 @@ public void testDefaults() assertRecordedDefaults(recordDefaults(TileDBConfig.class) .setArrayURIs(null) .setReadBufferSize(10485760) - .setWriteBufferSize(10485760) + .setWriteBufferSize(1048576) .setAwsAccessKeyId(null) .setAwsSecretAccessKey(null) .setTileDBConfig(null)); @@ -64,7 +64,7 @@ public void testExplicitPropertyMappingsWithMultipleArrays() Map properties = new ImmutableMap.Builder() .put("array-uris", "file:///test,s3:///test-bucket/array") .put("read-buffer-size", "1048576") - .put("write-buffer-size", "1048576") + .put("write-buffer-size", "104857") .put("aws-access-key-id", "123") .put("aws-secret-access-key", "abc") .put("tiledb-config", "key1=value1,key2=value2").build(); @@ -73,7 +73,7 @@ public void testExplicitPropertyMappingsWithMultipleArrays() expected = new TileDBConfig() .setArrayURIs("file:///test,s3:///test-bucket/array") .setReadBufferSize(1048576) - .setWriteBufferSize(1048576) + .setWriteBufferSize(104857) .setAwsAccessKeyId("123") .setAwsSecretAccessKey("abc") .setTileDBConfig("key1=value1,key2=value2"); diff --git a/src/test/java/io/trino/plugin/tiledb/TestTileDBIntegrationSmokeTest.java b/src/test/java/io/trino/plugin/tiledb/TestTileDBIntegrationSmokeTest.java index 934b5cba31..200d1fc1aa 100644 --- a/src/test/java/io/trino/plugin/tiledb/TestTileDBIntegrationSmokeTest.java +++ b/src/test/java/io/trino/plugin/tiledb/TestTileDBIntegrationSmokeTest.java @@ -16,14 +16,15 @@ import io.airlift.tpch.TpchTable; import io.tiledb.java.api.Context; import io.tiledb.java.api.TileDBError; +import io.tiledb.java.api.TileDBObject; import io.trino.spi.TrinoException; -import io.trino.testing.AbstractTestIntegrationSmokeTest; +import io.trino.spi.type.VarcharType; +import io.trino.testing.AbstractTestQueries; import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; -import static io.tiledb.java.api.TileDBObject.remove; import static io.trino.plugin.tiledb.TileDBErrorCode.TILEDB_UNEXPECTED_ERROR; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.assertions.Assert.assertEquals; @@ -31,19 +32,11 @@ @Test public class TestTileDBIntegrationSmokeTest - extends AbstractTestIntegrationSmokeTest + extends AbstractTestQueries { - private Context ctx; - public TestTileDBIntegrationSmokeTest() { super(); - try { - ctx = new Context(); - } - catch (TileDBError tileDBError) { - throw new TrinoException(TILEDB_UNEXPECTED_ERROR, tileDBError); - } } protected boolean isParameterizedVarcharSupported() @@ -57,6 +50,23 @@ protected QueryRunner createQueryRunner() throws Exception return TileDBQueryRunner.createTileDBQueryRunner(); } + @Override + public void testShowColumns() + { + MaterializedResult actual = this.computeActual("SHOW COLUMNS FROM orders"); + MaterializedResult expected = MaterializedResult.resultBuilder(this.getSession(), VarcharType.VARCHAR, VarcharType.VARCHAR, VarcharType.VARCHAR, VarcharType.VARCHAR) + .row("orderkey", "bigint", "", "Dimension") + .row("custkey", "bigint", "", "Dimension") + .row("orderstatus", "varchar(1)", "", "Attribute") + .row("totalprice", "double", "", "Attribute") + .row("orderdate", "date", "", "Attribute") + .row("orderpriority", "varchar", "", "Attribute") + .row("clerk", "varchar", "", "Attribute") + .row("shippriority", "integer", "", "Attribute") + .row(new Object[]{"comment", "varchar", "", "Attribute"}).build(); + assertThat(actual.equals(expected)); + } + @Test public void testDescribeTable() { @@ -84,16 +94,18 @@ public void testShowCreateTable() } @AfterClass(alwaysRun = true) - public final void destroy() + public final void destroy() throws TileDBError { + Context context = new Context(); for (TpchTable table : TpchTable.getTables()) { try { - remove(ctx, table.getTableName()); + TileDBObject.remove(context, table.getTableName()); } catch (TileDBError tileDBError) { throw new TrinoException(TILEDB_UNEXPECTED_ERROR, tileDBError); } } + context.close(); } private MaterializedResult getExpectedOrdersTableDescription(boolean parametrizedVarchar) diff --git a/src/test/java/io/trino/plugin/tiledb/TestTileDBQueries.java b/src/test/java/io/trino/plugin/tiledb/TestTileDBQueries.java index f09eb51356..8c9138b3bd 100644 --- a/src/test/java/io/trino/plugin/tiledb/TestTileDBQueries.java +++ b/src/test/java/io/trino/plugin/tiledb/TestTileDBQueries.java @@ -30,13 +30,10 @@ import io.tiledb.java.api.Query; import io.tiledb.java.api.TileDBError; import io.tiledb.java.api.TileDBObject; -import io.trino.spi.TrinoException; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.io.File; @@ -58,7 +55,6 @@ import static io.tiledb.java.api.Layout.TILEDB_GLOBAL_ORDER; import static io.tiledb.java.api.Layout.TILEDB_ROW_MAJOR; import static io.tiledb.java.api.QueryType.TILEDB_WRITE; -import static io.trino.plugin.tiledb.TileDBErrorCode.TILEDB_UNEXPECTED_ERROR; import static io.trino.plugin.tiledb.TileDBQueryRunner.createTileDBQueryRunner; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateType.DATE; @@ -66,7 +62,7 @@ import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; import static io.trino.spi.type.SmallintType.SMALLINT; -import static io.trino.spi.type.TimestampType.TIMESTAMP; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.assertions.Assert.assertEquals; @@ -78,37 +74,8 @@ public class TestTileDBQueries extends AbstractTestQueryFramework { - private Context ctx; - private String denseURI; - private String sparseURI; - - @BeforeClass - public void setup() throws Exception - { - denseURI = "dense_array"; - sparseURI = "sparse_array"; - if (Files.exists(Paths.get(denseURI))) { - TileDBObject.remove(ctx, denseURI); - } - denseArrayNullableCreate(); - denseArrayNullableWrite(); - if (Files.exists(Paths.get(sparseURI))) { - TileDBObject.remove(ctx, sparseURI); - } - sparseArrayNullableCreate(); - sparseArrayNullableWrite(); - } - - public TestTileDBQueries() - { - super(); - try { - ctx = new Context(); - } - catch (TileDBError tileDBError) { - throw new TrinoException(TILEDB_UNEXPECTED_ERROR, tileDBError); - } - } + private final String denseURI = "dense_array"; + private final String sparseURI = "sparse_array"; @Override protected QueryRunner createQueryRunner() throws Exception @@ -116,17 +83,6 @@ protected QueryRunner createQueryRunner() throws Exception return createTileDBQueryRunner(); } - @AfterClass(alwaysRun = true) - public final void destroy() throws TileDBError - { - if (Files.exists(Paths.get(denseURI))) { - TileDBObject.remove(ctx, denseURI); - } - if (Files.exists(Paths.get(sparseURI))) { - TileDBObject.remove(ctx, sparseURI); - } - } - @Test public void testCreate1DVector() { @@ -463,7 +419,7 @@ public void testCreate1DVectorTimestamp() String selectSql = format("SELECT * FROM %s ORDER BY x ASC", arrayName); MaterializedResult selectResult = computeActual(selectSql); - assertEquals(selectResult, MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), TIMESTAMP, INTEGER) + assertEquals(selectResult, MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), TIMESTAMP_MILLIS, INTEGER) .row(LocalDateTime.of(2012, 10, 10, 10, 0, 0), 10) .row(LocalDateTime.of(2012, 11, 10, 10, 0, 0), 13) .row(LocalDateTime.of(2012, 12, 10, 10, 0, 0), 15) @@ -471,7 +427,7 @@ public void testCreate1DVectorTimestamp() selectSql = format("SELECT * FROM %s WHERE x > timestamp '2012-11-10 10:00' ORDER BY x ASC", arrayName); selectResult = computeActual(selectSql); - assertEquals(selectResult, MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), TIMESTAMP, INTEGER) + assertEquals(selectResult, MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), TIMESTAMP_MILLIS, INTEGER) .row(LocalDateTime.of(2012, 12, 10, 10, 0, 0), 15) .build()); @@ -1163,64 +1119,6 @@ public void testInsertOrderIssue() ====================================================== */ - /** - * Dense array with nullable and fixed size attributes. - * - * @throws Exception - */ - private void denseArrayNullableCreate() throws Exception - { - Dimension rows = - new Dimension<>(ctx, "rows", Integer.class, new Pair(1, 2), 2); - Dimension cols = - new Dimension<>(ctx, "cols", Integer.class, new Pair(1, 2), 2); - - // Create and set getDomain - Domain domain = new Domain(ctx); - domain.addDimension(rows); - domain.addDimension(cols); - - Attribute a1 = new Attribute(ctx, "a1", Float.class); - Attribute a2 = new Attribute(ctx, "a2", Integer.class); - a2.setCellValNum(1); - - a1.setNullable(true); - a2.setNullable(true); - - ArraySchema schema = new ArraySchema(ctx, TILEDB_DENSE); - schema.setTileOrder(TILEDB_ROW_MAJOR); - schema.setCellOrder(TILEDB_ROW_MAJOR); - schema.setDomain(domain); - schema.addAttribute(a1); - schema.addAttribute(a2); - - Array.create(denseURI, schema); - } - - /** - * Populating the dense array with nullable attributes. - * @throws Exception - */ - private void denseArrayNullableWrite() throws Exception - { - // Prepare cell buffers - NativeArray a1 = new NativeArray(ctx, new float[] {2.0f, 3.0f, 4.0f, 1.0f}, Float.class); - NativeArray a2 = new NativeArray(ctx, new int[] {1, 4, 2, 2}, Integer.class); - - // Create query - try (Array array = new Array(ctx, denseURI, TILEDB_WRITE); Query query = new Query(array)) { - query.setLayout(TILEDB_ROW_MAJOR); - NativeArray a1Bytemap = new NativeArray(ctx, new short[] {0, 1, 1, 0}, Datatype.TILEDB_UINT8); - NativeArray a2Bytemap = new NativeArray(ctx, new short[] {1, 1, 0, 1}, Datatype.TILEDB_UINT8); - - query.setBufferNullable("a1", a1, a1Bytemap); - query.setBufferNullable("a2", a2, a2Bytemap); - - // Submit query - query.submit(); - } - } - /** * Creates an one-dimensional array to test dimension filtering. */ @@ -1298,8 +1196,52 @@ public void testDimensionFiltering() * Reads a two-dimensional dense array with nullable attributes. */ @Test - public void test2DVectorNullableDense() + public void test2DVectorNullableDense() throws Exception { + Context ctx = new Context(); + Dimension rows = + new Dimension<>(ctx, "rows", Integer.class, new Pair(1, 2), 2); + Dimension cols = + new Dimension<>(ctx, "cols", Integer.class, new Pair(1, 2), 2); + + // Create and set getDomain + Domain domain = new Domain(ctx); + domain.addDimension(rows); + domain.addDimension(cols); + + Attribute a1 = new Attribute(ctx, "a1", Float.class); + Attribute a2 = new Attribute(ctx, "a2", Integer.class); + a2.setCellValNum(1); + + a1.setNullable(true); + a2.setNullable(true); + + ArraySchema schema = new ArraySchema(ctx, TILEDB_DENSE); + schema.setTileOrder(TILEDB_ROW_MAJOR); + schema.setCellOrder(TILEDB_ROW_MAJOR); + schema.setDomain(domain); + schema.addAttribute(a1); + schema.addAttribute(a2); + + Array.create(denseURI, schema); + + // Prepare cell buffers + NativeArray na1 = new NativeArray(ctx, new float[] {2.0f, 3.0f, 4.0f, 1.0f}, Float.class); + NativeArray na2 = new NativeArray(ctx, new int[] {1, 4, 2, 2}, Integer.class); + + // Create query + try (Array array = new Array(ctx, denseURI, TILEDB_WRITE); Query query = new Query(array)) { + query.setLayout(TILEDB_ROW_MAJOR); + NativeArray a1Bytemap = new NativeArray(ctx, new short[] {0, 1, 1, 0}, Datatype.TILEDB_UINT8); + NativeArray a2Bytemap = new NativeArray(ctx, new short[] {1, 1, 0, 1}, Datatype.TILEDB_UINT8); + + query.setBufferNullable("a1", na1, a1Bytemap); + query.setBufferNullable("a2", na2, a2Bytemap); + + // Submit query + query.submit(); + } + String selectSql = format("SELECT * FROM %s ORDER BY rows ASC", denseURI); MaterializedResult selectResult = computeActual(selectSql); List resultRows = selectResult.getMaterializedRows(); @@ -1310,15 +1252,20 @@ public void test2DVectorNullableDense() .row(2, 2, null, 2) .build(); assertEquals(expected.toString(), selectResult.toString()); + + if (Files.exists(Paths.get(denseURI))) { + TileDBObject.remove(ctx, denseURI); + } + ctx.close(); } /** - * Sparse array with nullable and variable sized attributes. - * - * @throws TileDBError + * Reads a two-dimensional sparse array with nullable attributes. */ - private void sparseArrayNullableCreate() throws TileDBError + @Test + public void test2DVectorNullableSparse() throws TileDBError { + Context ctx = new Context(); Dimension d1 = new Dimension<>(ctx, "d1", Integer.class, new Pair(1, 8), 2); @@ -1341,18 +1288,13 @@ private void sparseArrayNullableCreate() throws TileDBError schema.addAttribute(a2); Array.create(sparseURI, schema); - } - /** - * Populating the sparse array with nullable attributes. - * @throws TileDBError - */ - private void sparseArrayNullableWrite() throws TileDBError - { + //write array + NativeArray data = new NativeArray(ctx, new int[] {1, 2, 3, 4, 5}, Integer.class); // Prepare cell buffers - NativeArray a1 = new NativeArray(ctx, new int[] {1, 2, 3, 4, 5}, Integer.class); + NativeArray na1 = new NativeArray(ctx, new int[] {1, 2, 3, 4, 5}, Integer.class); NativeArray a2Data = new NativeArray(ctx, "aabbccddee", Datatype.TILEDB_STRING_ASCII); NativeArray a2Off = new NativeArray(ctx, new long[] {0, 2, 4, 6, 8}, Datatype.TILEDB_UINT64); @@ -1368,7 +1310,7 @@ private void sparseArrayNullableWrite() throws TileDBError new NativeArray(ctx, new short[] {1, 1, 1, 0, 0}, Datatype.TILEDB_UINT8); query.setBuffer("d1", data); - query.setBufferNullable("a1", a1, a1ByteMap); + query.setBufferNullable("a1", na1, a1ByteMap); query.setBufferNullable("a2", a2Off, a2Data, a2ByteMap); // Submit query @@ -1377,14 +1319,6 @@ private void sparseArrayNullableWrite() throws TileDBError query.finalizeQuery(); query.close(); array.close(); - } - - /** - * Reads a two-dimensional sparse array with nullable attributes. - */ - @Test - public void test2DVectorNullableSparse() - { String selectSql = format("SELECT * FROM %s ORDER BY d1 ASC", sparseURI); MaterializedResult selectResult = computeActual(selectSql); MaterializedResult expected = MaterializedResult.resultBuilder(getQueryRunner().getDefaultSession(), INTEGER, INTEGER, VARCHAR) @@ -1394,8 +1328,12 @@ public void test2DVectorNullableSparse() .row(4, 4, null) .row(5, 5, null) .build(); - assertEquals(expected.toString(), selectResult.toString()); + + if (Files.exists(Paths.get(sparseURI))) { + TileDBObject.remove(ctx, sparseURI); + } + ctx.close(); } /** @@ -1735,19 +1673,9 @@ private void dropArray(String arrayName) queryRunner.execute(dropSql); } - private void removeArrayIfExists(String arrayName) - { - try { - TileDBObject.remove(ctx, arrayName); - } - catch (Exception e) { - // Do nothing - } - } - private void createYearArray(String arrayName) throws TileDBError { - removeArrayIfExists(arrayName); + Context ctx = new Context(); // Create dimensions Dimension d1 = new Dimension(ctx, "d1", Datatype.TILEDB_INT32, new Pair(0, 3), 4); @@ -1844,5 +1772,6 @@ private void createYearArray(String arrayName) throws TileDBError // Submit query query.submit(); } + ctx.close(); } }