Skip to content

Commit

Permalink
fix translog base path and update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
linuxpi committed Jun 7, 2023
1 parent 2ced9e2 commit 9773158
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class RemoteFsTranslog extends Translog {
private final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

private static final int REMOTE_DELETION_PERMITS = 2;
private static final String TRANSLOG = "translog";
public static final String TRANSLOG = "translog";

// Semaphore used to allow only single remote generation to happen at a time
private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public TranslogTransferManager(
) {
this.shardId = shardId;
this.transferService = transferService;
this.remoteBaseTransferPath = remoteBaseTransferPath;
this.remoteBaseTransferPath = remoteBaseTransferPath.add("data");
this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR);
this.fileTransferTracker = fileTransferTracker;
}
Expand Down Expand Up @@ -110,7 +110,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
fileSnapshot -> transferService.uploadBlobAsync(
ThreadPool.Names.TRANSLOG_TRANSFER,
fileSnapshot,
remoteBaseTransferPath.add("data").add(String.valueOf(fileSnapshot.getPrimaryTerm())),
remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())),
latchedActionListener
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG;
import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;

Expand Down Expand Up @@ -484,20 +485,22 @@ public void testSimpleOperationsUpload() throws Exception {
assertEquals(6, translog.allUploaded().size());

Set<String> mdFiles = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata")
getTranslogDirectory().add("metadata")
);
assertEquals(2, mdFiles.size());
logger.info("All md files {}", mdFiles);

Set<String> tlogFiles = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(String.valueOf(primaryTerm.get()))
getTranslogDirectory().add("data").add(String.valueOf(primaryTerm.get()))
);
logger.info("All data files {}", tlogFiles);

// assert content of ckp and tlog files
BlobPath path = repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(TRANSLOG)
.add("data")
.add(String.valueOf(primaryTerm.get()));
for (TranslogReader reader : translog.readers) {
final long readerGeneration = reader.getGeneration();
Expand Down Expand Up @@ -537,6 +540,8 @@ public void testSimpleOperationsUpload() throws Exception {
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(TRANSLOG)
.add("data")
.add(String.valueOf(primaryTerm.get()))
).size()
);
Expand All @@ -555,6 +560,8 @@ public void testSimpleOperationsUpload() throws Exception {
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(TRANSLOG)
.add("data")
.add(String.valueOf(primaryTerm.get()))
).size()
);
Expand Down Expand Up @@ -587,7 +594,7 @@ public void testMetadataFileDeletion() throws Exception {
() -> assertEquals(
2,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
getTranslogDirectory().add(METADATA_DIR)
).size()
)
);
Expand All @@ -603,7 +610,7 @@ public void testMetadataFileDeletion() throws Exception {
() -> assertEquals(
1 + moreDocs,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
getTranslogDirectory().add(METADATA_DIR)
).size()
)
);
Expand All @@ -623,7 +630,7 @@ public void testMetadataFileDeletion() throws Exception {
() -> assertEquals(
2,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
getTranslogDirectory().add(METADATA_DIR)
).size()
)
);
Expand All @@ -643,7 +650,7 @@ public void testMetadataFileDeletion() throws Exception {

// Check all metadata files corresponds to old primary term
Set<String> mdFileNames = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
getTranslogDirectory().add(METADATA_DIR)
);
assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(oldPrimaryTerm).concat("__"))));

Expand All @@ -659,7 +666,7 @@ public void testMetadataFileDeletion() throws Exception {

// Check that all metadata files are belonging now to the new primary
mdFileNames = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
getTranslogDirectory().add(METADATA_DIR)
);
assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(newPrimaryTerm).concat("__"))));

Expand All @@ -671,6 +678,10 @@ public void testMetadataFileDeletion() throws Exception {
}
}

private BlobPath getTranslogDirectory() {
return repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG);
}

private Long populateTranslogOps(boolean withMissingOps) throws IOException {
long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
Expand Down

0 comments on commit 9773158

Please sign in to comment.