-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support S3 Select on native readers #17522
Conversation
@trinodb/maintainers someone mind kicking off a build with secrets? I couldn't get the S3 tests running locally for some reason. |
/test-with-secrets sha=71aa2ee869ce9fcf5b6da2e21d3ac395b6871f3f |
The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/4997055706 |
34f1a78
to
677ad44
Compare
import static java.lang.Math.toIntExact; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
public class S3SelectPageSourceProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name this S3SelectPageSourceFactory
to match the others
long start = request.getScanRange().getStart(); | ||
SelectObjectContentRequest contentRequest = request.withScanRange(new ScanRange() | ||
.withStart(start) | ||
.withEnd(start + length)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems wrong. readTail()
is intended to read (up to) the last N bytes of the file. It is used for reading the footer of ORC and Parquet files.
Per the ScanRange docs:
If only the End parameter is supplied, it is interpreted to mean scan the last N bytes of the file.
I believe the correct implementation would be
SelectObjectContentRequest contentRequest = request.withScanRange(new ScanRange().withEnd(length));
However, we shouldn't need to implement this method as it won't be used.
} | ||
|
||
@Override | ||
public void readFully(long position, byte[] buffer, int offset, int length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method used? TextLineReader
is created via TextLineReaderFactory
which does inputFile.newStream()
, so I think we could change S3SelectInputFile.newInput()
to throw UnsupportedOperationException
. Text files don't use positioned reads.
TupleDomain<HiveColumnHandle> effectivePredicate, | ||
List<HiveColumnHandle> readerColumns) | ||
{ | ||
//There are no effective predicates and readercolumns and columntypes are identical to schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: space after //
private S3SelectUtils() | ||
{ } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private S3SelectUtils() {}
default -> throw new IllegalStateException("Unknown s3 select data type: " + s3SelectDataType); | ||
} | ||
|
||
if (!lineReaderFactory.getHiveOutputFormatClassName().equals(schema.getProperty(FILE_INPUT_FORMAT)) || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would this return false? S3SelectSerDeDataTypeMapper
already maps from the serde, so it seems strange that we need to check this again.
|
||
private void closeStream() | ||
{ | ||
if (input == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this check. It's legal to use try-with-resources on a null reference, and we want to close the client regardless of if there is an input stream. Also, we never actually set it to null.
catch (IOException ignored) { | ||
} | ||
finally { | ||
input = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could remove this, since we otherwise never set it to null, and it's legal to close a Closeable
multiple times (not that we do here).
return; | ||
} | ||
closed = true; | ||
closeStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can inline this method since it's only used once
} | ||
|
||
// for negative seek, reopen the file | ||
if (position < this.position) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to support negative seek for text files. Also, it seems like TrinoS3SelectClient
is designed such that you can only call getRecordsContent()
once. Otherwise, the SelectObjectContentResult
won't be closed and the requestComplete
flag won't be handled correctly.
So, we should probably fail here:
throw new IOException("Negative seek is not supported for S3 Select");
@alexjo2144 Since this a new implementation, shouldn't AWS SDK V2 be used to implement this? If it helps, I have a commit converting the current S3 select code to V2 : ankushChatterjee@1d67a14 (I have done some basic sanity testing) This is based off the branch from PR: #17866 |
Minio tests produced the correct results, however tests against a real S3 bucket did not.
S3 Select queries on CSV files are shown to have correctness problems. JSON files can still be enabled/disabled using the existing config and session properties.
@ankushChatterjee It is a new implementation, but it reuses Note that the old S3 Select implementation will be removed when we remove all of the Hive reader code. |
Add S3 Select pushdown support to native readers for JSON and CSV.
I may need to re-implement this anyway, because of conflicts with #18146 The TrinoS3ClientFactory works by passing S3 relevant properties through the Configuration object, but I'll need to switch it out. I'm not exactly sure how to pipe those in now, but I'll give it a stab. |
Raised the PR : #18270 for SDK v2 migration of the old code. |
677ad44
to
0406c54
Compare
@@ -44,16 +47,18 @@ | |||
import static com.google.common.collect.Multimaps.toMultimap; | |||
import static java.util.Objects.requireNonNull; | |||
|
|||
final class S3FileSystem | |||
public final class S3FileSystem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to make the constructor package private with this change
config.isRequesterPays(), | ||
config.getSseType(), | ||
config.getSseKmsKeyId()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove trailing blank line
@@ -73,6 +73,11 @@ | |||
</exclusions> | |||
</dependency> | |||
|
|||
<dependency> | |||
<groupId>software.amazon.awssdk</groupId> | |||
<artifactId>aws-crt-client</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to use AWS CRT since it's in C and thus goes through JNI
@Override | ||
public TrinoInputStream newStream() | ||
{ | ||
return null; // new S3InputStream(location(), client, newGetObjectRequest(), length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems backwards. Since S3 Select is only for text formats, we should only need the input stream version. TrinoInput
is only used by ORC and Parquet.
return selectObjectContentRequest.build(); | ||
} | ||
|
||
private boolean headObject() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could reuse or delegate to S3InputFile
this.isDone = this.next.isEmpty(); | ||
} | ||
catch (InterruptedException e) { | ||
throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Interrupted"); // TODO: Better error message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simply use RuntimeException
here
} | ||
|
||
/** | ||
* Below classes are required for compatibility between AWS Java SDK 1.x and 2.x |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment doesn't seem relevant in the context of a new implementation
Should we close this? |
Description
Add S3 Select pushdown support to native readers for JSON and CSV.
Based on: #17563
I am not sure we want to do this until fixing the rest of the issues in: #17775
Additional context and related issues
Release notes
(x) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: