Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create first backing index when creating data stream #54467

Merged
merged 10 commits into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
"Create data stream":
- skip:
version: " - 7.6.99"
reason: available only in 7.7+
version: " - 7.99.99"
reason: "enable in 7.8+ after back-porting https://github.com/elastic/elasticsearch/pull/54467"

- do:
indices.create_data_stream:
Expand All @@ -22,10 +22,15 @@
indices.get_data_streams: {}
- match: { 0.name: simple-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.indices: [] }
- length: { 0.indices: 1 }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can also the index name be checked here inside the indices array field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martijnvg, it looks like the format of the backing indices response changed when the data stream's indices member was changed from List<String> to List<Index>. It now looks like this:

       {
         "name" : "simple-data-stream1",
         "timestamp_field" : "@timestamp",
         "indices" : [
           {
             "index_name" : "simple-data-stream1-000001",
             "index_uuid" : "V3ONJviCQB68b1BBzMJwMw"
           }
         ]
       }

Is that ok?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is intended. We can add this assert:

- match: {0.indices.0.index_name: 'simple-data-stream1-000001')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind my comment, you already made this change :)

- match: { 1.name: simple-data-stream2 }
- match: { 1.timestamp_field: '@timestamp2' }
- match: { 1.indices: [] }
- length: { 1.indices: 1 }

- do:
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged

- do:
indices.delete_data_stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
Expand All @@ -42,13 +44,14 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
Expand Down Expand Up @@ -117,10 +120,14 @@ public int hashCode() {

public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {

private final MetadataCreateIndexService metadataCreateIndexService;

@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService metaDataCreateIndexService) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
this.metadataCreateIndexService = metaDataCreateIndexService;
}

@Override
Expand Down Expand Up @@ -151,7 +158,7 @@ public void onFailure(String source, Exception e) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return createDataStream(currentState, request);
return createDataStream(metadataCreateIndexService, currentState, request);
}

@Override
Expand All @@ -161,16 +168,26 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

static ClusterState createDataStream(ClusterState currentState, Request request) {
static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
ClusterState currentState,
Request request) throws Exception {
if (currentState.metadata().dataStreams().containsKey(request.name)) {
throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
}

MetadataCreateIndexService.validateIndexOrAliasName(request.name,
(s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));

String firstBackingIndexName = request.name + "-000001";
CreateIndexClusterStateUpdateRequest createIndexRequest =
martijnvg marked this conversation as resolved.
Show resolved Hide resolved
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
.settings(Settings.builder().put("index.hidden", true).build());
currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName);
assert firstBackingIndex != null;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add:

Suggested change
assert firstBackingIndex != null;

since that is guaranteed to fail tests (the NPE occurring further down could be swallowed).

Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(
new DataStream(request.name, request.timestampFieldName, Collections.emptyList()));
new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex())));
logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metadata(builder).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public interface IndexAbstraction {

/**
* A write index is a dedicated concrete index, that accepts all the new documents that belong to an index abstraction.
*
* <p>
* A write index may also be a regular concrete index of a index abstraction and may therefore also be returned
* by {@link #getIndices()}. An index abstraction may also not have a dedicated write index.
*
Expand Down Expand Up @@ -87,7 +87,9 @@ enum Type {
* An alias typically refers to many concrete indices and
* may have a write index.
*/
ALIAS("alias");
ALIAS("alias"),

DATA_STREAM("data_stream");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add some java doc here?


private final String displayName;

Expand Down Expand Up @@ -181,7 +183,7 @@ public boolean isHidden() {

/**
* Returns the unique alias metadata per concrete index.
*
* <p>
* (note that although alias can point to the same concrete indices, each alias reference may have its own routing
* and filters)
*/
Expand Down Expand Up @@ -233,7 +235,7 @@ public void computeAndValidateAliasProperties() {

// Validate hidden status
final Map<Boolean, List<IndexMetadata>> groupedByHiddenStatus = referenceIndexMetadatas.stream()
.collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden())));
.collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden())));
if (isNonEmpty(groupedByHiddenStatus.get(true)) && isNonEmpty(groupedByHiddenStatus.get(false))) {
List<String> hiddenOn = groupedByHiddenStatus.get(true).stream()
.map(idx -> idx.getIndex().getName()).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
Expand Down Expand Up @@ -1367,6 +1368,7 @@ private SortedMap<String, IndexAbstraction> buildIndicesLookup() {
});
}
}

aliasAndIndexLookup.values().stream()
.filter(aliasOrIndex -> aliasOrIndex.getType() == IndexAbstraction.Type.ALIAS)
.forEach(alias -> ((IndexAbstraction.Alias) alias).computeAndValidateAliasProperties());
Expand All @@ -1377,15 +1379,28 @@ private void validateDataStreams(SortedMap<String, IndexAbstraction> indicesLook
DataStreamMetadata dsMetadata = (DataStreamMetadata) customs.get(DataStreamMetadata.TYPE);
if (dsMetadata != null) {
for (DataStream ds : dsMetadata.dataStreams().values()) {
if (indicesLookup.containsKey(ds.getName())) {
IndexAbstraction existing = indicesLookup.get(ds.getName());
if (existing != null && existing.getType() != IndexAbstraction.Type.DATA_STREAM) {
throw new IllegalStateException("data stream [" + ds.getName() + "] conflicts with existing index or alias");
}

SortedMap<?, ?> map = indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
if (map.size() != 0) {
throw new IllegalStateException("data stream [" + ds.getName() +
"] could create backing indices that conflict with " + map.size() + " existing index(s) or alias(s)" +
" including '" + map.firstKey() + "'");
SortedMap<String, IndexAbstraction> potentialConflicts =
indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
if (potentialConflicts.size() != 0) {
List<String> indexNames = ds.getIndices().stream().map(Index::getName).collect(Collectors.toList());
List<String> conflicts = new ArrayList<>();
for (Map.Entry<String, IndexAbstraction> entry : potentialConflicts.entrySet()) {
if (entry.getValue().getType() != IndexAbstraction.Type.CONCRETE_INDEX ||
indexNames.contains(entry.getKey()) == false) {
conflicts.add(entry.getKey());
}
}

if (conflicts.size() > 0) {
throw new IllegalStateException("data stream [" + ds.getName() +
"] could create backing indices that conflict with " + conflicts.size() + " existing index(s) or alias(s)" +
" including '" + conflicts.get(0) + "'");
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,26 @@
*/
package org.elasticsearch.action.admin.indices.datastream;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.util.Collections;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {

Expand Down Expand Up @@ -62,33 +68,60 @@ public void testValidateRequestWithoutTimestampField() {
assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing"));
}

public void testCreateDataStream() {
public void testCreateDataStream() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = new MockMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(cs, req);
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
assertThat(newState.metadata().index(dataStreamName + "-000001"), notNullValue());
assertThat(newState.metadata().index(dataStreamName + "-000001").getSettings().get("index.hidden"), equalTo("true"));
}

public void testCreateDuplicateDataStream() {
final MetadataCreateIndexService metadataCreateIndexService = new MockMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList());
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);

IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists"));
}

public void testCreateDataStreamWithInvalidName() {
final MetadataCreateIndexService metadataCreateIndexService = new MockMetadataCreateIndexService();
final String dataStreamName = "_My-da#ta- ,stream-";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("must not contain the following characters"));
}

private static class MockMetadataCreateIndexService extends MetadataCreateIndexService {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead of mock class use: Mockito#spy(...) and attach expected behaviour?


MockMetadataCreateIndexService() {
super(null, null, null, null, null, null, null, null, null, null, false);
}

@Override
public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request,
boolean silent) throws Exception {
Metadata.Builder b = Metadata.builder(currentState.metadata())
.put(IndexMetadata.builder(request.index())
.settings(Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(request.settings())
.build())
.numberOfShards(1)
.numberOfReplicas(1)
.build(), false);
return ClusterState.builder(currentState).metadata(b.build()).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -939,7 +941,7 @@ public void testBuilderRejectsDataStreamThatConflictsWithAlias() {

public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
final String dataStreamName = "my-data-stream";
final String conflictingIndex = dataStreamName + "-00001";
final String conflictingIndex = dataStreamName + "-000001";
Metadata.Builder b = Metadata.builder()
.put(IndexMetadata.builder(conflictingIndex)
.settings(settings(Version.CURRENT))
Expand All @@ -953,6 +955,29 @@ public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
"] could create backing indices that conflict with 1 existing index(s) or alias(s) including '" + conflictingIndex + "'"));
}

public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() {
final String dataStreamName = "my-data-stream";
final List<Index> backingIndices = new ArrayList<>();
final int numBackingIndices = randomIntBetween(2, 5);
int lastBackingIndexNum = randomIntBetween(9, 50);
Metadata.Builder b = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(String.format(Locale.ROOT, "%s-%06d", dataStreamName, lastBackingIndexNum))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
b.put(im, false);
backingIndices.add(im.getIndex());
lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
}

b.put(new DataStream(dataStreamName, "ts", backingIndices));
Metadata metadata = b.build();
assertThat(metadata.dataStreams().size(), equalTo(1));
assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
}

public void testSerialization() throws IOException {
final Metadata orig = randomMetadata();
final BytesStreamOutput out = new BytesStreamOutput();
Expand Down