Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datalake inputstream #21322

Merged
merged 28 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ef843cf
DataLakeFileClient add openInputStream()
jaschrep-msft May 11, 2021
474b2f3
datalake inputstream tests
jaschrep-msft May 12, 2021
cc4ef6e
changelog
jaschrep-msft May 12, 2021
67aa7c1
Fixed blob artifacts from feature port
jaschrep-msft May 12, 2021
60dbe03
FileInputStream fix and docs issues
jaschrep-msft May 12, 2021
bf4fa8b
Casts throwable to match checked exception type
jaschrep-msft May 12, 2021
bd18062
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
jaschrep-msft May 12, 2021
04cf919
CI fixes
jaschrep-msft May 12, 2021
a6cddf9
fixes
jaschrep-msft May 12, 2021
4036751
checkstyle
jaschrep-msft May 13, 2021
316f15a
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
jaschrep-msft May 13, 2021
b0e5443
Refactored StorageInputStream
jaschrep-msft May 17, 2021
04bec4d
checkstyle suppression and pr feedback
jaschrep-msft May 17, 2021
3219ba3
PR feedback
jaschrep-msft May 18, 2021
33142ad
import cleanup
jaschrep-msft May 24, 2021
9b07d27
Undo abstract method shifting
jaschrep-msft Jun 1, 2021
ecb524e
import cleanup
jaschrep-msft Jun 1, 2021
fd4359b
imports and checkstyle supression to match blobs
jaschrep-msft Jun 1, 2021
4683228
minor fixes
jaschrep-msft Jun 3, 2021
5dee388
PR comments
jaschrep-msft Jun 16, 2021
2325932
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
jaschrep-msft Jun 16, 2021
0a04c1a
docstring
jaschrep-msft Jun 21, 2021
5c43a92
api rewrite
jaschrep-msft Jun 28, 2021
c5fa811
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-java in…
jaschrep-msft Jun 28, 2021
e0bac3f
PR feedback
jaschrep-msft Jun 29, 2021
529e09a
Styling
jaschrep-msft Jun 29, 2021
18939cc
reverted *Result type implements Closable
jaschrep-msft Jun 29, 2021
8b7c0a4
cleanup imports
jaschrep-msft Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ the main ServiceBusClientBuilder. -->
<!-- Suppress IO exception for now, which need code owner's attention -->
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck"
files="com.azure.storage.blob.BlobInputStream.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck"
files="com.azure.storage.file.datalake.DataLakeFileInputStream.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck"
files="com.azure.storage.blob.BlobOutputStream.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.common.StorageInputStream;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -54,31 +55,11 @@ public final class BlobInputStream extends StorageInputStream {
this.properties = blobProperties;
}

/**
* Dispatches a read operation of N bytes. When using sparse page blobs, the page ranges are evaluated and zero
* bytes may be generated on the client side for some ranges that do not exist.
*
* @param readLength An <code>int</code> which represents the number of bytes to read.
* @throws IOException If an I/O error occurs.
*/
@Override
protected synchronized ByteBuffer dispatchRead(final int readLength, final long offset) throws IOException {
try {
ByteBuffer currentBuffer = this.blobClient.downloadWithResponse(new BlobRange(offset,
(long) readLength), null, this.accessCondition, false)
.flatMap(response -> {
return FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(ByteBuffer::wrap);
})
.block();

this.bufferSize = readLength;
this.bufferStartOffset = offset;
return currentBuffer;
} catch (final BlobStorageException e) {
this.streamFaulted = true;
this.lastError = new IOException(e);
throw this.lastError;
}
protected Mono<ByteBuffer> executeRead(int readLength, long offset) {
return this.blobClient.downloadWithResponse(
new BlobRange(offset, (long) readLength), null, this.accessCondition, false)
.flatMap(response -> FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(ByteBuffer::wrap));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

package com.azure.storage.common;

import com.azure.core.exception.HttpResponseException;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.common.implementation.Constants;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -149,12 +152,45 @@ public synchronized void close() {
/**
* Dispatches a read operation of N bytes.
*
* Subclasses are intended to implement one of this method or {@link StorageInputStream#executeRead(int, long)},
* but not both. This method is responsible for reading from a Storage resource as well as updating stream state
* and transitioning from async back to sync. Its default implementation calls
* {@link StorageInputStream#executeRead(int, long)} and updates stream state accordingly.
*
* @param readLength An <code>int</code> which represents the number of bytes to read.
* @param offset The start point of data to be acquired.
* @return The bytebuffer which store one chunk size of data.
* @throws IOException If an I/O error occurs.
*/
protected abstract ByteBuffer dispatchRead(int readLength, long offset) throws IOException;
protected ByteBuffer dispatchRead(int readLength, long offset) throws IOException {
try {
ByteBuffer currentBuffer = executeRead(readLength, offset).block();

this.bufferSize = readLength;
this.bufferStartOffset = offset;
return currentBuffer;
} catch (final HttpResponseException e) {
this.streamFaulted = true;
this.lastError = new IOException(e);

throw this.lastError;
}
}

/**
* Executes a read call and returns content in a byte buffer.
*
* Subclasses are intended to implement one of {@link StorageInputStream#dispatchRead(int, long)} or this method,
* but not both. This method is responsible for asynchronously making a low-level client read and returning the
* content. Its default implementation throws a runtime exception.
*
* @param readLength An <code>int</code> which represents the number of bytes to read.
* @param offset The start point of data to be acquired.
* @return The bytebuffer which store one chunk size of data.
*/
jaschrep-msft marked this conversation as resolved.
Show resolved Hide resolved
protected Mono<ByteBuffer> executeRead(int readLength, long offset) {
throw new RuntimeException("com.azure.storage.common.StorageInputStream.executeRead() not implemented.");
jaschrep-msft marked this conversation as resolved.
Show resolved Hide resolved
}
jaschrep-msft marked this conversation as resolved.
Show resolved Hide resolved

/**
* Marks the current position in this input stream. A subsequent call to the reset method repositions this stream at
Expand Down
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-file-datalake/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Added support to undelete a file or directory
- Added support to list deletedPaths
- Added support to get/set service properties
- Added support for openInputStream to data lake file clients
jaschrep-msft marked this conversation as resolved.
Show resolved Hide resolved
- Deprecated support to undelete a file system to a new name.

## 12.5.0 (2021-04-29)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.common.implementation.UploadUtils;
import com.azure.storage.file.datalake.implementation.util.DataLakeImplUtils;
import com.azure.storage.file.datalake.models.ConsistentReadControl;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
import com.azure.storage.file.datalake.models.FileQueryAsyncResponse;
import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions;
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
import com.azure.storage.file.datalake.options.FileQueryOptions;
import com.azure.storage.file.datalake.models.FileQueryResponse;
Expand Down Expand Up @@ -496,6 +499,57 @@ public FileReadResponse readWithResponse(OutputStream stream, FileRange range, D
}, logger);
}

/**
* Opens a file input stream to download the file.
* <p>
jaschrep-msft marked this conversation as resolved.
Show resolved Hide resolved
*
* @return An <code>InputStream</code> object that represents the stream to use for reading from the file.
jaschrep-msft marked this conversation as resolved.
Show resolved Hide resolved
* @throws DataLakeStorageException If a storage service error occurred.
*/
public DataLakeFileInputStream openInputStream() {
return openInputStream(null);
}

/**
* Opens a file input stream to download the specified range of the file.
jaschrep-msft marked this conversation as resolved.
Show resolved Hide resolved
*
* @param options {@link DataLakeFileInputStreamOptions}
* @return An <code>InputStream</code> object that represents the stream to use for reading from the file.
* @throws DataLakeStorageException If a storage service error occurred.
*/
public DataLakeFileInputStream openInputStream(DataLakeFileInputStreamOptions options) {
options = options == null ? new DataLakeFileInputStreamOptions() : options;

// default to etag locking
ConsistentReadControl consistentReadControl = options.getConsistentReadControl() == null
? ConsistentReadControl.ETAG : options.getConsistentReadControl();

PathProperties properties = getPropertiesWithResponse(options.getRequestConditions(), null, null).getValue();
String eTag = properties.getETag();

FileRange range = options.getRange() == null ? new FileRange(0) : options.getRange();
int chunkSize = options.getBlockSize() == null ? 4 * Constants.MB : options.getBlockSize();

DataLakeRequestConditions requestConditions = options.getRequestConditions() == null
? new DataLakeRequestConditions() : options.getRequestConditions();

switch (consistentReadControl) {
case NONE:
break;
case ETAG:
// Target the user specified eTag by default. If not provided, target the latest eTag.
if (requestConditions.getIfMatch() == null) {
requestConditions.setIfMatch(eTag);
}
break;
default:
throw logger.logExceptionAsError(new IllegalArgumentException("Concurrency control type not "
+ "supported."));
}

return new DataLakeFileInputStream(this.dataLakeFileAsyncClient, range.getOffset(), range.getCount(), chunkSize,
requestConditions, properties);
}

/**
* Reads the entire file into a file specified by the path.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake;

import com.azure.core.util.FluxUtil;
import com.azure.storage.common.StorageInputStream;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.FileRange;
import com.azure.storage.file.datalake.models.PathProperties;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* Provides an input stream to read a given Data Lake file resource.
*/
public final class DataLakeFileInputStream extends StorageInputStream {
jaschrep-msft marked this conversation as resolved.
Show resolved Hide resolved
/**
* Holds the reference to the file this stream is associated with.
*/
private final DataLakeFileAsyncClient fileClient;

/**
* Holds the {@link DataLakeRequestConditions} object that represents the access conditions for the file.
*/
private final DataLakeRequestConditions accessCondition;

/**
* Holds the {@link PathProperties} object that represents the file's properties.
*/
private final PathProperties properties;

/**
* Initializes a new instance of the DataLakeFileInputStream class. Note that if {@code fileRangeOffset} is not {@code 0} or
* {@code fileRangeLength} is not {@code null}, there will be no content MD5 verification.
*
* @param fileClient A {@link DataLakeFileAsyncClient} object which represents the file that this stream is associated
* with.
* @param fileRangeOffset The offset of file data to begin stream.
* @param fileRangeLength How much data the stream should return after fileRangeOffset.
* @param chunkSize The size of the chunk to download.
* @param accessCondition An {@link DataLakeRequestConditions} object which represents the access conditions for the
* file.
* @throws DataLakeStorageException An exception representing any error which occurred during the operation.
*/
DataLakeFileInputStream(final DataLakeFileAsyncClient fileClient, long fileRangeOffset, Long fileRangeLength,
int chunkSize, final DataLakeRequestConditions accessCondition, final PathProperties pathProperties)
throws DataLakeStorageException {
super(fileRangeOffset, fileRangeLength, chunkSize, pathProperties.getFileSize());

this.fileClient = fileClient;
this.accessCondition = accessCondition;
this.properties = pathProperties;
}

@Override
protected Mono<ByteBuffer> executeRead(int readLength, long offset) {
return this.fileClient.readWithResponse(
new FileRange(offset, (long) readLength), null, this.accessCondition, false)
.flatMap(response -> FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(ByteBuffer::wrap));
}

/**
* Gets the file properties.
* <p>
* If no data has been read from the stream, a network call is made to get properties. Otherwise, the file
jaschrep-msft marked this conversation as resolved.
Show resolved Hide resolved
* properties obtained from the download are stored.
*
* @return {@link PathProperties}
*/
public PathProperties getProperties() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be part of the public interface?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we're kinda locked into this based on the equivalent API in blobs.

return this.properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake.models;

/**
* Defines values to indicate what strategy the SDK should use when reading from a blob to ensure the view of the data
* is consistent and not changed during the read.
* {@link #NONE}
* {@link #ETAG}
jaschrep-msft marked this conversation as resolved.
Show resolved Hide resolved
*/
public enum ConsistentReadControl {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this consistency control graduate into azure-storage-common as it should be re-usable in Blobs and maybe Files if we choose to add open read/write functionality there as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the blob package already has their own copy of this. So if we put one in common then there will be confusion in the blobs package. Something we'd need to have caught before that API GA'd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Datalake takes dependency on blobs directly? Don't we end up having both types anyway on class path ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be wrong but I think we go out of our way to avoid using blob types in the datalake public API.

/**
* No consistent read control. The client will honor user provided {@link DataLakeRequestConditions#getIfMatch()}
*/
NONE,

/**
* Default value. Consistent read control based on eTag.
* If {@link DataLakeRequestConditions#getIfMatch()} is set, the client will honor this value.
* Otherwise, {@link DataLakeRequestConditions#getIfMatch()} is set to the latest eTag.
* Note: Modification of the base blob will result in an {@code IOException} or a {@code BlobStorageException} if
* eTag is the only form of consistent read control being employed.
*/
ETAG,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake.options;

import com.azure.core.annotation.Fluent;
import com.azure.storage.file.datalake.models.ConsistentReadControl;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.FileRange;

/**
* Extended options that may be passed when opening a blob input stream.
*/
@Fluent
public class DataLakeFileInputStreamOptions {

private FileRange range;
private DataLakeRequestConditions requestConditions;
private Integer blockSize;
private ConsistentReadControl consistentReadControl;

/**
* @return {@link FileRange}
*/
public FileRange getRange() {
return range;
}

/**
* @param range {@link FileRange}
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setRange(FileRange range) {
this.range = range;
return this;
}

/**
* @return {@link DataLakeRequestConditions}
*/
public DataLakeRequestConditions getRequestConditions() {
return requestConditions;
}

/**
* @param requestConditions {@link DataLakeRequestConditions}
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setRequestConditions(DataLakeRequestConditions requestConditions) {
this.requestConditions = requestConditions;
return this;
}

/**
* @return The size of each data chunk returned from the service. If block size is large, input stream will make
* fewer network calls, but each individual call will send more data and will therefore take longer.
* The default value is 4 MB.
*/
public Integer getBlockSize() {
return blockSize;
}

/**
* @param blockSize The size of each data chunk returned from the service. If block size is large, input stream
* will make fewer network calls, but each individual call will send more data and will therefore take longer.
* The default value is 4 MB.
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setBlockSize(Integer blockSize) {
this.blockSize = blockSize;
return this;
}

/**
* @return {@link ConsistentReadControl} Default is E-Tag.
*/
public ConsistentReadControl getConsistentReadControl() {
return consistentReadControl;
}

/**
* @param consistentReadControl {@link ConsistentReadControl} Default is E-Tag.
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setConsistentReadControl(ConsistentReadControl consistentReadControl) {
this.consistentReadControl = consistentReadControl;
return this;
}
}
Loading