Skip to content

Commit

Permalink
Fix for SystemTables in allowSplittingReadIntoMultipleSubQueries
Browse files Browse the repository at this point in the history
  • Loading branch information
Dith3r committed Sep 23, 2024
1 parent 57ec0d2 commit 82b55e5
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4058,8 +4058,8 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
@Override
public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session, ConnectorTableHandle tableHandle)
{
// delta lake supports only a columnar (parquet) storage format
return true;
// dont split to subqueries if tableHandle is systemTableHandle, delta lake supports only a columnar (parquet) storage format
return tableHandle instanceof DeltaLakeTableHandle;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3984,7 +3984,12 @@ private static Optional<CatalogSchemaTableName> redirectTableToHudi(Optional<Str
@Override
public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session, ConnectorTableHandle tableHandle)
{
SchemaTableName tableName = ((HiveTableHandle) tableHandle).getSchemaTableName();
// dont split to subqueries if tableHandle is systemTableHandle
if (!(tableHandle instanceof HiveTableHandle hiveTableHandle)) {
return false;
}

SchemaTableName tableName = hiveTableHandle.getSchemaTableName();

Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

import io.trino.testing.AbstractTestAggregations;
import io.trino.testing.QueryRunner;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.Test;

import static io.trino.testing.TestingNames.randomNameSuffix;

public class TestHiveDistributedAggregations
extends AbstractTestAggregations
Expand All @@ -27,4 +31,20 @@ protected QueryRunner createQueryRunner()
.setInitialTables(REQUIRED_TPCH_TABLES)
.build();
}

@Test
public void testDistinctAggregationWithSystemTable()
{
String tableName = "test_dist_aggr_" + randomNameSuffix();
@Language("SQL") String createTable = """
CREATE TABLE %s
WITH (
partitioned_by = ARRAY[ 'regionkey', 'nationkey' ]
) AS (SELECT name, comment, regionkey, nationkey FROM nation)
""".formatted(tableName);

assertUpdate(getSession(), createTable, 25);

assertQuerySucceeds("SELECT count(distinct regionkey), count(distinct nationkey) FROM \"%s$partitions\"".formatted(tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
@Override
public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session, ConnectorTableHandle tableHandle)
{
// hudi supports only a columnar (parquet) storage format
return true;
// dont split to subqueries if tableHandle is systemTableHandle, hudi supports only a columnar (parquet) storage format
return tableHandle instanceof HudiTableHandle;
}

HiveMetastore getMetastore()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3312,8 +3312,11 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
@Override
public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session, ConnectorTableHandle connectorTableHandle)
{
IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTableHandle;
IcebergFileFormat storageFormat = getFileFormat(tableHandle.getStorageProperties());
// dont split to subqueries if tableHandle is systemTableHandle
if (!(connectorTableHandle instanceof IcebergTableHandle icebergTableHandle)) {
return false;
}
IcebergFileFormat storageFormat = getFileFormat(icebergTableHandle.getStorageProperties());

return storageFormat == IcebergFileFormat.ORC || storageFormat == IcebergFileFormat.PARQUET;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,8 @@ public Optional<SampleApplicationResult<ConnectorTableHandle>> applySample(Conne
@Override
public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return true;
// dont split to subqueries if tableHandle is systemTableHandle
return tableHandle instanceof MemoryTableHandle;
}

@Override
Expand Down

0 comments on commit 82b55e5

Please sign in to comment.