Skip to content

Commit

Permalink
TSDB: Automatically add timestamp mapper (#79136)
Browse files Browse the repository at this point in the history
If tsdb is enabled we need an `@timestamp` field. This automatically
maps the field if it is missing and fails to create indices in
time_series mode that map `@timestamp` as anything other than `date` and
`date_nanos`.
  • Loading branch information
weizijun authored Oct 26, 2021
1 parent 1badb59 commit 39deeb7
Show file tree
Hide file tree
Showing 17 changed files with 410 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@

---
date:
- skip:
version: " - 7.99.99"
reason: introduced in 8.0.0 to be backported to 7.16.0

- do:
indices.create:
index: test
body:
settings:
index:
mode: time_series
routing_path: [metricset]
number_of_replicas: 0
number_of_shards: 2
mappings:
properties:
"@timestamp":
type: date
metricset:
type: keyword
time_series_dimension: true

- do:
indices.get_mapping:
index: test
- match: { "[email protected]": date }
- match: { 'test.mappings._data_stream_timestamp.enabled': true }

- do:
bulk:
refresh: true
index: test_index
body:
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod"}'

- do:
search:
index: test_index
body:
docvalue_fields: [ '@timestamp' ]
- match: {hits.total.value: 1}
- match: { "hits.hits.0.fields.@timestamp": ["2021-04-28T18:50:04.467Z"] }

---
date_nanos:
- skip:
version: " - 7.99.99"
reason: introduced in 8.0.0 to be backported to 7.16.0

- do:
indices.create:
index: test
body:
settings:
index:
mode: time_series
routing_path: [metricset]
number_of_replicas: 0
number_of_shards: 2
mappings:
properties:
"@timestamp":
type: date_nanos
metricset:
type: keyword
time_series_dimension: true

- do:
indices.get_mapping:
index: test
- match: { "[email protected]": date_nanos }
- match: { 'test.mappings._data_stream_timestamp.enabled': true }

- do:
bulk:
refresh: true
index: test_index
body:
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod"}'

- do:
search:
index: test_index
body:
docvalue_fields: [ '@timestamp' ]
- match: {hits.total.value: 1}
- match: { "hits.hits.0.fields.@timestamp": ["2021-04-28T18:50:04.467Z"] }

---
automatically add with date:
- skip:
version: " - 7.99.99"
reason: introduced in 8.0.0 to be backported to 7.16.0

- do:
indices.create:
index: test
body:
settings:
index:
mode: time_series
routing_path: [metricset]
number_of_replicas: 0
number_of_shards: 2
mappings:
properties:
metricset:
type: keyword
time_series_dimension: true

- do:
indices.get_mapping:
index: test
- match: { 'test.mappings.properties.@timestamp': { "type": date } }
- match: { 'test.mappings._data_stream_timestamp.enabled': true }

- do:
bulk:
refresh: true
index: test_index
body:
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod"}'

- do:
search:
index: test_index
body:
docvalue_fields: [ '@timestamp' ]
- match: {hits.total.value: 1}
- match: { "hits.hits.0.fields.@timestamp": ["2021-04-28T18:50:04.467Z"] }

---
reject @timestamp with wrong type:
- skip:
version: " - 7.99.99"
reason: introduced in 8.0.0 to be backported to 7.16.0

- do:
catch: /data stream timestamp field \[@timestamp\] is of type \[keyword\], but \[date,date_nanos\] is expected/
indices.create:
index: test
body:
settings:
index:
mode: time_series
routing_path: [metricset]
number_of_replicas: 0
number_of_shards: 2
mappings:
properties:
"@timestamp":
type: keyword

---
reject timestamp meta field with wrong type:
- skip:
version: " - 7.99.99"
reason: introduced in 8.0.0 to be backported to 7.16.0

- do:
catch: /.* time series index \[_data_stream_timestamp\] meta field must be enabled/
indices.create:
index: test
body:
settings:
index:
mode: time_series
routing_path: [metricset]
number_of_replicas: 0
number_of_shards: 2
mappings:
_data_stream_timestamp:
enabled: false
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,10 @@
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.Matchers.equalTo;

public class ComposableTemplateIT extends ESIntegTestCase {

Expand Down Expand Up @@ -80,23 +72,4 @@ public void testComponentTemplatesCanBeUpdatedAfterRestart() throws Exception {
client().execute(PutComposableIndexTemplateAction.INSTANCE,
new PutComposableIndexTemplateAction.Request("my-it").indexTemplate(cit2)).get();
}

public void testUsageOfDataStreamFails() throws IOException {
// Exception that would happen if a unknown field is provided in a composable template:
// The thrown exception will be used to compare against the exception that is thrown when providing
// a composable template with a data stream definition.
String content = "{\"index_patterns\":[\"logs-*-*\"],\"my_field\":\"bla\"}";
XContentParser parser =
XContentHelper.createParser(xContentRegistry(), null, new BytesArray(content), XContentType.JSON);
Exception expectedException = expectThrows(Exception.class, () -> ComposableIndexTemplate.parse(parser));

ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of("logs-*-*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()).build();
Exception e = expectThrows(IllegalArgumentException.class, () -> client().execute(PutComposableIndexTemplateAction.INSTANCE,
new PutComposableIndexTemplateAction.Request("my-it").indexTemplate(template)).actionGet());
Exception actualException = (Exception) e.getCause();
assertThat(actualException.getMessage(),
equalTo(expectedException.getMessage().replace("[1:32] ", "").replace("my_field", "data_stream")));
assertThat(actualException.getMessage(), equalTo("[index_template] unknown field [data_stream]"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParseException;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperService.MergeReason;
Expand Down Expand Up @@ -1231,16 +1229,6 @@ private static void validateCompositeTemplate(final ClusterState state,
String indexName = DataStream.BACKING_INDEX_PREFIX + temporaryIndexName;
// Parse mappings to ensure they are valid after being composed

if (template.getDataStreamTemplate() != null) {
// If there is no _data_stream meta field mapper and a data stream should be created then
// fail as if the data_stream field can't be parsed:
if (tempIndexService.mapperService().isMetadataField(DataStreamTimestampFieldMapper.NAME) == false) {
// Fail like a parsing expection, since we will be moving data_stream template out of server module and
// then we would fail with the same error message, like we do here.
throw new XContentParseException("[index_template] unknown field [data_stream]");
}
}

List<CompressedXContent> mappings = collectMappings(stateWithIndex, templateName, indexName, xContentRegistry);
try {
MapperService mapperService = tempIndexService.mapperService();
Expand Down
57 changes: 57 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.MappingParserContext;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toSet;
Expand All @@ -44,6 +52,9 @@ void validateWithOtherSettings(Map<Setting<?>, Object> settings) {

@Override
public void validateAlias(@Nullable String indexRouting, @Nullable String searchRouting) {}

@Override
public void completeMappings(MappingParserContext context, Map<String, Object> mapping, RootObjectMapper.Builder builder) {}
},
TIME_SERIES {
@Override
Expand Down Expand Up @@ -88,6 +99,47 @@ private String routingRequiredBad() {
private String tsdbMode() {
return "[" + IndexSettings.MODE.getKey() + "=time_series]";
}

@Override
public void completeMappings(MappingParserContext context, Map<String, Object> mapping, RootObjectMapper.Builder builder) {
if (false == mapping.containsKey(DataStreamTimestampFieldMapper.NAME)) {
mapping.put(DataStreamTimestampFieldMapper.NAME, new HashMap<>(Map.of("enabled", true)));
} else {
validateTimeStampField(mapping.get(DataStreamTimestampFieldMapper.NAME));
}

Optional<Mapper.Builder> timestamp = builder.getBuilder(DataStreamTimestampFieldMapper.DEFAULT_PATH);
if (timestamp.isEmpty()) {
builder.add(
new DateFieldMapper.Builder(
DataStreamTimestampFieldMapper.DEFAULT_PATH,
DateFieldMapper.Resolution.MILLISECONDS,
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER,
context.scriptCompiler(),
DateFieldMapper.IGNORE_MALFORMED_SETTING.get(context.getSettings()),
context.getIndexSettings().getIndexVersionCreated(),
context.getIndexSettings().getIndex().getName()
)
);
}
}

private void validateTimeStampField(Object timestampFieldValue) {
if (false == (timestampFieldValue instanceof Map)) {
throw new IllegalArgumentException(
"time series index [" + DataStreamTimestampFieldMapper.NAME + "] meta field format error"
);
}

@SuppressWarnings("unchecked")
Map<String, Object> timeStampFieldValueMap = (Map<String, Object>) timestampFieldValue;
if (false == Maps.deepEquals(timeStampFieldValueMap, Map.of("enabled", true))
&& false == Maps.deepEquals(timeStampFieldValueMap, Map.of("enabled", "true"))) {
throw new IllegalArgumentException(
"time series index [" + DataStreamTimestampFieldMapper.NAME + "] meta field must be enabled"
);
}
}
};

private static final List<Setting<?>> TIME_SERIES_UNSUPPORTED = List.of(
Expand Down Expand Up @@ -115,4 +167,9 @@ private String tsdbMode() {
* Validate aliases targeting this index.
*/
public abstract void validateAlias(@Nullable String indexRouting, @Nullable String searchRouting);

/**
* Validate and/or modify the mappings after after they've been parsed.
*/
public abstract void completeMappings(MappingParserContext context, Map<String, Object> mapping, RootObjectMapper.Builder builder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
public class DataStreamTimestampFieldMapper extends MetadataFieldMapper {

public static final String NAME = "_data_stream_timestamp";
private static final String DEFAULT_PATH = "@timestamp";
public static final String DEFAULT_PATH = "@timestamp";

private static final DataStreamTimestampFieldMapper ENABLED_INSTANCE =
public static final DataStreamTimestampFieldMapper ENABLED_INSTANCE =
new DataStreamTimestampFieldMapper(TimestampFieldType.INSTANCE, true);
private static final DataStreamTimestampFieldMapper DISABLED_INSTANCE =
new DataStreamTimestampFieldMapper(TimestampFieldType.INSTANCE, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.XContentType;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -94,7 +94,8 @@ Mapping parse(@Nullable String type, CompressedXContent source) throws MapperPar

private Mapping parse(String type, Map<String, Object> mapping) throws MapperParsingException {
MappingParserContext parserContext = parserContextSupplier.get();
RootObjectMapper rootObjectMapper = rootObjectTypeParser.parse(type, mapping, parserContext).build(MapperBuilderContext.ROOT);
RootObjectMapper.Builder rootObjectMapperBuilder = rootObjectTypeParser.parse(type, mapping, parserContext);
parserContext.getIndexSettings().getMode().completeMappings(parserContext, mapping, rootObjectMapperBuilder);

Map<Class<? extends MetadataFieldMapper>, MetadataFieldMapper> metadataMappers = metadataMappersSupplier.get();
Map<String, Object> meta = null;
Expand Down Expand Up @@ -143,7 +144,7 @@ private Mapping parse(String type, Map<String, Object> mapping) throws MapperPar
checkNoRemainingFields(mapping, "Root mapping definition has unsupported parameters: ");

return new Mapping(
rootObjectMapper,
rootObjectMapperBuilder.build(MapperBuilderContext.ROOT),
metadataMappers.values().toArray(new MetadataFieldMapper[0]),
meta);
}
Expand Down
Loading

0 comments on commit 39deeb7

Please sign in to comment.