diff --git a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageTestServer.java b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageTestServer.java index b1450b79fabe8..8183ee5043ec8 100644 --- a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageTestServer.java +++ b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageTestServer.java @@ -164,7 +164,12 @@ private static PathTrie defaultHandlers(final String endpoint, f if (destContainer == null) { return newContainerNotFoundError(requestId); } - destContainer.objects.put(destBlobName, body); + + byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, body); + if (existingBytes != null) { + return newBlobAlreadyExistsError(requestId); + } + return new Response(RestStatus.CREATED, emptyMap(), "text/plain", EMPTY_BYTE); }) ); @@ -363,6 +368,10 @@ private static Response newBlobNotFoundError(final long requestId) { return newError(requestId, RestStatus.NOT_FOUND, "BlobNotFound", "The specified blob does not exist"); } + private static Response newBlobAlreadyExistsError(final long requestId) { + return newError(requestId, RestStatus.CONFLICT, "BlobAlreadyExists", "The specified blob already exists"); + } + private static Response newInternalError(final long requestId) { return newError(requestId, RestStatus.INTERNAL_SERVER_ERROR, "InternalError", "The server encountered an internal error"); } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index dd85bfc218159..f5a0ed253c8e9 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -88,10 +88,8 @@ public InputStream readBlob(String blobName) throws IOException { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { - if (blobExists(blobName)) { - throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); - } logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize); + try { blobStore.writeBlob(buildKey(blobName), inputStream, blobSize); } catch (URISyntaxException|StorageException e) { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index fede20bfb764e..fc6d9d7e482a8 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; +import java.nio.file.FileAlreadyExistsException; import java.util.Locale; import java.util.Map; @@ -114,7 +115,8 @@ public Map listBlobsByPrefix(String keyPath, String prefix return this.client.listBlobsByPrefix(this.clientName, this.locMode, container, keyPath, prefix); } - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException, + FileAlreadyExistsException { this.client.writeBlob(this.clientName, this.locMode, container, blobName, inputStream, blobSize); } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index 8cd3835e3e689..1c2ca71fe7887 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; +import java.nio.file.FileAlreadyExistsException; import java.util.Map; /** @@ -58,7 +59,7 @@ Map listBlobsByPrefix(String account, LocationMode mode, St throws URISyntaxException, StorageException; void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize) throws - URISyntaxException, StorageException; + URISyntaxException, StorageException, FileAlreadyExistsException; static InputStream giveSocketPermissionsToStream(InputStream stream) { return new InputStream() { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java index 339aec43c251a..9f059eaca11c6 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java @@ -19,11 +19,13 @@ package org.elasticsearch.repositories.azure; +import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.LocationMode; import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.RetryExponentialRetry; import com.microsoft.azure.storage.RetryPolicy; +import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobListingDetails; @@ -44,8 +46,10 @@ import org.elasticsearch.repositories.RepositoryException; import java.io.InputStream; +import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; +import java.nio.file.FileAlreadyExistsException; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -289,12 +293,21 @@ enumBlobListingDetails, null, generateOperationContext(account))) { @Override public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, FileAlreadyExistsException { logger.trace("writeBlob({}, stream, {})", blobName, blobSize); CloudBlobClient client = this.getSelectedClient(account, mode); CloudBlobContainer blobContainer = client.getContainerReference(container); CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName); - SocketAccess.doPrivilegedVoidException(() -> blob.upload(inputStream, blobSize, null, null, generateOperationContext(account))); + try { + SocketAccess.doPrivilegedVoidException(() -> blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), + null, generateOperationContext(account))); + } catch (StorageException se) { + if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT && + StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) { + throw new FileAlreadyExistsException(blobName, null, se.getMessage()); + } + throw se; + } logger.trace("writeBlob({}, stream, {}) - done", blobName, blobSize); } } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java index 0e0f73446ba4b..4b111e549476c 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java @@ -34,6 +34,7 @@ import java.io.InputStream; import java.net.SocketPermission; import java.net.URISyntaxException; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.security.AccessController; import java.util.Locale; @@ -108,7 +109,10 @@ public Map listBlobsByPrefix(String account, LocationMode @Override public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, FileAlreadyExistsException { + if (blobs.containsKey(blobName)) { + throw new FileAlreadyExistsException(blobName); + } try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { blobs.put(blobName, outputStream); Streams.copy(inputStream, outputStream);