Skip to content

Commit

Permalink
Use separate threadpool in remote deletion for remote-backed indexes (#…
Browse files Browse the repository at this point in the history
…6109)

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored Feb 9, 2023
1 parent 140829a commit 82fdb2f
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -484,7 +483,7 @@ private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Reposito
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager(
blobStoreRepository,
indexShard.getThreadPool().executor(ThreadPool.Names.TRANSLOG_TRANSFER),
indexShard.getThreadPool(),
shardId,
fileTransferTracker
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
Expand All @@ -30,7 +29,7 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory {

private final Repository repository;

private final ExecutorService executorService;
private final ThreadPool threadPool;

public RemoteBlobStoreInternalTranslogFactory(
Supplier<RepositoriesService> repositoriesServiceSupplier,
Expand All @@ -44,7 +43,7 @@ public RemoteBlobStoreInternalTranslogFactory(
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", ex);
}
this.repository = repository;
this.executorService = threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER);
this.threadPool = threadPool;
}

@Override
Expand All @@ -68,7 +67,7 @@ public Translog newTranslog(
primaryTermSupplier,
persistedSequenceNumberConsumer,
blobStoreRepository,
executorService,
threadPool,
primaryModeSupplier
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.index.translog;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.lease.Releasable;
Expand All @@ -25,19 +23,17 @@
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

/**
* A Translog implementation which syncs local FS with a remote store
Expand Down Expand Up @@ -71,14 +67,14 @@ public RemoteFsTranslog(
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer,
BlobStoreRepository blobStoreRepository,
ExecutorService executorService,
ThreadPool threadPool,
BooleanSupplier primaryModeSupplier
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
this.blobStoreRepository = blobStoreRepository;
this.primaryModeSupplier = primaryModeSupplier;
fileTransferTracker = new FileTransferTracker(shardId);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, executorService, shardId, fileTransferTracker);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker);

try {
download(translogTransferManager, location);
Expand Down Expand Up @@ -141,12 +137,12 @@ public static void download(TranslogTransferManager translogTransferManager, Pat

public static TranslogTransferManager buildTranslogTransferManager(
BlobStoreRepository blobStoreRepository,
ExecutorService executorService,
ThreadPool threadPool,
ShardId shardId,
FileTransferTracker fileTransferTracker
) {
return new TranslogTransferManager(
new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService),
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
fileTransferTracker
);
Expand Down Expand Up @@ -340,14 +336,6 @@ protected void setMinSeqNoToKeep(long seqNo) {
this.minSeqNoToKeep = seqNo;
}

private void deleteRemoteGeneration(Set<Long> generations) {
try {
translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generations);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception occurred while deleting generation {}", generations), e);
}
}

@Override
public void trimUnreferencedReaders() throws IOException {
// clean up local translog files and updates readers
Expand All @@ -363,49 +351,27 @@ public void trimUnreferencedReaders() throws IOException {
generationsToDelete.add(generation);
}
if (generationsToDelete.isEmpty() == false) {
deleteRemoteGeneration(generationsToDelete);
deleteOlderPrimaryTranslogFilesFromRemoteStore();
deleteRemoteGenerationAsync(generationsToDelete);
deleteOlderPrimaryTranslogFiles();
}
}

private void deleteRemoteGenerationAsync(Set<Long> generations) {
translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generations);
}

/**
* This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
* implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
*/
private void deleteOlderPrimaryTranslogFilesFromRemoteStore() {
private void deleteOlderPrimaryTranslogFiles() {
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
// of older primary term.
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
logger.info("Cleaning up translog uploaded by previous primaries");
long minPrimaryTermInMetadata = current.getPrimaryTerm();
Set<Long> primaryTermsInRemote;
try {
primaryTermsInRemote = translogTransferManager.listPrimaryTerms();
} catch (IOException e) {
logger.error("Exception occurred while getting primary terms from remote store", e);
// If there are exceptions encountered, then we try to delete all older primary terms lesser than the
// minimum referenced primary term in remote translog metadata.
primaryTermsInRemote = LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet());
}
// Delete all primary terms that are no more referenced by the metadata file and exists in the
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
.filter(term -> term < minPrimaryTermInMetadata)
.collect(Collectors.toSet());
primaryTermsToDelete.forEach(term -> translogTransferManager.deleteTranslogAsync(term, new ActionListener<>() {
@Override
public void onResponse(Void response) {
// NO-OP
}

@Override
public void onFailure(Exception e) {
logger.error(
() -> new ParameterizedMessage("Exception occurred while deleting older translog files for primary_term={}", term),
e
);
}
}));
assert readers.isEmpty() == false : "Expected non-empty readers";
long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get();
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;

/**
* Service that handles remote transfer of translog and checkpoint files
Expand All @@ -31,24 +31,25 @@
public class BlobStoreTransferService implements TransferService {

private final BlobStore blobStore;
private final ExecutorService executorService;
private final ThreadPool threadPool;

private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class);

public BlobStoreTransferService(BlobStore blobStore, ExecutorService executorService) {
public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) {
this.blobStore = blobStore;
this.executorService = executorService;
this.threadPool = threadPool;
}

@Override
public void uploadBlobAsync(
String threadpoolName,
final TransferFileSnapshot fileSnapshot,
Iterable<String> remoteTransferPath,
ActionListener<TransferFileSnapshot> listener
) {
assert remoteTransferPath instanceof BlobPath;
BlobPath blobPath = (BlobPath) remoteTransferPath;
executorService.execute(ActionRunnable.wrap(listener, l -> {
threadPool.executor(threadpoolName).execute(ActionRunnable.wrap(listener, l -> {
try (InputStream inputStream = fileSnapshot.inputStream()) {
blobStore.blobContainer(blobPath)
.writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true);
Expand All @@ -66,8 +67,6 @@ public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String>
BlobPath blobPath = (BlobPath) remoteTransferPath;
try (InputStream inputStream = fileSnapshot.inputStream()) {
blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true);
} catch (Exception ex) {
throw ex;
}
}

Expand All @@ -82,10 +81,10 @@ public void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IO
}

@Override
public void deleteBlobsAsync(Iterable<String> path, List<String> fileNames, ActionListener<Void> listener) {
executorService.execute(() -> {
public void deleteBlobsAsync(String threadpoolName, Iterable<String> path, List<String> fileNames, ActionListener<Void> listener) {
threadPool.executor(threadpoolName).execute(() -> {
try {
blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames);
deleteBlobs(path, fileNames);
listener.onResponse(null);
} catch (IOException e) {
listener.onFailure(e);
Expand All @@ -94,10 +93,15 @@ public void deleteBlobsAsync(Iterable<String> path, List<String> fileNames, Acti
}

@Override
public void deleteAsync(Iterable<String> path, ActionListener<Void> listener) {
executorService.execute(() -> {
public void delete(Iterable<String> path) throws IOException {
blobStore.blobContainer((BlobPath) path).delete();
}

@Override
public void deleteAsync(String threadpoolName, Iterable<String> path, ActionListener<Void> listener) {
threadPool.executor(threadpoolName).execute(() -> {
try {
blobStore.blobContainer((BlobPath) path).delete();
delete(path);
listener.onResponse(null);
} catch (IOException e) {
listener.onFailure(e);
Expand All @@ -114,4 +118,15 @@ public Set<String> listAll(Iterable<String> path) throws IOException {
public Set<String> listFolders(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).children().keySet();
}

@Override
public void listFoldersAsync(String threadpoolName, Iterable<String> path, ActionListener<Set<String>> listener) {
threadPool.executor(threadpoolName).execute(() -> {
try {
listener.onResponse(listFolders(path));
} catch (IOException e) {
listener.onFailure(e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ public interface TransferService {

/**
* Uploads the {@link TransferFileSnapshot} async, once the upload is complete the callback is invoked
* @param threadpoolName threadpool type which will be used to upload blobs asynchronously
* @param fileSnapshot the file snapshot to upload
* @param remotePath the remote path where upload should be made
* @param listener the callback to be invoked once upload completes successfully/fails
*/
void uploadBlobAsync(
String threadpoolName,
final TransferFileSnapshot fileSnapshot,
Iterable<String> remotePath,
ActionListener<TransferFileSnapshot> listener
Expand All @@ -45,9 +47,30 @@ void uploadBlobAsync(

void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException;

void deleteBlobsAsync(Iterable<String> path, List<String> fileNames, ActionListener<Void> listener);
/**
* Deletes the list of files in async and uses the listener to propagate success or failure.
* @param threadpoolName threadpool type which will be used to perform the deletion asynchronously.
* @param path the path where the deletion would occur on remote store.
* @param fileNames list of all files that are to be deleted within the path.
* @param listener the callback to be invoked once delete completes successfully/fails.
*/
void deleteBlobsAsync(String threadpoolName, Iterable<String> path, List<String> fileNames, ActionListener<Void> listener);

/**
* Deletes all contents with-in a path.
* @param path the path in remote which needs to be deleted completely.
* @throws IOException the exception while transferring the data.
*/
void delete(Iterable<String> path) throws IOException;

void deleteAsync(Iterable<String> path, ActionListener<Void> listener);
/**
* Deletes all contents with-in a path and invokes the listener on success or failure.
*
* @param threadpoolName threadpool type which will be used to perform the deletion asynchronously.
* @param path path in remote store.
* @param listener the callback to be invoked once delete completes successfully/fails.
*/
void deleteAsync(String threadpoolName, Iterable<String> path, ActionListener<Void> listener);

/**
* Lists the files
Expand All @@ -65,6 +88,15 @@ void uploadBlobAsync(
*/
Set<String> listFolders(Iterable<String> path) throws IOException;

/**
* Invokes the listener with the list of folders inside the path. For exception, invokes the {@code listener.onFailure}.
*
* @param threadpoolName threadpool type which will be used to perform the deletion asynchronously.
* @param path path in remote store
* @param listener the callback to be invoked once list folders succeeds or fails.
*/
void listFoldersAsync(String threadpoolName, Iterable<String> path, ActionListener<Set<String>> listener);

/**
*
* @param path the remote path from where download should be made
Expand Down
Loading

0 comments on commit 82fdb2f

Please sign in to comment.