Skip to content

Commit

Permalink
fix translog base path and update tests
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Jun 7, 2023
1 parent 2ced9e2 commit 0b4640b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class RemoteFsTranslog extends Translog {
private final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

private static final int REMOTE_DELETION_PERMITS = 2;
private static final String TRANSLOG = "translog";
public static final String TRANSLOG = "translog";

// Semaphore used to allow only single remote generation to happen at a time
private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public TranslogTransferManager(
) {
this.shardId = shardId;
this.transferService = transferService;
this.remoteBaseTransferPath = remoteBaseTransferPath;
this.remoteBaseTransferPath = remoteBaseTransferPath.add("data");
this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR);
this.fileTransferTracker = fileTransferTracker;
}
Expand Down Expand Up @@ -110,7 +110,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
fileSnapshot -> transferService.uploadBlobAsync(
ThreadPool.Names.TRANSLOG_TRANSFER,
fileSnapshot,
remoteBaseTransferPath.add("data").add(String.valueOf(fileSnapshot.getPrimaryTerm())),
remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())),
latchedActionListener
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG;
import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;

Expand Down Expand Up @@ -483,21 +484,19 @@ public void testSimpleOperationsUpload() throws Exception {
translog.rollGeneration();
assertEquals(6, translog.allUploaded().size());

Set<String> mdFiles = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata")
);
Set<String> mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add("metadata"));
assertEquals(2, mdFiles.size());
logger.info("All md files {}", mdFiles);

Set<String> tlogFiles = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(String.valueOf(primaryTerm.get()))
);
Set<String> tlogFiles = blobStoreTransferService.listAll(getTranslogDirectory().add("data").add(String.valueOf(primaryTerm.get())));
logger.info("All data files {}", tlogFiles);

// assert content of ckp and tlog files
BlobPath path = repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(TRANSLOG)
.add("data")
.add(String.valueOf(primaryTerm.get()));
for (TranslogReader reader : translog.readers) {
final long readerGeneration = reader.getGeneration();
Expand Down Expand Up @@ -537,6 +536,8 @@ public void testSimpleOperationsUpload() throws Exception {
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(TRANSLOG)
.add("data")
.add(String.valueOf(primaryTerm.get()))
).size()
);
Expand All @@ -555,6 +556,8 @@ public void testSimpleOperationsUpload() throws Exception {
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(TRANSLOG)
.add("data")
.add(String.valueOf(primaryTerm.get()))
).size()
);
Expand Down Expand Up @@ -583,14 +586,7 @@ public void testMetadataFileDeletion() throws Exception {
assertEquals(1, translog.readers.size());
}
assertBusy(() -> assertEquals(4, translog.allUploaded().size()));
assertBusy(
() -> assertEquals(
2,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
).size()
)
);
assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
int moreDocs = randomIntBetween(3, 10);
logger.info("numDocs={} moreDocs={}", numDocs, moreDocs);
for (int i = numDocs; i < numDocs + moreDocs; i++) {
Expand All @@ -599,14 +595,7 @@ public void testMetadataFileDeletion() throws Exception {
translog.trimUnreferencedReaders();
assertEquals(1 + moreDocs, translog.readers.size());
assertBusy(() -> assertEquals(2 + 2L * moreDocs, translog.allUploaded().size()));
assertBusy(
() -> assertEquals(
1 + moreDocs,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
).size()
)
);
assertBusy(() -> assertEquals(1 + moreDocs, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));

int totalDocs = numDocs + moreDocs;
translog.setMinSeqNoToKeep(totalDocs - 1);
Expand All @@ -619,14 +608,7 @@ public void testMetadataFileDeletion() throws Exception {
);
translog.setMinSeqNoToKeep(totalDocs);
translog.trimUnreferencedReaders();
assertBusy(
() -> assertEquals(
2,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
).size()
)
);
assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));

// Change primary term and test the deletion of older primaries
String translogUUID = translog.translogUUID;
Expand All @@ -642,9 +624,7 @@ public void testMetadataFileDeletion() throws Exception {
long newPrimaryTerm = primaryTerm.incrementAndGet();

// Check all metadata files corresponds to old primary term
Set<String> mdFileNames = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
);
Set<String> mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(oldPrimaryTerm).concat("__"))));

// Creating RemoteFsTranslog with the same location
Expand All @@ -658,9 +638,7 @@ public void testMetadataFileDeletion() throws Exception {
}

// Check that all metadata files are belonging now to the new primary
mdFileNames = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
);
mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(newPrimaryTerm).concat("__"))));

try {
Expand All @@ -671,6 +649,10 @@ public void testMetadataFileDeletion() throws Exception {
}
}

private BlobPath getTranslogDirectory() {
return repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG);
}

private Long populateTranslogOps(boolean withMissingOps) throws IOException {
long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
Expand Down

0 comments on commit 0b4640b

Please sign in to comment.