Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datalake inputstream #21322

Merged
merged 28 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ef843cf
DataLakeFileClient add openInputStream()
jaschrep-msft May 11, 2021
474b2f3
datalake inputstream tests
jaschrep-msft May 12, 2021
cc4ef6e
changelog
jaschrep-msft May 12, 2021
67aa7c1
Fixed blob artifacts from feature port
jaschrep-msft May 12, 2021
60dbe03
FileInputStream fix and docs issues
jaschrep-msft May 12, 2021
bf4fa8b
Casts throwable to match checked exception type
jaschrep-msft May 12, 2021
bd18062
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
jaschrep-msft May 12, 2021
04cf919
CI fixes
jaschrep-msft May 12, 2021
a6cddf9
fixes
jaschrep-msft May 12, 2021
4036751
checkstyle
jaschrep-msft May 13, 2021
316f15a
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
jaschrep-msft May 13, 2021
b0e5443
Refactored StorageInputStream
jaschrep-msft May 17, 2021
04bec4d
checkstyle suppression and pr feedback
jaschrep-msft May 17, 2021
3219ba3
PR feedback
jaschrep-msft May 18, 2021
33142ad
import cleanup
jaschrep-msft May 24, 2021
9b07d27
Undo abstract method shifting
jaschrep-msft Jun 1, 2021
ecb524e
import cleanup
jaschrep-msft Jun 1, 2021
fd4359b
imports and checkstyle supression to match blobs
jaschrep-msft Jun 1, 2021
4683228
minor fixes
jaschrep-msft Jun 3, 2021
5dee388
PR comments
jaschrep-msft Jun 16, 2021
2325932
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
jaschrep-msft Jun 16, 2021
0a04c1a
docstring
jaschrep-msft Jun 21, 2021
5c43a92
api rewrite
jaschrep-msft Jun 28, 2021
c5fa811
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
jaschrep-msft Jun 28, 2021
e0bac3f
PR feedback
jaschrep-msft Jun 29, 2021
529e09a
Styling
jaschrep-msft Jun 29, 2021
18939cc
reverted *Result type implements Closable
jaschrep-msft Jun 29, 2021
8b7c0a4
cleanup imports
jaschrep-msft Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,9 @@ public final class BlobInputStream extends StorageInputStream {
@Override
protected synchronized ByteBuffer dispatchRead(final int readLength, final long offset) throws IOException {
try {
ByteBuffer currentBuffer = this.blobClient.downloadWithResponse(new BlobRange(offset,
(long) readLength), null, this.accessCondition, false)
.flatMap(response -> {
return FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(ByteBuffer::wrap);
})
ByteBuffer currentBuffer = this.blobClient.downloadWithResponse(
new BlobRange(offset, (long) readLength), null, this.accessCondition, false)
.flatMap(response -> FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(ByteBuffer::wrap))
.block();

this.bufferSize = readLength;
Expand All @@ -77,15 +75,13 @@ protected synchronized ByteBuffer dispatchRead(final int readLength, final long
} catch (final BlobStorageException e) {
this.streamFaulted = true;
this.lastError = new IOException(e);

throw this.lastError;
}
}

/**
* Gets the blob properties.
* <p>
* If no data has been read from the stream, a network call is made to get properties. Otherwise, the blob
* properties obtained from the download are stored.
* Gets the blob properties as fetched upon download.
*
* @return {@link BlobProperties}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public synchronized void close() {
}

/**
* Dispatches a read operation of N bytes.
* Dispatches a read operation of N bytes and updates stream state accordingly.
*
* @param readLength An <code>int</code> which represents the number of bytes to read.
* @param offset The start point of data to be acquired.
Expand Down
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-file-datalake/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 12.7.0-beta.1 (Unreleased)
- Added support for openInputStream to sync data lake file clients
- Added support for the 2020-10-02 service version.
- Added support to specify Parquet Input Serialization when querying a file.
- Updated DownloadRetryOptions.maxRetryRequests to default downloads to retry 5 times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ public Mono<PathInfo> upload(Flux<ByteBuffer> data, ParallelTransferOptions para

/**
* Creates a new file.
* <p>
* To avoid overwriting, pass "*" to {@link DataLakeRequestConditions#setIfNoneMatch(String)}.
*
* <p><strong>Code Samples</strong></p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobQueryResponse;
import com.azure.storage.blob.options.BlobDownloadToFileOptions;
import com.azure.storage.blob.options.BlobInputStreamOptions;
import com.azure.storage.blob.specialized.BlobInputStream;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.ParallelTransferOptions;
import com.azure.storage.common.Utility;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.FluxInputStream;
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.common.implementation.UploadUtils;
import com.azure.storage.file.datalake.implementation.models.InternalDataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.implementation.util.DataLakeImplUtils;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
import com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.models.FileQueryAsyncResponse;
import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions;
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
import com.azure.storage.file.datalake.options.FileQueryOptions;
import com.azure.storage.file.datalake.models.FileQueryResponse;
Expand Down Expand Up @@ -207,7 +213,6 @@ public PathInfo upload(InputStream data, long length, boolean overwrite) {

/**
* Creates a new file.
* <p>
* To avoid overwriting, pass "*" to {@link DataLakeRequestConditions#setIfNoneMatch(String)}.
*
* <p><strong>Code Samples</strong></p>
Expand Down Expand Up @@ -496,6 +501,30 @@ public FileReadResponse readWithResponse(OutputStream stream, FileRange range, D
}, logger);
}

/**
* Opens a file input stream to download the file. Locks on ETags.
*
* @return An {@link InputStream} object that represents the stream to use for reading from the file.
* @throws DataLakeStorageException If a storage service error occurred.
*/
public DataLakeFileOpenInputStreamResult openInputStream() {
return openInputStream(null);
}

/**
* Opens a file input stream to download the specified range of the file. Defaults to ETag locking if the option
* is not specified.
*
* @param options {@link DataLakeFileInputStreamOptions}
* @return A {@link DataLakeFileOpenInputStreamResult} object that contains the stream to use for reading from the file.
* @throws DataLakeStorageException If a storage service error occurred.
*/
public DataLakeFileOpenInputStreamResult openInputStream(DataLakeFileInputStreamOptions options) {
BlobInputStreamOptions convertedOptions = Transforms.toBlobInputStreamOptions(options);
BlobInputStream inputStream = blockBlobClient.openInputStream(convertedOptions);
return new InternalDataLakeFileOpenInputStreamResult(inputStream,
Transforms.toPathProperties(inputStream.getProperties()));
}

/**
* Reads the entire file into a file specified by the path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import com.azure.storage.blob.models.BlobRetentionPolicy;
import com.azure.storage.blob.models.BlobServiceProperties;
import com.azure.storage.blob.models.BlobSignedIdentifier;
import com.azure.storage.blob.models.ConsistentReadControl;
import com.azure.storage.blob.models.ListBlobContainersOptions;
import com.azure.storage.blob.models.StaticWebsite;
import com.azure.storage.blob.options.BlobInputStreamOptions;
import com.azure.storage.blob.options.BlobQueryOptions;
import com.azure.storage.blob.options.UndeleteBlobContainerOptions;
import com.azure.storage.file.datalake.implementation.models.BlobItemInternal;
Expand Down Expand Up @@ -84,6 +86,7 @@
import com.azure.storage.file.datalake.models.PathProperties;
import com.azure.storage.file.datalake.models.PublicAccessType;
import com.azure.storage.file.datalake.models.UserDelegationKey;
import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions;
import com.azure.storage.file.datalake.options.FileQueryOptions;
import com.azure.storage.file.datalake.options.FileSystemUndeleteOptions;

Expand Down Expand Up @@ -224,6 +227,32 @@ static BlobHttpHeaders toBlobHttpHeaders(PathHttpHeaders pathHTTPHeaders) {
.setContentMd5(pathHTTPHeaders.getContentMd5());
}

static BlobInputStreamOptions toBlobInputStreamOptions(DataLakeFileInputStreamOptions options) {
kasobol-msft marked this conversation as resolved.
Show resolved Hide resolved
if (options == null) {
return null;
}
return new BlobInputStreamOptions()
.setBlockSize(options.getBlockSize())
.setRange(toBlobRange(options.getRange()))
.setRequestConditions(toBlobRequestConditions(options.getRequestConditions()))
.setConsistentReadControl(toBlobConsistentReadControl(options.getConsistentReadControl()));
}

static com.azure.storage.blob.models.ConsistentReadControl toBlobConsistentReadControl(
com.azure.storage.file.datalake.models.ConsistentReadControl datalakeConsistentReadControl) {
if (datalakeConsistentReadControl == null) {
return null;
}
switch (datalakeConsistentReadControl) {
case NONE:
return ConsistentReadControl.NONE;
case ETAG:
return ConsistentReadControl.ETAG;
default:
throw new IllegalArgumentException("Could not convert ConsistentReadControl");
}
}

static BlobRange toBlobRange(FileRange fileRange) {
if (fileRange == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake.implementation.models;

import com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.models.PathProperties;

import java.io.InputStream;

public class InternalDataLakeFileOpenInputStreamResult implements DataLakeFileOpenInputStreamResult {

private final InputStream inputStream;
private final PathProperties properties;

public InternalDataLakeFileOpenInputStreamResult(InputStream inputStream, PathProperties properties) {
this.inputStream = inputStream;
this.properties = properties;
}

@Override
public InputStream getInputStream() {
return inputStream;
}

@Override
public PathProperties getProperties() {
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake.models;

/**
* Defines values to indicate what strategy the SDK should use when reading from a blob to ensure the view of the data
* is consistent and not changed during the read.
*/
public enum ConsistentReadControl {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this consistency control graduate into azure-storage-common as it should be re-usable in Blobs and maybe Files if we choose to add open read/write functionality there as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the blob package already has their own copy of this. So if we put one in common then there will be confusion in the blobs package. Something we'd need to have caught before that API GA'd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Datalake takes dependency on blobs directly? Don't we end up having both types anyway on class path ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be wrong but I think we go out of our way to avoid using blob types in the datalake public API.

/**
* No consistent read control. The client will honor user provided {@link DataLakeRequestConditions#getIfMatch()}
*/
NONE,

/**
* Default value. Consistent read control based on eTag.
* If {@link DataLakeRequestConditions#getIfMatch()} is set, the client will honor this value.
* Otherwise, {@link DataLakeRequestConditions#getIfMatch()} is set to the latest eTag.
* Note: Modification of the base blob will result in an {@code IOException} or a {@code BlobStorageException} if
* eTag is the only form of consistent read control being employed.
*/
ETAG,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake.models;

import java.io.InputStream;

/**
* Result of opening an {@link InputStream} to a datalake file.
*/
public interface DataLakeFileOpenInputStreamResult {
/**
* @return the {@link InputStream} of the target file.
*/
InputStream getInputStream();

/**
* @return the {@link PathProperties} of the target file.
*/
PathProperties getProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake.options;

import com.azure.core.annotation.Fluent;
import com.azure.storage.file.datalake.models.ConsistentReadControl;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.FileRange;

/**
* Extended options that may be passed when opening a blob input stream.
*/
@Fluent
public class DataLakeFileInputStreamOptions {

private FileRange range;
private DataLakeRequestConditions requestConditions;
private Integer blockSize;
private ConsistentReadControl consistentReadControl;

/**
* @return {@link FileRange}
*/
public FileRange getRange() {
return range;
}

/**
* @param range {@link FileRange}
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setRange(FileRange range) {
this.range = range;
return this;
}

/**
* @return {@link DataLakeRequestConditions}
*/
public DataLakeRequestConditions getRequestConditions() {
return requestConditions;
}

/**
* @param requestConditions {@link DataLakeRequestConditions}
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setRequestConditions(DataLakeRequestConditions requestConditions) {
this.requestConditions = requestConditions;
return this;
}

/**
* @return The size of each data chunk returned from the service. If block size is large, input stream will make
* fewer network calls, but each individual call will send more data and will therefore take longer.
* The default value is 4 MB.
*/
public Integer getBlockSize() {
return blockSize;
}

/**
* @param blockSize The size of each data chunk returned from the service. If block size is large, input stream
* will make fewer network calls, but each individual call will send more data and will therefore take longer.
* The default value is 4 MB.
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setBlockSize(Integer blockSize) {
this.blockSize = blockSize;
return this;
}

/**
* @return {@link ConsistentReadControl} Default is E-Tag.
*/
public ConsistentReadControl getConsistentReadControl() {
return consistentReadControl;
}

/**
* @param consistentReadControl {@link ConsistentReadControl} Default is E-Tag.
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setConsistentReadControl(ConsistentReadControl consistentReadControl) {
this.consistentReadControl = consistentReadControl;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.storage.blob.BlobUrlParts
import com.azure.storage.blob.models.BlobErrorCode
import com.azure.storage.blob.models.BlobStorageException
import com.azure.storage.blob.options.BlobParallelUploadOptions
import com.azure.storage.common.ParallelTransferOptions
import com.azure.storage.common.ProgressReceiver
import com.azure.storage.common.implementation.Constants
import com.azure.storage.blob.models.BlockListType
import com.azure.storage.common.test.shared.extensions.LiveOnly
import com.azure.storage.common.test.shared.extensions.RequiredServiceVersion
import com.azure.storage.common.test.shared.policy.MockFailureResponsePolicy
import com.azure.storage.file.datalake.models.DownloadRetryOptions
import com.azure.storage.file.datalake.models.AccessTier
import com.azure.storage.file.datalake.models.DataLakeRequestConditions
import com.azure.storage.file.datalake.models.DataLakeStorageException
Expand Down
Loading