diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 43bae9ed0adf1..e4f8ccfe3f1b1 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -21,8 +21,11 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SerialMergeScheduler; import org.apache.lucene.index.Term; +import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -53,20 +56,21 @@ import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xcontent.ToXContent; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; -import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeMetadata; import org.elasticsearch.index.Index; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; import java.io.Closeable; import java.io.IOError; @@ -124,6 +128,9 @@ public class PersistedClusterStateService { private static final String INDEX_UUID_FIELD_NAME = "index_uuid"; private static final int COMMIT_DATA_SIZE = 4; + private static final MergePolicy NO_MERGE_POLICY = noMergePolicy(); + private static final MergePolicy DEFAULT_MERGE_POLICY = defaultMergePolicy(); + public static final String METADATA_DIRECTORY_NAME = MetadataStateFormat.STATE_DIR_NAME; public static final Setting SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold", @@ -193,10 +200,13 @@ private static IndexWriter createIndexWriter(Directory directory, boolean openEx indexWriterConfig.setOpenMode(openExisting ? IndexWriterConfig.OpenMode.APPEND : IndexWriterConfig.OpenMode.CREATE); // only commit when specifically instructed, we must not write any intermediate states indexWriterConfig.setCommitOnClose(false); - // most of the data goes into stored fields which are not buffered, so we only really need a tiny buffer + // most of the data goes into stored fields which are not buffered, so each doc written accounts for ~500B of indexing buffer + // (see e.g. BufferedUpdates#BYTES_PER_DEL_TERM); a 1MB buffer therefore gets flushed every ~2000 docs. indexWriterConfig.setRAMBufferSizeMB(1.0); // merge on the write thread (e.g. while flushing) indexWriterConfig.setMergeScheduler(new SerialMergeScheduler()); + // apply the adjusted merge policy + indexWriterConfig.setMergePolicy(DEFAULT_MERGE_POLICY); return new IndexWriter(directory, indexWriterConfig); } @@ -481,6 +491,28 @@ private static void consumeFromType(IndexSearcher indexSearcher, String type, FORMAT_PARAMS = new ToXContent.MapParams(params); } + @SuppressForbidden(reason = "merges are only temporarily suppressed, the merge scheduler does not need changing") + private static MergePolicy noMergePolicy() { + return NoMergePolicy.INSTANCE; + } + + private static MergePolicy defaultMergePolicy() { + final TieredMergePolicy mergePolicy = new TieredMergePolicy(); + + // don't worry about cleaning up deletes too much, segments will often get completely deleted once they're old enough + mergePolicy.setDeletesPctAllowed(50.0); + // more/smaller segments means there's a better chance they just get deleted before needing a merge + mergePolicy.setSegmentsPerTier(100); + // ... but if we do end up merging them then do them all + mergePolicy.setMaxMergeAtOnce(100); + // always use compound segments to avoid fsync overhead + mergePolicy.setNoCFSRatio(1.0); + // segments are mostly tiny, so don't pretend they are bigger + mergePolicy.setFloorSegmentMB(0.001); + + return mergePolicy; + } + /** * Encapsulates a single {@link IndexWriter} with its {@link Directory} for ease of closing, and a {@link Logger}. There is one of these * for each data path. @@ -522,7 +554,15 @@ void flush() throws IOException { this.indexWriter.flush(); } + void startWrite() { + // Disable merges during indexing - many older segments will ultimately contain no live docs and simply get deleted. + indexWriter.getConfig().setMergePolicy(NO_MERGE_POLICY); + } + void prepareCommit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException { + indexWriter.getConfig().setMergePolicy(DEFAULT_MERGE_POLICY); + indexWriter.maybeMerge(); + final Map commitData = new HashMap<>(COMMIT_DATA_SIZE); commitData.put(CURRENT_TERM_KEY, Long.toString(currentTerm)); commitData.put(LAST_ACCEPTED_VERSION_KEY, Long.toString(lastAcceptedVersion)); @@ -594,6 +634,11 @@ public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) ensureOpen(); try { final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.startWrite(); + } + final WriterStats stats = overwriteMetadata(clusterState.metadata()); commit(currentTerm, clusterState.version()); fullStateWritten = true; @@ -623,6 +668,11 @@ void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClust try { final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.startWrite(); + } + final WriterStats stats = updateMetadata(previousClusterState.metadata(), clusterState.metadata()); commit(currentTerm, clusterState.version()); final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index eabf62a8e3cf8..8f39021c62cb0 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -53,6 +53,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -421,9 +422,8 @@ public void sync(Collection names) { assertFalse(writer.isOpen()); } - // check if we can open writer again + // noinspection EmptyTryBlock - we are just checking that opening the writer again doesn't throw any exceptions try (Writer ignored = persistedClusterStateService.createWriter()) { - } } } @@ -469,9 +469,8 @@ public void rename(String source, String dest) throws IOException { assertFalse(writer.isOpen()); } - // check if we can open writer again + // noinspection EmptyTryBlock - we are just checking that opening the writer again doesn't throw any exceptions try (Writer ignored = persistedClusterStateService.createWriter()) { - } } } @@ -881,6 +880,66 @@ public void testFailsIfCorrupt() throws IOException { } } + public void testLimitsFileCount() throws IOException { + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + + try (Writer writer = persistedClusterStateService.createWriter()) { + + ClusterState clusterState = ClusterState.EMPTY_STATE; + writer.writeFullStateAndCommit(1, ClusterState.EMPTY_STATE); + + final int indexCount = between(2, usually() ? 20 : 1000); + + final int maxSegmentCount = (indexCount / 100) + 100; // only expect to have two tiers, each with max 100 segments + final int filesPerSegment = 3; // .cfe, .cfs, .si + final int extraFiles = 2; // segments_*, write.lock + final int maxFileCount = (maxSegmentCount * filesPerSegment) + extraFiles; + + logger.info("--> adding [{}] indices one-by-one, verifying file count does not exceed [{}]", indexCount, maxFileCount); + for (int i = 0; i < indexCount; i++) { + final ClusterState previousClusterState = clusterState; + + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder(clusterState.metadata()) + .version(i + 2) + .put(IndexMetadata.builder("index-" + i) + .settings(Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))))) + .incrementVersion().build(); + + writer.writeIncrementalStateAndCommit(1, previousClusterState, clusterState); + + for (Path dataPath : nodeEnvironment.nodeDataPaths()) { + try (DirectoryStream files + = Files.newDirectoryStream(dataPath.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME))) { + + int fileCount = 0; + final List fileNames = new ArrayList<>(); + for (Path filePath : files) { + final String fileName = filePath.getFileName().toString(); + if (ExtrasFS.isExtra(fileName) == false) { + fileNames.add(fileName); + fileCount += 1; + } + } + + if (maxFileCount < fileCount) { + // don't bother preparing the description unless we are failing + fileNames.sort(Comparator.naturalOrder()); + fail("after " + indexCount + " indices have " + fileCount + " files vs max of " + maxFileCount + ": " + + fileNames); + } + } + } + } + } + } + } + private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState, PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation) throws IllegalAccessException, IOException {