From 683d072ec454bdc10eaf97e52c2e8328de20c975 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 26 Oct 2021 13:02:07 +0100 Subject: [PATCH] Reduce merging in PersistedClusterStateService (#79793) When writing the cluster state index we flush a segment every 2000 docs or so, which sometimes triggers merging in the middle of the write process. This merging is often unnecessary since many of the segments being merged would have ended up containing no live docs at the end of the process and hence could have just been deleted. With this commit we adjust the merge policy to be much more relaxed about merging, permitting up to 100 segments per tier, since we only read this index very rarely and not on any hot paths. We also disable merging completely during the write process, checking just before commit to see if any merging should be done. Relates #77466 --- .../gateway/PersistedClusterStateService.java | 62 +++++++++++++++-- .../PersistedClusterStateServiceTests.java | 67 +++++++++++++++++-- 2 files changed, 119 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 43bae9ed0adf1..e4f8ccfe3f1b1 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -21,8 +21,11 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SerialMergeScheduler; import org.apache.lucene.index.Term; +import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -53,20 +56,21 @@ import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xcontent.ToXContent; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; -import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeMetadata; import org.elasticsearch.index.Index; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; import java.io.Closeable; import java.io.IOError; @@ -124,6 +128,9 @@ public class PersistedClusterStateService { private static final String INDEX_UUID_FIELD_NAME = "index_uuid"; private static final int COMMIT_DATA_SIZE = 4; + private static final MergePolicy NO_MERGE_POLICY = noMergePolicy(); + private static final MergePolicy DEFAULT_MERGE_POLICY = defaultMergePolicy(); + public static final String METADATA_DIRECTORY_NAME = MetadataStateFormat.STATE_DIR_NAME; public static final Setting SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold", @@ -193,10 +200,13 @@ private static IndexWriter createIndexWriter(Directory directory, boolean openEx indexWriterConfig.setOpenMode(openExisting ? IndexWriterConfig.OpenMode.APPEND : IndexWriterConfig.OpenMode.CREATE); // only commit when specifically instructed, we must not write any intermediate states indexWriterConfig.setCommitOnClose(false); - // most of the data goes into stored fields which are not buffered, so we only really need a tiny buffer + // most of the data goes into stored fields which are not buffered, so each doc written accounts for ~500B of indexing buffer + // (see e.g. BufferedUpdates#BYTES_PER_DEL_TERM); a 1MB buffer therefore gets flushed every ~2000 docs. indexWriterConfig.setRAMBufferSizeMB(1.0); // merge on the write thread (e.g. while flushing) indexWriterConfig.setMergeScheduler(new SerialMergeScheduler()); + // apply the adjusted merge policy + indexWriterConfig.setMergePolicy(DEFAULT_MERGE_POLICY); return new IndexWriter(directory, indexWriterConfig); } @@ -481,6 +491,28 @@ private static void consumeFromType(IndexSearcher indexSearcher, String type, FORMAT_PARAMS = new ToXContent.MapParams(params); } + @SuppressForbidden(reason = "merges are only temporarily suppressed, the merge scheduler does not need changing") + private static MergePolicy noMergePolicy() { + return NoMergePolicy.INSTANCE; + } + + private static MergePolicy defaultMergePolicy() { + final TieredMergePolicy mergePolicy = new TieredMergePolicy(); + + // don't worry about cleaning up deletes too much, segments will often get completely deleted once they're old enough + mergePolicy.setDeletesPctAllowed(50.0); + // more/smaller segments means there's a better chance they just get deleted before needing a merge + mergePolicy.setSegmentsPerTier(100); + // ... but if we do end up merging them then do them all + mergePolicy.setMaxMergeAtOnce(100); + // always use compound segments to avoid fsync overhead + mergePolicy.setNoCFSRatio(1.0); + // segments are mostly tiny, so don't pretend they are bigger + mergePolicy.setFloorSegmentMB(0.001); + + return mergePolicy; + } + /** * Encapsulates a single {@link IndexWriter} with its {@link Directory} for ease of closing, and a {@link Logger}. There is one of these * for each data path. @@ -522,7 +554,15 @@ void flush() throws IOException { this.indexWriter.flush(); } + void startWrite() { + // Disable merges during indexing - many older segments will ultimately contain no live docs and simply get deleted. + indexWriter.getConfig().setMergePolicy(NO_MERGE_POLICY); + } + void prepareCommit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException { + indexWriter.getConfig().setMergePolicy(DEFAULT_MERGE_POLICY); + indexWriter.maybeMerge(); + final Map commitData = new HashMap<>(COMMIT_DATA_SIZE); commitData.put(CURRENT_TERM_KEY, Long.toString(currentTerm)); commitData.put(LAST_ACCEPTED_VERSION_KEY, Long.toString(lastAcceptedVersion)); @@ -594,6 +634,11 @@ public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) ensureOpen(); try { final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.startWrite(); + } + final WriterStats stats = overwriteMetadata(clusterState.metadata()); commit(currentTerm, clusterState.version()); fullStateWritten = true; @@ -623,6 +668,11 @@ void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClust try { final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.startWrite(); + } + final WriterStats stats = updateMetadata(previousClusterState.metadata(), clusterState.metadata()); commit(currentTerm, clusterState.version()); final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index eabf62a8e3cf8..8f39021c62cb0 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -53,6 +53,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -421,9 +422,8 @@ public void sync(Collection names) { assertFalse(writer.isOpen()); } - // check if we can open writer again + // noinspection EmptyTryBlock - we are just checking that opening the writer again doesn't throw any exceptions try (Writer ignored = persistedClusterStateService.createWriter()) { - } } } @@ -469,9 +469,8 @@ public void rename(String source, String dest) throws IOException { assertFalse(writer.isOpen()); } - // check if we can open writer again + // noinspection EmptyTryBlock - we are just checking that opening the writer again doesn't throw any exceptions try (Writer ignored = persistedClusterStateService.createWriter()) { - } } } @@ -881,6 +880,66 @@ public void testFailsIfCorrupt() throws IOException { } } + public void testLimitsFileCount() throws IOException { + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + + try (Writer writer = persistedClusterStateService.createWriter()) { + + ClusterState clusterState = ClusterState.EMPTY_STATE; + writer.writeFullStateAndCommit(1, ClusterState.EMPTY_STATE); + + final int indexCount = between(2, usually() ? 20 : 1000); + + final int maxSegmentCount = (indexCount / 100) + 100; // only expect to have two tiers, each with max 100 segments + final int filesPerSegment = 3; // .cfe, .cfs, .si + final int extraFiles = 2; // segments_*, write.lock + final int maxFileCount = (maxSegmentCount * filesPerSegment) + extraFiles; + + logger.info("--> adding [{}] indices one-by-one, verifying file count does not exceed [{}]", indexCount, maxFileCount); + for (int i = 0; i < indexCount; i++) { + final ClusterState previousClusterState = clusterState; + + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder(clusterState.metadata()) + .version(i + 2) + .put(IndexMetadata.builder("index-" + i) + .settings(Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))))) + .incrementVersion().build(); + + writer.writeIncrementalStateAndCommit(1, previousClusterState, clusterState); + + for (Path dataPath : nodeEnvironment.nodeDataPaths()) { + try (DirectoryStream files + = Files.newDirectoryStream(dataPath.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME))) { + + int fileCount = 0; + final List fileNames = new ArrayList<>(); + for (Path filePath : files) { + final String fileName = filePath.getFileName().toString(); + if (ExtrasFS.isExtra(fileName) == false) { + fileNames.add(fileName); + fileCount += 1; + } + } + + if (maxFileCount < fileCount) { + // don't bother preparing the description unless we are failing + fileNames.sort(Comparator.naturalOrder()); + fail("after " + indexCount + " indices have " + fileCount + " files vs max of " + maxFileCount + ": " + + fileNames); + } + } + } + } + } + } + } + private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState, PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation) throws IllegalAccessException, IOException {