diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index d10690379eddd..addb16d58d031 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -191,6 +191,13 @@ private static int indexOfKeptCommits(List commits, long return 0; } + /** + * Checks whether the deletion policy is holding on to snapshotted commits + */ + synchronized boolean hasSnapshottedCommits() { + return snapshottedCommits.isEmpty() == false; + } + /** * Checks if the deletion policy can release some index commits with the latest global checkpoint. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index acedd8356ea9e..b2143dcc0407f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -476,6 +476,11 @@ Translog getTranslog() { return translog; } + // Package private for testing purposes only + boolean hasSnapshottedCommits() { + return combinedDeletionPolicy.hasSnapshottedCommits(); + } + @Override public boolean isTranslogSyncNeeded() { return getTranslog().syncNeeded(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java new file mode 100644 index 0000000000000..87a6d18671a6f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java @@ -0,0 +1,213 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.recovery; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentMap; + +public class MultiFileWriter implements Releasable { + + public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) { + this.store = store; + this.indexState = indexState; + this.tempFilePrefix = tempFilePrefix; + this.logger = logger; + this.ensureOpen = ensureOpen; + } + + private final Runnable ensureOpen; + private final Logger logger; + private final Store store; + private final RecoveryState.Index indexState; + private final String tempFilePrefix; + + private final ConcurrentMap openIndexOutputs = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap fileChunkWriters = ConcurrentCollections.newConcurrentMap(); + + + final Map tempFileNames = ConcurrentCollections.newConcurrentMap(); + + public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk) + throws IOException { + final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter()); + writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk)); + } + + /** Get a temporary name for the provided file name. */ + String getTempNameForFile(String origFile) { + return tempFilePrefix + origFile; + } + + public IndexOutput getOpenIndexOutput(String key) { + ensureOpen.run(); + return openIndexOutputs.get(key); + } + + /** remove and {@link IndexOutput} for a given file. It is the caller's responsibility to close it */ + public IndexOutput removeOpenIndexOutputs(String name) { + ensureOpen.run(); + return openIndexOutputs.remove(name); + } + + /** + * Creates an {@link IndexOutput} for the given file name. Note that the + * IndexOutput actually point at a temporary file. + *

+ * Note: You can use {@link #getOpenIndexOutput(String)} with the same filename to retrieve the same IndexOutput + * at a later stage + */ + public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData metaData, Store store) throws IOException { + ensureOpen.run(); + String tempFileName = getTempNameForFile(fileName); + if (tempFileNames.containsKey(tempFileName)) { + throw new IllegalStateException("output for file [" + fileName + "] has already been created"); + } + // add first, before it's created + tempFileNames.put(tempFileName, fileName); + IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, metaData, IOContext.DEFAULT); + openIndexOutputs.put(fileName, indexOutput); + return indexOutput; + } + + private void innerWriteFileChunk(StoreFileMetaData fileMetaData, long position, + BytesReference content, boolean lastChunk) throws IOException { + final String name = fileMetaData.name(); + IndexOutput indexOutput; + if (position == 0) { + indexOutput = openAndPutIndexOutput(name, fileMetaData, store); + } else { + indexOutput = getOpenIndexOutput(name); + } + assert indexOutput.getFilePointer() == position : "file-pointer " + indexOutput.getFilePointer() + " != " + position; + BytesRefIterator iterator = content.iterator(); + BytesRef scratch; + while((scratch = iterator.next()) != null) { // we iterate over all pages - this is a 0-copy for all core impls + indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length); + } + indexState.addRecoveredBytesToFile(name, content.length()); + if (indexOutput.getFilePointer() >= fileMetaData.length() || lastChunk) { + try { + Store.verify(indexOutput); + } finally { + // we are done + indexOutput.close(); + } + final String temporaryFileName = getTempNameForFile(name); + assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName) : + "expected: [" + temporaryFileName + "] in " + Arrays.toString(store.directory().listAll()); + store.directory().sync(Collections.singleton(temporaryFileName)); + IndexOutput remove = removeOpenIndexOutputs(name); + assert remove == null || remove == indexOutput; // remove maybe null if we got finished + } + } + + @Override + public void close() { + fileChunkWriters.clear(); + // clean open index outputs + Iterator> iterator = openIndexOutputs.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + logger.trace("closing IndexOutput file [{}]", entry.getValue()); + try { + entry.getValue().close(); + } catch (Exception e) { + logger.debug(() -> new ParameterizedMessage("error while closing recovery output [{}]", entry.getValue()), e); + } + iterator.remove(); + } + if (Strings.hasText(tempFilePrefix)) { + // trash temporary files + for (String file : tempFileNames.keySet()) { + logger.trace("cleaning temporary file [{}]", file); + store.deleteQuiet(file); + } + } + } + + /** renames all temporary files to their true name, potentially overriding existing files */ + public void renameAllTempFiles() throws IOException { + ensureOpen.run(); + store.renameTempFilesSafe(tempFileNames); + } + + static final class FileChunk { + final StoreFileMetaData md; + final BytesReference content; + final long position; + final boolean lastChunk; + FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { + this.md = md; + this.content = content; + this.position = position; + this.lastChunk = lastChunk; + } + } + + private final class FileChunkWriter { + // chunks can be delivered out of order, we need to buffer chunks if there's a gap between them. + final PriorityQueue pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position)); + long lastPosition = 0; + + void writeChunk(FileChunk newChunk) throws IOException { + synchronized (this) { + pendingChunks.add(newChunk); + } + while (true) { + final FileChunk chunk; + synchronized (this) { + chunk = pendingChunks.peek(); + if (chunk == null || chunk.position != lastPosition) { + return; + } + pendingChunks.remove(); + } + innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk); + synchronized (this) { + assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position; + lastPosition += chunk.content.length(); + if (chunk.lastChunk) { + assert pendingChunks.isEmpty() == true : "still have pending chunks [" + pendingChunks + "]"; + fileChunkWriters.remove(chunk.md.name()); + assert fileChunkWriters.containsValue(this) == false : "chunk writer [" + newChunk.md + "] was not removed"; + } + } + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index e63b9ba8fd5ea..76f2200a47d82 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -20,14 +20,9 @@ package org.elasticsearch.indices.recovery; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -39,7 +34,6 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.ReplicationTracker; @@ -55,15 +49,7 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.PriorityQueue; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -85,15 +71,13 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private final long recoveryId; private final IndexShard indexShard; private final DiscoveryNode sourceNode; - private final String tempFilePrefix; + private final MultiFileWriter multiFileWriter; private final Store store; private final PeerRecoveryTargetService.RecoveryListener listener; private final LongConsumer ensureClusterStateVersionCallback; private final AtomicBoolean finished = new AtomicBoolean(); - private final ConcurrentMap openIndexOutputs = ConcurrentCollections.newConcurrentMap(); - private final ConcurrentMap fileChunkWriters = ConcurrentCollections.newConcurrentMap(); private final CancellableThreads cancellableThreads; // last time this status was accessed @@ -102,8 +86,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget // latch that can be used to blockingly wait for RecoveryTarget to be closed private final CountDownLatch closedLatch = new CountDownLatch(1); - private final Map tempFileNames = ConcurrentCollections.newConcurrentMap(); - /** * Creates a new recovery target object that represents a recovery to the provided shard. * @@ -126,7 +108,9 @@ public RecoveryTarget(final IndexShard indexShard, this.indexShard = indexShard; this.sourceNode = sourceNode; this.shardId = indexShard.shardId(); - this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + "."; + final String tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + "."; + this.multiFileWriter = new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger, + this::ensureRefCount); this.store = indexShard.store(); this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback; // make sure the store is not released until we are done. @@ -187,12 +171,6 @@ public RecoveryState.Stage stage() { return state().getStage(); } - /** renames all temporary files to their true name, potentially overriding existing files */ - public void renameAllTempFiles() throws IOException { - ensureRefCount(); - store.renameTempFilesSafe(tempFileNames); - } - /** * Closes the current recovery target and waits up to a certain timeout for resources to be freed. * Returns true if resetting the recovery was successful, false if the recovery target is already cancelled / failed or marked as done. @@ -274,7 +252,7 @@ public void notifyListener(RecoveryFailedException e, boolean sendShardFailure) /** mark the current recovery as done */ public void markAsDone() { if (finished.compareAndSet(false, true)) { - assert tempFileNames.isEmpty() : "not all temporary files are renamed"; + assert multiFileWriter.tempFileNames.isEmpty() : "not all temporary files are renamed"; try { // this might still throw an exception ie. if the shard is CLOSED due to some other event. // it's safer to decrement the reference in a try finally here. @@ -287,65 +265,12 @@ public void markAsDone() { } } - /** Get a temporary name for the provided file name. */ - public String getTempNameForFile(String origFile) { - return tempFilePrefix + origFile; - } - - public IndexOutput getOpenIndexOutput(String key) { - ensureRefCount(); - return openIndexOutputs.get(key); - } - - /** remove and {@link org.apache.lucene.store.IndexOutput} for a given file. It is the caller's responsibility to close it */ - public IndexOutput removeOpenIndexOutputs(String name) { - ensureRefCount(); - return openIndexOutputs.remove(name); - } - - /** - * Creates an {@link org.apache.lucene.store.IndexOutput} for the given file name. Note that the - * IndexOutput actually point at a temporary file. - *

- * Note: You can use {@link #getOpenIndexOutput(String)} with the same filename to retrieve the same IndexOutput - * at a later stage - */ - public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData metaData, Store store) throws IOException { - ensureRefCount(); - String tempFileName = getTempNameForFile(fileName); - if (tempFileNames.containsKey(tempFileName)) { - throw new IllegalStateException("output for file [" + fileName + "] has already been created"); - } - // add first, before it's created - tempFileNames.put(tempFileName, fileName); - IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, metaData, IOContext.DEFAULT); - openIndexOutputs.put(fileName, indexOutput); - return indexOutput; - } - @Override protected void closeInternal() { try { - // clean open index outputs - Iterator> iterator = openIndexOutputs.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - logger.trace("closing IndexOutput file [{}]", entry.getValue()); - try { - entry.getValue().close(); - } catch (Exception e) { - logger.debug(() -> new ParameterizedMessage("error while closing recovery output [{}]", entry.getValue()), e); - } - iterator.remove(); - } - // trash temporary files - for (String file : tempFileNames.keySet()) { - logger.trace("cleaning temporary file [{}]", file); - store.deleteQuiet(file); - } + multiFileWriter.close(); } finally { // free store. increment happens in constructor - fileChunkWriters.clear(); store.decRef(); indexShard.recoveryStats().decCurrentAsTarget(); closedLatch.countDown(); @@ -470,7 +395,7 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa // first, we go and move files that were created with the recovery id suffix to // the actual names, its ok if we have a corrupted index here, since we have replicas // to recover from in case of a full cluster shutdown just when this code executes... - renameAllTempFiles(); + multiFileWriter.renameAllTempFiles(); final Store store = store(); store.incRef(); try { @@ -511,96 +436,21 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa } } - private void innerWriteFileChunk(StoreFileMetaData fileMetaData, long position, - BytesReference content, boolean lastChunk) throws IOException { - final Store store = store(); - final String name = fileMetaData.name(); - final RecoveryState.Index indexState = state().getIndex(); - IndexOutput indexOutput; - if (position == 0) { - indexOutput = openAndPutIndexOutput(name, fileMetaData, store); - } else { - indexOutput = getOpenIndexOutput(name); - } - assert indexOutput.getFilePointer() == position : "file-pointer " + indexOutput.getFilePointer() + " != " + position; - BytesRefIterator iterator = content.iterator(); - BytesRef scratch; - while((scratch = iterator.next()) != null) { // we iterate over all pages - this is a 0-copy for all core impls - indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length); - } - indexState.addRecoveredBytesToFile(name, content.length()); - if (indexOutput.getFilePointer() >= fileMetaData.length() || lastChunk) { - try { - Store.verify(indexOutput); - } finally { - // we are done - indexOutput.close(); - } - final String temporaryFileName = getTempNameForFile(name); - assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName) : - "expected: [" + temporaryFileName + "] in " + Arrays.toString(store.directory().listAll()); - store.directory().sync(Collections.singleton(temporaryFileName)); - IndexOutput remove = removeOpenIndexOutputs(name); - assert remove == null || remove == indexOutput; // remove maybe null if we got finished - } - } - @Override public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener) { try { state().getTranslog().totalOperations(totalTranslogOps); - final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter()); - writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk)); + multiFileWriter.writeFileChunk(fileMetaData, position, content, lastChunk); listener.onResponse(null); } catch (Exception e) { listener.onFailure(e); } } - private static final class FileChunk { - final StoreFileMetaData md; - final BytesReference content; - final long position; - final boolean lastChunk; - FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { - this.md = md; - this.content = content; - this.position = position; - this.lastChunk = lastChunk; - } - } - - private final class FileChunkWriter { - // chunks can be delivered out of order, we need to buffer chunks if there's a gap between them. - final PriorityQueue pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position)); - long lastPosition = 0; - - void writeChunk(FileChunk newChunk) throws IOException { - synchronized (this) { - pendingChunks.add(newChunk); - } - while (true) { - final FileChunk chunk; - synchronized (this) { - chunk = pendingChunks.peek(); - if (chunk == null || chunk.position != lastPosition) { - return; - } - pendingChunks.remove(); - } - innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk); - synchronized (this) { - assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position; - lastPosition += chunk.content.length(); - if (chunk.lastChunk) { - assert pendingChunks.isEmpty() == true : "still have pending chunks [" + pendingChunks + "]"; - fileChunkWriters.remove(chunk.md.name()); - assert fileChunkWriters.containsValue(this) == false : "chunk writer [" + newChunk.md + "] was not removed"; - } - } - } - } + /** Get a temporary name for the provided file name. */ + public String getTempNameForFile(String origFile) { + return multiFileWriter.getTempNameForFile(origFile); } Path translogLocation() { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index 2f837812ae2e2..c8185fea89523 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -62,14 +62,14 @@ */ public abstract class FileRestoreContext { - private static final Logger logger = LogManager.getLogger(FileRestoreContext.class); + protected static final Logger logger = LogManager.getLogger(FileRestoreContext.class); - private final String repositoryName; - private final IndexShard indexShard; - private final RecoveryState recoveryState; - private final SnapshotId snapshotId; - private final ShardId shardId; - private final int bufferSize; + protected final String repositoryName; + protected final IndexShard indexShard; + protected final RecoveryState recoveryState; + protected final SnapshotId snapshotId; + protected final ShardId shardId; + protected final int bufferSize; /** * Constructs new restore context @@ -183,7 +183,6 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException { // list of all existing store files final List deleteIfExistFiles = Arrays.asList(store.directory().listAll()); - // restore the files from the snapshot to the Lucene store for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { // if a file with a same physical name already exist in the store we need to delete it // before restoring it from the snapshot. We could be lenient and try to reuse the existing @@ -196,10 +195,9 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException { logger.trace("[{}] [{}] deleting pre-existing file [{}]", shardId, snapshotId, physicalName); store.directory().deleteFile(physicalName); } - - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover, store); } + + restoreFiles(filesToRecover, store); } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); } @@ -234,6 +232,14 @@ public void restore(SnapshotFiles snapshotFiles) throws IOException { } } + protected void restoreFiles(List filesToRecover, Store store) throws IOException { + // restore the files from the snapshot to the Lucene store + for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { + logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); + restoreFile(fileToRecover, store); + } + } + protected abstract InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo); @SuppressWarnings("unchecked") diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index a2ec88cf7b58c..41ea9a8bea74b 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -189,7 +189,7 @@ public void testWriteFileChunksConcurrently() throws Exception { for (Thread sender : senders) { sender.join(); } - recoveryTarget.renameAllTempFiles(); + recoveryTarget.cleanFiles(0, sourceSnapshot); recoveryTarget.decRef(); Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata(); Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java index ed1ee7708522d..b974d42d826bb 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java @@ -20,8 +20,6 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.IndexOutput; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; @@ -32,9 +30,6 @@ import java.util.Set; import java.util.regex.Pattern; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; - public class RecoveryStatusTests extends ESSingleNodeTestCase { private static final org.apache.lucene.util.Version MIN_SUPPORTED_LUCENE_VERSION = org.elasticsearch.Version.CURRENT .minimumIndexCompatibilityVersion().luceneVersion; @@ -42,35 +37,27 @@ public void testRenameTempFiles() throws IOException { IndexService service = createIndex("foo"); IndexShard indexShard = service.getShardOrNull(0); - DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - RecoveryTarget status = new RecoveryTarget(indexShard, node, new PeerRecoveryTargetService.RecoveryListener() { - @Override - public void onRecoveryDone(RecoveryState state) { - } - - @Override - public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { - } - }, version -> {}); - try (IndexOutput indexOutput = status.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8 + CodecUtil.footerLength() - , "9z51nw", MIN_SUPPORTED_LUCENE_VERSION), status.store())) { + MultiFileWriter multiFileWriter = new MultiFileWriter(indexShard.store(), + indexShard.recoveryState().getIndex(), "recovery.test.", logger, () -> {}); + try (IndexOutput indexOutput = multiFileWriter.openAndPutIndexOutput("foo.bar", + new StoreFileMetaData("foo.bar", 8 + CodecUtil.footerLength(), "9z51nw", MIN_SUPPORTED_LUCENE_VERSION), indexShard.store())) { indexOutput.writeInt(1); - IndexOutput openIndexOutput = status.getOpenIndexOutput("foo.bar"); + IndexOutput openIndexOutput = multiFileWriter.getOpenIndexOutput("foo.bar"); assertSame(openIndexOutput, indexOutput); openIndexOutput.writeInt(1); CodecUtil.writeFooter(indexOutput); } try { - status.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8 + CodecUtil.footerLength(), "9z51nw", - MIN_SUPPORTED_LUCENE_VERSION), status.store()); + multiFileWriter.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8 + CodecUtil.footerLength(), "9z51nw", + MIN_SUPPORTED_LUCENE_VERSION), indexShard.store()); fail("file foo.bar is already opened and registered"); } catch (IllegalStateException ex) { assertEquals("output for file [foo.bar] has already been created", ex.getMessage()); // all well = it's already registered } - status.removeOpenIndexOutputs("foo.bar"); - Set strings = Sets.newHashSet(status.store().directory().listAll()); + multiFileWriter.removeOpenIndexOutputs("foo.bar"); + Set strings = Sets.newHashSet(indexShard.store().directory().listAll()); String expectedFile = null; for (String file : strings) { if (Pattern.compile("recovery[.][\\w-]+[.]foo[.]bar").matcher(file).matches()) { @@ -80,12 +67,10 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo } assertNotNull(expectedFile); indexShard.close("foo", false);// we have to close it here otherwise rename fails since the write.lock is held by the engine - status.renameAllTempFiles(); - strings = Sets.newHashSet(status.store().directory().listAll()); + multiFileWriter.renameAllTempFiles(); + strings = Sets.newHashSet(indexShard.store().directory().listAll()); assertTrue(strings.toString(), strings.contains("foo.bar")); assertFalse(strings.toString(), strings.contains(expectedFile)); - // we must fail the recovery because marking it as done will try to move the shard to POST_RECOVERY, - // which will fail because it's started - status.fail(new RecoveryFailedException(status.state(), "end of test. OK.", null), false); + multiFileWriter.close(); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index e09455b55bd52..855f1b2e2fd72 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1088,6 +1088,12 @@ public static Translog getTranslog(Engine engine) { return internalEngine.getTranslog(); } + public static boolean hasSnapshottedCommits(Engine engine) { + assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass(); + InternalEngine internalEngine = (InternalEngine) engine; + return internalEngine.hasSnapshottedCommits(); + } + public static final class PrimaryTermSupplier implements LongSupplier { private final AtomicLong term; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index a7fa69e7abd39..d4c73c1c6e503 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -127,6 +127,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final CcrLicenseChecker ccrLicenseChecker; private final SetOnce restoreSourceService = new SetOnce<>(); private final SetOnce ccrSettings = new SetOnce<>(); + private final SetOnce threadPool = new SetOnce<>(); private Client client; private final boolean transportClientMode; @@ -171,6 +172,7 @@ public Collection createComponents( CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings()); this.ccrSettings.set(ccrSettings); + this.threadPool.set(threadPool); CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings); this.restoreSourceService.set(restoreSourceService); return Arrays.asList( @@ -307,7 +309,7 @@ public List> getExecutorBuilders(Settings settings) { @Override public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { Repository.Factory repositoryFactory = - (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get()); + (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool.get()); return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index 0e147f66d6ebc..9abcfb86e2b7c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -57,6 +57,12 @@ public final class CcrSettings { new ByteSizeValue(1, ByteSizeUnit.KB), new ByteSizeValue(1, ByteSizeUnit.GB), Setting.Property.Dynamic, Setting.Property.NodeScope); + /** + * Controls the maximum number of file chunk requests that are sent concurrently per recovery to the leader. + */ + public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING = + Setting.intSetting("ccr.indices.recovery.max_concurrent_file_chunks", 5, 1, 10, Property.Dynamic, Property.NodeScope); + /** * The leader must open resources for a ccr recovery. If there is no activity for this interval of time, * the leader will close the restore session. @@ -77,7 +83,7 @@ public final class CcrSettings { * * @return the settings */ - static List> getSettings() { + public static List> getSettings() { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, CCR_FOLLOWING_INDEX_SETTING, @@ -86,6 +92,7 @@ static List> getSettings() { INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, RECOVERY_CHUNK_SIZE, + INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, CCR_WAIT_FOR_METADATA_TIMEOUT); } @@ -93,14 +100,17 @@ static List> getSettings() { private volatile TimeValue recoveryActivityTimeout; private volatile TimeValue recoveryActionTimeout; private volatile ByteSizeValue chunkSize; + private volatile int maxConcurrentFileChunks; public CcrSettings(Settings settings, ClusterSettings clusterSettings) { this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings); this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings)); this.chunkSize = RECOVERY_MAX_BYTES_PER_SECOND.get(settings); + this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(RECOVERY_CHUNK_SIZE, this::setChunkSize); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout); } @@ -109,6 +119,10 @@ private void setChunkSize(ByteSizeValue chunkSize) { this.chunkSize = chunkSize; } + private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) { + this.maxConcurrentFileChunks = maxConcurrentFileChunks; + } + private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { ccrRateLimiter.setMBPerSec(maxBytesPerSec); } @@ -125,6 +139,10 @@ public ByteSizeValue getChunkSize() { return chunkSize; } + public int getMaxConcurrentFileChunks() { + return maxConcurrentFileChunks; + } + public CombinedRateLimiter getRateLimiter() { return ccrRateLimiter; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java index cf8d2e5c55f48..37dfc84f46a01 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java @@ -51,7 +51,6 @@ public static class TransportGetCcrRestoreFileChunkAction extends HandledTransportAction { private final CcrRestoreSourceService restoreSourceService; - private final ThreadPool threadPool; private final BigArrays bigArrays; @Inject @@ -59,7 +58,6 @@ public TransportGetCcrRestoreFileChunkAction(BigArrays bigArrays, TransportServi CcrRestoreSourceService restoreSourceService) { super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC); TransportActionProxy.registerProxyAction(transportService, NAME, GetCcrRestoreFileChunkResponse::new); - this.threadPool = transportService.getThreadPool(); this.restoreSourceService = restoreSourceService; this.bigArrays = bigArrays; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 833cd474450ff..356dcd1439b85 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -9,7 +9,10 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -22,26 +25,27 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.CombinedRateLimiter; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRecoveryException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; @@ -51,6 +55,7 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; @@ -65,15 +70,20 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; import java.util.function.Supplier; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; + /** * This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to @@ -92,17 +102,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit private final String remoteClusterAlias; private final Client client; private final CcrLicenseChecker ccrLicenseChecker; + private final ThreadPool threadPool; private final CounterMetric throttledTime = new CounterMetric(); public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings, - CcrSettings ccrSettings) { + CcrSettings ccrSettings, ThreadPool threadPool) { this.metadata = metadata; this.ccrSettings = ccrSettings; assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX; this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1]; this.ccrLicenseChecker = ccrLicenseChecker; this.client = client; + this.threadPool = threadPool; } @Override @@ -325,7 +337,7 @@ private RestoreSession openSession(String repositoryName, Client remoteClient, S PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout()); return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState, - response.getStoreFileMetaData(), response.getMappingVersion(), ccrSettings, throttledTime::inc); + response.getStoreFileMetaData(), response.getMappingVersion(), threadPool, ccrSettings, throttledTime::inc); } private static class RestoreSession extends FileRestoreContext implements Closeable { @@ -337,107 +349,186 @@ private static class RestoreSession extends FileRestoreContext implements Closea private final long mappingVersion; private final CcrSettings ccrSettings; private final LongConsumer throttleListener; + private final ThreadPool threadPool; RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard, RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion, - CcrSettings ccrSettings, LongConsumer throttleListener) { + ThreadPool threadPool, CcrSettings ccrSettings, LongConsumer throttleListener) { super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes())); this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; this.sourceMetaData = sourceMetaData; this.mappingVersion = mappingVersion; + this.threadPool = threadPool; this.ccrSettings = ccrSettings; this.throttleListener = throttleListener; } void restoreFiles() throws IOException { - ArrayList fileInfos = new ArrayList<>(); + ArrayList fileInfos = new ArrayList<>(); for (StoreFileMetaData fileMetaData : sourceMetaData) { ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length()); - fileInfos.add(new BlobStoreIndexShardSnapshot.FileInfo(fileMetaData.name(), fileMetaData, fileSize)); + fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize)); } SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos); restore(snapshotFiles); } - @Override - protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), ccrSettings, throttleListener); - } + private static class FileSession { + FileSession(long lastTrackedSeqNo, long lastOffset) { + this.lastTrackedSeqNo = lastTrackedSeqNo; + this.lastOffset = lastOffset; + } - @Override - public void close() { - ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); - ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = - remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout()); + final long lastTrackedSeqNo; + final long lastOffset; } - } - - private static class RestoreFileInputStream extends InputStream { - private final Client remoteClient; - private final String sessionUUID; - private final DiscoveryNode node; - private final StoreFileMetaData fileToRecover; - private final CombinedRateLimiter rateLimiter; - private final CcrSettings ccrSettings; - private final LongConsumer throttleListener; - - private long pos = 0; + @Override + protected void restoreFiles(List filesToRecover, Store store) throws IOException { + logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); + + try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {})) { + final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + final AtomicReference> error = new AtomicReference<>(); + + final ArrayDeque remainingFiles = new ArrayDeque<>(filesToRecover); + final Map inFlightRequests = new HashMap<>(); + final Object mutex = new Object(); + + while (true) { + if (error.get() != null) { + break; + } + final FileInfo fileToRecover; + final FileSession fileSession; + synchronized (mutex) { + if (inFlightRequests.isEmpty() && remainingFiles.isEmpty()) { + break; + } + final long maxConcurrentFileChunks = ccrSettings.getMaxConcurrentFileChunks(); + if (remainingFiles.isEmpty() == false && inFlightRequests.size() < maxConcurrentFileChunks) { + for (int i = 0; i < maxConcurrentFileChunks; i++) { + if (remainingFiles.isEmpty()) { + break; + } + inFlightRequests.put(remainingFiles.pop(), new FileSession(NO_OPS_PERFORMED, 0)); + } + } + final Map.Entry minEntry = + inFlightRequests.entrySet().stream().min(Comparator.comparingLong(e -> e.getValue().lastTrackedSeqNo)).get(); + fileSession = minEntry.getValue(); + fileToRecover = minEntry.getKey(); + } + try { + requestSeqIdTracker.waitForOpsToComplete(fileSession.lastTrackedSeqNo); + synchronized (mutex) { + // if file has been removed in the mean-while, it means that restore of this file completed, so start working + // on the next one + if (inFlightRequests.containsKey(fileToRecover) == false) { + continue; + } + } + final long requestSeqId = requestSeqIdTracker.generateSeqNo(); + try { + synchronized (mutex) { + inFlightRequests.put(fileToRecover, new FileSession(requestSeqId, fileSession.lastOffset)); + } + final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), + fileToRecover.length() - fileSession.lastOffset)); + final GetCcrRestoreFileChunkRequest request = + new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileToRecover.name(), bytesRequested); + logger.trace("[{}] [{}] fetching chunk for file [{}]", shardId, snapshotId, fileToRecover.name()); + + remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, + ActionListener.wrap( + r -> threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); + requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); + } + + @Override + protected void doRun() throws Exception { + final int actualChunkSize = r.getChunk().length(); + final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize); + throttleListener.accept(nanosPaused); + final long newOffset = r.getOffset() + actualChunkSize; + assert newOffset <= fileToRecover.length(); + final boolean lastChunk = newOffset >= fileToRecover.length(); + multiFileWriter.writeFileChunk(fileToRecover.metadata(), r.getOffset(), r.getChunk(), + lastChunk); + if (lastChunk) { + synchronized (mutex) { + final FileSession session = inFlightRequests.remove(fileToRecover); + assert session != null : "session disappeared for " + fileToRecover.name(); + } + } else { + synchronized (mutex) { + final FileSession replaced = inFlightRequests.replace(fileToRecover, + new FileSession(requestSeqId, newOffset)); + assert replaced != null : "session disappeared for " + fileToRecover.name(); + } + } + requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); + } + }), + e -> { + error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); + requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); + } + )); + } catch (Exception e) { + error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); + requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); + throw e; + } + } catch (Exception e) { + error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); + break; + } + + } + try { + requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ElasticsearchException(e); + } + if (error.get() != null) { + handleError(store, error.get().v2()); + } + } - private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover, - CcrSettings ccrSettings, LongConsumer throttleListener) { - this.remoteClient = remoteClient; - this.sessionUUID = sessionUUID; - this.node = node; - this.fileToRecover = fileToRecover; - this.ccrSettings = ccrSettings; - this.rateLimiter = ccrSettings.getRateLimiter(); - this.throttleListener = throttleListener; + logger.trace("[{}] completed CCR restore", shardId); } + private void handleError(Store store, Exception e) throws IOException { + final IOException corruptIndexException; + if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { + try { + store.markStoreCorrupted(corruptIndexException); + } catch (IOException ioe) { + logger.warn("store cannot be marked as corrupted", e); + } + throw corruptIndexException; + } else { + ExceptionsHelper.reThrowIfNotNull(e); + } + } @Override - public int read() throws IOException { + protected InputStream fileInputStream(FileInfo fileInfo) { throw new UnsupportedOperationException(); } @Override - public int read(byte[] bytes, int off, int len) throws IOException { - long remainingBytes = fileToRecover.length() - pos; - if (remainingBytes <= 0) { - return 0; - } - - int bytesRequested = (int) Math.min(remainingBytes, len); - - long nanosPaused = rateLimiter.maybePause(bytesRequested); - throttleListener.accept(nanosPaused); - - String fileName = fileToRecover.name(); - GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested); - GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response = - remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet(ccrSettings.getRecoveryActionTimeout()); - BytesReference fileChunk = response.getChunk(); - - int bytesReceived = fileChunk.length(); - if (bytesReceived > bytesRequested) { - throw new IOException("More bytes [" + bytesReceived + "] received than requested [" + bytesRequested + "]"); - } - - long leaderOffset = response.getOffset(); - assert pos == leaderOffset : "Position [" + pos + "] should be equal to the leader file offset [" + leaderOffset + "]."; - - try (StreamInput streamInput = fileChunk.streamInput()) { - int bytesRead = streamInput.read(bytes, 0, bytesReceived); - assert bytesRead == bytesReceived : "Did not read the correct number of bytes"; - } - - pos += bytesReceived; - - return bytesReceived; + public void close() { + ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); + ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = + remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout()); } - } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index f093143112d3d..65fb14fdb95ec 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -42,8 +42,6 @@ import java.util.HashSet; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Consumer; import java.util.function.LongConsumer; public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener { @@ -52,7 +50,6 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); private final Map> sessionsForShard = new HashMap<>(); - private final CopyOnWriteArrayList> closeSessionListeners = new CopyOnWriteArrayList<>(); private final ThreadPool threadPool; private final CcrSettings ccrSettings; private final CounterMetric throttleTime = new CounterMetric(); @@ -93,12 +90,6 @@ protected synchronized void doClose() throws IOException { onGoingRestores.clear(); } - // TODO: The listeners are for testing. Once end-to-end file restore is implemented and can be tested, - // these should be removed. - public void addCloseSessionListener(Consumer listener) { - closeSessionListeners.add(listener); - } - public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException { boolean success = false; RestoreSession restore = null; @@ -165,9 +156,7 @@ private void internalCloseSession(String sessionUUID, boolean throwIfSessionMiss } } } - closeSessionListeners.forEach(c -> c.accept(sessionUUID)); restore.decRef(); - } private Scheduler.Cancellable scheduleTimeout(String sessionUUID) { @@ -255,6 +244,7 @@ protected void closeInternal() { assert keyedLock.hasLockedKeys() == false : "Should not hold any file locks when closing"; timeoutTask.cancel(); IOUtils.closeWhileHandlingException(cachedInputs.values()); + IOUtils.closeWhileHandlingException(commitRef); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 80bded6a5d1d3..d5c3304e966e6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -114,7 +114,7 @@ public void testFollowIndex() throws Exception { final int firstBatchNumDocs; // Sometimes we want to index a lot of documents to ensure that the recovery works with larger files if (rarely()) { - firstBatchNumDocs = randomIntBetween(1800, 2000); + firstBatchNumDocs = randomIntBetween(1800, 10000); } else { firstBatchNumDocs = randomIntBetween(10, 64); } @@ -127,6 +127,7 @@ public void testFollowIndex() throws Exception { waitForDocs(firstBatchNumDocs, indexer); indexer.assertNoFailures(); + logger.info("Executing put follow"); boolean waitOnAll = randomBoolean(); final PutFollowAction.Request followRequest; @@ -176,6 +177,8 @@ public void testFollowIndex() throws Exception { logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); indexer.continueIndexing(secondBatchNumDocs); + waitForDocs(firstBatchNumDocs + secondBatchNumDocs, indexer); + final Map secondBatchNumDocsPerShard = new HashMap<>(); final ShardStats[] secondBatchShardStats = leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); @@ -194,6 +197,7 @@ public void testFollowIndex() throws Exception { assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); }); } + pauseFollow("index2"); assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 1c3c0da3d3c8a..2b5011d45139f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -12,10 +12,9 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -25,7 +24,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Set; +import java.util.stream.Collectors; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; @@ -39,10 +38,8 @@ public void setUp() throws Exception { super.setUp(); Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); taskQueue = new DeterministicTaskQueue(settings, random()); - Set> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, - CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, - CcrSettings.RECOVERY_CHUNK_SIZE); - ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, CcrSettings.getSettings() + .stream().filter(s -> s.hasNodeScope()).collect(Collectors.toSet())); restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings)); } @@ -202,7 +199,10 @@ public void testGetSessionDoesNotLeakFileIfClosed() throws IOException { sessionReader.readFileBytes(files.get(1).name(), new BytesArray(new byte[10])); } + assertTrue(EngineTestCase.hasSnapshottedCommits(IndexShardTestCase.getEngine(indexShard))); restoreSourceService.closeSession(sessionUUID); + assertFalse(EngineTestCase.hasSnapshottedCommits(IndexShardTestCase.getEngine(indexShard))); + closeShards(indexShard); // Exception will be thrown if file is not closed. }