Skip to content

Commit

Permalink
Radomize BlobContainer Path in Retries Tests (#76303) (#76384)
Browse files Browse the repository at this point in the history
Follow up to #76273 adding some randomization across all retries tests.
  • Loading branch information
original-brownbear authored Aug 11, 2021
1 parent 170ff7a commit 793cad0
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.SocketTimeoutException;
Expand All @@ -43,14 +42,14 @@ public static void setUpHttpClient() {
}

@AfterClass
public static void tearDownHttpClient() throws IOException {
public static void tearDownHttpClient() {
factory.close();
factory = null;
}

@Override
protected String downloadStorageEndpoint(String blob) {
return "/" + blob;
protected String downloadStorageEndpoint(BlobContainer container, String blob) {
return "/" + container.path().buildAsString() + blob;
}

@Override
Expand All @@ -75,8 +74,7 @@ protected Matcher<Object> readTimeoutExceptionMatcher() {
protected BlobContainer createBlobContainer(Integer maxRetries,
TimeValue readTimeout,
Boolean disableChunkedEncoding,
ByteSizeValue bufferSize,
BlobPath path) {
ByteSizeValue bufferSize) {
Settings.Builder settingsBuilder = Settings.builder();

if (maxRetries != null) {
Expand All @@ -92,7 +90,7 @@ protected BlobContainer createBlobContainer(Integer maxRetries,
final URLHttpClientSettings httpClientSettings = URLHttpClientSettings.fromSettings(settings);
URLBlobStore urlBlobStore =
new URLBlobStore(settings, new URL(getEndpointForServer()), factory.create(httpClientSettings), httpClientSettings);
return urlBlobStore.blobContainer(path == null ? BlobPath.EMPTY : path);
return urlBlobStore.blobContainer(BlobPath.EMPTY);
} catch (MalformedURLException e) {
throw new RuntimeException("Unable to create URLBlobStore", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public static void skipJava8() {
}

@Override
protected String downloadStorageEndpoint(String blob) {
return "/download/storage/v1/b/bucket/o/" + blob;
protected String downloadStorageEndpoint(BlobContainer container, String blob) {
return "/download/storage/v1/b/bucket/o/" + container.path().buildAsString() + blob;
}

@Override
Expand All @@ -113,8 +113,7 @@ protected Class<? extends Exception> unresponsiveExceptionType() {
protected BlobContainer createBlobContainer(final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable ByteSizeValue bufferSize,
final @Nullable BlobPath path) {
final @Nullable ByteSizeValue bufferSize) {
final Settings.Builder clientSettings = Settings.builder();
final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
clientSettings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace(client).getKey(), httpServerUrl());
Expand Down Expand Up @@ -157,16 +156,18 @@ StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clien
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, "repo", service,
randomIntBetween(1, 8) * 1024);

return new GoogleCloudStorageBlobContainer(BlobPath.EMPTY, blobStore);
return new GoogleCloudStorageBlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), blobStore);
}

public void testReadLargeBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final AtomicInteger countDown = new AtomicInteger(maxRetries);

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);

// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
final byte[] bytes = randomBytes(1 << 22);
httpServer.createContext("/download/storage/v1/b/bucket/o/large_blob_retries", exchange -> {
httpServer.createContext(downloadStorageEndpoint(blobContainer, "large_blob_retries"), exchange -> {
Streams.readFully(exchange.getRequestBody());
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
final Tuple<Long, Long> range = getRange(exchange);
Expand All @@ -182,7 +183,6 @@ public void testReadLargeBlobWithRetries() throws Exception {
exchange.close();
});

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
try (InputStream inputStream = blobContainer.readBlob("large_blob_retries")) {
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
}
Expand All @@ -192,13 +192,14 @@ public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final CountDown countDown = new CountDown(maxRetries);

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
final byte[] bytes = randomBlobContent();
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
if (countDown.countDown()) {
Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(exchange.getRequestBody());
assertThat(content.isPresent(), is(true));
assertThat(content.get().v1(), equalTo("write_blob_max_retries"));
assertThat(content.get().v1(), equalTo(blobContainer.path().buildAsString() + "write_blob_max_retries"));
if (Objects.deepEquals(bytes, BytesReference.toBytes(content.get().v2()))) {
byte[] response = ("{\"bucket\":\"bucket\",\"name\":\"" + content.get().v1() + "\"}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
Expand All @@ -219,7 +220,6 @@ public void testWriteBlobWithRetries() throws Exception {
}
}));

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false);
}
Expand All @@ -229,7 +229,7 @@ public void testWriteBlobWithRetries() throws Exception {
public void testWriteBlobWithReadTimeouts() {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null);

// HTTP server does not send a response
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
Expand Down Expand Up @@ -276,6 +276,9 @@ public void testWriteLargeBlob() throws IOException {
final AtomicReference<String> sessionUploadId = new AtomicReference<>(UUIDs.randomBase64UUID());
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());

final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null);

httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
final BytesReference requestBody = Streams.readFully(exchange.getRequestBody());

Expand All @@ -284,7 +287,7 @@ public void testWriteLargeBlob() throws IOException {
assertThat(params.get("uploadType"), equalTo("resumable"));

if ("POST".equals(exchange.getRequestMethod())) {
assertThat(params.get("name"), equalTo("write_large_blob"));
assertThat(params.get("name"), equalTo(blobContainer.path().buildAsString() + "write_large_blob"));
if (countInits.decrementAndGet() <= 0) {
byte[] response = requestBody.utf8ToString().getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
Expand Down Expand Up @@ -369,9 +372,6 @@ public void testWriteLargeBlob() throws IOException {
}
}));

final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;

final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null);
if (randomBoolean()) {
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) {
blobContainer.writeBlob("write_large_blob", stream, data.length, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public void tearDown() throws Exception {
}

@Override
protected String downloadStorageEndpoint(String blob) {
return "/bucket/" + blob;
protected String downloadStorageEndpoint(BlobContainer container, String blob) {
return "/bucket/" + container.path().buildAsString() + blob;
}

@Override
Expand All @@ -94,8 +94,7 @@ protected Class<? extends Exception> unresponsiveExceptionType() {
protected BlobContainer createBlobContainer(final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable ByteSizeValue bufferSize,
final @Nullable BlobPath path) {
final @Nullable ByteSizeValue bufferSize) {
final Settings.Builder clientSettings = Settings.builder();
final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);

Expand Down Expand Up @@ -124,7 +123,7 @@ protected BlobContainer createBlobContainer(final @Nullable Integer maxRetries,
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE,
Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName).build());

return new S3BlobContainer(path == null ? BlobPath.EMPTY : path, new S3BlobStore(service, "bucket",
return new S3BlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), new S3BlobStore(service, "bucket",
S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getDefault(Settings.EMPTY),
bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize,
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
Expand All @@ -146,8 +145,10 @@ public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null);

final byte[] bytes = randomBlobContent();
httpServer.createContext("/bucket/write_blob_max_retries", exchange -> {
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {
if ("PUT".equals(exchange.getRequestMethod()) && exchange.getRequestURI().getQuery() == null) {
if (countDown.countDown()) {
final BytesReference body = Streams.readFully(exchange.getRequestBody());
Expand All @@ -172,8 +173,6 @@ public void testWriteBlobWithRetries() throws Exception {
exchange.close();
}
});

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null);
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false);
}
Expand All @@ -183,10 +182,10 @@ public void testWriteBlobWithRetries() throws Exception {
public void testWriteBlobWithReadTimeouts() {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null);

// HTTP server does not send a response
httpServer.createContext("/bucket/write_blob_timeout", exchange -> {
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> {
if (randomBoolean()) {
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]);
Expand All @@ -202,7 +201,8 @@ public void testWriteBlobWithReadTimeouts() {
}
});
assertThat(exception.getMessage().toLowerCase(Locale.ROOT),
containsString("unable to upload object [write_blob_timeout] using a single upload"));
containsString(
"unable to upload object [" + blobContainer.path().buildAsString() + "write_blob_timeout] using a single upload"));

assertThat(exception.getCause(), instanceOf(SdkClientException.class));
assertThat(exception.getCause().getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
Expand All @@ -215,7 +215,7 @@ public void testWriteLargeBlob() throws Exception {
final boolean useTimeout = rarely();
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);

final int parts = randomIntBetween(1, 5);
final long lastPartSize = randomLongBetween(10, 512);
Expand All @@ -226,7 +226,7 @@ public void testWriteLargeBlob() throws Exception {
final AtomicInteger countDownUploads = new AtomicInteger(nbErrors * (parts + 1));
final CountDown countDownComplete = new CountDown(nbErrors);

httpServer.createContext("/bucket/write_large_blob", exchange -> {
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_large_blob"), exchange -> {
final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));

if ("POST".equals(exchange.getRequestMethod())
Expand Down Expand Up @@ -303,8 +303,7 @@ public void testWriteLargeBlobStreaming() throws Exception {
final boolean useTimeout = rarely();
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);
final BlobContainer blobContainer =
createBlobContainer(null, readTimeout, true, bufferSize, randomBoolean() ? null : BlobPath.EMPTY.add("foo"));
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);

final int parts = randomIntBetween(1, 5);
final long lastPartSize = randomLongBetween(10, 512);
Expand All @@ -316,7 +315,7 @@ public void testWriteLargeBlobStreaming() throws Exception {
final AtomicLong bytesReceived = new AtomicLong(0L);
final CountDown countDownComplete = new CountDown(nbErrors);

httpServer.createContext("/bucket/" + blobContainer.path().buildAsString() + "write_large_blob_streaming", exchange -> {
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_large_blob_streaming"), exchange -> {
final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));

if ("POST".equals(exchange.getRequestMethod())
Expand Down
Loading

0 comments on commit 793cad0

Please sign in to comment.