Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make remote translog store path consistent to remote segment store #7947

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class RemoteFsTranslog extends Translog {
private final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

private static final int REMOTE_DELETION_PERMITS = 2;
public static final String TRANSLOG = "translog";

// Semaphore used to allow only single remote generation to happen at a time
private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS);
Expand Down Expand Up @@ -167,7 +168,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
return new TranslogTransferManager(
shardId,
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG),
fileTransferTracker
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class TranslogTransferManager {

private final ShardId shardId;
private final TransferService transferService;
private final BlobPath remoteBaseTransferPath;
private final BlobPath remoteDataTransferPath;
private final BlobPath remoteMetadataTransferPath;
private final FileTransferTracker fileTransferTracker;

Expand All @@ -59,17 +59,18 @@ public class TranslogTransferManager {
private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class);

private final static String METADATA_DIR = "metadata";
private final static String DATA_DIR = "data";

public TranslogTransferManager(
ShardId shardId,
TransferService transferService,
BlobPath remoteBaseTransferPath,
BlobPath remoteDataTransferPath,
FileTransferTracker fileTransferTracker
) {
this.shardId = shardId;
this.transferService = transferService;
this.remoteBaseTransferPath = remoteBaseTransferPath;
this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR);
this.remoteDataTransferPath = remoteDataTransferPath.add(DATA_DIR);
this.remoteMetadataTransferPath = remoteDataTransferPath.add(METADATA_DIR);
this.fileTransferTracker = fileTransferTracker;
}

Expand Down Expand Up @@ -110,7 +111,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
fileSnapshot -> transferService.uploadBlobAsync(
ThreadPool.Names.TRANSLOG_TRANSFER,
fileSnapshot,
remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())),
remoteDataTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())),
latchedActionListener
)
);
Expand Down Expand Up @@ -164,7 +165,7 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th
if (Files.exists(filePath)) {
Files.delete(filePath);
}
try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) {
try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) {
Files.copy(inputStream, filePath);
}
// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
Expand Down Expand Up @@ -238,7 +239,7 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
*/
public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) {
logger.info("Deleting primary terms from remote store lesser than {} for {}", minPrimaryTermToKeep, shardId);
transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() {
transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() {
@Override
public void onResponse(Set<String> folders) {
Set<Long> primaryTermsInRemote = folders.stream().filter(folderName -> {
Expand Down Expand Up @@ -271,7 +272,7 @@ public void onFailure(Exception e) {
private void deletePrimaryTermAsync(long primaryTerm) {
transferService.deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteBaseTransferPath.add(String.valueOf(primaryTerm)),
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
Expand Down Expand Up @@ -317,7 +318,7 @@ private void deleteTranslogFilesAsync(long primaryTerm, List<String> files, Runn
try {
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteBaseTransferPath.add(String.valueOf(primaryTerm)),
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
files,
new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG;
import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;

Expand All @@ -111,6 +112,7 @@ public class RemoteFSTranslogTests extends OpenSearchTestCase {
private final AtomicReference<LongConsumer> persistedSeqNoConsumer = new AtomicReference<>();
private ThreadPool threadPool;
private final static String METADATA_DIR = "metadata";
private final static String DATA_DIR = "data";
BlobStoreRepository repository;

BlobStoreTransferService blobStoreTransferService;
Expand Down Expand Up @@ -483,22 +485,17 @@ public void testSimpleOperationsUpload() throws Exception {
translog.rollGeneration();
assertEquals(6, translog.allUploaded().size());

Set<String> mdFiles = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata")
);
Set<String> mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertEquals(2, mdFiles.size());
logger.info("All md files {}", mdFiles);

Set<String> tlogFiles = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(String.valueOf(primaryTerm.get()))
getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))
);
logger.info("All data files {}", tlogFiles);

// assert content of ckp and tlog files
BlobPath path = repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()));
BlobPath path = getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()));
for (TranslogReader reader : translog.readers) {
final long readerGeneration = reader.getGeneration();
logger.error("Asserting content of {}", readerGeneration);
Expand Down Expand Up @@ -533,12 +530,7 @@ public void testSimpleOperationsUpload() throws Exception {
assertEquals(4, translog.allUploaded().size());
assertEquals(
4,
blobStoreTransferService.listAll(
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()))
).size()
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

Expand All @@ -551,12 +543,7 @@ public void testSimpleOperationsUpload() throws Exception {
assertEquals(4, translog.allUploaded().size());
assertEquals(
4,
blobStoreTransferService.listAll(
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()))
).size()
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

Expand All @@ -583,14 +570,7 @@ public void testMetadataFileDeletion() throws Exception {
assertEquals(1, translog.readers.size());
}
assertBusy(() -> assertEquals(4, translog.allUploaded().size()));
assertBusy(
() -> assertEquals(
2,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
).size()
)
);
assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
int moreDocs = randomIntBetween(3, 10);
logger.info("numDocs={} moreDocs={}", numDocs, moreDocs);
for (int i = numDocs; i < numDocs + moreDocs; i++) {
Expand All @@ -599,14 +579,7 @@ public void testMetadataFileDeletion() throws Exception {
translog.trimUnreferencedReaders();
assertEquals(1 + moreDocs, translog.readers.size());
assertBusy(() -> assertEquals(2 + 2L * moreDocs, translog.allUploaded().size()));
assertBusy(
() -> assertEquals(
1 + moreDocs,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
).size()
)
);
assertBusy(() -> assertEquals(1 + moreDocs, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));

int totalDocs = numDocs + moreDocs;
translog.setMinSeqNoToKeep(totalDocs - 1);
Expand All @@ -619,14 +592,7 @@ public void testMetadataFileDeletion() throws Exception {
);
translog.setMinSeqNoToKeep(totalDocs);
translog.trimUnreferencedReaders();
assertBusy(
() -> assertEquals(
2,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
).size()
)
);
assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));

// Change primary term and test the deletion of older primaries
String translogUUID = translog.translogUUID;
Expand All @@ -642,9 +608,7 @@ public void testMetadataFileDeletion() throws Exception {
long newPrimaryTerm = primaryTerm.incrementAndGet();

// Check all metadata files corresponds to old primary term
Set<String> mdFileNames = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
);
Set<String> mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(oldPrimaryTerm).concat("__"))));

// Creating RemoteFsTranslog with the same location
Expand All @@ -658,9 +622,7 @@ public void testMetadataFileDeletion() throws Exception {
}

// Check that all metadata files are belonging now to the new primary
mdFileNames = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
);
mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(newPrimaryTerm).concat("__"))));

try {
Expand All @@ -671,6 +633,10 @@ public void testMetadataFileDeletion() throws Exception {
}
}

private BlobPath getTranslogDirectory() {
return repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG);
}

private Long populateTranslogOps(boolean withMissingOps) throws IOException {
long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
Expand Down