Skip to content

Commit

Permalink
Reworking s3 connector with various improvements (#13960)
Browse files Browse the repository at this point in the history
* Reworking s3 connector with
1. Adding retries
2. Adding max fetch size
3. Using s3Utils for most of the api's
4. Fixing bugs in DurableStorageCleaner
5. Moving to Iterator for listDir call
  • Loading branch information
cryptoe authored Mar 28, 2023
1 parent e8e8082 commit c2fe6a4
Show file tree
Hide file tree
Showing 12 changed files with 396 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import org.apache.druid.frame.util.DurableStorageUtils;
Expand All @@ -37,6 +36,7 @@

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -93,19 +93,41 @@ public void schedule(ScheduledExecutorService exec)
return;
}
TaskRunner taskRunner = taskRunnerOptional.get();
Set<String> allDirectories = new HashSet<>(storageConnector.listDir("/"));
Iterator<String> allFiles = storageConnector.listDir("");
Set<String> runningTaskIds = taskRunner.getRunningTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.map(DurableStorageUtils::getControllerDirectory)
.collect(Collectors.toSet());
Set<String> unknownDirectories = Sets.difference(allDirectories, runningTaskIds);
LOG.info(
"Following directories do not have a corresponding MSQ task associated with it:\n%s\nThese will get cleaned up.",
unknownDirectories
);
for (String unknownDirectory : unknownDirectories) {
storageConnector.deleteRecursively(unknownDirectory);

Set<String> filesToRemove = new HashSet<>();
while (allFiles.hasNext()) {
String currentFile = allFiles.next();
String taskIdFromPathOrEmpty = DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
if (taskIdFromPathOrEmpty != null && !taskIdFromPathOrEmpty.isEmpty()) {
if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {
// do nothing
} else {
filesToRemove.add(currentFile);
}
}
}
if (filesToRemove.isEmpty()) {
LOG.info("DurableStorageCleaner did not find any left over directories to delete");
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Number of files [%d] that do not have a corresponding MSQ task associated with it. These are:\n[%s]\nT",
filesToRemove.size(),
filesToRemove
);
} else {
LOG.info(
"Number of files [%d] that do not have a corresponding MSQ task associated with it.",
filesToRemove.size()
);
}
storageConnector.deleteFiles(filesToRemove);
}
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
Expand All @@ -34,7 +34,9 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;


public class DurableStorageCleanerTest
Expand All @@ -44,39 +46,44 @@ public class DurableStorageCleanerTest
private static final TaskRunner TASK_RUNNER = EasyMock.mock(TaskRunner.class);
private static final StorageConnector STORAGE_CONNECTOR = EasyMock.mock(StorageConnector.class);
private static final TaskRunnerWorkItem TASK_RUNNER_WORK_ITEM = EasyMock.mock(TaskRunnerWorkItem.class);
private static final TaskLocation TASK_LOCATION = new TaskLocation("dummy", 1000, -1);
private static final String TASK_ID = "dummyTaskId";
private static final String STRAY_DIR = "strayDirectory";

@Test
public void testSchedule() throws IOException, InterruptedException
{
EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
DurableStorageCleanerConfig durableStorageCleanerConfig = new DurableStorageCleanerConfig();
durableStorageCleanerConfig.delaySeconds = 1L;
durableStorageCleanerConfig.enabled = true;
DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(
durableStorageCleanerConfig,
STORAGE_CONNECTOR,
() -> TASK_MASTER
);
EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
.andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID), "strayDirectory"))
.anyTimes();
EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID)
.anyTimes();
EasyMock.expect((Collection<TaskRunnerWorkItem>) TASK_RUNNER.getRunningTasks())
.andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
.anyTimes();
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
Capture<String> capturedArguments = EasyMock.newCapture();
STORAGE_CONNECTOR.deleteRecursively(EasyMock.capture(capturedArguments));
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);


durableStorageCleaner.schedule(Executors.newSingleThreadScheduledExecutor());
Thread.sleep(8000L);
Assert.assertEquals(STRAY_DIR, capturedArguments.getValue());
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
try {
EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
DurableStorageCleanerConfig durableStorageCleanerConfig = new DurableStorageCleanerConfig();
durableStorageCleanerConfig.delaySeconds = 1L;
durableStorageCleanerConfig.enabled = true;
DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(
durableStorageCleanerConfig,
STORAGE_CONNECTOR,
() -> TASK_MASTER
);
EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
.andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID), STRAY_DIR)
.stream()
.iterator())
.anyTimes();
EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID)
.anyTimes();
EasyMock.expect((Collection<TaskRunnerWorkItem>) TASK_RUNNER.getRunningTasks())
.andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
.anyTimes();
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
Capture<Set<String>> capturedArguments = EasyMock.newCapture();
STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
EasyMock.expectLastCall().once();
EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
durableStorageCleaner.schedule(executor);
Thread.sleep(8000L);
Assert.assertEquals(Sets.newHashSet(STRAY_DIR), capturedArguments.getValue());
}
finally {
executor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void killAll() throws IOException
try {
S3Utils.deleteObjectsInPath(
s3ClientSupplier.get(),
inputDataConfig,
inputDataConfig.getMaxListingLength(),
segmentPusherConfig.getBucket(),
segmentPusherConfig.getBaseKey(),
Predicates.alwaysTrue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void killOlderThan(long timestamp) throws IOException
try {
S3Utils.deleteObjectsInPath(
service,
inputDataConfig,
inputDataConfig.getMaxListingLength(),
config.getS3Bucket(),
config.getS3Prefix(),
(object) -> object.getLastModified().getTime() < timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public boolean apply(Throwable e)
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
* found, etc) are not retried.
*/
static <T> T retryS3Operation(Task<T> f) throws Exception
public static <T> T retryS3Operation(Task<T> f) throws Exception
{
return RetryUtils.retry(f, S3RETRY, RetryUtils.DEFAULT_MAX_TRIES);
}
Expand All @@ -102,7 +102,7 @@ static <T> T retryS3Operation(Task<T> f) throws Exception
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
* found, etc) are not retried. Also provide a way to set maxRetries that can be useful, i.e. for testing.
*/
static <T> T retryS3Operation(Task<T> f, int maxRetries) throws Exception
public static <T> T retryS3Operation(Task<T> f, int maxRetries) throws Exception
{
return RetryUtils.retry(f, S3RETRY, maxRetries);
}
Expand Down Expand Up @@ -243,59 +243,73 @@ public static S3ObjectSummary getSingleObjectSummary(ServerSideEncryptingAmazonS
* Delete the files from S3 in a specified bucket, matching a specified prefix and filter
*
* @param s3Client s3 client
* @param config specifies the configuration to use when finding matching files in S3 to delete
* @param maxListingLength maximum number of keys to fetch and delete at a time
* @param bucket s3 bucket
* @param prefix the file prefix
* @param filter function which returns true if the prefix file found should be deleted and false otherwise.
*
* @throws Exception
* @throws Exception in case of errors
*/

public static void deleteObjectsInPath(
ServerSideEncryptingAmazonS3 s3Client,
S3InputDataConfig config,
int maxListingLength,
String bucket,
String prefix,
Predicate<S3ObjectSummary> filter
)
throws Exception
{
final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(config.getMaxListingLength());
deleteObjectsInPath(s3Client, maxListingLength, bucket, prefix, filter, RetryUtils.DEFAULT_MAX_TRIES);
}

public static void deleteObjectsInPath(
ServerSideEncryptingAmazonS3 s3Client,
int maxListingLength,
String bucket,
String prefix,
Predicate<S3ObjectSummary> filter,
int maxRetries
)
throws Exception
{
final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(maxListingLength);
final ObjectSummaryIterator iterator = new ObjectSummaryIterator(
s3Client,
ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("s3")),
config.getMaxListingLength()
maxListingLength
);

while (iterator.hasNext()) {
final S3ObjectSummary nextObject = iterator.next();
if (filter.apply(nextObject)) {
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(nextObject.getKey()));
if (keysToDelete.size() == config.getMaxListingLength()) {
deleteBucketKeys(s3Client, bucket, keysToDelete);
log.info("Deleted %d files", keysToDelete.size());
if (keysToDelete.size() == maxListingLength) {
deleteBucketKeys(s3Client, bucket, keysToDelete, maxRetries);
keysToDelete.clear();
}
}
}

if (keysToDelete.size() > 0) {
deleteBucketKeys(s3Client, bucket, keysToDelete);
log.info("Deleted %d files", keysToDelete.size());
deleteBucketKeys(s3Client, bucket, keysToDelete, maxRetries);
}
}

private static void deleteBucketKeys(
public static void deleteBucketKeys(
ServerSideEncryptingAmazonS3 s3Client,
String bucket,
List<DeleteObjectsRequest.KeyVersion> keysToDelete
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
int retries
)
throws Exception
{
DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
S3Utils.retryS3Operation(() -> {
s3Client.deleteObjects(deleteRequest);
return null;
});
}, retries);
log.info("Deleted %d files", keysToDelete.size());
}

/**
Expand Down
Loading

0 comments on commit c2fe6a4

Please sign in to comment.