From 0ae87c538ad078773fc66cc07abce9dae57f1656 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Mon, 30 Mar 2020 16:06:24 -0500 Subject: [PATCH 1/9] create first backing index --- .../test/indices.data_stream/10_basic.yml | 25 +++++++++- .../datastream/CreateDataStreamAction.java | 24 ++++++++-- .../cluster/metadata/IndexAbstraction.java | 48 +++++++++++++++++-- .../cluster/metadata/Metadata.java | 21 +++++++- .../CreateDataStreamRequestTests.java | 33 +++++++++++-- 5 files changed, 134 insertions(+), 17 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 44e8fdf72004a..2fb7fe2a9fa9b 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -22,10 +22,31 @@ indices.get_data_streams: {} - match: { 0.name: simple-data-stream1 } - match: { 0.timestamp_field: '@timestamp' } - - match: { 0.indices: [] } + - length: { 0.indices: 1 } - match: { 1.name: simple-data-stream2 } - match: { 1.timestamp_field: '@timestamp2' } - - match: { 1.indices: [] } + - length: { 1.indices: 1 } + + - do: + index: + index: simple-data-stream1 + body: { foo: bar } + + - do: + indices.refresh: {} + + - do: + search: + index: simple-data-stream1 + body: { query: { match_all: {} } } + - length: { hits.hits: 1 } + - match: { hits.hits.0._index: simple-data-stream1-000000 } + - match: { hits.hits.0._source.foo: 'bar' } + + - do: + indices.delete_data_stream: + name: simple-data-stream1 + - is_true: acknowledged - do: indices.delete_data_stream: diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java index 883560086e654..aca6494773bfd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; @@ -33,6 +34,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; @@ -48,7 +50,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; +import java.util.List; import java.util.Objects; public class CreateDataStreamAction extends ActionType { @@ -117,10 +119,14 @@ public int hashCode() { public static class TransportAction extends TransportMasterNodeAction { + private final MetadataCreateIndexService metadataCreateIndexService; + @Inject public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + MetadataCreateIndexService metaDataCreateIndexService) { super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + this.metadataCreateIndexService = metaDataCreateIndexService; } @Override @@ -151,7 +157,7 @@ public void onFailure(String source, Exception e) { @Override public ClusterState execute(ClusterState currentState) throws Exception { - return createDataStream(currentState, request); + return createDataStream(metadataCreateIndexService, currentState, request); } @Override @@ -161,7 +167,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - static ClusterState createDataStream(ClusterState currentState, Request request) { + static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService, + ClusterState currentState, + Request request) throws Exception { if (currentState.metadata().dataStreams().containsKey(request.name)) { throw new IllegalArgumentException("data_stream [" + request.name + "] already exists"); } @@ -169,8 +177,14 @@ static ClusterState createDataStream(ClusterState currentState, Request request) MetadataCreateIndexService.validateIndexOrAliasName(request.name, (s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2)); + String firstBackingIndexName = request.name + "-000000"; + CreateIndexClusterStateUpdateRequest createIndexRequest = + new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName); + currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false); + IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName); + Metadata.Builder builder = Metadata.builder(currentState.metadata()).put( - new DataStream(request.name, request.timestampFieldName, Collections.emptyList())); + new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex()))); logger.info("adding data stream [{}]", request.name); return ClusterState.builder(currentState).metadata(builder).build(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index b7aecabd525e1..72ba82ea55711 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -56,7 +56,7 @@ public interface IndexAbstraction { /** * A write index is a dedicated concrete index, that accepts all the new documents that belong to an index abstraction. - * + *

* A write index may also be a regular concrete index of a index abstraction and may therefore also be returned * by {@link #getIndices()}. An index abstraction may also not have a dedicated write index. * @@ -87,7 +87,9 @@ enum Type { * An alias typically refers to many concrete indices and * may have a write index. */ - ALIAS("alias"); + ALIAS("alias"), + + DATA_STREAM("data_stream"); private final String displayName; @@ -181,7 +183,7 @@ public boolean isHidden() { /** * Returns the unique alias metadata per concrete index. - * + *

* (note that although alias can point to the same concrete indices, each alias reference may have its own routing * and filters) */ @@ -233,7 +235,7 @@ public void computeAndValidateAliasProperties() { // Validate hidden status final Map> groupedByHiddenStatus = referenceIndexMetadatas.stream() - .collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden()))); + .collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden()))); if (isNonEmpty(groupedByHiddenStatus.get(true)) && isNonEmpty(groupedByHiddenStatus.get(false))) { List hiddenOn = groupedByHiddenStatus.get(true).stream() .map(idx -> idx.getIndex().getName()).collect(Collectors.toList()); @@ -250,4 +252,42 @@ private boolean isNonEmpty(List idxMetas) { return (Objects.isNull(idxMetas) || idxMetas.isEmpty()) == false; } } + + class DataStream implements IndexAbstraction { + + private final org.elasticsearch.cluster.metadata.DataStream dataStream; + private final List dataStreamIndices; + private final IndexMetaData writeIndex; + + public DataStream(org.elasticsearch.cluster.metadata.DataStream dataStream, List dataStreamIndices, + IndexMetaData writeIndex) { + this.dataStream = dataStream; + this.dataStreamIndices = dataStreamIndices; + this.writeIndex = writeIndex; + } + + @Override + public String getName() { + return dataStream.getName(); + } + + @Override + public Type getType() { + return Type.DATA_STREAM; + } + + @Override + public List getIndices() { + return dataStreamIndices; + } + + public IndexMetaData getWriteIndex() { + return writeIndex; + } + + @Override + public boolean isHidden() { + return false; + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 5984d886d9ee3..f899f3b221939 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -69,6 +69,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -1367,6 +1368,7 @@ private SortedMap buildIndicesLookup() { }); } } + aliasAndIndexLookup.values().stream() .filter(aliasOrIndex -> aliasOrIndex.getType() == IndexAbstraction.Type.ALIAS) .forEach(alias -> ((IndexAbstraction.Alias) alias).computeAndValidateAliasProperties()); @@ -1377,12 +1379,27 @@ private void validateDataStreams(SortedMap indicesLook DataStreamMetadata dsMetadata = (DataStreamMetadata) customs.get(DataStreamMetadata.TYPE); if (dsMetadata != null) { for (DataStream ds : dsMetadata.dataStreams().values()) { - if (indicesLookup.containsKey(ds.getName())) { + IndexAbstraction existing = indicesLookup.get(ds.getName()); + if (existing != null && existing.getType() != IndexAbstraction.Type.DATA_STREAM) { throw new IllegalStateException("data stream [" + ds.getName() + "] conflicts with existing index or alias"); } - SortedMap map = indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-' + SortedMap map = + indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-' if (map.size() != 0) { + if (map.size() == ds.getIndices().size()) { + int numValidIndices = 0; + for (int i = 0; i < map.size(); i++) { + IndexAbstraction space = map.get(String.format(Locale.ROOT, "%s-%06d", ds.getName(), i)); + if (space != null && space.getType() == IndexAbstraction.Type.CONCRETE_INDEX) { + numValidIndices++; + } + } + if (numValidIndices == map.size()) { + continue; + } + } + throw new IllegalStateException("data stream [" + ds.getName() + "] could create backing indices that conflict with " + map.size() + " existing index(s) or alias(s)" + " including '" + map.firstKey() + "'"); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java index 089018c52db6b..4b4c3bc51a3c7 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java @@ -18,12 +18,16 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; @@ -35,6 +39,8 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase { + private final MetadataCreateIndexService metadataCreateIndexService = new MockMetadataCreateIndexService(); + @Override protected Writeable.Reader instanceReader() { return Request::new; @@ -62,11 +68,11 @@ public void testValidateRequestWithoutTimestampField() { assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing")); } - public void testCreateDataStream() { + public void testCreateDataStream() throws Exception { final String dataStreamName = "my-data-stream"; ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); - ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(cs, req); + ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req); assertThat(newState.metadata().dataStreams().size(), equalTo(1)); assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); } @@ -79,7 +85,7 @@ public void testCreateDuplicateDataStream() { CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> CreateDataStreamAction.TransportAction.createDataStream(cs, req)); + () -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req)); assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists")); } @@ -88,7 +94,26 @@ public void testCreateDataStreamWithInvalidName() { ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> CreateDataStreamAction.TransportAction.createDataStream(cs, req)); + () -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req)); assertThat(e.getMessage(), containsString("must not contain the following characters")); } + + private static class MockMetadataCreateIndexService extends MetadataCreateIndexService { + + MockMetadataCreateIndexService() { + super(null, null, null, null, null, null, null, null, null, null, false); + } + + @Override + public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request, + boolean silent) throws Exception { + Metadata.Builder b = Metadata.builder(currentState.metadata()) + .put(IndexMetadata.builder(request.index()) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(), false); + return ClusterState.builder(currentState).metadata(b.build()).build(); + } + } } From 806acc45f286f744e3fe6bd46e5385235649e2ff Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Mon, 30 Mar 2020 16:19:23 -0500 Subject: [PATCH 2/9] update unit test --- .../admin/indices/datastream/CreateDataStreamRequestTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java index 4b4c3bc51a3c7..d58363e9d684b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java @@ -36,6 +36,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase { @@ -75,6 +76,7 @@ public void testCreateDataStream() throws Exception { ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req); assertThat(newState.metadata().dataStreams().size(), equalTo(1)); assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); + assertThat(newState.metadata().index(dataStreamName + "-000000"), notNullValue()); } public void testCreateDuplicateDataStream() { From 078dae15dac3c8bcfb3d3ddee14d9ff5f481d0c5 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Mon, 30 Mar 2020 16:51:59 -0500 Subject: [PATCH 3/9] temporarily restrict YML test --- .../rest-api-spec/test/indices.data_stream/10_basic.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 2fb7fe2a9fa9b..951ec0066622f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -1,8 +1,8 @@ --- "Create data stream": - skip: - version: " - 7.6.99" - reason: available only in 7.7+ + version: " - 7.99.99" + reason: "enable in 7.8+ after back-porting https://github.com/elastic/elasticsearch/pull/54467" - do: indices.create_data_stream: From 1284b2b14ed867d6087cdf66beccab8099ea98c1 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Tue, 31 Mar 2020 08:20:21 -0500 Subject: [PATCH 4/9] wip on review comments --- .../test/indices.data_stream/10_basic.yml | 2 +- .../datastream/CreateDataStreamAction.java | 3 +- .../cluster/metadata/IndexAbstraction.java | 38 ------------------- .../cluster/metadata/MetadataTests.java | 2 +- 4 files changed, 4 insertions(+), 41 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 951ec0066622f..082bf0ec4282f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -40,7 +40,7 @@ index: simple-data-stream1 body: { query: { match_all: {} } } - length: { hits.hits: 1 } - - match: { hits.hits.0._index: simple-data-stream1-000000 } + - match: { hits.hits.0._index: simple-data-stream1-000001 } - match: { hits.hits.0._source.foo: 'bar' } - do: diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java index aca6494773bfd..3a15568fc9ee0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -177,11 +177,12 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn MetadataCreateIndexService.validateIndexOrAliasName(request.name, (s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2)); - String firstBackingIndexName = request.name + "-000000"; + String firstBackingIndexName = request.name + "-000001"; CreateIndexClusterStateUpdateRequest createIndexRequest = new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName); currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false); IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName); + assert firstBackingIndex != null; Metadata.Builder builder = Metadata.builder(currentState.metadata()).put( new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex()))); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index 72ba82ea55711..646dbc35e2160 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -252,42 +252,4 @@ private boolean isNonEmpty(List idxMetas) { return (Objects.isNull(idxMetas) || idxMetas.isEmpty()) == false; } } - - class DataStream implements IndexAbstraction { - - private final org.elasticsearch.cluster.metadata.DataStream dataStream; - private final List dataStreamIndices; - private final IndexMetaData writeIndex; - - public DataStream(org.elasticsearch.cluster.metadata.DataStream dataStream, List dataStreamIndices, - IndexMetaData writeIndex) { - this.dataStream = dataStream; - this.dataStreamIndices = dataStreamIndices; - this.writeIndex = writeIndex; - } - - @Override - public String getName() { - return dataStream.getName(); - } - - @Override - public Type getType() { - return Type.DATA_STREAM; - } - - @Override - public List getIndices() { - return dataStreamIndices; - } - - public IndexMetaData getWriteIndex() { - return writeIndex; - } - - @Override - public boolean isHidden() { - return false; - } - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index 01ba7715ecffa..3a04306a1125c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -939,7 +939,7 @@ public void testBuilderRejectsDataStreamThatConflictsWithAlias() { public void testBuilderRejectsDataStreamWithConflictingBackingIndices() { final String dataStreamName = "my-data-stream"; - final String conflictingIndex = dataStreamName + "-00001"; + final String conflictingIndex = dataStreamName + "-000001"; Metadata.Builder b = Metadata.builder() .put(IndexMetadata.builder(conflictingIndex) .settings(settings(Version.CURRENT)) From df1bf0542d1c3486ed0bc8e0b84797a062aee9cf Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Tue, 31 Mar 2020 18:26:40 -0500 Subject: [PATCH 5/9] finish review comments and tests --- .../datastream/CreateDataStreamAction.java | 4 ++- .../cluster/metadata/Metadata.java | 30 +++++++++---------- .../CreateDataStreamRequestTests.java | 14 ++++++--- .../cluster/metadata/MetadataTests.java | 25 ++++++++++++++++ 4 files changed, 52 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java index 3a15568fc9ee0..b2fa2f694b818 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -179,7 +180,8 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn String firstBackingIndexName = request.name + "-000001"; CreateIndexClusterStateUpdateRequest createIndexRequest = - new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName); + new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName) + .settings(Settings.builder().put("index.hidden", true).build()); currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false); IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName); assert firstBackingIndex != null; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index f899f3b221939..a09f552974ce2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -69,7 +69,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -78,6 +77,7 @@ import java.util.TreeMap; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.elasticsearch.common.settings.Settings.readSettingsFromStream; @@ -1384,25 +1384,23 @@ private void validateDataStreams(SortedMap indicesLook throw new IllegalStateException("data stream [" + ds.getName() + "] conflicts with existing index or alias"); } - SortedMap map = + SortedMap potentialConflicts = indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-' - if (map.size() != 0) { - if (map.size() == ds.getIndices().size()) { - int numValidIndices = 0; - for (int i = 0; i < map.size(); i++) { - IndexAbstraction space = map.get(String.format(Locale.ROOT, "%s-%06d", ds.getName(), i)); - if (space != null && space.getType() == IndexAbstraction.Type.CONCRETE_INDEX) { - numValidIndices++; - } - } - if (numValidIndices == map.size()) { - continue; + if (potentialConflicts.size() != 0) { + List indexNames = ds.getIndices().stream().map(Index::getName).collect(Collectors.toList()); + List conflicts = new ArrayList<>(); + for (Map.Entry entry : potentialConflicts.entrySet()) { + if (entry.getValue().getType() != IndexAbstraction.Type.CONCRETE_INDEX || + indexNames.contains(entry.getKey()) == false) { + conflicts.add(entry.getKey()); } } - throw new IllegalStateException("data stream [" + ds.getName() + - "] could create backing indices that conflict with " + map.size() + " existing index(s) or alias(s)" + - " including '" + map.firstKey() + "'"); + if (conflicts.size() > 0) { + throw new IllegalStateException("data stream [" + ds.getName() + + "] could create backing indices that conflict with " + conflicts.size() + " existing index(s) or alias(s)" + + " including '" + conflicts.get(0) + "'"); + } } } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java index d58363e9d684b..457d54fd7e371 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.AbstractWireSerializingTestCase; import java.util.Collections; @@ -40,8 +41,6 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase { - private final MetadataCreateIndexService metadataCreateIndexService = new MockMetadataCreateIndexService(); - @Override protected Writeable.Reader instanceReader() { return Request::new; @@ -70,16 +69,19 @@ public void testValidateRequestWithoutTimestampField() { } public void testCreateDataStream() throws Exception { + final MetadataCreateIndexService metadataCreateIndexService = new MockMetadataCreateIndexService(); final String dataStreamName = "my-data-stream"; ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req); assertThat(newState.metadata().dataStreams().size(), equalTo(1)); assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); - assertThat(newState.metadata().index(dataStreamName + "-000000"), notNullValue()); + assertThat(newState.metadata().index(dataStreamName + "-000001"), notNullValue()); + assertThat(newState.metadata().index(dataStreamName + "-000001").getSettings().get("index.hidden"), equalTo("true")); } public void testCreateDuplicateDataStream() { + final MetadataCreateIndexService metadataCreateIndexService = new MockMetadataCreateIndexService(); final String dataStreamName = "my-data-stream"; DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList()); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -92,6 +94,7 @@ public void testCreateDuplicateDataStream() { } public void testCreateDataStreamWithInvalidName() { + final MetadataCreateIndexService metadataCreateIndexService = new MockMetadataCreateIndexService(); final String dataStreamName = "_My-da#ta- ,stream-"; ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); @@ -111,7 +114,10 @@ public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateInd boolean silent) throws Exception { Metadata.Builder b = Metadata.builder(currentState.metadata()) .put(IndexMetadata.builder(request.index()) - .settings(settings(Version.CURRENT)) + .settings(Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(request.settings()) + .build()) .numberOfShards(1) .numberOfReplicas(1) .build(), false); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index 3a04306a1125c..9db53b856eaa6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -43,11 +43,13 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; @@ -953,6 +955,29 @@ public void testBuilderRejectsDataStreamWithConflictingBackingIndices() { "] could create backing indices that conflict with 1 existing index(s) or alias(s) including '" + conflictingIndex + "'")); } + public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() { + final String dataStreamName = "my-data-stream"; + final List backingIndices = new ArrayList<>(); + final int numBackingIndices = randomIntBetween(2, 5); + int lastBackingIndexNum = randomIntBetween(9, 50); + Metadata.Builder b = Metadata.builder(); + for (int k = 1; k <= numBackingIndices; k++) { + IndexMetadata im = IndexMetadata.builder(String.format(Locale.ROOT, "%s-%06d", dataStreamName, lastBackingIndexNum)) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + b.put(im, false); + backingIndices.add(im.getIndex()); + lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50); + } + + b.put(new DataStream(dataStreamName, "ts", backingIndices)); + Metadata metadata = b.build(); + assertThat(metadata.dataStreams().size(), equalTo(1)); + assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); + } + public void testSerialization() throws IOException { final Metadata orig = randomMetadata(); final BytesStreamOutput out = new BytesStreamOutput(); From 47e6b8802525f5aec6b1fbb6b2c8ce90c55e668f Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Tue, 31 Mar 2020 18:53:29 -0500 Subject: [PATCH 6/9] fix test --- .../test/indices.data_stream/10_basic.yml | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 082bf0ec4282f..8afadc6a00931 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -27,22 +27,6 @@ - match: { 1.timestamp_field: '@timestamp2' } - length: { 1.indices: 1 } - - do: - index: - index: simple-data-stream1 - body: { foo: bar } - - - do: - indices.refresh: {} - - - do: - search: - index: simple-data-stream1 - body: { query: { match_all: {} } } - - length: { hits.hits: 1 } - - match: { hits.hits.0._index: simple-data-stream1-000001 } - - match: { hits.hits.0._source.foo: 'bar' } - - do: indices.delete_data_stream: name: simple-data-stream1 From 0bd4bd4df3e9a8fedfc181fe9a814053952b91d0 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 1 Apr 2020 12:20:13 -0500 Subject: [PATCH 7/9] wip on review comments --- .../rest-api-spec/test/indices.data_stream/10_basic.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 8afadc6a00931..24aa842f76584 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -23,9 +23,11 @@ - match: { 0.name: simple-data-stream1 } - match: { 0.timestamp_field: '@timestamp' } - length: { 0.indices: 1 } + - match: { 0.indices: ['simple-data-stream1-000001'] } - match: { 1.name: simple-data-stream2 } - match: { 1.timestamp_field: '@timestamp2' } - length: { 1.indices: 1 } + - match: { 1.indices: ['simple-data-stream2-000001'] } - do: indices.delete_data_stream: From 6c1856feb7e4aa13f2544347b339e89217da2b29 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 1 Apr 2020 13:28:29 -0500 Subject: [PATCH 8/9] review comments --- .../cluster/metadata/IndexAbstraction.java | 5 ++ .../CreateDataStreamRequestTests.java | 50 ++++++++++--------- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index 646dbc35e2160..98c3dd31ffe1e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -89,6 +89,11 @@ enum Type { */ ALIAS("alias"), + /** + * An index abstraction that refers to a data stream. + * A data stream typically has multiple backing indices, the latest of which + * is the target for index requests. + */ DATA_STREAM("data_stream"); private final String displayName; diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java index 457d54fd7e371..40e1f81307284 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java @@ -38,6 +38,10 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase { @@ -69,7 +73,7 @@ public void testValidateRequestWithoutTimestampField() { } public void testCreateDataStream() throws Exception { - final MetadataCreateIndexService metadataCreateIndexService = new MockMetadataCreateIndexService(); + final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService(); final String dataStreamName = "my-data-stream"; ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); @@ -80,8 +84,8 @@ public void testCreateDataStream() throws Exception { assertThat(newState.metadata().index(dataStreamName + "-000001").getSettings().get("index.hidden"), equalTo("true")); } - public void testCreateDuplicateDataStream() { - final MetadataCreateIndexService metadataCreateIndexService = new MockMetadataCreateIndexService(); + public void testCreateDuplicateDataStream() throws Exception { + final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService(); final String dataStreamName = "my-data-stream"; DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList()); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -93,8 +97,8 @@ public void testCreateDuplicateDataStream() { assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists")); } - public void testCreateDataStreamWithInvalidName() { - final MetadataCreateIndexService metadataCreateIndexService = new MockMetadataCreateIndexService(); + public void testCreateDataStreamWithInvalidName() throws Exception { + final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService(); final String dataStreamName = "_My-da#ta- ,stream-"; ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); @@ -103,25 +107,25 @@ public void testCreateDataStreamWithInvalidName() { assertThat(e.getMessage(), containsString("must not contain the following characters")); } - private static class MockMetadataCreateIndexService extends MetadataCreateIndexService { + private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception { + MetadataCreateIndexService s = mock(MetadataCreateIndexService.class); + when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean())) + .thenAnswer(mockInvocation -> { + ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0]; + CreateIndexClusterStateUpdateRequest request = (CreateIndexClusterStateUpdateRequest) mockInvocation.getArguments()[1]; - MockMetadataCreateIndexService() { - super(null, null, null, null, null, null, null, null, null, null, false); - } + Metadata.Builder b = Metadata.builder(currentState.metadata()) + .put(IndexMetadata.builder(request.index()) + .settings(Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(request.settings()) + .build()) + .numberOfShards(1) + .numberOfReplicas(1) + .build(), false); + return ClusterState.builder(currentState).metadata(b.build()).build(); + }); - @Override - public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request, - boolean silent) throws Exception { - Metadata.Builder b = Metadata.builder(currentState.metadata()) - .put(IndexMetadata.builder(request.index()) - .settings(Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(request.settings()) - .build()) - .numberOfShards(1) - .numberOfReplicas(1) - .build(), false); - return ClusterState.builder(currentState).metadata(b.build()).build(); - } + return s; } } From 9c9a42c0f8bed34165119b1fa2d7dce8f55a7d53 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 1 Apr 2020 14:10:16 -0500 Subject: [PATCH 9/9] fix test --- .../rest-api-spec/test/indices.data_stream/10_basic.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 24aa842f76584..52e56003fb166 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -23,11 +23,11 @@ - match: { 0.name: simple-data-stream1 } - match: { 0.timestamp_field: '@timestamp' } - length: { 0.indices: 1 } - - match: { 0.indices: ['simple-data-stream1-000001'] } + - match: { 0.indices.0.index_name: 'simple-data-stream1-000001' } - match: { 1.name: simple-data-stream2 } - match: { 1.timestamp_field: '@timestamp2' } - length: { 1.indices: 1 } - - match: { 1.indices: ['simple-data-stream2-000001'] } + - match: { 1.indices.0.index_name: 'simple-data-stream2-000001' } - do: indices.delete_data_stream: