Skip to content

Commit

Permalink
Fix file reading in ccr restore service (#38117) (#38143)
Browse files Browse the repository at this point in the history
Currently we use the raw byte array length when calling the IndexInput
read call to determine how many bytes we want to read. However, due to
how BigArrays works, the array length might be longer than the reference
length. This commit fixes the issue and uses the BytesRef length when
calling read. Additionally, it expands the index follow test to index
many more documents. These documents should potentially lead to large
enough segment files to trigger scenarios where this fix matters.
  • Loading branch information
Tim-Brooks authored Feb 1, 2019
1 parent 7077cb3 commit 9c2d1e6
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ private long readFileBytes(String fileName, BytesReference reference) throws IOE
BytesRefIterator refIterator = reference.iterator();
BytesRef ref;
while ((ref = refIterator.next()) != null) {
byte[] refBytes = ref.bytes;
indexInput.readBytes(refBytes, 0, refBytes.length);
indexInput.readBytes(ref.bytes, ref.offset, ref.length);
}

long offsetAfterRead = indexInput.getFilePointer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
Expand Down Expand Up @@ -554,27 +553,6 @@ protected void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index fol
});
}

protected void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception {
assertBusy(() -> {
long[] numOfOptimizedOps = new long[numberOfShards];
for (int shardId = 0; shardId < numberOfShards; shardId++) {
for (String node : getFollowerCluster().nodesInclude(followerIndex.getName())) {
IndicesService indicesService = getFollowerCluster().getInstance(IndicesService.class, node);
IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId));
if (shard != null && shard.routingEntry().primary()) {
try {
FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard));
numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing();
} catch (AlreadyClosedException e) {
throw new AssertionError(e); // causes assertBusy to retry
}
}
}
}
assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal));
});
}

static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -101,9 +102,30 @@ public void testFollowIndex() throws Exception {
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");

final int firstBatchNumDocs = randomIntBetween(2, 64);
final int firstBatchNumDocs;
// Sometimes we want to index a lot of documents to ensure that the recovery works with larger files
if (rarely()) {
firstBatchNumDocs = randomIntBetween(1800, 2000);
} else {
firstBatchNumDocs = randomIntBetween(10, 64);
}
final int flushPoint = (int) (firstBatchNumDocs * 0.75);

logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
for (int i = 0; i < firstBatchNumDocs; i++) {
BulkRequestBuilder bulkRequestBuilder = leaderClient().prepareBulk();
for (int i = 0; i < flushPoint; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
IndexRequest indexRequest = new IndexRequest("index1", "doc", Integer.toString(i))
.source(source, XContentType.JSON)
.timeout(TimeValue.timeValueSeconds(1));
bulkRequestBuilder.add(indexRequest);
}
bulkRequestBuilder.get();

leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get();

// Index some docs after the flush that might be recovered in the normal index following operations
for (int i = flushPoint; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
Expand Down Expand Up @@ -147,7 +169,7 @@ public void testFollowIndex() throws Exception {
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);

pauseFollow("index2");
followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
final int secondBatchNumDocs = randomIntBetween(2, 64);
Expand All @@ -172,8 +194,6 @@ public void testFollowIndex() throws Exception {
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards,
firstBatchNumDocs + secondBatchNumDocs);
pauseFollow("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards);
}
Expand Down Expand Up @@ -287,7 +307,6 @@ public void testFollowIndexWithoutWaitForComplete() throws Exception {
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
pauseFollow("index2");
}

Expand Down Expand Up @@ -431,8 +450,6 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
assertIndexFullyReplicatedToFollower("index1", "index2");
pauseFollow("index2");
leaderClient().admin().indices().prepareRefresh("index1").get();
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfShards,
leaderClient().prepareSearch("index1").get().getHits().totalHits);
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfShards);
}

Expand Down Expand Up @@ -474,7 +491,6 @@ public void testFollowIndexWithNestedField() throws Exception {
}
pauseFollow("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 1);
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), 1, numDocs);
}

public void testUnfollowNonExistingIndex() {
Expand Down Expand Up @@ -537,7 +553,6 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
}
pauseFollow("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 1);
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), 1, numDocs);
}

public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception {
Expand Down

0 comments on commit 9c2d1e6

Please sign in to comment.