Skip to content

Commit

Permalink
Remove usage of InputFormat for generating symlink splits
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Sep 1, 2023
1 parent 13268e9 commit 59732e7
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 466 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;

import java.io.File;
import java.util.List;
Expand Down Expand Up @@ -61,14 +60,6 @@ public static void copy(Configuration from, Configuration to)
}
}

public static JobConf toJobConf(Configuration conf)
{
if (conf instanceof JobConf) {
return (JobConf) conf;
}
return new JobConf(conf);
}

public static Configuration readConfiguration(List<File> resourcePaths)
{
Configuration result = newEmptyConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ public S3SelectTestHelper(String host,
hivePartitionManager,
new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS),
new HdfsNamenodeStats(),
hdfsEnvironment,
new BoundedExecutor(executorService, this.hiveConfig.getMaxSplitIteratorThreads()),
new CounterStat(),
this.hiveConfig.getMaxOutstandingSplits(),
Expand Down
5 changes: 0 additions & 5 deletions plugin/trino-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"hive.assume-canonical-partition-keys",
"hive.partition-use-column-names",
"hive.allow-corrupt-writes-for-testing",
"hive.optimize-symlink-listing",
})
public class HiveConfig
{
Expand Down Expand Up @@ -163,8 +164,6 @@ public class HiveConfig

private HiveTimestampPrecision timestampPrecision = HiveTimestampPrecision.DEFAULT_PRECISION;

private boolean optimizeSymlinkListing = true;

private Optional<String> icebergCatalogName = Optional.empty();
private Optional<String> deltaLakeCatalogName = Optional.empty();
private Optional<String> hudiCatalogName = Optional.empty();
Expand Down Expand Up @@ -1186,19 +1185,6 @@ public HiveConfig setTimestampPrecision(HiveTimestampPrecision timestampPrecisio
return this;
}

public boolean isOptimizeSymlinkListing()
{
return this.optimizeSymlinkListing;
}

@Config("hive.optimize-symlink-listing")
@ConfigDescription("Optimize listing for SymlinkTextFormat tables with files in a single directory")
public HiveConfig setOptimizeSymlinkListing(boolean optimizeSymlinkListing)
{
this.optimizeSymlinkListing = optimizeSymlinkListing;
return this;
}

public Optional<String> getIcebergCatalogName()
{
return icebergCatalogName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public final class HiveSessionProperties
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String TIMESTAMP_PRECISION = "timestamp_precision";
private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout";
private static final String OPTIMIZE_SYMLINK_LISTING = "optimize_symlink_listing";
private static final String HIVE_VIEWS_LEGACY_TRANSLATION = "hive_views_legacy_translation";
private static final String ICEBERG_CATALOG_NAME = "iceberg_catalog_name";
public static final String DELTA_LAKE_CATALOG_NAME = "delta_lake_catalog_name";
Expand Down Expand Up @@ -485,11 +484,6 @@ public HiveSessionProperties(
"Duration to wait for completion of dynamic filters during split generation",
hiveConfig.getDynamicFilteringWaitTimeout(),
false),
booleanProperty(
OPTIMIZE_SYMLINK_LISTING,
"Optimize listing for SymlinkTextFormat tables with files in a single directory",
hiveConfig.isOptimizeSymlinkListing(),
false),
booleanProperty(
HIVE_VIEWS_LEGACY_TRANSLATION,
"Use legacy Hive view translation mechanism",
Expand Down Expand Up @@ -835,11 +829,6 @@ public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session)
return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class);
}

public static boolean isOptimizeSymlinkListing(ConnectorSession session)
{
return session.getProperty(OPTIMIZE_SYMLINK_LISTING, Boolean.class);
}

public static boolean isHiveViewsLegacyTranslation(ConnectorSession session)
{
return session.getProperty(HIVE_VIEWS_LEGACY_TRANSLATION, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.airlift.stats.CounterStat;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.HdfsNamenodeStats;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Partition;
Expand Down Expand Up @@ -75,7 +74,6 @@
import static io.trino.plugin.hive.HiveSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.trino.plugin.hive.HiveSessionProperties.isIgnoreAbsentPartitions;
import static io.trino.plugin.hive.HiveSessionProperties.isOptimizeSymlinkListing;
import static io.trino.plugin.hive.HiveSessionProperties.isPropagateTableScanSortingProperties;
import static io.trino.plugin.hive.HiveSessionProperties.isUseOrcColumnNames;
import static io.trino.plugin.hive.HiveSessionProperties.isUseParquetColumnNames;
Expand Down Expand Up @@ -105,7 +103,6 @@ public class HiveSplitManager
private final HivePartitionManager partitionManager;
private final TrinoFileSystemFactory fileSystemFactory;
private final HdfsNamenodeStats hdfsNamenodeStats;
private final HdfsEnvironment hdfsEnvironment;
private final Executor executor;
private final int maxOutstandingSplits;
private final DataSize maxOutstandingSplitsSize;
Expand All @@ -126,7 +123,6 @@ public HiveSplitManager(
HivePartitionManager partitionManager,
TrinoFileSystemFactory fileSystemFactory,
HdfsNamenodeStats hdfsNamenodeStats,
HdfsEnvironment hdfsEnvironment,
ExecutorService executorService,
VersionEmbedder versionEmbedder,
TypeManager typeManager)
Expand All @@ -136,7 +132,6 @@ public HiveSplitManager(
partitionManager,
fileSystemFactory,
hdfsNamenodeStats,
hdfsEnvironment,
versionEmbedder.embedVersion(new BoundedExecutor(executorService, hiveConfig.getMaxSplitIteratorThreads())),
new CounterStat(),
hiveConfig.getMaxOutstandingSplits(),
Expand All @@ -156,7 +151,6 @@ public HiveSplitManager(
HivePartitionManager partitionManager,
TrinoFileSystemFactory fileSystemFactory,
HdfsNamenodeStats hdfsNamenodeStats,
HdfsEnvironment hdfsEnvironment,
Executor executor,
CounterStat highMemorySplitSourceCounter,
int maxOutstandingSplits,
Expand All @@ -174,7 +168,6 @@ public HiveSplitManager(
this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.hdfsNamenodeStats = requireNonNull(hdfsNamenodeStats, "hdfsNamenodeStats is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.executor = new ErrorCodedExecutor(executor);
this.highMemorySplitSourceCounter = requireNonNull(highMemorySplitSourceCounter, "highMemorySplitSourceCounter is null");
checkArgument(maxOutstandingSplits >= 1, "maxOutstandingSplits must be at least 1");
Expand Down Expand Up @@ -259,14 +252,12 @@ public ConnectorSplitSource getSplits(
createBucketSplitInfo(bucketHandle, bucketFilter),
session,
fileSystemFactory,
hdfsEnvironment,
hdfsNamenodeStats,
transactionalMetadata.getDirectoryLister(),
executor,
splitLoaderConcurrency,
recursiveDfsWalkerEnabled,
!hiveTable.getPartitionColumns().isEmpty() && isIgnoreAbsentPartitions(session),
isOptimizeSymlinkListing(session),
metastore.getValidWriteIds(session, hiveTable)
.map(value -> value.getTableValidWriteIdList(table.getDatabaseName() + "." + table.getTableName())),
hiveTable.getMaxScannedFileSize(),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@
import io.trino.spi.HostAddress;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.FileSplit;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -54,7 +50,6 @@

public class InternalHiveSplitFactory
{
private final FileSystem fileSystem;
private final String partitionName;
private final HiveStorageFormat storageFormat;
private final Properties strippedSchema;
Expand All @@ -70,7 +65,6 @@ public class InternalHiveSplitFactory
private final boolean s3SelectPushdownEnabled;

public InternalHiveSplitFactory(
FileSystem fileSystem,
String partitionName,
HiveStorageFormat storageFormat,
Properties schema,
Expand All @@ -85,7 +79,6 @@ public InternalHiveSplitFactory(
boolean s3SelectPushdownEnabled,
Optional<Long> maxSplitFileSize)
{
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
this.partitionName = requireNonNull(partitionName, "partitionName is null");
this.storageFormat = requireNonNull(storageFormat, "storageFormat is null");
this.strippedSchema = stripUnnecessaryProperties(requireNonNull(schema, "schema is null"));
Expand Down Expand Up @@ -134,23 +127,6 @@ public Optional<InternalHiveSplit> createInternalHiveSplit(TrinoFileStatus statu
acidInfo);
}

public Optional<InternalHiveSplit> createInternalHiveSplit(FileSplit split)
throws IOException
{
FileStatus file = fileSystem.getFileStatus(split.getPath());
return createInternalHiveSplit(
split.getPath().toString(),
BlockLocation.fromHiveBlockLocations(fileSystem.getFileBlockLocations(file, split.getStart(), split.getLength())),
split.getStart(),
split.getLength(),
file.getLen(),
file.getModificationTime(),
OptionalInt.empty(),
OptionalInt.empty(),
false,
Optional.empty());
}

private Optional<InternalHiveSplit> createInternalHiveSplit(
String path,
List<BlockLocation> blockLocations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,6 @@ public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Connect
partitionManager,
new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS),
new HdfsNamenodeStats(),
hdfsEnvironment,
executor,
new CounterStat(),
100,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec
hivePartitionManager,
new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS),
new HdfsNamenodeStats(),
hdfsEnvironment,
new BoundedExecutor(executor, config.getMaxSplitIteratorThreads()),
new CounterStat(),
config.getMaxOutstandingSplits(),
Expand Down
Loading

0 comments on commit 59732e7

Please sign in to comment.