Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove legacy Hive readers and writers #18241

Merged
merged 12 commits into from
Sep 6, 2023
Merged
82 changes: 0 additions & 82 deletions docs/src/main/sphinx/connector/hive-s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,85 +312,3 @@ classpath and must be able to communicate with your custom key management system
the `org.apache.hadoop.conf.Configurable` interface from the Hadoop Java API, then the Hadoop configuration
is passed in after the object instance is created, and before it is asked to provision or retrieve any
encryption keys.

(s3selectpushdown)=

## S3 Select pushdown

S3 Select pushdown enables pushing down projection (SELECT) and predicate (WHERE)
processing to [S3 Select](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html).
With S3 Select Pushdown, Trino only retrieves the required data from S3 instead
of entire S3 objects, reducing both latency and network usage.

### Is S3 Select a good fit for my workload?

Performance of S3 Select pushdown depends on the amount of data filtered by the
query. Filtering a large number of rows should result in better performance. If
the query doesn't filter any data, then pushdown may not add any additional value
and the user is charged for S3 Select requests. Thus, we recommend that you
benchmark your workloads with and without S3 Select to see if using it may be
suitable for your workload. By default, S3 Select Pushdown is disabled and you
should enable it in production after proper benchmarking and cost analysis. For
more information on S3 Select request cost, please see
[Amazon S3 Cloud Storage Pricing](https://aws.amazon.com/s3/pricing/).

Use the following guidelines to determine if S3 Select is a good fit for your
workload:

- Your query filters out more than half of the original data set.
- Your query filter predicates use columns that have a data type supported by
Trino and S3 Select.
The `TIMESTAMP`, `DECIMAL`, `REAL`, and `DOUBLE` data types are not
supported by S3 Select Pushdown. For more information about supported data
types for S3 Select, see the
[Data Types documentation](https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-data-types.html).
- Your network connection between Amazon S3 and the Amazon EMR cluster has good
transfer speed and available bandwidth. Amazon S3 Select does not compress
HTTP responses, so the response size may increase for compressed input files.

### Considerations and limitations

- Only objects stored in JSON format are supported. Objects can be uncompressed,
or optionally compressed with gzip or bzip2.
- The "AllowQuotedRecordDelimiters" property is not supported. If this property
is specified, the query fails.
- Amazon S3 server-side encryption with customer-provided encryption keys
(SSE-C) and client-side encryption are not supported.
- S3 Select Pushdown is not a substitute for using columnar or compressed file
formats such as ORC and Parquet.

### Enabling S3 Select pushdown

You can enable S3 Select Pushdown using the `s3_select_pushdown_enabled`
Hive session property, or using the `hive.s3select-pushdown.enabled`
configuration property. The session property overrides the config
property, allowing you enable or disable on a per-query basis. Non-filtering
queries (`SELECT * FROM table`) are not pushed down to S3 Select,
as they retrieve the entire object content.

For uncompressed files, S3 Select scans ranges of bytes in parallel. The scan range
requests run across the byte ranges of the internal Hive splits for the query fragments
pushed down to S3 Select. Changes in the Hive connector {ref}`performance tuning
configuration properties <hive-performance-tuning-configuration>` are likely to impact
S3 Select pushdown performance.

S3 Select can be enabled for TEXTFILE data using the
`hive.s3select-pushdown.experimental-textfile-pushdown-enabled` configuration property,
however this has been shown to produce incorrect results. For more information see
[the GitHub Issue.](https://github.com/trinodb/trino/issues/17775)

### Understanding and tuning the maximum connections

Trino can use its native S3 file system or EMRFS. When using the native FS, the
maximum connections is configured via the `hive.s3.max-connections`
configuration property. When using EMRFS, the maximum connections is configured
via the `fs.s3.maxConnections` Hadoop configuration property.

S3 Select Pushdown bypasses the file systems, when accessing Amazon S3 for
predicate operations. In this case, the value of
`hive.s3select-pushdown.max-connections` determines the maximum number of
client connections allowed for those operations from worker nodes.

If your workload experiences the error *Timeout waiting for connection from
pool*, increase the value of both `hive.s3select-pushdown.max-connections` and
the maximum connections configuration for the file system you are using.
10 changes: 0 additions & 10 deletions docs/src/main/sphinx/connector/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,6 @@ Hive connector documentation.
- Enables automatic column level statistics collection on write. See
`Table Statistics <#table-statistics>`__ for details.
- ``true``
* - ``hive.s3select-pushdown.enabled``
- Enable query pushdown to JSON files using the AWS S3 Select service.
- ``false``
* - ``hive.s3select-pushdown.experimental-textfile-pushdown-enabled``
- Enable query pushdown to TEXTFILE tables using the AWS S3 Select service.
- ``false``
* - ``hive.s3select-pushdown.max-connections``
- Maximum number of simultaneously open connections to S3 for
:ref:`s3selectpushdown`.
- 500
* - ``hive.file-status-cache-tables``
- Cache directory listing for specific tables. Examples:
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/release/release-300.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
(e.g., min > max). To disable this behavior, set the configuration
property `hive.parquet.fail-on-corrupted-statistics`
or session property `parquet_fail_with_corrupted_statistics` to false.
- Add support for {ref}`s3selectpushdown`, which enables pushing down
- Add support for S3 Select pushdown, which enables pushing down
column selection and range filters into S3 for text files.

## Kudu connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@ final class AzureBlobFileIterator
{
private final AzureLocation location;
private final Iterator<BlobItem> iterator;
private final String base;
private final Location baseLocation;

AzureBlobFileIterator(AzureLocation location, Iterator<BlobItem> iterator)
{
this.location = requireNonNull(location, "location is null");
this.iterator = requireNonNull(iterator, "iterator is null");
this.base = "abfs://%s%s.dfs.core.windows.net".formatted(
location.container().map(container -> container + "@").orElse(""),
location.account());
this.baseLocation = location.baseLocation();
}

@Override
Expand All @@ -60,7 +58,7 @@ public FileEntry next()
try {
BlobItem blobItem = iterator.next();
return new FileEntry(
Location.of(base + "/" + blobItem.getName()),
baseLocation.appendPath(blobItem.getName()),
blobItem.getProperties().getContentLength(),
blobItem.getProperties().getLastModified().toInstant(),
Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@
import java.util.Optional;

import static io.trino.filesystem.azure.AzureUtils.handleAzureException;
import static java.util.Objects.requireNonNull;

final class AzureDataLakeFileIterator
implements FileIterator
{
private final AzureLocation location;
private final Iterator<PathItem> iterator;
private final String base;
private final Location baseLocation;

AzureDataLakeFileIterator(AzureLocation location, Iterator<PathItem> iterator)
{
this.location = location;
this.iterator = iterator;
this.base = "abfs://%s%s.dfs.core.windows.net".formatted(
location.container().map(container -> container + "@").orElse(""),
location.account());
this.location = requireNonNull(location, "location is null");
this.iterator = requireNonNull(iterator, "iterator is null");
this.baseLocation = location.baseLocation();
}

@Override
Expand All @@ -59,7 +58,7 @@ public FileEntry next()
try {
PathItem pathItem = iterator.next();
return new FileEntry(
Location.of(base + "/" + pathItem.getName()),
baseLocation.appendPath(pathItem.getName()),
pathItem.getContentLength(),
pathItem.getLastModified().toInstant(),
Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.azure.storage.file.datalake.models.ListPathsOptions;
import com.azure.storage.file.datalake.models.PathItem;
import com.azure.storage.file.datalake.options.DataLakePathDeleteOptions;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
Expand All @@ -44,9 +45,11 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

import static com.azure.storage.common.implementation.Constants.HeaderConstants.ETAG_WILDCARD;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.filesystem.azure.AzureUtils.handleAzureException;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -220,13 +223,10 @@ public FileIterator listFiles(Location location)
{
AzureLocation azureLocation = new AzureLocation(location);
try {
// blob api returns directories as blobs, so it can not be used when Gen2 is enabled
if (isHierarchicalNamespaceEnabled(azureLocation)) {
return listGen2Files(azureLocation);
}
else {
return listBlobFiles(azureLocation);
}
// blob API returns directories as blobs, so it cannot be used when Gen2 is enabled
return (isHierarchicalNamespaceEnabled(azureLocation))
? listGen2Files(azureLocation)
: listBlobFiles(azureLocation);
}
catch (RuntimeException e) {
throw handleAzureException(e, "listing files", azureLocation);
Expand Down Expand Up @@ -258,7 +258,7 @@ private FileIterator listGen2Files(AzureLocation location)
.iterator());
}

private AzureBlobFileIterator listBlobFiles(AzureLocation location)
private FileIterator listBlobFiles(AzureLocation location)
{
String path = location.path();
if (!path.isEmpty() && !path.endsWith("/")) {
Expand Down Expand Up @@ -354,6 +354,60 @@ public void renameDirectory(Location source, Location target)
}
}

@Override
public Set<Location> listDirectories(Location location)
throws IOException
{
AzureLocation azureLocation = new AzureLocation(location);
try {
// blob API returns directories as blobs, so it cannot be used when Gen2 is enabled
return (isHierarchicalNamespaceEnabled(azureLocation))
? listGen2Directories(azureLocation)
: listBlobDirectories(azureLocation);
}
catch (RuntimeException e) {
throw handleAzureException(e, "listing files", azureLocation);
}
}

private Set<Location> listGen2Directories(AzureLocation location)
throws IOException
{
DataLakeFileSystemClient fileSystemClient = createFileSystemClient(location);
PagedIterable<PathItem> pathItems;
if (location.path().isEmpty()) {
pathItems = fileSystemClient.listPaths();
}
else {
DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(location.path());
if (!directoryClient.exists()) {
return ImmutableSet.of();
}
if (!directoryClient.getProperties().isDirectory()) {
throw new IOException("Location is not a directory: " + location);
}
pathItems = directoryClient.listPaths(true, false, null, null);
}
Location baseLocation = location.baseLocation();
return pathItems.stream()
.filter(PathItem::isDirectory)
.map(item -> baseLocation.appendPath(item.getName()))
.collect(toImmutableSet());
}

private Set<Location> listBlobDirectories(AzureLocation location)
{
String path = location.path();
if (!path.isEmpty() && !path.endsWith("/")) {
path += "/";
}
return createBlobContainerClient(location)
.listBlobsByHierarchy(path).stream()
.filter(BlobItem::isPrefix)
.map(item -> Location.of(location + "/" + item.getName()))
.collect(toImmutableSet());
}

private boolean isHierarchicalNamespaceEnabled(AzureLocation location)
throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,11 @@ public String toString()
{
return location.toString();
}

public Location baseLocation()
{
return Location.of("abfs://%s%s.dfs.core.windows.net/".formatted(
container().map(container -> container + "@").orElse(""),
account()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -130,6 +131,13 @@ public void renameDirectory(Location source, Location target)
fileSystem(source).renameDirectory(source, target);
}

@Override
public Set<Location> listDirectories(Location location)
throws IOException
{
return fileSystem(location).listDirectories(location);
}

private TrinoFileSystem fileSystem(Location location)
{
return createFileSystem(determineFactory(location));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ final class S3FileIterator
{
private final S3Location location;
private final Iterator<S3Object> iterator;
private final String base;
private final Location baseLocation;

public S3FileIterator(S3Location location, Iterator<S3Object> iterator)
{
this.location = requireNonNull(location, "location is null");
this.iterator = requireNonNull(iterator, "iterator is null");
this.base = "%s://%s/".formatted(location.scheme(), location.bucket());
this.baseLocation = location.baseLocation();
}

@Override
Expand All @@ -62,7 +62,7 @@ public FileEntry next()
verify(object.key().startsWith(location.key()), "S3 listed key [%s] does not start with prefix [%s]", object.key(), location.key());

return new FileEntry(
Location.of(base + object.key()),
baseLocation.appendPath(object.key()),
object.size(),
object.lastModified(),
Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.filesystem.TrinoOutputFile;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CommonPrefix;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
Expand All @@ -39,7 +40,9 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.partition;
import static com.google.common.collect.Multimaps.toMultimap;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -211,6 +214,36 @@ public void renameDirectory(Location source, Location target)
throw new IOException("S3 does not support directory renames");
}

@Override
public Set<Location> listDirectories(Location location)
throws IOException
{
S3Location s3Location = new S3Location(location);
Location baseLocation = s3Location.baseLocation();

String key = s3Location.key();
if (!key.isEmpty() && !key.endsWith("/")) {
key += "/";
}

ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(s3Location.bucket())
.prefix(key)
.delimiter("/")
.build();

try {
return client.listObjectsV2Paginator(request)
.commonPrefixes().stream()
.map(CommonPrefix::prefix)
.map(baseLocation::appendPath)
.collect(toImmutableSet());
}
catch (SdkException e) {
throw new IOException("Failed to list location: " + location, e);
}
}

@SuppressWarnings("ResultOfObjectAllocationIgnored")
private static void validateS3Location(Location location)
{
Expand Down
Loading
Loading