diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 02d91974b652a..190ca6948f42a 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -65,6 +65,7 @@ public class RemoteFsTranslog extends Translog { private final SetOnce olderPrimaryCleaned = new SetOnce<>(); private static final int REMOTE_DELETION_PERMITS = 2; + public static final String TRANSLOG = "translog"; // Semaphore used to allow only single remote generation to happen at a time private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS); @@ -167,7 +168,7 @@ public static TranslogTransferManager buildTranslogTransferManager( return new TranslogTransferManager( shardId, new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool), - blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())), + blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG), fileTransferTracker ); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 243fd8801a562..352e7dc2cc0e6 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -50,7 +50,7 @@ public class TranslogTransferManager { private final ShardId shardId; private final TransferService transferService; - private final BlobPath remoteBaseTransferPath; + private final BlobPath remoteDataTransferPath; private final BlobPath remoteMetadataTransferPath; private final FileTransferTracker fileTransferTracker; @@ -59,17 +59,18 @@ public class TranslogTransferManager { private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class); private final static String METADATA_DIR = "metadata"; + private final static String DATA_DIR = "data"; public TranslogTransferManager( ShardId shardId, TransferService transferService, - BlobPath remoteBaseTransferPath, + BlobPath remoteDataTransferPath, FileTransferTracker fileTransferTracker ) { this.shardId = shardId; this.transferService = transferService; - this.remoteBaseTransferPath = remoteBaseTransferPath; - this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR); + this.remoteDataTransferPath = remoteDataTransferPath.add(DATA_DIR); + this.remoteMetadataTransferPath = remoteDataTransferPath.add(METADATA_DIR); this.fileTransferTracker = fileTransferTracker; } @@ -110,7 +111,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans fileSnapshot -> transferService.uploadBlobAsync( ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, - remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), + remoteDataTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), latchedActionListener ) ); @@ -164,7 +165,7 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th if (Files.exists(filePath)) { Files.delete(filePath); } - try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) { + try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) { Files.copy(inputStream, filePath); } // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync @@ -238,7 +239,7 @@ public void deleteGenerationAsync(long primaryTerm, Set generations, Runna */ public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) { logger.info("Deleting primary terms from remote store lesser than {} for {}", minPrimaryTermToKeep, shardId); - transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() { + transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() { @Override public void onResponse(Set folders) { Set primaryTermsInRemote = folders.stream().filter(folderName -> { @@ -271,7 +272,7 @@ public void onFailure(Exception e) { private void deletePrimaryTermAsync(long primaryTerm) { transferService.deleteAsync( ThreadPool.Names.REMOTE_PURGE, - remoteBaseTransferPath.add(String.valueOf(primaryTerm)), + remoteDataTransferPath.add(String.valueOf(primaryTerm)), new ActionListener<>() { @Override public void onResponse(Void unused) { @@ -317,7 +318,7 @@ private void deleteTranslogFilesAsync(long primaryTerm, List files, Runn try { transferService.deleteBlobsAsync( ThreadPool.Names.REMOTE_PURGE, - remoteBaseTransferPath.add(String.valueOf(primaryTerm)), + remoteDataTransferPath.add(String.valueOf(primaryTerm)), files, new ActionListener<>() { @Override diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 24dae6f5be9ab..d963830e9e736 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -93,6 +93,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG; import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; @@ -111,6 +112,7 @@ public class RemoteFSTranslogTests extends OpenSearchTestCase { private final AtomicReference persistedSeqNoConsumer = new AtomicReference<>(); private ThreadPool threadPool; private final static String METADATA_DIR = "metadata"; + private final static String DATA_DIR = "data"; BlobStoreRepository repository; BlobStoreTransferService blobStoreTransferService; @@ -483,22 +485,17 @@ public void testSimpleOperationsUpload() throws Exception { translog.rollGeneration(); assertEquals(6, translog.allUploaded().size()); - Set mdFiles = blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata") - ); + Set mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); assertEquals(2, mdFiles.size()); logger.info("All md files {}", mdFiles); Set tlogFiles = blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(String.valueOf(primaryTerm.get())) + getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get())) ); logger.info("All data files {}", tlogFiles); // assert content of ckp and tlog files - BlobPath path = repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(String.valueOf(primaryTerm.get())); + BlobPath path = getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get())); for (TranslogReader reader : translog.readers) { final long readerGeneration = reader.getGeneration(); logger.error("Asserting content of {}", readerGeneration); @@ -533,12 +530,7 @@ public void testSimpleOperationsUpload() throws Exception { assertEquals(4, translog.allUploaded().size()); assertEquals( 4, - blobStoreTransferService.listAll( - repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(String.valueOf(primaryTerm.get())) - ).size() + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); }); @@ -551,12 +543,7 @@ public void testSimpleOperationsUpload() throws Exception { assertEquals(4, translog.allUploaded().size()); assertEquals( 4, - blobStoreTransferService.listAll( - repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(String.valueOf(primaryTerm.get())) - ).size() + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); }); @@ -583,14 +570,7 @@ public void testMetadataFileDeletion() throws Exception { assertEquals(1, translog.readers.size()); } assertBusy(() -> assertEquals(4, translog.allUploaded().size())); - assertBusy( - () -> assertEquals( - 2, - blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR) - ).size() - ) - ); + assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int moreDocs = randomIntBetween(3, 10); logger.info("numDocs={} moreDocs={}", numDocs, moreDocs); for (int i = numDocs; i < numDocs + moreDocs; i++) { @@ -599,14 +579,7 @@ public void testMetadataFileDeletion() throws Exception { translog.trimUnreferencedReaders(); assertEquals(1 + moreDocs, translog.readers.size()); assertBusy(() -> assertEquals(2 + 2L * moreDocs, translog.allUploaded().size())); - assertBusy( - () -> assertEquals( - 1 + moreDocs, - blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR) - ).size() - ) - ); + assertBusy(() -> assertEquals(1 + moreDocs, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int totalDocs = numDocs + moreDocs; translog.setMinSeqNoToKeep(totalDocs - 1); @@ -619,14 +592,7 @@ public void testMetadataFileDeletion() throws Exception { ); translog.setMinSeqNoToKeep(totalDocs); translog.trimUnreferencedReaders(); - assertBusy( - () -> assertEquals( - 2, - blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR) - ).size() - ) - ); + assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); // Change primary term and test the deletion of older primaries String translogUUID = translog.translogUUID; @@ -642,9 +608,7 @@ public void testMetadataFileDeletion() throws Exception { long newPrimaryTerm = primaryTerm.incrementAndGet(); // Check all metadata files corresponds to old primary term - Set mdFileNames = blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR) - ); + Set mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(oldPrimaryTerm).concat("__")))); // Creating RemoteFsTranslog with the same location @@ -658,9 +622,7 @@ public void testMetadataFileDeletion() throws Exception { } // Check that all metadata files are belonging now to the new primary - mdFileNames = blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR) - ); + mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(newPrimaryTerm).concat("__")))); try { @@ -671,6 +633,10 @@ public void testMetadataFileDeletion() throws Exception { } } + private BlobPath getTranslogDirectory() { + return repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG); + } + private Long populateTranslogOps(boolean withMissingOps) throws IOException { long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;