Skip to content

Commit

Permalink
add timestamp metafield to time_series index
Browse files Browse the repository at this point in the history
  • Loading branch information
weizijun committed Oct 14, 2021
1 parent c552bc8 commit 4d38bdb
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ reject @timestamp with wrong type:
reason: introduced in 8.0.0 to be backported to 7.16.0

- do:
catch: /@timestamp must be \[date\] or \[date_nanos\]/
catch: /data stream timestamp field \[@timestamp\] is of type keyword, but \[date,date_nanos\] is expected/
indices.create:
index: test
body:
Expand Down
35 changes: 26 additions & 9 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MappingParserContext;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.RootObjectMapper;

import java.util.List;
Expand All @@ -33,7 +35,11 @@ public enum IndexMode {
void validateWithOtherSettings(Map<Setting<?>, Object> settings) {}

@Override
public void completeMappings(MappingParserContext context, RootObjectMapper.Builder builder) {}
public void completeMappings(
MappingParserContext context,
Map<Class<? extends MetadataFieldMapper>, MetadataFieldMapper> metadataMappers,
RootObjectMapper.Builder builder
) {}
},
TIME_SERIES {
@Override
Expand All @@ -53,23 +59,30 @@ private String error(Setting<?> unsupported) {
}

@Override
public void completeMappings(MappingParserContext context, RootObjectMapper.Builder builder) {
Optional<Mapper.Builder> timestamp = builder.getBuilder("@timestamp");
public void completeMappings(
MappingParserContext context,
Map<Class<? extends MetadataFieldMapper>, MetadataFieldMapper> metadataMappers,
RootObjectMapper.Builder builder
) {
DataStreamTimestampFieldMapper timestampFieldMapper = (DataStreamTimestampFieldMapper) metadataMappers.get(
DataStreamTimestampFieldMapper.class
);
if (timestampFieldMapper == null || false == timestampFieldMapper.isEnabled()) {
metadataMappers.put(DataStreamTimestampFieldMapper.class, DataStreamTimestampFieldMapper.ENABLED_INSTANCE);
}

Optional<Mapper.Builder> timestamp = builder.getBuilder(DataStreamTimestampFieldMapper.DEFAULT_PATH);
if (timestamp.isEmpty()) {
builder.add(
new DateFieldMapper.Builder(
"@timestamp",
DataStreamTimestampFieldMapper.DEFAULT_PATH,
DateFieldMapper.Resolution.MILLISECONDS,
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER,
context.scriptCompiler(),
DateFieldMapper.IGNORE_MALFORMED_SETTING.get(context.getSettings()),
context.getIndexSettings().getIndexVersionCreated()
)
);
return;
}
if (false == timestamp.get() instanceof DateFieldMapper.Builder) {
throw new IllegalArgumentException("@timestamp must be [date] or [date_nanos]");
}
}
};
Expand All @@ -90,5 +103,9 @@ public void completeMappings(MappingParserContext context, RootObjectMapper.Buil
/**
* Validate and/or modify the mappings after after they've been parsed.
*/
public abstract void completeMappings(MappingParserContext context, RootObjectMapper.Builder builder);
public abstract void completeMappings(
MappingParserContext context,
Map<Class<? extends MetadataFieldMapper>, MetadataFieldMapper> metadataMappers,
RootObjectMapper.Builder builder
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
public class DataStreamTimestampFieldMapper extends MetadataFieldMapper {

public static final String NAME = "_data_stream_timestamp";
private static final String DEFAULT_PATH = "@timestamp";
public static final String DEFAULT_PATH = "@timestamp";

private static final DataStreamTimestampFieldMapper ENABLED_INSTANCE =
public static final DataStreamTimestampFieldMapper ENABLED_INSTANCE =
new DataStreamTimestampFieldMapper(TimestampFieldType.INSTANCE, true);
private static final DataStreamTimestampFieldMapper DISABLED_INSTANCE =
new DataStreamTimestampFieldMapper(TimestampFieldType.INSTANCE, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.XContentType;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -94,8 +94,11 @@ Mapping parse(@Nullable String type, CompressedXContent source) throws MapperPar

private Mapping parse(String type, Map<String, Object> mapping) throws MapperParsingException {
MappingParserContext parserContext = parserContextSupplier.get();
RootObjectMapper rootObjectMapper
= rootObjectTypeParser.parse(type, mapping, parserContext).build(MapperBuilderContext.ROOT);

// parserContext.getIndexSettings().getMode().completeMappings(parserContext, mapping);

RootObjectMapper.Builder rootObjectMapperBuilder
= rootObjectTypeParser.parse(type, mapping, parserContext);

Map<Class<? extends MetadataFieldMapper>, MetadataFieldMapper> metadataMappers = metadataMappersSupplier.get();
Map<String, Object> meta = null;
Expand Down Expand Up @@ -143,8 +146,10 @@ private Mapping parse(String type, Map<String, Object> mapping) throws MapperPar
}
checkNoRemainingFields(mapping, "Root mapping definition has unsupported parameters: ");

parserContext.getIndexSettings().getMode().completeMappings(parserContext, metadataMappers, rootObjectMapperBuilder);

return new Mapping(
rootObjectMapper,
rootObjectMapperBuilder.build(MapperBuilderContext.ROOT),
metadataMappers.values().toArray(new MetadataFieldMapper[0]),
meta);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ public RootObjectMapper.Builder parse(String name, Map<String, Object> node, Map
iterator.remove();
}
}
parserContext.getIndexSettings().getMode().completeMappings(parserContext, builder);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.admin.indices.rollover.MaxPrimaryShardSizeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxSizeCondition;
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -183,6 +184,7 @@ private static Map<String, MetadataFieldMapper.TypeParser> initBuiltInMetadataMa
builtInMetadataMappers.put(VersionFieldMapper.NAME, VersionFieldMapper.PARSER);
builtInMetadataMappers.put(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PARSER);
builtInMetadataMappers.put(DocCountFieldMapper.NAME, DocCountFieldMapper.PARSER);
builtInMetadataMappers.put(DataStreamTimestampFieldMapper.NAME, DataStreamTimestampFieldMapper.PARSER);
//_field_names must be added last so that it has a chance to see all the other mappers
builtInMetadataMappers.put(FieldNamesFieldMapper.NAME, FieldNamesFieldMapper.PARSER);
return Collections.unmodifiableMap(builtInMetadataMappers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.DateFieldMapper.DateFieldType;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperServiceTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -62,9 +63,13 @@ public void testSortOrder() {
public void testAddsTimestamp() throws IOException {
Settings s = Settings.builder().put(IndexSettings.MODE.getKey(), "time_series").build();
DocumentMapper mapper = createMapperService(s, mapping(b -> {})).documentMapper();
MappedFieldType timestamp = mapper.mappers().getFieldType("@timestamp");
MappedFieldType timestamp = mapper.mappers().getFieldType(DataStreamTimestampFieldMapper.DEFAULT_PATH);
assertThat(timestamp, instanceOf(DateFieldType.class));
assertThat(((DateFieldType) timestamp).resolution(), equalTo(DateFieldMapper.Resolution.MILLISECONDS));

Mapper timestampField = mapper.mappers().getMapper(DataStreamTimestampFieldMapper.NAME);
assertThat(timestampField, instanceOf(DataStreamTimestampFieldMapper.class));
assertTrue(((DataStreamTimestampFieldMapper)timestampField).isEnabled());
}

public void testTimestampMillis() throws IOException {
Expand All @@ -74,6 +79,10 @@ public void testTimestampMillis() throws IOException {
MappedFieldType timestamp = mapper.mappers().getFieldType("@timestamp");
assertThat(timestamp, instanceOf(DateFieldType.class));
assertThat(((DateFieldType) timestamp).resolution(), equalTo(DateFieldMapper.Resolution.MILLISECONDS));

Mapper timestampField = mapper.mappers().getMapper(DataStreamTimestampFieldMapper.NAME);
assertThat(timestampField, instanceOf(DataStreamTimestampFieldMapper.class));
assertTrue(((DataStreamTimestampFieldMapper)timestampField).isEnabled());
}

public void testTimestampNanos() throws IOException {
Expand All @@ -83,17 +92,25 @@ public void testTimestampNanos() throws IOException {
MappedFieldType timestamp = mapper.mappers().getFieldType("@timestamp");
assertThat(timestamp, instanceOf(DateFieldType.class));
assertThat(((DateFieldType) timestamp).resolution(), equalTo(DateFieldMapper.Resolution.NANOSECONDS));

Mapper timestampField = mapper.mappers().getMapper(DataStreamTimestampFieldMapper.NAME);
assertThat(timestampField, instanceOf(DataStreamTimestampFieldMapper.class));
assertTrue(((DataStreamTimestampFieldMapper)timestampField).isEnabled());
}

public void testBadTimestamp() throws IOException {
Settings s = Settings.builder().put(IndexSettings.MODE.getKey(), "time_series").build();
String type = randomFrom("keyword", "integer", "long", "double", "text");
Exception e = expectThrows(
MapperParsingException.class,
IllegalArgumentException.class,
() -> createMapperService(
s,
mapping(b -> b.startObject("@timestamp").field("type", randomFrom("keyword", "int", "long", "double", "text")).endObject())
mapping(b -> b.startObject("@timestamp").field("type", type).endObject())
)
);
assertThat(e.getMessage(), equalTo("Failed to parse mapping: @timestamp must be [date] or [date_nanos]"));
assertThat(
e.getMessage(),
equalTo("data stream timestamp field [@timestamp] is of type [" + type + "], but [date,date_nanos] is expected")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand Down Expand Up @@ -45,15 +42,9 @@
import org.elasticsearch.xpack.datastreams.rest.RestPromoteDataStreamAction;

import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

public class DataStreamsPlugin extends Plugin implements ActionPlugin, MapperPlugin {

@Override
public Map<String, MetadataFieldMapper.TypeParser> getMetadataMappers() {
return Map.of(DataStreamTimestampFieldMapper.NAME, DataStreamTimestampFieldMapper.PARSER);
}
public class DataStreamsPlugin extends Plugin implements ActionPlugin {

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
Expand Down

0 comments on commit 4d38bdb

Please sign in to comment.