diff --git a/docs/changelog/88479.yaml b/docs/changelog/88479.yaml new file mode 100644 index 0000000000000..5febaf0ab1232 --- /dev/null +++ b/docs/changelog/88479.yaml @@ -0,0 +1,5 @@ +pr: 88479 +summary: Deduplicate mappings in persisted cluster state +area: Cluster Coordination +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 4f73fbe1ed64b..ab0f3d91e1e33 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -72,6 +72,7 @@ import java.util.function.Function; import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM; +import static org.elasticsearch.cluster.metadata.Metadata.DEDUPLICATED_MAPPINGS_PARAM; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR; import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.validateIpValue; @@ -480,6 +481,7 @@ public Iterator> settings() { static final String KEY_SETTINGS = "settings"; static final String KEY_STATE = "state"; static final String KEY_MAPPINGS = "mappings"; + static final String KEY_MAPPINGS_HASH = "mappings_hash"; static final String KEY_ALIASES = "aliases"; static final String KEY_ROLLOVER_INFOS = "rollover_info"; static final String KEY_SYSTEM = "system"; @@ -1301,6 +1303,10 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti return Builder.fromXContent(parser); } + public static IndexMetadata fromXContent(XContentParser parser, Map mappingsByHash) throws IOException { + return Builder.fromXContent(parser, mappingsByHash); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { Builder.toXContent(this, builder, params); @@ -2028,7 +2034,12 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build } builder.endObject(); - if (context != Metadata.XContentContext.API) { + if (context == Metadata.XContentContext.GATEWAY && params.paramAsBoolean(DEDUPLICATED_MAPPINGS_PARAM, false)) { + MappingMetadata mmd = indexMetadata.mapping(); + if (mmd != null) { + builder.field(KEY_MAPPINGS_HASH, mmd.source().getSha256()); + } + } else if (context != Metadata.XContentContext.API) { builder.startArray(KEY_MAPPINGS); MappingMetadata mmd = indexMetadata.mapping(); if (mmd != null) { @@ -2109,6 +2120,10 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build } public static IndexMetadata fromXContent(XContentParser parser) throws IOException { + return fromXContent(parser, null); + } + + public static IndexMetadata fromXContent(XContentParser parser, Map mappingsByHash) throws IOException { if (parser.currentToken() == null) { // fresh parser? move to the first token parser.nextToken(); } @@ -2224,6 +2239,13 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti } case KEY_ROUTING_NUM_SHARDS -> builder.setRoutingNumShards(parser.intValue()); case KEY_SYSTEM -> builder.system(parser.booleanValue()); + case KEY_MAPPINGS_HASH -> { + assert mappingsByHash != null : "no deduplicated mappings given"; + if (mappingsByHash.containsKey(parser.text()) == false) { + throw new IllegalArgumentException("mapping with hash [" + parser.text() + "] not found"); + } + builder.putMapping(mappingsByHash.get(parser.text())); + } default -> throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } } else { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 685a7a8d1cf4f..506581c7ad5cf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -192,6 +192,7 @@ default boolean isRestorable() { public static final String CONTEXT_MODE_API = XContentContext.API.toString(); + public static final String DEDUPLICATED_MAPPINGS_PARAM = "deduplicated_mappings"; public static final String GLOBAL_STATE_FILE_PREFIX = "global-"; private static final NamedDiffableValueSerializer CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class); diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index e743de41950f5..52d0215556ba4 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -43,12 +43,14 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.logging.Loggers; @@ -69,6 +71,7 @@ import org.elasticsearch.env.NodeMetadata; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; @@ -92,6 +95,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.function.IntPredicate; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -103,12 +107,13 @@ * to record the last-accepted cluster state during publication. The metadata is written incrementally where possible, leaving alone any * documents that have not changed. The index has the following fields: * - * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ - * | "type" (string field) | "index_uuid" (string field) | "data" (stored binary field in SMILE format) | "page" | "last_page" | - * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ - * | GLOBAL_TYPE_NAME == "global" | (omitted) | Global metadata | large docs are | - * | INDEX_TYPE_NAME == "index" | Index UUID | Index metadata | split into pages | - * +------------------------------+-----------------------------+----------------------------------------------+--------+-------------+ + * +--------------------------------+-------------------+----------------------------------------------+--------+-------------+ + * | "type" (string field) | ID (string) field | "data" (stored binary field in SMILE format) | "page" | "last_page" | + * +--------------------------------+-------------------+----------------------------------------------+--------+-------------+ + * | GLOBAL_TYPE_NAME == "global" | (none) | Global metadata | large docs are | + * | INDEX_TYPE_NAME == "index" | "index_uuid" | Index metadata | split into pages | + * | MAPPING_TYPE_NAME == "mapping" | "mapping_hash" | Mapping metadata | | + * +--------------------------------+-------------------+----------------------------------------------+--------+-------------+ * * Additionally each commit has the following user data: * @@ -133,8 +138,10 @@ public class PersistedClusterStateService { public static final String TYPE_FIELD_NAME = "type"; public static final String GLOBAL_TYPE_NAME = "global"; public static final String INDEX_TYPE_NAME = "index"; + public static final String MAPPING_TYPE_NAME = "mapping"; private static final String DATA_FIELD_NAME = "data"; private static final String INDEX_UUID_FIELD_NAME = "index_uuid"; + private static final String MAPPING_HASH_FIELD_NAME = "mapping_hash"; public static final String PAGE_FIELD_NAME = "page"; public static final String LAST_PAGE_FIELD_NAME = "last_page"; public static final int IS_LAST_PAGE = 1; @@ -531,7 +538,7 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw searcher.setQueryCache(null); final SetOnce builderReference = new SetOnce<>(); - consumeFromType(searcher, GLOBAL_TYPE_NAME, bytes -> { + consumeFromType(searcher, GLOBAL_TYPE_NAME, ignored -> GLOBAL_TYPE_NAME, bytes -> { final Metadata metadata = readXContent(bytes, Metadata.Builder::fromXContent); logger.trace("found global metadata with last-accepted term [{}]", metadata.coordinationMetadata().term()); if (builderReference.get() != null) { @@ -545,11 +552,50 @@ private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throw throw new CorruptStateException("no global metadata found in [" + dataPath + "]"); } - logger.trace("got global metadata, now reading index metadata"); + logger.trace("got global metadata, now reading mapping metadata"); + + final Map mappingsByHash = new HashMap<>(); + consumeFromType(searcher, MAPPING_TYPE_NAME, document -> document.getField(MAPPING_HASH_FIELD_NAME).stringValue(), bytes -> { + final var mappingMetadata = readXContent(bytes, parser -> { + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new CorruptStateException( + "invalid mapping metadata: expected START_OBJECT but got [" + parser.currentToken() + "]" + ); + } + if (parser.nextToken() != XContentParser.Token.FIELD_NAME) { + throw new CorruptStateException( + "invalid mapping metadata: expected FIELD_NAME but got [" + parser.currentToken() + "]" + ); + } + final var fieldName = parser.currentName(); + if ("content".equals(fieldName) == false) { + throw new CorruptStateException("invalid mapping metadata: unknown field [" + fieldName + "]"); + } + if (parser.nextToken() != XContentParser.Token.VALUE_EMBEDDED_OBJECT) { + throw new CorruptStateException( + "invalid mapping metadata: expected VALUE_EMBEDDED_OBJECT but got [" + parser.currentToken() + "]" + ); + } + return new MappingMetadata(new CompressedXContent(parser.binaryValue())); + }); + final var hash = mappingMetadata.source().getSha256(); + logger.trace("found mapping metadata with hash {}", hash); + if (mappingsByHash.put(hash, mappingMetadata) != null) { + throw new CorruptStateException("duplicate metadata found for mapping hash [" + hash + "]"); + } + }); + + logger.trace("got metadata for [{}] mappings, now reading index metadata", mappingsByHash.size()); final Set indexUUIDs = new HashSet<>(); - consumeFromType(searcher, INDEX_TYPE_NAME, bytes -> { - final IndexMetadata indexMetadata = readXContent(bytes, IndexMetadata::fromXContent); + consumeFromType(searcher, INDEX_TYPE_NAME, document -> document.getField(INDEX_UUID_FIELD_NAME).stringValue(), bytes -> { + final IndexMetadata indexMetadata = readXContent(bytes, parser -> { + try { + return IndexMetadata.fromXContent(parser, mappingsByHash); + } catch (Exception e) { + throw new CorruptStateException(e); + } + }); logger.trace("found index metadata for {}", indexMetadata.getIndex()); if (indexUUIDs.add(indexMetadata.getIndexUUID()) == false) { throw new CorruptStateException("duplicate metadata found for " + indexMetadata.getIndex() + " in [" + dataPath + "]"); @@ -585,6 +631,7 @@ private T readXContent(BytesReference bytes, CheckedFunction keyFunction, CheckedConsumer bytesReferenceConsumer ) throws IOException { @@ -630,13 +677,7 @@ private static void consumeFromType( // startup, on the main thread and before most other services have started, and we will need space to serialize the // whole cluster state in memory later on. - final String key; - if (type.equals(GLOBAL_TYPE_NAME)) { - key = GLOBAL_TYPE_NAME; - } else { - key = document.getField(INDEX_UUID_FIELD_NAME).stringValue(); - } - + final var key = keyFunction.apply(document); final PaginatedDocumentReader reader = documentReaders.computeIfAbsent(key, k -> new PaginatedDocumentReader()); final BytesReference bytesReference = reader.addPage(key, documentData, pageIndex, isLastPage); if (bytesReference != null) { @@ -670,6 +711,7 @@ private static BytesReference uncompress(BytesReference bytesReference) throws I Map params = Maps.newMapWithExpectedSize(2); params.put("binary", "true"); params.put(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY); + params.put(Metadata.DEDUPLICATED_MAPPINGS_PARAM, Boolean.TRUE.toString()); FORMAT_PARAMS = new ToXContent.MapParams(params); } @@ -728,6 +770,11 @@ void deleteIndexMetadata(String indexUUID) throws IOException { indexWriter.deleteDocuments(new Term(INDEX_UUID_FIELD_NAME, indexUUID)); } + public void deleteMappingMetadata(String mappingHash) throws IOException { + this.logger.trace("removing mapping metadata for [{}]", mappingHash); + indexWriter.deleteDocuments(new Term(MAPPING_HASH_FIELD_NAME, mappingHash)); + } + void flush() throws IOException { this.logger.trace("flushing"); this.indexWriter.flush(); @@ -906,6 +953,27 @@ private WriterStats updateMetadata(Metadata previouslyWrittenMetadata, Metadata addGlobalMetadataDocuments(metadata); } + int numMappingsAdded = 0; + int numMappingsRemoved = 0; + int numMappingsUnchanged = 0; + final var previousMappingHashes = new HashSet<>(previouslyWrittenMetadata.getMappingsByHash().keySet()); + for (final var entry : metadata.getMappingsByHash().entrySet()) { + if (previousMappingHashes.remove(entry.getKey()) == false) { + addMappingDocuments(entry.getKey(), entry.getValue()); + numMappingsAdded++; + } else { + logger.trace("no action required for mapping [{}]", entry.getKey()); + numMappingsUnchanged++; + } + } + + for (final var unusedMappingHash : previousMappingHashes) { + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.deleteMappingMetadata(unusedMappingHash); + numMappingsRemoved++; + } + } + final Map indexMetadataVersionByUUID = Maps.newMapWithExpectedSize(previouslyWrittenMetadata.indices().size()); previouslyWrittenMetadata.indices().forEach((name, indexMetadata) -> { final Long previousValue = indexMetadataVersionByUUID.putIfAbsent(indexMetadata.getIndexUUID(), indexMetadata.getVersion()); @@ -938,7 +1006,7 @@ private WriterStats updateMetadata(Metadata previouslyWrittenMetadata, Metadata addIndexMetadataDocuments(indexMetadata); } else { numIndicesUnchanged++; - logger.trace("no action required for [{}]", indexMetadata.getIndex()); + logger.trace("no action required for index [{}]", indexMetadata.getIndex()); } indexMetadataVersionByUUID.remove(indexMetadata.getIndexUUID()); } @@ -956,13 +1024,41 @@ private WriterStats updateMetadata(Metadata previouslyWrittenMetadata, Metadata metadataIndexWriter.flush(); } - return new WriterStats(false, updateGlobalMeta, numIndicesUnchanged, numIndicesAdded, numIndicesUpdated, numIndicesRemoved); + return new WriterStats( + false, + updateGlobalMeta, + numMappingsUnchanged, + numMappingsAdded, + numMappingsRemoved, + numIndicesUnchanged, + numIndicesAdded, + numIndicesUpdated, + numIndicesRemoved + ); } private static int lastPageValue(boolean isLastPage) { return isLastPage ? IS_LAST_PAGE : IS_NOT_LAST_PAGE; } + private void addMappingDocuments(String key, MappingMetadata mappingMetadata) throws IOException { + logger.trace("writing mapping metadata with hash [{}]", key); + writePages( + (builder, params) -> builder.field("content", mappingMetadata.source().compressed()), + (((bytesRef, pageIndex, isLastPage) -> { + final Document document = new Document(); + document.add(new StringField(TYPE_FIELD_NAME, MAPPING_TYPE_NAME, Field.Store.NO)); + document.add(new StringField(MAPPING_HASH_FIELD_NAME, key, Field.Store.YES)); + document.add(new StoredField(PAGE_FIELD_NAME, pageIndex)); + document.add(new StoredField(LAST_PAGE_FIELD_NAME, lastPageValue(isLastPage))); + document.add(new StoredField(DATA_FIELD_NAME, bytesRef)); + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.indexWriter.addDocument(document); + } + })) + ); + } + private void addIndexMetadataDocuments(IndexMetadata indexMetadata) throws IOException { final String indexUUID = indexMetadata.getIndexUUID(); assert indexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE) == false; @@ -994,7 +1090,7 @@ private void addGlobalMetadataDocuments(Metadata metadata) throws IOException { }); } - private void writePages(ToXContent metadata, PageWriter pageWriter) throws IOException { + private void writePages(ToXContentFragment metadata, PageWriter pageWriter) throws IOException { try ( PageWriterOutputStream paginatedStream = new PageWriterOutputStream(documentBuffer, pageWriter); OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(paginatedStream); @@ -1022,6 +1118,10 @@ private WriterStats overwriteMetadata(Metadata metadata) throws IOException { private WriterStats addMetadata(Metadata metadata) throws IOException { addGlobalMetadataDocuments(metadata); + for (final var entry : metadata.getMappingsByHash().entrySet()) { + addMappingDocuments(entry.getKey(), entry.getValue()); + } + for (IndexMetadata indexMetadata : metadata.indices().values()) { addIndexMetadataDocuments(indexMetadata); } @@ -1032,7 +1132,7 @@ private WriterStats addMetadata(Metadata metadata) throws IOException { metadataIndexWriter.flush(); } - return new WriterStats(true, true, 0, 0, metadata.indices().size(), 0); + return new WriterStats(true, true, 0, metadata.getMappingsByHash().size(), 0, 0, metadata.indices().size(), 0, 0); } public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion, Version oldestIndexVersion) @@ -1130,6 +1230,9 @@ public void close() throws IOException { private record WriterStats( boolean isFullWrite, boolean globalMetaUpdated, + int numMappingsUnchanged, + int numMappingsAdded, + int numMappingsRemoved, int numIndicesUnchanged, int numIndicesAdded, int numIndicesUpdated, @@ -1138,14 +1241,24 @@ private record WriterStats( @Override public String toString() { if (isFullWrite) { - return String.format(Locale.ROOT, "wrote global metadata and metadata for [%d] indices", numIndicesUpdated); + return String.format( + Locale.ROOT, + "wrote global metadata, [%d] mappings, and metadata for [%d] indices", + numMappingsAdded, + numIndicesAdded + ); } else { return String.format( Locale.ROOT, """ - [%s] global metadata, wrote metadata for [%d] new indices and [%d] existing indices, \ + [%s] global metadata, \ + wrote [%d] new mappings, removed [%d] mappings and skipped [%d] unchanged mappings, \ + wrote metadata for [%d] new indices and [%d] existing indices, \ removed metadata for [%d] indices and skipped [%d] unchanged indices""", globalMetaUpdated ? "wrote" : "skipped writing", + numMappingsAdded, + numMappingsRemoved, + numMappingsUnchanged, numIndicesAdded, numIndicesUpdated, numIndicesRemoved, diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 21ccaff0f646f..f09257922aee1 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -19,6 +19,7 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.Term; import org.apache.lucene.search.DocIdSetIterator; @@ -41,6 +42,7 @@ import org.elasticsearch.cluster.coordination.CoordinationMetadata; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -57,6 +59,7 @@ import org.elasticsearch.env.NodeMetadata; import org.elasticsearch.gateway.PersistedClusterStateService.Writer; import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; @@ -74,8 +77,10 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.IntPredicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -87,6 +92,7 @@ import static org.elasticsearch.gateway.PersistedClusterStateService.IS_LAST_PAGE; import static org.elasticsearch.gateway.PersistedClusterStateService.IS_NOT_LAST_PAGE; import static org.elasticsearch.gateway.PersistedClusterStateService.LAST_PAGE_FIELD_NAME; +import static org.elasticsearch.gateway.PersistedClusterStateService.MAPPING_TYPE_NAME; import static org.elasticsearch.gateway.PersistedClusterStateService.METADATA_DIRECTORY_NAME; import static org.elasticsearch.gateway.PersistedClusterStateService.PAGE_FIELD_NAME; import static org.elasticsearch.gateway.PersistedClusterStateService.TYPE_FIELD_NAME; @@ -704,6 +710,7 @@ public void testFailsIfIndexMetadataIsDuplicated() throws IOException { .put( IndexMetadata.builder(indexName) .version(1L) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -726,7 +733,8 @@ public void testFailsIfIndexMetadataIsDuplicated() throws IOException { Directory dupDirectory = newFSDirectory(dupPath.resolve(METADATA_DIRECTORY_NAME)) ) { try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { - indexWriter.deleteDocuments(new Term("type", "global")); // do not duplicate global metadata + indexWriter.deleteDocuments(new Term(TYPE_FIELD_NAME, GLOBAL_TYPE_NAME)); // do not duplicate global metadata + indexWriter.deleteDocuments(new Term(TYPE_FIELD_NAME, MAPPING_TYPE_NAME)); // do not duplicate mappings indexWriter.addIndexes(dupDirectory); indexWriter.commit(); } @@ -773,6 +781,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws .put( IndexMetadata.builder("test") .version(indexMetadataVersion - 1) // -1 because it's incremented in .put() + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -801,6 +810,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws Metadata.builder(clusterState.metadata()) .put( IndexMetadata.builder(indexMetadata) + .putMapping(indexMetadata.mapping()) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -828,6 +838,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws Metadata.builder(clusterState.metadata()) .put( IndexMetadata.builder(indexMetadata) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -858,6 +869,7 @@ public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws ) .put( IndexMetadata.builder(indexMetadata) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(indexMetadata.getSettings()) @@ -902,6 +914,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .coordinationMetadata(CoordinationMetadata.builder(clusterState.coordinationMetadata()).term(term).build()) .put( IndexMetadata.builder("updated") + .putMapping(randomMappingMetadataOrNull()) .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() .settings( Settings.builder() @@ -913,6 +926,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc ) .put( IndexMetadata.builder("deleted") + .putMapping(randomMappingMetadataOrNull()) .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() .settings( Settings.builder() @@ -950,6 +964,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .remove("deleted") .put( IndexMetadata.builder("updated") + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(clusterState.metadata().index("updated").getSettings()) @@ -959,6 +974,7 @@ public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOExc .put( IndexMetadata.builder("added") .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) @@ -1008,6 +1024,7 @@ public void testReloadsMetadataAcrossMultipleSegments() throws IOException { .version(i + 2) .put( IndexMetadata.builder(index.getName()) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1041,6 +1058,7 @@ public void testHandlesShuffledDocuments() throws IOException { for (int i = between(5, 20); i >= 0; i--) { metadata.put( IndexMetadata.builder("test-" + i) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1071,40 +1089,12 @@ public void testHandlesShuffledDocuments() throws IOException { DirectoryReader reader = DirectoryReader.open(directory) ) { commitUserData = reader.getIndexCommit().getUserData(); - final IndexSearcher indexSearcher = new IndexSearcher(reader); - indexSearcher.setQueryCache(null); - for (String typeName : new String[] { GLOBAL_TYPE_NAME, INDEX_TYPE_NAME }) { - final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, typeName)); - final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); - for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { - final Scorer scorer = weight.scorer(leafReaderContext); - final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); - final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - if (isLiveDoc.test(docIdSetIterator.docID())) { - final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); - document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); - documents.add(document); - } - } - } - } + forEachDocument(reader, Set.of(GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME), documents::add); } Randomness.shuffle(documents); - try (Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME))) { - final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); - indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); - try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { - for (Document document : documents) { - indexWriter.addDocument(document); - } - indexWriter.setLiveCommitData(commitUserData.entrySet()); - indexWriter.commit(); - } - } + writeDocumentsAndCommit(dataPath.resolve(METADATA_DIRECTORY_NAME), commitUserData, documents); final ClusterState loadedState = loadPersistedClusterState(persistedClusterStateService); assertEquals(clusterState.metadata().indices(), loadedState.metadata().indices()); @@ -1119,7 +1109,11 @@ public void testHandlesShuffledDocuments() throws IOException { final boolean isOnlyPageForIndex = corruptDocument.getField(TYPE_FIELD_NAME).stringValue().equals(INDEX_TYPE_NAME) && corruptDocPage == 0 && corruptDocIsLastPage; + final boolean isOnlyPageForMapping = corruptDocument.getField(TYPE_FIELD_NAME).stringValue().equals(MAPPING_TYPE_NAME) + && corruptDocPage == 0 + && corruptDocIsLastPage; if (isOnlyPageForIndex == false // don't remove the only doc for an index, this just loses the index and doesn't corrupt + && isOnlyPageForMapping == false // similarly, don't remove the only doc for a mapping, this causes an AssertionError && rarely()) { documents.remove(corruptIndex); } else { @@ -1134,17 +1128,7 @@ && rarely()) { } } - try (Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME))) { - final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); - indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); - try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { - for (Document document : documents) { - indexWriter.addDocument(document); - } - indexWriter.setLiveCommitData(commitUserData.entrySet()); - indexWriter.commit(); - } - } + writeDocumentsAndCommit(dataPath.resolve(METADATA_DIRECTORY_NAME), commitUserData, documents); expectThrows(CorruptStateException.class, () -> loadPersistedClusterState(persistedClusterStateService)); } @@ -1194,7 +1178,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing full cluster state took [*] which is above the warn threshold of [*]; \ - wrote global metadata and metadata for [0] indices""" + wrote global metadata, [0] mappings, and metadata for [0] indices""" ) ); @@ -1210,7 +1194,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing full cluster state took [*] which is above the warn threshold of [*]; \ - wrote global metadata and metadata for [0] indices""" + wrote global metadata, [0] mappings, and metadata for [0] indices""" ) ); @@ -1244,7 +1228,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing full cluster state took [*] which is above the warn threshold of [*]; \ - wrote global metadata and metadata for [0] indices""" + wrote global metadata, [0] mappings, and metadata for [0] indices""" ) ); @@ -1254,6 +1238,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { .version(clusterState.version()) .put( IndexMetadata.builder("test") + .putMapping(randomMappingMetadata()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1277,12 +1262,27 @@ public void testSlowLogging() throws IOException, IllegalAccessException { Level.WARN, """ writing cluster state took [*] which is above the warn threshold of [*]; [skipped writing] global metadata, \ + wrote [1] new mappings, removed [0] mappings and skipped [0] unchanged mappings, \ wrote metadata for [1] new indices and [0] existing indices, removed metadata for [0] indices and \ skipped [0] unchanged indices""" ) ); + // force a full write, so that the next write is an actual incremental write from clusterState->newClusterState writeDurationMillis.set(randomLongBetween(0, writeDurationMillis.get() - 1)); + assertExpectedLogs( + 1L, + null, + clusterState, + writer, + new MockLogAppender.UnseenEventExpectation( + "should not see warning below threshold", + PersistedClusterStateService.class.getCanonicalName(), + Level.WARN, + "*" + ) + ); + assertExpectedLogs( 1L, clusterState, @@ -1296,7 +1296,7 @@ public void testSlowLogging() throws IOException, IllegalAccessException { ) ); - assertThat(currentTime.get(), lessThan(startTimeMillis + 14 * slowWriteLoggingThresholdMillis)); // ensure no overflow + assertThat(currentTime.get(), lessThan(startTimeMillis + 16 * slowWriteLoggingThresholdMillis)); // ensure no overflow } } } @@ -1353,6 +1353,7 @@ public void testLimitsFileCount() throws IOException { .version(i + 2) .put( IndexMetadata.builder("index-" + i) + .putMapping(randomMappingMetadataOrNull()) .settings( Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) @@ -1484,6 +1485,7 @@ public void testOldestIndexVersionIsCorrectlySerialized() throws IOException { for (Version indexVersion : indexVersions) { String indexUUID = UUIDs.randomBase64UUID(random()); IndexMetadata im = IndexMetadata.builder(DataStream.getDefaultBackingIndexName("index", lastIndexNum)) + .putMapping(randomMappingMetadataOrNull()) .settings(settings(indexVersion).put(IndexMetadata.SETTING_INDEX_UUID, indexUUID)) .numberOfShards(1) .numberOfReplicas(1) @@ -1569,6 +1571,269 @@ public void testDebugLogging() throws IOException, IllegalAccessException { } } + public void testFailsIfMappingIsDuplicated() throws IOException { + final Path dataPath = createTempDir(); + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[] { dataPath })) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put( + IndexMetadata.builder("test-1") + .putMapping(randomMappingMetadata()) + .settings( + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ) + ) + ) + .build(); + + String hash = clusterState.metadata().getMappingsByHash().keySet().iterator().next(); + + try (Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(0L, clusterState); + } + + final List documents = new ArrayList<>(); + final Map commitUserData; + + try ( + Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)); + DirectoryReader reader = DirectoryReader.open(directory) + ) { + commitUserData = reader.getIndexCommit().getUserData(); + forEachDocument(reader, Set.of(GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME), documents::add); + } + + // duplicate all documents associated with the mapping in question + for (Document document : new ArrayList<>(documents)) { // iterating a copy + IndexableField mappingHash = document.getField("mapping_hash"); + if (mappingHash != null && mappingHash.stringValue().equals(hash)) { + documents.add(document); + } + } + + writeDocumentsAndCommit(dataPath.resolve(METADATA_DIRECTORY_NAME), commitUserData, documents); + + final String message = expectThrows(CorruptStateException.class, () -> persistedClusterStateService.loadBestOnDiskState()) + .getMessage(); + assertEquals("duplicate metadata found for mapping hash [" + hash + "]", message); + } + } + + public void testFailsIfMappingIsMissing() throws IOException { + final Path dataPath = createTempDir(); + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[] { dataPath })) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put( + IndexMetadata.builder("test-1") + .putMapping(randomMappingMetadata()) + .settings( + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ) + ) + ) + .build(); + + String hash = clusterState.metadata().getMappingsByHash().keySet().iterator().next(); + + try (Writer writer = persistedClusterStateService.createWriter()) { + writer.writeFullStateAndCommit(0L, clusterState); + } + + final List documents = new ArrayList<>(); + final Map commitUserData; + + try ( + Directory directory = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)); + DirectoryReader reader = DirectoryReader.open(directory) + ) { + commitUserData = reader.getIndexCommit().getUserData(); + forEachDocument(reader, Set.of(GLOBAL_TYPE_NAME, MAPPING_TYPE_NAME, INDEX_TYPE_NAME), documents::add); + } + + // remove all documents associated with the mapping in question + for (Document document : new ArrayList<>(documents)) { // iterating a copy + IndexableField mappingHash = document.getField("mapping_hash"); + if (mappingHash != null && mappingHash.stringValue().equals(hash)) { + documents.remove(document); + } + } + + writeDocumentsAndCommit(dataPath.resolve(METADATA_DIRECTORY_NAME), commitUserData, documents); + + final String message = expectThrows(CorruptStateException.class, () -> persistedClusterStateService.loadBestOnDiskState()) + .getCause() + .getMessage(); + assertEquals("java.lang.IllegalArgumentException: mapping with hash [" + hash + "] not found", message); + } + } + + public void testDeduplicatedMappings() throws IOException { + final Path dataPath = createTempDir(); + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[] { dataPath })) { + final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment); + try (Writer writer = persistedClusterStateService.createWriter()) { + + Set hashes; + Metadata.Builder metadata; + ClusterState clusterState; + ClusterState previousState; + + // generate two mappings + MappingMetadata mapping1 = randomMappingMetadata(); + MappingMetadata mapping2 = randomValueOtherThan(mapping1, () -> randomMappingMetadata()); + + // build and write a cluster state with metadata that has all indices using a single mapping + metadata = Metadata.builder(); + for (int i = between(5, 20); i >= 0; i--) { + metadata.put( + IndexMetadata.builder("test-" + i) + .putMapping(mapping1) + .settings( + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ) + ); + } + clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build(); + assertThat(clusterState.metadata().getMappingsByHash().size(), equalTo(1)); + writer.writeFullStateAndCommit(0L, clusterState); + + // verify that the on-disk state reflects 1 mapping + hashes = loadPersistedMappingHashes(dataPath.resolve(METADATA_DIRECTORY_NAME)); + assertThat(hashes.size(), equalTo(1)); + assertThat(clusterState.metadata().getMappingsByHash().keySet(), equalTo(hashes)); + + previousState = clusterState; + metadata = Metadata.builder(previousState.metadata()); + + // add a second mapping -- either by adding a new index or changing an existing one + if (randomBoolean()) { + // add another index with a different mapping + metadata.put( + IndexMetadata.builder("test-" + 99) + .putMapping(mapping2) + .settings( + Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ) + ); + } else { + // change an existing index to a different mapping + String index = randomFrom(previousState.metadata().getIndices().keySet()); + metadata.put(IndexMetadata.builder(metadata.get(index)).putMapping(mapping2)); + } + clusterState = ClusterState.builder(previousState).metadata(metadata).build(); + assertThat(clusterState.metadata().getMappingsByHash().size(), equalTo(2)); + writer.writeIncrementalStateAndCommit(0L, previousState, clusterState); + + // verify that the on-disk state reflects 2 mappings + hashes = loadPersistedMappingHashes(dataPath.resolve(METADATA_DIRECTORY_NAME)); + assertThat(hashes.size(), equalTo(2)); + assertThat(clusterState.metadata().getMappingsByHash().keySet(), equalTo(hashes)); + + previousState = clusterState; + metadata = Metadata.builder(previousState.metadata()); + + // update all indices to use the second mapping + for (String index : previousState.metadata().getIndices().keySet()) { + metadata.put(IndexMetadata.builder(metadata.get(index)).putMapping(mapping2)); + } + clusterState = ClusterState.builder(previousState).metadata(metadata).build(); + assertThat(clusterState.metadata().getMappingsByHash().size(), equalTo(1)); + writer.writeIncrementalStateAndCommit(0L, previousState, clusterState); + + // verify that the on-disk reflects 1 mapping + hashes = loadPersistedMappingHashes(dataPath.resolve(METADATA_DIRECTORY_NAME)); + assertThat(hashes.size(), equalTo(1)); + assertThat(clusterState.metadata().getMappingsByHash().keySet(), equalTo(hashes)); + } + } + } + + /** + * Utility method for applying a consumer to each document (of the given types) associated with a DirectoryReader. + */ + private static void forEachDocument(DirectoryReader reader, Set types, Consumer consumer) throws IOException { + final IndexSearcher indexSearcher = new IndexSearcher(reader); + indexSearcher.setQueryCache(null); + for (String typeName : types) { + final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, typeName)); + final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); + for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { + final Scorer scorer = weight.scorer(leafReaderContext); + if (scorer != null) { + final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); + final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (isLiveDoc.test(docIdSetIterator.docID())) { + final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); + document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); + consumer.accept(document); + } + } + } + } + } + } + + /** + * Utility method writing documents back to a directory. + */ + private static void writeDocumentsAndCommit(Path metadataDirectory, Map commitUserData, List documents) + throws IOException { + try (Directory directory = new NIOFSDirectory(metadataDirectory)) { + final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); + indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); + try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { + for (Document document : documents) { + indexWriter.addDocument(document); + } + indexWriter.setLiveCommitData(commitUserData.entrySet()); + indexWriter.commit(); + } + } + } + + /** + * Search the underlying persisted state indices for non-deleted mapping_hash documents that represent the + * first page of data, collecting and returning the distinct mapping_hashes themselves. + */ + private static Set loadPersistedMappingHashes(Path metadataDirectory) throws IOException { + Set hashes = new HashSet<>(); + try (Directory directory = new NIOFSDirectory(metadataDirectory); DirectoryReader reader = DirectoryReader.open(directory)) { + forEachDocument(reader, Set.of(MAPPING_TYPE_NAME), document -> { + int page = document.getField("page").numericValue().intValue(); + if (page == 0) { + String hash = document.getField("mapping_hash").stringValue(); + assertTrue(hashes.add(hash)); + } + }); + } + return hashes; + } + private boolean findSegmentInDirectory(Path dataPath) throws IOException { Directory d = new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)); @@ -1629,6 +1894,20 @@ private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException ); } + private static MappingMetadata randomMappingMetadata() { + int i = randomIntBetween(1, 4); + return new MappingMetadata(MapperService.SINGLE_MAPPING_NAME, Map.of("_doc", Map.of("properties", Map.of("field" + i, "text")))); + } + + private static MappingMetadata randomMappingMetadataOrNull() { + int i = randomIntBetween(0, 4); + if (i == 0) { + return null; + } else { + return randomMappingMetadata(); + } + } + private static ClusterState loadPersistedClusterState(PersistedClusterStateService persistedClusterStateService) throws IOException { final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(false); return clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata);