diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 47d10513f4aaa..30147ebbb0b5a 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -69,7 +69,6 @@ import org.opensearch.repositories.IndexId; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Arrays; @@ -484,7 +483,7 @@ private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Reposito FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager( blobStoreRepository, - indexShard.getThreadPool().executor(ThreadPool.Names.TRANSLOG_TRANSFER), + indexShard.getThreadPool(), shardId, fileTransferTracker ); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index 272e560991386..e439a56581c14 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -15,7 +15,6 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.concurrent.ExecutorService; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -30,7 +29,7 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory { private final Repository repository; - private final ExecutorService executorService; + private final ThreadPool threadPool; public RemoteBlobStoreInternalTranslogFactory( Supplier repositoriesServiceSupplier, @@ -44,7 +43,7 @@ public RemoteBlobStoreInternalTranslogFactory( throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", ex); } this.repository = repository; - this.executorService = threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER); + this.threadPool = threadPool; } @Override @@ -68,7 +67,7 @@ public Translog newTranslog( primaryTermSupplier, persistedSequenceNumberConsumer, blobStoreRepository, - executorService, + threadPool, primaryModeSupplier ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index a3a6eba39e126..ac270f4e99f98 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -8,8 +8,6 @@ package org.opensearch.index.translog; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.action.ActionListener; import org.opensearch.common.SetOnce; import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; @@ -25,6 +23,7 @@ import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Files; @@ -32,12 +31,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; -import java.util.stream.Collectors; -import java.util.stream.LongStream; /** * A Translog implementation which syncs local FS with a remote store @@ -71,14 +67,14 @@ public RemoteFsTranslog( LongSupplier primaryTermSupplier, LongConsumer persistedSequenceNumberConsumer, BlobStoreRepository blobStoreRepository, - ExecutorService executorService, + ThreadPool threadPool, BooleanSupplier primaryModeSupplier ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); this.blobStoreRepository = blobStoreRepository; this.primaryModeSupplier = primaryModeSupplier; fileTransferTracker = new FileTransferTracker(shardId); - this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, executorService, shardId, fileTransferTracker); + this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker); try { download(translogTransferManager, location); @@ -141,12 +137,12 @@ public static void download(TranslogTransferManager translogTransferManager, Pat public static TranslogTransferManager buildTranslogTransferManager( BlobStoreRepository blobStoreRepository, - ExecutorService executorService, + ThreadPool threadPool, ShardId shardId, FileTransferTracker fileTransferTracker ) { return new TranslogTransferManager( - new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService), + new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool), blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())), fileTransferTracker ); @@ -340,14 +336,6 @@ protected void setMinSeqNoToKeep(long seqNo) { this.minSeqNoToKeep = seqNo; } - private void deleteRemoteGeneration(Set generations) { - try { - translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generations); - } catch (IOException e) { - logger.error(() -> new ParameterizedMessage("Exception occurred while deleting generation {}", generations), e); - } - } - @Override public void trimUnreferencedReaders() throws IOException { // clean up local translog files and updates readers @@ -363,49 +351,27 @@ public void trimUnreferencedReaders() throws IOException { generationsToDelete.add(generation); } if (generationsToDelete.isEmpty() == false) { - deleteRemoteGeneration(generationsToDelete); - deleteOlderPrimaryTranslogFilesFromRemoteStore(); + deleteRemoteGenerationAsync(generationsToDelete); + deleteOlderPrimaryTranslogFiles(); } } + private void deleteRemoteGenerationAsync(Set generations) { + translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generations); + } + /** * This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures * implicitly that minimum primary term in latest translog metadata in remote store is the current primary term. */ - private void deleteOlderPrimaryTranslogFilesFromRemoteStore() { + private void deleteOlderPrimaryTranslogFiles() { // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part // of older primary term. if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { - logger.info("Cleaning up translog uploaded by previous primaries"); - long minPrimaryTermInMetadata = current.getPrimaryTerm(); - Set primaryTermsInRemote; - try { - primaryTermsInRemote = translogTransferManager.listPrimaryTerms(); - } catch (IOException e) { - logger.error("Exception occurred while getting primary terms from remote store", e); - // If there are exceptions encountered, then we try to delete all older primary terms lesser than the - // minimum referenced primary term in remote translog metadata. - primaryTermsInRemote = LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet()); - } - // Delete all primary terms that are no more referenced by the metadata file and exists in the - Set primaryTermsToDelete = primaryTermsInRemote.stream() - .filter(term -> term < minPrimaryTermInMetadata) - .collect(Collectors.toSet()); - primaryTermsToDelete.forEach(term -> translogTransferManager.deleteTranslogAsync(term, new ActionListener<>() { - @Override - public void onResponse(Void response) { - // NO-OP - } - - @Override - public void onFailure(Exception e) { - logger.error( - () -> new ParameterizedMessage("Exception occurred while deleting older translog files for primary_term={}", term), - e - ); - } - })); + assert readers.isEmpty() == false : "Expected non-empty readers"; + long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get(); + translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm); } } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 08a98a491a035..20447b4de65b7 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -16,12 +16,12 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutorService; /** * Service that handles remote transfer of translog and checkpoint files @@ -31,24 +31,25 @@ public class BlobStoreTransferService implements TransferService { private final BlobStore blobStore; - private final ExecutorService executorService; + private final ThreadPool threadPool; private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class); - public BlobStoreTransferService(BlobStore blobStore, ExecutorService executorService) { + public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) { this.blobStore = blobStore; - this.executorService = executorService; + this.threadPool = threadPool; } @Override public void uploadBlobAsync( + String threadpoolName, final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath, ActionListener listener ) { assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; - executorService.execute(ActionRunnable.wrap(listener, l -> { + threadPool.executor(threadpoolName).execute(ActionRunnable.wrap(listener, l -> { try (InputStream inputStream = fileSnapshot.inputStream()) { blobStore.blobContainer(blobPath) .writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); @@ -66,8 +67,6 @@ public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable BlobPath blobPath = (BlobPath) remoteTransferPath; try (InputStream inputStream = fileSnapshot.inputStream()) { blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); - } catch (Exception ex) { - throw ex; } } @@ -82,10 +81,10 @@ public void deleteBlobs(Iterable path, List fileNames) throws IO } @Override - public void deleteBlobsAsync(Iterable path, List fileNames, ActionListener listener) { - executorService.execute(() -> { + public void deleteBlobsAsync(String threadpoolName, Iterable path, List fileNames, ActionListener listener) { + threadPool.executor(threadpoolName).execute(() -> { try { - blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); + deleteBlobs(path, fileNames); listener.onResponse(null); } catch (IOException e) { listener.onFailure(e); @@ -94,10 +93,15 @@ public void deleteBlobsAsync(Iterable path, List fileNames, Acti } @Override - public void deleteAsync(Iterable path, ActionListener listener) { - executorService.execute(() -> { + public void delete(Iterable path) throws IOException { + blobStore.blobContainer((BlobPath) path).delete(); + } + + @Override + public void deleteAsync(String threadpoolName, Iterable path, ActionListener listener) { + threadPool.executor(threadpoolName).execute(() -> { try { - blobStore.blobContainer((BlobPath) path).delete(); + delete(path); listener.onResponse(null); } catch (IOException e) { listener.onFailure(e); @@ -114,4 +118,15 @@ public Set listAll(Iterable path) throws IOException { public Set listFolders(Iterable path) throws IOException { return blobStore.blobContainer((BlobPath) path).children().keySet(); } + + @Override + public void listFoldersAsync(String threadpoolName, Iterable path, ActionListener> listener) { + threadPool.executor(threadpoolName).execute(() -> { + try { + listener.onResponse(listFolders(path)); + } catch (IOException e) { + listener.onFailure(e); + } + }); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 5ba15ad01d44e..c198f02f61df8 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -25,11 +25,13 @@ public interface TransferService { /** * Uploads the {@link TransferFileSnapshot} async, once the upload is complete the callback is invoked + * @param threadpoolName threadpool type which will be used to upload blobs asynchronously * @param fileSnapshot the file snapshot to upload * @param remotePath the remote path where upload should be made * @param listener the callback to be invoked once upload completes successfully/fails */ void uploadBlobAsync( + String threadpoolName, final TransferFileSnapshot fileSnapshot, Iterable remotePath, ActionListener listener @@ -45,9 +47,30 @@ void uploadBlobAsync( void deleteBlobs(Iterable path, List fileNames) throws IOException; - void deleteBlobsAsync(Iterable path, List fileNames, ActionListener listener); + /** + * Deletes the list of files in async and uses the listener to propagate success or failure. + * @param threadpoolName threadpool type which will be used to perform the deletion asynchronously. + * @param path the path where the deletion would occur on remote store. + * @param fileNames list of all files that are to be deleted within the path. + * @param listener the callback to be invoked once delete completes successfully/fails. + */ + void deleteBlobsAsync(String threadpoolName, Iterable path, List fileNames, ActionListener listener); + + /** + * Deletes all contents with-in a path. + * @param path the path in remote which needs to be deleted completely. + * @throws IOException the exception while transferring the data. + */ + void delete(Iterable path) throws IOException; - void deleteAsync(Iterable path, ActionListener listener); + /** + * Deletes all contents with-in a path and invokes the listener on success or failure. + * + * @param threadpoolName threadpool type which will be used to perform the deletion asynchronously. + * @param path path in remote store. + * @param listener the callback to be invoked once delete completes successfully/fails. + */ + void deleteAsync(String threadpoolName, Iterable path, ActionListener listener); /** * Lists the files @@ -65,6 +88,15 @@ void uploadBlobAsync( */ Set listFolders(Iterable path) throws IOException; + /** + * Invokes the listener with the list of folders inside the path. For exception, invokes the {@code listener.onFailure}. + * + * @param threadpoolName threadpool type which will be used to perform the deletion asynchronously. + * @param path path in remote store + * @param listener the callback to be invoked once list folders succeeds or fails. + */ + void listFoldersAsync(String threadpoolName, Iterable path, ActionListener> listener); + /** * * @param path the remote path from where download should be made diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 48331f6528606..e9083c38e533f 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -18,6 +18,7 @@ import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; @@ -97,6 +98,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans ); toUpload.forEach( fileSnapshot -> transferService.uploadBlobAsync( + ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), latchedActionListener @@ -197,12 +199,12 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) /** * This method handles deletion of multiple generations for a single primary term. - * TODO: Take care of metadata file cleanup. Github Issue #5677 + * TODO: Take care of metadata file cleanup. Github Issue #5677 * - * @param primaryTerm primary term - * @param generations set of generation + * @param primaryTerm primary term where the generations will be deleted. + * @param generations set of generation to delete. */ - public void deleteTranslogAsync(long primaryTerm, Set generations) throws IOException { + public void deleteTranslogAsync(long primaryTerm, Set generations) { if (generations.isEmpty()) { return; } @@ -212,51 +214,86 @@ public void deleteTranslogAsync(long primaryTerm, Set generations) throws String translogFilename = Translog.getFilename(generation); files.addAll(List.of(ckpFileName, translogFilename)); }); - transferService.deleteBlobsAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files, new ActionListener<>() { + transferService.deleteBlobsAsync( + ThreadPool.Names.REMOTE_PURGE, + remoteBaseTransferPath.add(String.valueOf(primaryTerm)), + files, + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + fileTransferTracker.delete(files); + logger.trace("Deleted translogs for primaryTerm {} generations {}", primaryTerm, generations); + } + + @Override + public void onFailure(Exception e) { + logger.error( + () -> new ParameterizedMessage( + "Exception occurred while deleting translog for primary_term={} generations={}", + primaryTerm, + generations + ), + e + ); + } + } + ); + } + + /** + * Deletes all primary terms from remote store that are more than the given {@code minPrimaryTermToKeep}. The caller + * of the method must ensure that the value is lesser than equal to the minimum primary term referenced by the remote + * translog metadata. + * + * @param minPrimaryTermToKeep all primary terms below this primary term are deleted. + */ + public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) { + logger.info("Deleting primary terms from remote store lesser than {}", minPrimaryTermToKeep); + transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() { @Override - public void onResponse(Void unused) { - fileTransferTracker.delete(files); + public void onResponse(Set folders) { + Set primaryTermsInRemote = folders.stream().filter(folderName -> { + try { + Long.parseLong(folderName); + return true; + } catch (Exception ignored) { + // NO-OP + } + return false; + }).map(Long::parseLong).collect(Collectors.toSet()); + Set primaryTermsToDelete = primaryTermsInRemote.stream() + .filter(term -> term < minPrimaryTermToKeep) + .collect(Collectors.toSet()); + primaryTermsToDelete.forEach(term -> deletePrimaryTermAsync(term)); } @Override public void onFailure(Exception e) { - logger.error( - () -> new ParameterizedMessage( - "Exception occurred while deleting translog for primary_term={} generations={}", - primaryTerm, - generations - ), - e - ); + logger.error("Exception occurred while getting primary terms from remote store", e); } }); } /** - * Handles deletion of translog files for a particular primary term. + * Handles deletion of all translog files associated with a primary term. * - * @param primaryTerm primary term - * @param listener listener for response and failure + * @param primaryTerm primary term. */ - public void deleteTranslogAsync(long primaryTerm, ActionListener listener) { - transferService.deleteAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), listener); - } + private void deletePrimaryTermAsync(long primaryTerm) { + transferService.deleteAsync( + ThreadPool.Names.REMOTE_PURGE, + remoteBaseTransferPath.add(String.valueOf(primaryTerm)), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("Deleted primary term {}", primaryTerm); + } - /** - * Lists all primary terms existing on remote store. - * - * @return the list of primary terms. - * @throws IOException is thrown if it can read the data. - */ - public Set listPrimaryTerms() throws IOException { - return transferService.listFolders(remoteBaseTransferPath).stream().filter(s -> { - try { - Long.parseLong(s); - return true; - } catch (Exception ignored) { - // NO-OP + @Override + public void onFailure(Exception e) { + logger.error(new ParameterizedMessage("Exception occurred while deleting primary term {}", primaryTerm), e); + } } - return false; - }).map(Long::parseLong).collect(Collectors.toSet()); + ); } } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index dd52e0f7ecf76..c2884feca9157 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -110,6 +110,7 @@ public static class Names { public static final String SYSTEM_WRITE = "system_write"; public static final String TRANSLOG_TRANSFER = "translog_transfer"; public static final String TRANSLOG_SYNC = "translog_sync"; + public static final String REMOTE_PURGE = "remote_purge"; } /** @@ -176,6 +177,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED); map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING); map.put(Names.TRANSLOG_SYNC, ThreadPoolType.FIXED); + map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -253,6 +255,7 @@ public ThreadPool( new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) ); builders.put(Names.TRANSLOG_SYNC, new FixedExecutorBuilder(settings, Names.TRANSLOG_SYNC, allocatedProcessors * 4, 10000)); + builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index cb3affb71b3dc..2b9efb15cbcca 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -159,10 +159,7 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin final TranslogConfig translogConfig = getTranslogConfig(path); final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); threadPool = new TestThreadPool(getClass().getName()); - blobStoreTransferService = new BlobStoreTransferService( - repository.blobStore(), - threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER) - ); + blobStoreTransferService = new BlobStoreTransferService(repository.blobStore(), threadPool); return new RemoteFsTranslog( translogConfig, translogUUID, @@ -171,7 +168,7 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin primaryTerm::get, getPersistedSeqNoConsumer(), repository, - threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER), + threadPool, primaryMode::get ); @@ -1164,7 +1161,7 @@ public int write(ByteBuffer src) throws IOException { primaryTerm::get, persistedSeqNos::add, repository, - threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER), + threadPool, () -> Boolean.TRUE ) { @Override diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index 5dc5ac92070ea..196fbd58c2c20 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -22,20 +22,20 @@ import org.opensearch.repositories.blobstore.BlobStoreTestUtil; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class BlobStoreTransferServiceTests extends OpenSearchTestCase { - private ExecutorService executorService; + private ThreadPool threadPool; private BlobStoreRepository repository; @@ -43,14 +43,14 @@ public class BlobStoreTransferServiceTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); repository = createRepository(); - executorService = Executors.newFixedThreadPool(1); + threadPool = new TestThreadPool(getClass().getName()); } public void testUploadBlob() throws IOException { Path testFile = createTempFile(); Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); - TransferService transferService = new BlobStoreTransferService(repository.blobStore(), executorService); + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); transferService.uploadBlob(transferFileSnapshot, repository.basePath()); } @@ -60,7 +60,7 @@ public void testUploadBlobFromByteArray() throws IOException { randomByteArrayOfLength(128), 1 ); - TransferService transferService = new BlobStoreTransferService(repository.blobStore(), executorService); + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); transferService.uploadBlob(transferFileSnapshot, repository.basePath()); } @@ -70,20 +70,25 @@ public void testUploadBlobAsync() throws IOException, InterruptedException { AtomicBoolean succeeded = new AtomicBoolean(false); FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); CountDownLatch latch = new CountDownLatch(1); - TransferService transferService = new BlobStoreTransferService(repository.blobStore(), executorService); - transferService.uploadBlobAsync(transferFileSnapshot, repository.basePath(), new LatchedActionListener<>(new ActionListener<>() { - @Override - public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { - assert succeeded.compareAndSet(false, true); - assertEquals(transferFileSnapshot.getPrimaryTerm(), fileSnapshot.getPrimaryTerm()); - assertEquals(transferFileSnapshot.getName(), fileSnapshot.getName()); - } + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); + transferService.uploadBlobAsync( + ThreadPool.Names.TRANSLOG_TRANSFER, + transferFileSnapshot, + repository.basePath(), + new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + assert succeeded.compareAndSet(false, true); + assertEquals(transferFileSnapshot.getPrimaryTerm(), fileSnapshot.getPrimaryTerm()); + assertEquals(transferFileSnapshot.getName(), fileSnapshot.getName()); + } - @Override - public void onFailure(Exception e) { - throw new AssertionError("Failed to perform uploadBlobAsync", e); - } - }, latch)); + @Override + public void onFailure(Exception e) { + throw new AssertionError("Failed to perform uploadBlobAsync", e); + } + }, latch) + ); assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); assertTrue(succeeded.get()); } @@ -92,8 +97,7 @@ public void onFailure(Exception e) { public void tearDown() throws Exception { super.tearDown(); repository.stop(); - executorService.shutdown(); - executorService.awaitTermination(1000, TimeUnit.MILLISECONDS); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } /** Create a {@link Repository} with a random name **/ diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 0abbfcd3eb69c..23cd5931742c0 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -15,7 +15,6 @@ import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -24,6 +23,8 @@ import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -47,6 +48,7 @@ public class TranslogTransferManagerTests extends OpenSearchTestCase { private TransferService transferService; private BlobPath remoteBaseTransferPath; + private ThreadPool threadPool; private long primaryTerm; private long generation; private long minTranslogGeneration; @@ -61,6 +63,13 @@ public void setUp() throws Exception { minTranslogGeneration = randomLongBetween(0, generation); remoteBaseTransferPath = new BlobPath().add("base_path"); transferService = mock(TransferService.class); + threadPool = new TestThreadPool(getClass().getName()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + terminate(threadPool); } @SuppressWarnings("unchecked") @@ -73,10 +82,11 @@ public void testTransferSnapshot() throws IOException { doNothing().when(transferService) .uploadBlob(any(TransferFileSnapshot.class), Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm)))); doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse((TransferFileSnapshot) invocationOnMock.getArguments()[0]); + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[3]; + listener.onResponse((TransferFileSnapshot) invocationOnMock.getArguments()[1]); return null; - }).when(transferService).uploadBlobAsync(any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); + }).when(transferService) + .uploadBlobAsync(any(String.class), any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); FileTransferTracker fileTransferTracker = new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { @Override @@ -309,10 +319,7 @@ public void testDeleteTranslogSuccess() throws Exception { BlobStore blobStore = mock(BlobStore.class); BlobContainer blobContainer = mock(BlobContainer.class); when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); - BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService( - blobStore, - OpenSearchExecutors.newDirectExecutorService() - ); + BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService(blobStore, threadPool); TranslogTransferManager translogTransferManager = new TranslogTransferManager( blobStoreTransferService, remoteBaseTransferPath, @@ -335,10 +342,7 @@ public void testDeleteTranslogFailure() throws Exception { BlobContainer blobContainer = mock(BlobContainer.class); doAnswer(invocation -> { throw new IOException("test exception"); }).when(blobStore).blobContainer(any(BlobPath.class)); // when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); - BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService( - blobStore, - OpenSearchExecutors.newDirectExecutorService() - ); + BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService(blobStore, threadPool); TranslogTransferManager translogTransferManager = new TranslogTransferManager( blobStoreTransferService, remoteBaseTransferPath, diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index eca5a8eb19e47..d1e7e25369b12 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -134,6 +134,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessorsMaxTen); sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n); + sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive); return sizes.get(threadPoolName).apply(numberOfProcessors); }