Skip to content

Commit

Permalink
Validate tsdb's routing_path (#79384)
Browse files Browse the repository at this point in the history
`routing_path`'s job is to put the same time series on the same node. It
does that by extracting a String directly from the xcontent, hashing it,
and feeding the hash into the shard selection algorithm. I'm happy with
it! But it won't work properly if `routing_path` matches non-dimension
fields or if it matches non-keyword dimensions. This prevents us from
mapping any fields that aren't keyword dimensions that "line up" with
`routing_path`.

Let's talk about why `routing_path` can't do it's job if it matches any
non-dimension fields. Well, imagine `routing_path` is `[dim, foo]` and
only `dim` is a dimension. It'll *still* hash `foo`'s values into the
routing key. So, say you get documents like:
```
{"dim": "a", "foo": "1"}
{"dim": "a", "foo": "1"}
{"dim": "a", "foo": "2"}
```

The third document could be routed to a different shard than the first
two! Which would be a disaster because it'd cut the time series
identified by `"dim":"a"` into pieces!

Now let's talk about when `routing_path` matches a non-keyword
dimension. Imagine `routing_path` is `[kwd, int]` and we send these
documents:
```
{"kwd": "a", "int": "1"}
{"kwd": "a", "int": "01"}
```

Both of these documents belong to the time series identified by
`"dim":"a","int":1` but the `routing_path` code just reads strings so
it'll route them into separate shards. Also bad! Also forbidden by this
change.
  • Loading branch information
nik9000 authored Oct 19, 2021
1 parent f60bda5 commit 7a32158
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,45 @@ routing required:
mappings:
_routing:
required: true

---
bad routing_path:
- skip:
version: " - 7.99.99"
reason: introduced in 8.0.0

- do:
catch: /All fields that match routing_path must be keyword time_series_dimensions but \[@timestamp\] was \[date\]/
indices.create:
index: test_index
body:
settings:
index:
mode: time_series
routing_path: [metricset, k8s.pod.uid, "@timestamp"]
number_of_replicas: 0
number_of_shards: 2
mappings:
properties:
"@timestamp":
type: date
metricset:
type: keyword
time_series_dimension: true
k8s:
properties:
pod:
properties:
uid:
type: keyword
time_series_dimension: true
name:
type: keyword
ip:
type: ip
network:
properties:
tx:
type: long
rx:
type: long
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
add time series mappings:
ecs style:
- skip:
version: " - 7.99.99"
reason: introduced in 8.0.0
Expand Down Expand Up @@ -49,3 +49,58 @@ add time series mappings:
latency:
type: double
time_series_metric: gauge

---
top level dim object:
- skip:
version: " - 7.99.99"
reason: introduced in 8.0.0

- do:
indices.create:
index: tsdb_index
body:
settings:
index:
mode: time_series
routing_path: [dim.*]
number_of_replicas: 0
number_of_shards: 2
mappings:
properties:
"@timestamp":
type: date
dim:
properties:
metricset:
type: keyword
time_series_dimension: true
uid:
type: keyword
time_series_dimension: true
k8s:
properties:
pod:
properties:
availability_zone:
type: short
time_series_dimension: true
name:
type: keyword
ip:
type: ip
time_series_dimension: true
network:
properties:
tx:
type: long
time_series_metric: counter
rx:
type: integer
time_series_metric: gauge
packets_dropped:
type: long
time_series_metric: gauge
latency:
type: double
time_series_metric: gauge
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void validateWithOtherSettings(Map<Setting<?>, Object> settings) {
}
}

@Override
public void validateMapping(MappingLookup lookup) {};

@Override
Expand Down Expand Up @@ -66,6 +67,7 @@ private String error(Setting<?> unsupported) {
return tsdbMode() + " is incompatible with [" + unsupported.getKey() + "]";
}

@Override
public void validateMapping(MappingLookup lookup) {
if (((RoutingFieldMapper) lookup.getMapper(RoutingFieldMapper.NAME)).required()) {
throw new IllegalArgumentException(routingRequiredBad());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.index.IndexSettings;

import java.util.List;

public class DocumentMapper {
private final String type;
private final CompressedXContent mappingSource;
Expand Down Expand Up @@ -87,6 +89,10 @@ public void validate(IndexSettings settings, boolean checkLimits) {
if (settings.getIndexSortConfig().hasIndexSort() && mappers().hasNested()) {
throw new IllegalArgumentException("cannot have nested fields when index sort is activated");
}
List<String> routingPaths = settings.getIndexMetadata().getRoutingPaths();
if (false == routingPaths.isEmpty()) {
mappingLookup.getMapping().getRoot().validateRoutingPath(routingPaths);
}
if (checkLimits) {
this.mappingLookup.checkLimits(settings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,4 +584,14 @@ public FieldMapper.Builder getMergeBuilder() {
return new Builder(simpleName(), indexAnalyzers, scriptCompiler).dimension(dimension).init(this);
}

@Override
protected void validateMatchedRoutingPath() {
if (false == fieldType().isDimension()) {
throw new IllegalArgumentException(
"All fields that match routing_path must be keyword time_series_dimensions but ["
+ name()
+ "] was not a time_series_dimension"
);
}
}
}
38 changes: 38 additions & 0 deletions server/src/main/java/org/elasticsearch/index/mapper/Mapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@

package org.elasticsearch.index.mapper;

import com.fasterxml.jackson.core.filter.TokenFilter;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.support.filtering.FilterPathBasedFilter;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public abstract class Mapper implements ToXContentFragment, Iterable<Mapper> {

Expand Down Expand Up @@ -66,4 +72,36 @@ public final String simpleName() {
*/
public abstract void validate(MappingLookup mappers);

/**
* Validate a {@link TokenFilter} made from {@link IndexMetadata#INDEX_ROUTING_PATH}.
*/
public final void validateRoutingPath(List<String> routingPaths) {
validateRoutingPath(new FilterPathBasedFilter(Set.copyOf(routingPaths), true));
}

/**
* Validate a {@link TokenFilter} made from {@link IndexMetadata#INDEX_ROUTING_PATH}.
*/
private void validateRoutingPath(TokenFilter filter) {
if (filter == TokenFilter.INCLUDE_ALL) {
validateMatchedRoutingPath();
}
for (Mapper m : this) {
TokenFilter next = filter.includeProperty(m.simpleName());
if (next == null) {
// null means "do not include"
continue;
}
m.validateRoutingPath(next);
}
}

/**
* Validate that this field can be the target of {@link IndexMetadata#INDEX_ROUTING_PATH}.
*/
protected void validateMatchedRoutingPath() {
throw new IllegalArgumentException(
"All fields that match routing_path must be keyword time_series_dimensions but [" + name() + "] was [" + typeName() + "]"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.MapperServiceTestCase;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;

public class TimeSeriesModeTests extends MapperServiceTestCase {
Expand Down Expand Up @@ -103,4 +105,76 @@ public void testValidateAliasWithSearchRouting() {
assertThat(e.getMessage(), equalTo("routing is forbidden on CRUD operations that target indices in [index.mode=time_series]"));
}

public void testRoutingPathMatchesObject() {
Settings s = Settings.builder()
.put(IndexSettings.MODE.getKey(), "time_series")
.put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), randomBoolean() ? "dim.o" : "dim.*")
.build();
Exception e = expectThrows(IllegalArgumentException.class, () -> createMapperService(s, mapping(b -> {
b.startObject("dim").startObject("properties");
{
b.startObject("o").startObject("properties");
b.startObject("inner_dim").field("type", "keyword").field("time_series_dimension", true).endObject();
b.endObject().endObject();
}
b.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject();
b.endObject().endObject();
})));
assertThat(
e.getMessage(),
equalTo("All fields that match routing_path must be keyword time_series_dimensions but [dim.o] was [object]")
);
}

public void testRoutingPathMatchesNonDimensionKeyword() {
Settings s = Settings.builder()
.put(IndexSettings.MODE.getKey(), "time_series")
.put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), randomBoolean() ? "dim.non_dim" : "dim.*")
.build();
Exception e = expectThrows(IllegalArgumentException.class, () -> createMapperService(s, mapping(b -> {
b.startObject("dim").startObject("properties");
b.startObject("non_dim").field("type", "keyword").endObject();
b.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject();
b.endObject().endObject();
})));
assertThat(
e.getMessage(),
equalTo(
"All fields that match routing_path must be keyword time_series_dimensions but "
+ "[dim.non_dim] was not a time_series_dimension"
)
);
}

public void testRoutingPathMatchesNonKeyword() {
Settings s = Settings.builder()
.put(IndexSettings.MODE.getKey(), "time_series")
.put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), randomBoolean() ? "dim.non_kwd" : "dim.*")
.build();
Exception e = expectThrows(IllegalArgumentException.class, () -> createMapperService(s, mapping(b -> {
b.startObject("dim").startObject("properties");
b.startObject("non_kwd").field("type", "integer").field("time_series_dimension", true).endObject();
b.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject();
b.endObject().endObject();
})));
assertThat(
e.getMessage(),
equalTo("All fields that match routing_path must be keyword time_series_dimensions but [dim.non_kwd] was [integer]")
);
}

public void testRoutingPathMatchesOnlyKeywordDimensions() throws IOException {
Settings s = Settings.builder()
.put(IndexSettings.MODE.getKey(), "time_series")
.put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), randomBoolean() ? "dim.metric_type,dim.server,dim.species,dim.uuid" : "dim.*")
.build();
createMapperService(s, mapping(b -> {
b.startObject("dim").startObject("properties");
b.startObject("metric_type").field("type", "keyword").field("time_series_dimension", true).endObject();
b.startObject("server").field("type", "keyword").field("time_series_dimension", true).endObject();
b.startObject("species").field("type", "keyword").field("time_series_dimension", true).endObject();
b.startObject("uuid").field("type", "keyword").field("time_series_dimension", true).endObject();
b.endObject().endObject();
})); // doesn't throw
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.IndexableFieldType;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.CharFilterFactory;
Expand All @@ -36,6 +38,7 @@
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -579,4 +582,21 @@ protected Object generateRandomInputValue(MappedFieldType ft) {
protected boolean dedupAfterFetch() {
return true;
}

@Override
protected String minimalIsInvalidRoutingPathErrorMessage(Mapper mapper) {
return "All fields that match routing_path must be keyword time_series_dimensions but [field] was not a time_series_dimension";
}

public void testDimensionInRoutingPath() throws IOException {
MapperService mapper = createMapperService(fieldMapping(b -> b.field("type", "keyword").field("time_series_dimension", true)));
IndexSettings settings = createIndexSettings(
Version.CURRENT,
Settings.builder()
.put(IndexSettings.MODE.getKey(), "time_series")
.put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "field")
.build()
);
mapper.documentMapper().validate(settings, false); // Doesn't throw
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.query.SearchExecutionContext;
Expand All @@ -35,6 +35,9 @@
import org.elasticsearch.search.lookup.LeafStoredFieldsLookup;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -750,4 +753,25 @@ public final void testNullInput() throws Exception {
protected boolean allowsNullValues() {
return true;
}

public final void testMinimalIsInvalidInRoutingPath() throws IOException {
MapperService mapper = createMapperService(fieldMapping(this::minimalMapping));
try {
IndexSettings settings = createIndexSettings(
Version.CURRENT,
Settings.builder()
.put(IndexSettings.MODE.getKey(), "time_series")
.put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "field")
.build()
);
Exception e = expectThrows(IllegalArgumentException.class, () -> mapper.documentMapper().validate(settings, false));
assertThat(e.getMessage(), equalTo(minimalIsInvalidRoutingPathErrorMessage(mapper.mappingLookup().getMapper("field"))));
} finally {
assertParseMinimalWarnings();
}
}

protected String minimalIsInvalidRoutingPathErrorMessage(Mapper mapper) {
return "All fields that match routing_path must be keyword time_series_dimensions but [field] was [" + mapper.typeName() + "]";
}
}

0 comments on commit 7a32158

Please sign in to comment.