From fa2a1e7b678c821ab28e5a172e2234bc529281ed Mon Sep 17 00:00:00 2001 From: satyajitg28 <95424630+satyajitg28@users.noreply.github.com> Date: Fri, 8 Jul 2022 18:06:55 +0530 Subject: [PATCH] Introducing TranslogManager implementations decoupled from the Engine [2.x] (#3820) * Introduce decoupled translog manager interfaces compatible with 2.x Co-authored-by: Bukhtawar Khan Co-authored-by: Satyajit Ganguly --- .../org/opensearch/index/engine/Engine.java | 2 +- .../index/engine/LifecycleAware.java | 20 ++ .../translog/InternalTranslogManager.java | 322 ++++++++++++++++++ .../index/translog/NoOpTranslogManager.java | 110 ++++++ .../index/translog/TranslogManager.java | 108 ++++++ .../translog/TranslogRecoveryRunner.java | 28 ++ .../translog/WriteOnlyTranslogManager.java | 69 ++++ .../CompositeTranslogEventListener.java | 110 ++++++ .../listener/TranslogEventListener.java | 50 +++ .../index/translog/listener/package-info.java | 11 + .../InternalTranslogManagerTests.java | 279 +++++++++++++++ .../translog/TranslogManagerTestCase.java | 217 ++++++++++++ .../listener/TranslogListenerTests.java | 126 +++++++ 13 files changed, 1451 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/index/engine/LifecycleAware.java create mode 100644 server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java create mode 100644 server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java create mode 100644 server/src/main/java/org/opensearch/index/translog/TranslogManager.java create mode 100644 server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java create mode 100644 server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java create mode 100644 server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java create mode 100644 server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java create mode 100644 server/src/main/java/org/opensearch/index/translog/listener/package-info.java create mode 100644 server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java create mode 100644 server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java create mode 100644 server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 87803f280260f..c0623fabf3a40 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -828,7 +828,7 @@ protected final void ensureOpen(Exception suppressed) { } } - protected final void ensureOpen() { + public final void ensureOpen() { ensureOpen(null); } diff --git a/server/src/main/java/org/opensearch/index/engine/LifecycleAware.java b/server/src/main/java/org/opensearch/index/engine/LifecycleAware.java new file mode 100644 index 0000000000000..06cfb8e7e73a5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/LifecycleAware.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +/** + * Interface that is aware of a component lifecycle. + */ +public interface LifecycleAware { + + /** + * Checks to ensure if the component is an open state + */ + void ensureOpen(); +} diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java new file mode 100644 index 0000000000000..22f72cc3d9acd --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -0,0 +1,322 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.engine.LifecycleAware; +import org.opensearch.index.seqno.LocalCheckpointTracker; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.translog.listener.TranslogEventListener; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/** + * The {@link TranslogManager} implementation capable of orchestrating all read/write {@link Translog} operations while + * interfacing with the {@link org.opensearch.index.engine.InternalEngine} + * + * @opensearch.internal + */ +public class InternalTranslogManager implements TranslogManager { + + private final ReleasableLock readLock; + private final LifecycleAware engineLifeCycleAware; + private final ShardId shardId; + private final Translog translog; + private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); + private final TranslogEventListener translogEventListener; + private static final Logger logger = LogManager.getLogger(InternalTranslogManager.class); + + public InternalTranslogManager( + TranslogConfig translogConfig, + LongSupplier primaryTermSupplier, + LongSupplier globalCheckpointSupplier, + TranslogDeletionPolicy translogDeletionPolicy, + ShardId shardId, + ReleasableLock readLock, + Supplier localCheckpointTrackerSupplier, + String translogUUID, + TranslogEventListener translogEventListener, + LifecycleAware engineLifeCycleAware + ) throws IOException { + this.shardId = shardId; + this.readLock = readLock; + this.engineLifeCycleAware = engineLifeCycleAware; + this.translogEventListener = translogEventListener; + Translog translog = openTranslog(translogConfig, primaryTermSupplier, translogDeletionPolicy, globalCheckpointSupplier, seqNo -> { + final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.get(); + assert tracker != null || getTranslog(true).isOpen() == false; + if (tracker != null) { + tracker.markSeqNoAsPersisted(seqNo); + } + }, translogUUID); + assert translog.getGeneration() != null; + this.translog = translog; + assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; + // don't allow commits until we are done with recovering + pendingTranslogRecovery.set(true); + } + + /** + * Rolls the translog generation and cleans unneeded. + */ + @Override + public void rollTranslogGeneration() throws TranslogException { + try (ReleasableLock ignored = readLock.acquire()) { + engineLifeCycleAware.ensureOpen(); + translog.rollGeneration(); + translog.trimUnreferencedReaders(); + } catch (AlreadyClosedException e) { + translogEventListener.onTragicFailure(e); + throw e; + } catch (Exception e) { + try { + translogEventListener.onFailure("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new TranslogException(shardId, "failed to roll translog", e); + } + } + + /** + * Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive). + * This operation will close the engine if the recovery fails. + * @param translogRecoveryRunner the translog recovery runner + * @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered + * @return the total number of operations recovered + */ + @Override + public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) + throws IOException { + int opsRecovered = 0; + translogEventListener.onBeginTranslogRecovery(); + try (ReleasableLock ignored = readLock.acquire()) { + engineLifeCycleAware.ensureOpen(); + if (pendingTranslogRecovery.get() == false) { + throw new IllegalStateException("Engine has already been recovered"); + } + try { + opsRecovered = recoverFromTranslogInternal(translogRecoveryRunner, localCheckpoint, recoverUpToSeqNo); + } catch (Exception e) { + try { + pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush + translogEventListener.onFailure("failed to recover from translog", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; + } + } + return opsRecovered; + } + + private int recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) { + final int opsRecovered; + if (localCheckpoint < recoverUpToSeqNo) { + try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { + opsRecovered = translogRecoveryRunner.run(snapshot); + } catch (Exception e) { + throw new TranslogException(shardId, "failed to recover from translog", e); + } + } else { + opsRecovered = 0; + } + // flush if we recovered something or if we have references to older translogs + // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length. + assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; + pendingTranslogRecovery.set(false); // we are good - now we can commit + logger.trace( + () -> new ParameterizedMessage( + "flushing post recovery from translog: ops recovered [{}], current translog generation [{}]", + opsRecovered, + translog.currentFileGeneration() + ) + ); + translogEventListener.onAfterTranslogRecovery(); + return opsRecovered; + } + + /** + * Checks if the underlying storage sync is required. + */ + @Override + public boolean isTranslogSyncNeeded() { + return getTranslog(true).syncNeeded(); + } + + /** + * Ensures that all locations in the given stream have been written to the underlying storage. + */ + @Override + public boolean ensureTranslogSynced(Stream locations) throws IOException { + final boolean synced = translog.ensureSynced(locations); + if (synced) { + translogEventListener.onAfterTranslogSync(); + } + return synced; + } + + /** + * Syncs the translog and invokes the listener + * @throws IOException the exception on sync failure + */ + @Override + public void syncTranslog() throws IOException { + translog.sync(); + translogEventListener.onAfterTranslogSync(); + } + + @Override + public TranslogStats getTranslogStats() { + return getTranslog(true).stats(); + } + + /** + * Returns the last location that the translog of this engine has written into. + */ + @Override + public Translog.Location getTranslogLastWriteLocation() { + return getTranslog(true).getLastWriteLocation(); + } + + /** + * checks and removes translog files that no longer need to be retained. See + * {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details + */ + @Override + public void trimUnreferencedTranslogFiles() throws TranslogException { + try (ReleasableLock ignored = readLock.acquire()) { + engineLifeCycleAware.ensureOpen(); + translog.trimUnreferencedReaders(); + } catch (AlreadyClosedException e) { + translogEventListener.onTragicFailure(e); + throw e; + } catch (Exception e) { + try { + translogEventListener.onFailure("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new TranslogException(shardId, "failed to trim translog", e); + } + } + + /** + * Tests whether or not the translog generation should be rolled to a new generation. + * This test is based on the size of the current generation compared to the configured generation threshold size. + * + * @return {@code true} if the current generation should be rolled to a new generation + */ + @Override + public boolean shouldRollTranslogGeneration() { + return getTranslog(true).shouldRollGeneration(); + } + + /** + * Trims translog for terms below belowTerm and seq# above aboveSeqNo + * @see Translog#trimOperations(long, long) + */ + @Override + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException { + try (ReleasableLock ignored = readLock.acquire()) { + engineLifeCycleAware.ensureOpen(); + translog.trimOperations(belowTerm, aboveSeqNo); + } catch (AlreadyClosedException e) { + translogEventListener.onTragicFailure(e); + throw e; + } catch (Exception e) { + try { + translogEventListener.onFailure("translog operations trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new TranslogException(shardId, "failed to trim translog operations", e); + } + } + + /** + * This method replays translog to restore the Lucene index which might be reverted previously. + * This ensures that all acknowledged writes are restored correctly when this engine is promoted. + * + * @return the number of translog operations have been recovered + */ + @Override + public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { + try (ReleasableLock ignored = readLock.acquire()) { + engineLifeCycleAware.ensureOpen(); + try (Translog.Snapshot snapshot = getTranslog(true).newSnapshot(processedCheckpoint + 1, Long.MAX_VALUE)) { + return translogRecoveryRunner.run(snapshot); + } + } + } + + /** + * Ensures that the flushes can succeed if there are no pending translog recovery + */ + @Override + public void ensureCanFlush() { + // translog recovery happens after the engine is fully constructed. + // If we are in this stage we have to prevent flushes from this + // engine otherwise we might loose documents if the flush succeeds + // and the translog recovery fails when we "commit" the translog on flush. + if (pendingTranslogRecovery.get()) { + throw new IllegalStateException(shardId.toString() + " flushes are disabled - pending translog recovery"); + } + } + + /** + * Do not replay translog operations, but make the engine be ready. + */ + @Override + public void skipTranslogRecovery() { + assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; + pendingTranslogRecovery.set(false); // we are good - now we can commit + } + + private Translog openTranslog( + TranslogConfig translogConfig, + LongSupplier primaryTermSupplier, + TranslogDeletionPolicy translogDeletionPolicy, + LongSupplier globalCheckpointSupplier, + LongConsumer persistedSequenceNumberConsumer, + String translogUUID + ) throws IOException { + + return new Translog( + translogConfig, + translogUUID, + translogDeletionPolicy, + globalCheckpointSupplier, + primaryTermSupplier, + persistedSequenceNumberConsumer + ); + } + + /** + * Returns the the translog instance + * @param ensureOpen check if the engine is open + * @return the {@link Translog} instance + */ + @Override + public Translog getTranslog(boolean ensureOpen) { + if (ensureOpen) { + this.engineLifeCycleAware.ensureOpen(); + } + return translog; + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java new file mode 100644 index 0000000000000..07cae808ce071 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.stream.Stream; + +/** + * The no-op implementation of {@link TranslogManager} that doesn't perform any operation + * + * @opensearch.internal + */ +public class NoOpTranslogManager implements TranslogManager { + + private final Translog.Snapshot emptyTranslogSnapshot; + private final ReleasableLock readLock; + private final Runnable ensureOpen; + private final ShardId shardId; + private final TranslogStats translogStats; + + public NoOpTranslogManager( + ShardId shardId, + ReleasableLock readLock, + Runnable ensureOpen, + TranslogStats translogStats, + Translog.Snapshot emptyTranslogSnapshot + ) throws IOException { + this.emptyTranslogSnapshot = emptyTranslogSnapshot; + this.readLock = readLock; + this.shardId = shardId; + this.ensureOpen = ensureOpen; + this.translogStats = translogStats; + } + + @Override + public void rollTranslogGeneration() throws TranslogException {} + + @Override + public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) + throws IOException { + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen.run(); + try (Translog.Snapshot snapshot = emptyTranslogSnapshot) { + translogRecoveryRunner.run(snapshot); + } catch (final Exception e) { + throw new TranslogException(shardId, "failed to recover from empty translog snapshot", e); + } + } + return emptyTranslogSnapshot.totalOperations(); + } + + @Override + public boolean isTranslogSyncNeeded() { + return false; + } + + @Override + public boolean ensureTranslogSynced(Stream locations) throws IOException { + return false; + } + + @Override + public void syncTranslog() throws IOException {} + + @Override + public TranslogStats getTranslogStats() { + return translogStats; + } + + @Override + public Translog.Location getTranslogLastWriteLocation() { + return new Translog.Location(0, 0, 0); + } + + @Override + public void trimUnreferencedTranslogFiles() throws TranslogException {} + + @Override + public boolean shouldRollTranslogGeneration() { + return false; + } + + @Override + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException {} + + @Override + public Translog getTranslog(boolean ensureOpen) { + return null; + } + + @Override + public void ensureCanFlush() {} + + @Override + public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { + return 0; + } + + @Override + public void skipTranslogRecovery() {} +} diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java new file mode 100644 index 0000000000000..988a88c5d2ae5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import java.io.IOException; +import java.util.stream.Stream; + +/** + * The interface that orchestrates Translog operations and manages the {@link Translog} and interfaces with the Engine + * + * @opensearch.internal + */ +public interface TranslogManager { + + /** + * Rolls the translog generation and cleans unneeded. + */ + void rollTranslogGeneration() throws TranslogException; + + /** + * Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive). + * This operation will close the engine if the recovery fails. + * + * @param translogRecoveryRunner the translog recovery runner + * @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered + * @return ops recovered + */ + int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) throws IOException; + + /** + * Checks if the underlying storage sync is required. + */ + boolean isTranslogSyncNeeded(); + + /** + * Ensures that all locations in the given stream have been written to the underlying storage. + */ + boolean ensureTranslogSynced(Stream locations) throws IOException; + + /** + * Syncs translog to disk + * @throws IOException the exception while performing the sync operation + */ + void syncTranslog() throws IOException; + + /** + * Translog operation stats + * @return the translog stats + */ + TranslogStats getTranslogStats(); + + /** + * Returns the last location that the translog of this engine has written into. + */ + Translog.Location getTranslogLastWriteLocation(); + + /** + * checks and removes translog files that no longer need to be retained. See + * {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details + */ + void trimUnreferencedTranslogFiles() throws TranslogException; + + /** + * Tests whether or not the translog generation should be rolled to a new generation. + * This test is based on the size of the current generation compared to the configured generation threshold size. + * + * @return {@code true} if the current generation should be rolled to a new generation + */ + boolean shouldRollTranslogGeneration(); + + /** + * Trims translog for terms below belowTerm and seq# above aboveSeqNo + * + * @see Translog#trimOperations(long, long) + */ + void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException; + + /** + * This method replays translog to restore the Lucene index which might be reverted previously. + * This ensures that all acknowledged writes are restored correctly when this engine is promoted. + * + * @return the number of translog operations have been recovered + */ + int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException; + + /** + * Do not replay translog operations, but make the engine be ready. + */ + void skipTranslogRecovery(); + + /** + * Returns the instance of the translog with a precondition + * @param ensureOpen check if the engine is open + * @return the translog instance + */ + Translog getTranslog(boolean ensureOpen); + + /** + * Checks if the translog has a pending recovery + */ + void ensureCanFlush(); +} diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java b/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java new file mode 100644 index 0000000000000..91c9a95b07d58 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import java.io.IOException; + +/** + * The interface that defines how {@link Translog.Snapshot} will get replayed into the Engine + * + * @opensearch.internal + */ +@FunctionalInterface +public interface TranslogRecoveryRunner { + + /** + * Recovers a translog snapshot + * @param snapshot the snapshot of translog operations + * @return recoveredOps + * @throws IOException exception while recovering operations + */ + int run(Translog.Snapshot snapshot) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java new file mode 100644 index 0000000000000..09f5f38a9f6a9 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.engine.LifecycleAware; +import org.opensearch.index.seqno.LocalCheckpointTracker; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.translog.listener.TranslogEventListener; + +import java.io.IOException; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +/*** + * The implementation of {@link TranslogManager} that only orchestrates writes to the underlying {@link Translog} + * + * @opensearch.internal + */ +public class WriteOnlyTranslogManager extends InternalTranslogManager { + + public WriteOnlyTranslogManager( + TranslogConfig translogConfig, + LongSupplier primaryTermSupplier, + LongSupplier globalCheckpointSupplier, + TranslogDeletionPolicy translogDeletionPolicy, + ShardId shardId, + ReleasableLock readLock, + Supplier localCheckpointTrackerSupplier, + String translogUUID, + TranslogEventListener translogEventListener, + LifecycleAware engineLifecycleAware + ) throws IOException { + super( + translogConfig, + primaryTermSupplier, + globalCheckpointSupplier, + translogDeletionPolicy, + shardId, + readLock, + localCheckpointTrackerSupplier, + translogUUID, + translogEventListener, + engineLifecycleAware + ); + } + + @Override + public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { + return 0; + } + + @Override + public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) + throws IOException { + throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog."); + } + + @Override + public void skipTranslogRecovery() { + // Do nothing. + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java new file mode 100644 index 0000000000000..731b069ab0c74 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.listener; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.ExceptionsHelper; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * The listener that multiplexes other {@link TranslogEventListener} + * + * @opensearch.internal + */ +public final class CompositeTranslogEventListener implements TranslogEventListener { + + private final List listeners; + private final Logger logger = LogManager.getLogger(CompositeTranslogEventListener.class); + + public CompositeTranslogEventListener(Collection listeners) { + for (TranslogEventListener listener : listeners) { + if (listener == null) { + throw new IllegalArgumentException("listeners must be non-null"); + } + } + this.listeners = Collections.unmodifiableList(new ArrayList<>(listeners)); + } + + @Override + public void onAfterTranslogSync() { + List exceptionList = new ArrayList<>(listeners.size()); + for (TranslogEventListener listener : listeners) { + try { + listener.onAfterTranslogSync(); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogSync listener"), ex); + exceptionList.add(ex); + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + } + + @Override + public void onAfterTranslogRecovery() { + List exceptionList = new ArrayList<>(listeners.size()); + for (TranslogEventListener listener : listeners) { + try { + listener.onAfterTranslogRecovery(); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogRecovery listener"), ex); + exceptionList.add(ex); + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + } + + @Override + public void onBeginTranslogRecovery() { + List exceptionList = new ArrayList<>(listeners.size()); + for (TranslogEventListener listener : listeners) { + try { + listener.onBeginTranslogRecovery(); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to invoke onBeginTranslogRecovery listener"), ex); + exceptionList.add(ex); + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + } + + @Override + public void onFailure(String reason, Exception e) { + List exceptionList = new ArrayList<>(listeners.size()); + for (TranslogEventListener listener : listeners) { + try { + listener.onFailure(reason, e); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to invoke onFailure listener"), ex); + exceptionList.add(ex); + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + } + + @Override + public void onTragicFailure(AlreadyClosedException e) { + List exceptionList = new ArrayList<>(listeners.size()); + for (TranslogEventListener listener : listeners) { + try { + listener.onTragicFailure(e); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("failed to invoke onTragicFailure listener"), ex); + exceptionList.add(ex); + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList); + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java new file mode 100644 index 0000000000000..1862b4b9a62b7 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.listener; + +import org.apache.lucene.store.AlreadyClosedException; + +/** + * The listener that gets fired on events related to {@link org.opensearch.index.translog.TranslogManager} + * + * @opensearch.internal + */ +public interface TranslogEventListener { + + TranslogEventListener NOOP_TRANSLOG_EVENT_LISTENER = new TranslogEventListener() { + }; + + /** + * Invoked after translog sync operations + */ + default void onAfterTranslogSync() {} + + /** + * Invoked after recovering operations from translog + */ + default void onAfterTranslogRecovery() {} + + /** + * Invoked before recovering operations from translog + */ + default void onBeginTranslogRecovery() {} + + /** + * Invoked when translog operations run into accessing an already closed resource + * @param ex the exception thrown when accessing a closed resource + */ + default void onTragicFailure(AlreadyClosedException ex) {} + + /** + * Invoked when translog operations run into any other failure + * @param reason the failure reason + * @param ex the failure exception + */ + default void onFailure(String reason, Exception ex) {} +} diff --git a/server/src/main/java/org/opensearch/index/translog/listener/package-info.java b/server/src/main/java/org/opensearch/index/translog/listener/package-info.java new file mode 100644 index 0000000000000..bfb2415881c10 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/listener/package-info.java @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** + * Provides mechanism to listen into translog operations + */ +package org.opensearch.index.translog.listener; diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java new file mode 100644 index 0000000000000..4db792b4a3fc2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -0,0 +1,279 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.seqno.LocalCheckpointTracker; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.translog.listener.TranslogEventListener; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; + +public class InternalTranslogManagerTests extends TranslogManagerTestCase { + + public void testRecoveryFromTranslog() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final AtomicBoolean beginTranslogRecoveryInvoked = new AtomicBoolean(false); + final AtomicBoolean onTranslogRecoveryInvoked = new AtomicBoolean(false); + TranslogManager translogManager = null; + + LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + try { + translogManager = new InternalTranslogManager( + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> tracker, + translogUUID, + TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, + () -> {} + ); + final int docs = randomIntBetween(1, 100); + for (int i = 0; i < docs; i++) { + final String id = Integer.toString(i); + final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); + Engine.Index index = indexForDoc(doc); + Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); + tracker.markSeqNoAsProcessed(i); + translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.rollTranslogGeneration(); + } + long maxSeqNo = tracker.getMaxSeqNo(); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().getUncommittedOperations()); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); + + translogManager.syncTranslog(); + translogManager.getTranslog(false).close(); + translogManager = new InternalTranslogManager( + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED), + translogUUID, + new TranslogEventListener() { + @Override + public void onAfterTranslogRecovery() { + onTranslogRecoveryInvoked.set(true); + } + + @Override + public void onBeginTranslogRecovery() { + beginTranslogRecoveryInvoked.set(true); + } + }, + () -> {} + ); + AtomicInteger opsRecovered = new AtomicInteger(); + int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + opsRecovered.incrementAndGet(); + } + return opsRecovered.get(); + }, NO_OPS_PERFORMED, Long.MAX_VALUE); + + assertEquals(maxSeqNo + 1, opsRecovered.get()); + assertEquals(maxSeqNo + 1, opsRecoveredFromTranslog); + + assertTrue(beginTranslogRecoveryInvoked.get()); + assertTrue(onTranslogRecoveryInvoked.get()); + + } finally { + translogManager.getTranslog(false).close(); + } + } + + public void testTranslogRollsGeneration() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + TranslogManager translogManager = null; + LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + try { + translogManager = new InternalTranslogManager( + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> tracker, + translogUUID, + TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, + () -> {} + ); + final int docs = randomIntBetween(1, 100); + for (int i = 0; i < docs; i++) { + final String id = Integer.toString(i); + final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); + Engine.Index index = indexForDoc(doc); + Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); + tracker.markSeqNoAsProcessed(i); + translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.rollTranslogGeneration(); + } + long maxSeqNo = tracker.getMaxSeqNo(); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().getUncommittedOperations()); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); + + translogManager.syncTranslog(); + translogManager.getTranslog(false).close(); + translogManager = new InternalTranslogManager( + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED), + translogUUID, + TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, + () -> {} + ); + AtomicInteger opsRecovered = new AtomicInteger(); + int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + opsRecovered.incrementAndGet(); + } + return opsRecovered.get(); + }, NO_OPS_PERFORMED, Long.MAX_VALUE); + + assertEquals(maxSeqNo + 1, opsRecovered.get()); + assertEquals(maxSeqNo + 1, opsRecoveredFromTranslog); + } finally { + translogManager.getTranslog(false).close(); + } + } + + public void testTrimOperationsFromTranslog() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + TranslogManager translogManager = null; + LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + try { + translogManager = new InternalTranslogManager( + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> tracker, + translogUUID, + TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, + () -> {} + ); + final int docs = randomIntBetween(1, 100); + for (int i = 0; i < docs; i++) { + final String id = Integer.toString(i); + final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); + Engine.Index index = indexForDoc(doc); + Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); + tracker.markSeqNoAsProcessed(i); + translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + } + long maxSeqNo = tracker.getMaxSeqNo(); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().getUncommittedOperations()); + assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); + + primaryTerm.set(randomLongBetween(primaryTerm.get(), Long.MAX_VALUE)); + translogManager.rollTranslogGeneration(); + translogManager.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog + + translogManager.getTranslog(false).close(); + translogManager = new InternalTranslogManager( + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED), + translogUUID, + TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, + () -> {} + ); + AtomicInteger opsRecovered = new AtomicInteger(); + int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + opsRecovered.incrementAndGet(); + } + return opsRecovered.get(); + }, NO_OPS_PERFORMED, Long.MAX_VALUE); + + assertEquals(0, opsRecovered.get()); + assertEquals(0, opsRecoveredFromTranslog); + } finally { + translogManager.getTranslog(false).close(); + } + } + + public void testTranslogSync() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + AtomicBoolean syncListenerInvoked = new AtomicBoolean(); + TranslogManager translogManager = null; + final AtomicInteger maxSeqNo = new AtomicInteger(randomIntBetween(0, 128)); + final AtomicInteger localCheckpoint = new AtomicInteger(randomIntBetween(0, maxSeqNo.get())); + try { + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + AtomicReference translogManagerAtomicReference = new AtomicReference<>(); + translogManager = new InternalTranslogManager( + new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + primaryTerm, + globalCheckpoint::get, + createTranslogDeletionPolicy(INDEX_SETTINGS), + shardId, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + () -> new LocalCheckpointTracker(maxSeqNo.get(), localCheckpoint.get()), + translogUUID, + new TranslogEventListener() { + @Override + public void onAfterTranslogSync() { + try { + translogManagerAtomicReference.get().getTranslog(false).trimUnreferencedReaders(); + syncListenerInvoked.set(true); + } catch (IOException ex) { + fail("Failed due to " + ex); + } + } + }, + () -> {} + ); + translogManagerAtomicReference.set(translogManager); + Engine.Index index = indexForDoc(doc); + Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), 1, false); + translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + + translogManager.syncTranslog(); + + assertThat(translogManager.getTranslog(true).currentFileGeneration(), equalTo(2L)); + assertThat(translogManager.getTranslog(true).getMinFileGeneration(), equalTo(2L)); + assertTrue(syncListenerInvoked.get()); + } finally { + translogManager.getTranslog(false).close(); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java new file mode 100644 index 0000000000000..25867cdb666ad --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java @@ -0,0 +1,217 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.Term; +import org.apache.lucene.util.BytesRef; +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.AllocationId; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.Index; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.mapper.ParseContext; +import org.opensearch.index.mapper.Mapping; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.mapper.SourceFieldMapper; +import org.opensearch.index.mapper.SeqNoFieldMapper; +import org.opensearch.index.mapper.Uid; +import org.opensearch.index.mapper.IdFieldMapper; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.ShardId; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; + +import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; + +public abstract class TranslogManagerTestCase extends OpenSearchTestCase { + + protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0); + protected final AllocationId allocationId = AllocationId.newInitializing(); + protected static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); + private AtomicLong globalCheckpoint; + protected ThreadPool threadPool; + protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(1L); + + protected IndexSettings defaultSettings; + protected String codecName; + protected Path primaryTranslogDir; + protected String translogUUID; + + protected static final BytesArray SOURCE = bytesArray("{}"); + protected static final BytesReference B_1 = new BytesArray(new byte[] { 1 }); + + protected Translog createTranslog(LongSupplier primaryTermSupplier) throws IOException { + return createTranslog(primaryTranslogDir, primaryTermSupplier); + } + + protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSupplier) throws IOException { + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); + String translogUUID = Translog.createEmptyTranslog( + translogPath, + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + primaryTermSupplier.getAsLong() + ); + return new Translog( + translogConfig, + translogUUID, + createTranslogDeletionPolicy(INDEX_SETTINGS), + () -> SequenceNumbers.NO_OPS_PERFORMED, + primaryTermSupplier, + seqNo -> {} + ); + } + + private String create(Path path) throws IOException { + globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + return Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + primaryTerm.set(randomIntBetween(1, 100)); + defaultSettings = IndexSettingsModule.newIndexSettings("test", indexSettings()); + threadPool = new TestThreadPool(getClass().getName()); + primaryTranslogDir = createTempDir("translog-primary"); + translogUUID = create(primaryTranslogDir); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + IOUtils.close(() -> terminate(threadPool)); + } + + protected Settings indexSettings() { + // TODO randomize more settings + return Settings.builder() + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us + .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put( + IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(), + between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY)) + ) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000)) + .build(); + } + + public static final class PrimaryTermSupplier implements LongSupplier { + private final AtomicLong term; + + PrimaryTermSupplier(long initialTerm) { + this.term = new AtomicLong(initialTerm); + } + + public long get() { + return term.get(); + } + + public void set(long newTerm) { + this.term.set(newTerm); + } + + @Override + public long getAsLong() { + return get(); + } + } + + protected static ParsedDocument testParsedDocument( + String id, + String routing, + ParseContext.Document document, + BytesReference source, + Mapping mappingUpdate + ) { + return testParsedDocument(id, routing, document, source, mappingUpdate, false); + } + + protected static ParsedDocument testParsedDocument( + String id, + String routing, + ParseContext.Document document, + BytesReference source, + Mapping mappingUpdate, + boolean recoverySource + ) { + Field uidField = new Field("_id", Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE); + Field versionField = new NumericDocValuesField("_version", 0); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + document.add(uidField); + document.add(versionField); + document.add(seqID.seqNo); + document.add(seqID.seqNoDocValue); + document.add(seqID.primaryTerm); + BytesRef ref = source.toBytesRef(); + if (recoverySource) { + document.add(new StoredField(SourceFieldMapper.RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length)); + document.add(new NumericDocValuesField(SourceFieldMapper.RECOVERY_SOURCE_NAME, 1)); + } else { + document.add(new StoredField(SourceFieldMapper.NAME, ref.bytes, ref.offset, ref.length)); + } + return new ParsedDocument(versionField, seqID, id, routing, List.of(document), source, XContentType.JSON, mappingUpdate); + } + + protected static ParseContext.Document testDocumentWithTextField() { + return testDocumentWithTextField("test"); + } + + protected static ParseContext.Document testDocumentWithTextField(String value) { + ParseContext.Document document = testDocument(); + document.add(new TextField("value", value, Field.Store.YES)); + return document; + } + + protected static ParseContext.Document testDocument() { + return new ParseContext.Document(); + } + + protected Engine.Index indexForDoc(ParsedDocument doc) { + return new Engine.Index(newUid(doc), primaryTerm.get(), doc); + } + + public static Term newUid(String id) { + return new Term("_id", Uid.encodeId(id)); + } + + public static Term newUid(ParsedDocument doc) { + return newUid(doc.id()); + } + + protected static BytesArray bytesArray(String string) { + return new BytesArray(string.getBytes(Charset.defaultCharset())); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java new file mode 100644 index 0000000000000..79c243772b252 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.listener; + +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.test.OpenSearchTestCase; + +import java.lang.reflect.Proxy; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class TranslogListenerTests extends OpenSearchTestCase { + + public void testCompositeTranslogEventListener() { + AtomicInteger onTranslogSyncInvoked = new AtomicInteger(); + AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger(); + AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger(); + AtomicInteger onFailureInvoked = new AtomicInteger(); + AtomicInteger onTragicFailureInvoked = new AtomicInteger(); + + TranslogEventListener listener = new TranslogEventListener() { + @Override + public void onAfterTranslogSync() { + onTranslogSyncInvoked.incrementAndGet(); + } + + @Override + public void onAfterTranslogRecovery() { + onTranslogRecoveryInvoked.incrementAndGet(); + } + + @Override + public void onBeginTranslogRecovery() { + onBeginTranslogRecoveryInvoked.incrementAndGet(); + } + + @Override + public void onFailure(String reason, Exception ex) { + onFailureInvoked.incrementAndGet(); + } + + @Override + public void onTragicFailure(AlreadyClosedException ex) { + onTragicFailureInvoked.incrementAndGet(); + } + }; + + final List translogEventListeners = new ArrayList<>(Arrays.asList(listener, listener)); + Collections.shuffle(translogEventListeners, random()); + TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners); + compositeListener.onAfterTranslogRecovery(); + compositeListener.onAfterTranslogSync(); + compositeListener.onBeginTranslogRecovery(); + compositeListener.onFailure("reason", new RuntimeException("reason")); + compositeListener.onTragicFailure(new AlreadyClosedException("reason")); + + assertEquals(2, onBeginTranslogRecoveryInvoked.get()); + assertEquals(2, onTranslogRecoveryInvoked.get()); + assertEquals(2, onTranslogSyncInvoked.get()); + assertEquals(2, onFailureInvoked.get()); + assertEquals(2, onTragicFailureInvoked.get()); + } + + public void testCompositeTranslogEventListenerOnExceptions() { + AtomicInteger onTranslogSyncInvoked = new AtomicInteger(); + AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger(); + AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger(); + AtomicInteger onFailureInvoked = new AtomicInteger(); + AtomicInteger onTragicFailureInvoked = new AtomicInteger(); + + TranslogEventListener listener = new TranslogEventListener() { + @Override + public void onAfterTranslogSync() { + onTranslogSyncInvoked.incrementAndGet(); + } + + @Override + public void onAfterTranslogRecovery() { + onTranslogRecoveryInvoked.incrementAndGet(); + } + + @Override + public void onBeginTranslogRecovery() { + onBeginTranslogRecoveryInvoked.incrementAndGet(); + } + + @Override + public void onFailure(String reason, Exception ex) { + onFailureInvoked.incrementAndGet(); + } + + @Override + public void onTragicFailure(AlreadyClosedException ex) { + onTragicFailureInvoked.incrementAndGet(); + } + }; + + TranslogEventListener throwingListener = (TranslogEventListener) Proxy.newProxyInstance( + TranslogEventListener.class.getClassLoader(), + new Class[] { TranslogEventListener.class }, + (a, b, c) -> { throw new RuntimeException(); } + ); + + final List translogEventListeners = new LinkedList<>(Arrays.asList(listener, throwingListener, listener)); + Collections.shuffle(translogEventListeners, random()); + TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners); + expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogRecovery()); + expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogSync()); + expectThrows(RuntimeException.class, () -> compositeListener.onBeginTranslogRecovery()); + expectThrows(RuntimeException.class, () -> compositeListener.onFailure("reason", new RuntimeException("reason"))); + expectThrows(RuntimeException.class, () -> compositeListener.onTragicFailure(new AlreadyClosedException("reason"))); + + assertEquals(2, onBeginTranslogRecoveryInvoked.get()); + assertEquals(2, onTranslogRecoveryInvoked.get()); + assertEquals(2, onTranslogSyncInvoked.get()); + assertEquals(2, onFailureInvoked.get()); + assertEquals(2, onTragicFailureInvoked.get()); + + } +}