Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #3732 #3734

Merged
merged 9 commits into from
Mar 21, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import java.io.IOException;
import java.io.InputStream;

public class PageBlobInputStream extends InputStream {
public class AppendBlobInputStream extends InputStream {

private BlobInputStream inputStream;
private long fileSize;
private long totalRead = 0;

public PageBlobInputStream(BlobInputStream inputStream, long fileSize) {
public AppendBlobInputStream(BlobInputStream inputStream, long fileSize) {
this.inputStream = inputStream;
this.fileSize = fileSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,40 +38,57 @@ public AzureFileNameParser() {
}

@Override
public FileName parseUri(final VfsComponentContext context, FileName base, String filename)
public FileName parseUri(final VfsComponentContext context, FileName base, String uri)
throws FileSystemException {
final StringBuilder name = new StringBuilder();
Authority auth = null;
String path = null;
FileType fileType;

int eidx = filename.indexOf("@/");
if (eidx != -1)
filename =
filename.substring(0, eidx + 1) + "windowsazure.com" + filename.substring(eidx + 1);

String scheme;
try {
auth = extractToPath(filename, name);
if (auth.getUserName() == null) {
scheme = UriParser.extractScheme(filename, name);
UriParser.canonicalizePath(name, 0, name.length(), this);
UriParser.fixSeparators(name);
} else {
scheme = auth.getScheme();
}
fileType = UriParser.normalisePath(name);
path = name.toString();
if (path.equals("")) {
path = "/";

StringBuilder sb = new StringBuilder(uri);

UriParser.normalisePath(sb);

String normalizedUri = sb.toString();
String scheme = normalizedUri.substring(0, normalizedUri.indexOf(':'));
String absPath = "/";
FileType fileType = FileType.IMAGINARY;
String[] s = normalizedUri.split("/");

if (s.length > 1) {
if (scheme.equals("azure")) {

String container = s[1];
for (int i = 1; i < s.length; i++) {
absPath += s[i];

if (s.length > 1 && i != s.length - 1) {
absPath += "/";
}
}

if (uri.endsWith("/")) {
fileType = FileType.FOLDER;
} else if (!absPath.endsWith("/")) {
fileType = FileType.FILE;
}

} else if (scheme.equals("azfs")) {

String account = s[1];
String container = s[2];

for (int i = 2; i < s.length; i++) {
absPath += s[i];

if (s.length > 1 && i != s.length - 1) {
absPath += "/";
}
}

if (uri.endsWith("/")) {
fileType = FileType.FOLDER;
} else if (!absPath.endsWith("/")) {
fileType = FileType.FILE;
}
}
} catch (FileSystemException fse) {
scheme = UriParser.extractScheme(filename, name);
UriParser.canonicalizePath(name, 0, name.length(), this);
UriParser.fixSeparators(name);
fileType = UriParser.normalisePath(name);
path = name.toString();
}
return new AzureFileName(scheme, path, fileType);
return new AzureFileName(scheme, absPath, fileType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,15 @@

package org.apache.hop.vfs.azure;

import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CloudPageBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import com.microsoft.azure.storage.blob.*;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.FileType;
import org.apache.commons.vfs2.provider.AbstractFileName;
import org.apache.commons.vfs2.provider.AbstractFileObject;
import org.apache.commons.vfs2.provider.UriParser;
import org.apache.hop.core.Const;
import org.apache.hop.core.variables.IVariables;
import org.apache.hop.core.variables.Variables;
import org.apache.hop.vfs.azure.config.AzureConfigSingleton;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -52,30 +40,20 @@
import java.util.Map;

public class AzureFileObject extends AbstractFileObject<AzureFileSystem> {
public static final int DEFAULT_BLOB_SIZE = 1024;

private class PageBlobOutputStream extends OutputStream {
public class AppendBlobOutputStream extends OutputStream {

private CloudAppendBlob ab;
private final OutputStream outputStream;
long written = 0;
private CloudPageBlob pb;
private int blockIncrement;
private long blobSize;

private PageBlobOutputStream(
CloudPageBlob pb, OutputStream outputStream, int blockIncrement, long blobSize) {
if (blockIncrement % Constants.PAGE_SIZE != 0)
throw new IllegalArgumentException("Block increment must be in multiple of 512.");
if (blobSize % Constants.PAGE_SIZE != 0)
throw new IllegalArgumentException("Block increment must be in multiple of 512.");
this.blockIncrement = blockIncrement;
this.outputStream = new BufferedOutputStream(outputStream, blockIncrement);
this.pb = pb;
this.blobSize = blobSize;

public AppendBlobOutputStream(CloudAppendBlob ab, OutputStream outputStream) {
this.ab = ab;
this.outputStream = outputStream;
}

@Override
public void write(int b) throws IOException {
checkBlobSize(1);
outputStream.write(b);
written(1);
}
Expand All @@ -85,67 +63,39 @@ public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
checkBlobSize(len);
outputStream.write(b, off, len);
written(len);
}

private void checkBlobSize(int length) throws IOException {
if (written + length > blobSize) {
while (written + length > blobSize) {
blobSize += blockIncrement;
}
try {
pb.resize(blobSize);
} catch (StorageException e) {
throw new IOException("Failed to resize blob.", e);
}
}
}

protected void written(int len) {
written += len;
lastModified = System.currentTimeMillis();
size = written;
}

@Override
public void flush() throws IOException {}
public void write(byte[] b, int off, int len) throws IOException {
outputStream.write(b, off, len);
written(len);
}

@Override
public void flush() throws IOException {
super.flush();
}

@Override
public void close() throws IOException {
HashMap<String, String> md = new HashMap<>(pb.getMetadata());

HashMap<String, String> md = new HashMap<>(ab.getMetadata());
md.put("ActualLength", String.valueOf(written));
pb.getProperties().setContentDisposition("vfs ; length=\"" + written + "\"");
pb.setMetadata(md);
ab.getProperties().setContentDisposition("vfs ; length=\"" + written + "\"");
ab.setMetadata(md);
try {
pb.uploadProperties();
ab.uploadProperties();
} catch (StorageException e) {
throw new IOException("Failed to update meta-data.", e);
}
checkPageBoundary();

outputStream.close();
URI uri = pb.getUri();
try {
uri = pb.getServiceClient().getCredentials().transformUri(uri);
} catch (URISyntaxException e) {
throw new IOException(e);
} catch (StorageException e) {
throw new IOException(e);
}
closeBlob();
}

private void checkPageBoundary() throws IOException {
long spare = written % Constants.PAGE_SIZE;
if (spare != 0) {
byte[] b = new byte[Constants.PAGE_SIZE - (int) spare];
write(b);
outputStream.flush();
}
}
}

private final CloudBlobClient service;
Expand Down Expand Up @@ -436,21 +386,15 @@ protected long doGetLastModifiedTime() throws Exception {
protected OutputStream doGetOutputStream(boolean bAppend) throws Exception {
if (container != null && !containerPath.equals("")) {
if (bAppend) throw new UnsupportedOperationException();
final CloudPageBlob pb = container.getPageBlobReference(removeLeadingSlash(containerPath));

// Get the block increment...
//
IVariables variables = Variables.getADefaultVariableSpace();
String configBlockIncrement = AzureConfigSingleton.getConfig().getBlockIncrement();
int blockIncrement = Const.toInt(variables.resolve(configBlockIncrement), 4096);
final CloudAppendBlob cab = container.getAppendBlobReference(removeLeadingSlash(containerPath));

if (type == FileType.IMAGINARY) {
type = FileType.FILE;
return new PageBlobOutputStream(
pb, pb.openWriteNew(DEFAULT_BLOB_SIZE), blockIncrement, DEFAULT_BLOB_SIZE);
return new AppendBlobOutputStream(
cab, cab.openWriteNew());
} else {
return new PageBlobOutputStream(
pb, pb.openWriteExisting(), blockIncrement, pb.getProperties().getLength());
return new AppendBlobOutputStream(
cab, cab.openWriteExisting());
}
} else {
throw new UnsupportedOperationException();
Expand All @@ -460,7 +404,7 @@ protected OutputStream doGetOutputStream(boolean bAppend) throws Exception {
@Override
protected InputStream doGetInputStream() throws Exception {
if (container != null && !containerPath.equals("") && type == FileType.FILE) {
return new PageBlobInputStream(cloudBlob.openInputStream(), size);
return new AppendBlobInputStream(cloudBlob.openInputStream(), size);
} else {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class AzureVfsPlugin implements IVfs {
@Override
public String[] getUrlSchemes() {
return new String[] {"azure"};
return new String[] {"azure", "azfs"};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,7 @@ public class AzureConfigPlugin implements IConfigOptions, IGuiPluginCompositeWid
description = "The key to use for the Azure VFS")
private String key;

@GuiWidgetElement(
id = WIDGET_ID_AZURE_BLOCK_INCREMENT,
parentId = ConfigPluginOptionsTab.GUI_WIDGETS_PARENT_ID,
type = GuiElementType.TEXT,
variables = true,
label = "i18n::AzureVFS.FileBlockSize.Label",
toolTip = "i18n::AzureVFS.FileBlockSize.Description")
@CommandLine.Option(
names = {"-azi", "--azure-block-increment"},
description = "The block increment size for new files on Azure, multiples of 512 only.")
private String blockIncrement;

/**
* Gets instance
*
Expand All @@ -96,7 +86,6 @@ public static AzureConfigPlugin getInstance() {
AzureConfig config = AzureConfigSingleton.getConfig();
instance.account = config.getAccount();
instance.key = config.getKey();
instance.blockIncrement = config.getBlockIncrement();

return instance;
}
Expand All @@ -121,12 +110,6 @@ public boolean handleOption(
changed = true;
}

if (blockIncrement != null) {
config.setBlockIncrement(blockIncrement);
log.logBasic("The Azure file block increment is set to '" + blockIncrement + "'");
changed = true;
}

// Save to file if anything changed
//
if (changed) {
Expand Down Expand Up @@ -163,10 +146,6 @@ public void persistContents(GuiCompositeWidgets compositeWidgets) {
key = ((TextVar) control).getText();
AzureConfigSingleton.getConfig().setKey(key);
break;
case WIDGET_ID_AZURE_BLOCK_INCREMENT:
blockIncrement = ((TextVar) control).getText();
AzureConfigSingleton.getConfig().setKey(blockIncrement);
break;
}
}
// Save the project...
Expand Down Expand Up @@ -210,19 +189,4 @@ public void setKey(String key) {
this.key = key;
}

/**
* Gets blockIncrement
*
* @return value of blockIncrement
*/
public String getBlockIncrement() {
return blockIncrement;
}

/**
* @param blockIncrement The blockIncrement to set
*/
public void setBlockIncrement(String blockIncrement) {
this.blockIncrement = blockIncrement;
}
}
Loading
Loading