Skip to content

Commit

Permalink
Add support for multi-value dimensions
Browse files Browse the repository at this point in the history
Closes elastic#110387

Having this in now affords us not having to introduce version checks in the ES exporter later.
  • Loading branch information
felixbarny committed Sep 9, 2024
1 parent 602e60b commit d9f5151
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 38 deletions.
1 change: 0 additions & 1 deletion docs/reference/mapping/types/keyword.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ index setting limits the number of dimensions in an index.
Dimension fields have the following constraints:

* The `doc_values` and `index` mapping parameters must be `true`.
* Field values cannot be an <<array,array or multi-value>>.
// end::dimension[]
* Dimension values are used to identify a document’s time series. If dimension values are altered in any way during indexing, the document will be stored as belonging to different from intended time series. As a result there are additional constraints:
** The field cannot use a <<normalizer,`normalizer`>>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1230,3 +1230,62 @@ non string dimension fields:
- match: { .$idx0name.mappings.properties.attributes.properties.double.time_series_dimension: true }
- match: { .$idx0name.mappings.properties.attributes.properties.host\.ip.type: 'ip' }
- match: { .$idx0name.mappings.properties.attributes.properties.host\.ip.time_series_dimension: true }

---
multi value dimensions:
- requires:
cluster_features: ["mapper.pass_through_priority"]
reason: support for priority in passthrough objects

- do:
allowed_warnings:
- "index template [my-dynamic-template] has index patterns [k9s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-dynamic-template] will take precedence during new index creation"
indices.put_index_template:
name: my-dynamic-template
body:
index_patterns: [k9s*]
data_stream: {}
template:
settings:
index:
number_of_shards: 1
mode: time_series
time_series:
start_time: 2023-08-31T13:03:08.138Z

mappings:
properties:
attributes:
type: passthrough
dynamic: true
time_series_dimension: true
priority: 1
dynamic_templates:
- counter_metric:
mapping:
type: integer
time_series_metric: counter

- do:
bulk:
index: k9s
refresh: true
body:
- '{ "create": { "dynamic_templates": { "data": "counter_metric" } } }'
- '{ "@timestamp": "2023-09-01T13:03:08.138Z","data": "10", "attributes": { "dim1": ["a" , "b"], "dim2": [1, 2] } }'
- '{ "create": { "dynamic_templates": { "data": "counter_metric" } } }'
- '{ "@timestamp": "2023-09-01T13:03:08.138Z","data": "20", "attributes": { "dim1": ["b" , "a"], "dim2": [1, 2] } }'
- '{ "create": { "dynamic_templates": { "data": "counter_metric" } } }'
- is_false: errors

- do:
search:
index: k9s
body:
size: 0
aggs:
tsids:
terms:
field: _tsid

- length: { aggregations.tsids.buckets: 2 } # only the order of the dim1 attribute is different, yet we expect to have two distinct time series
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -44,6 +43,7 @@
import java.util.function.Predicate;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.common.xcontent.XContentParserUtils.expectValueToken;

/**
* Generates the shard id for {@code (id, routing)} pairs.
Expand Down Expand Up @@ -300,12 +300,20 @@ public String createId(Map<String, Object> flat, byte[] suffix) {
Builder b = builder();
for (Map.Entry<String, Object> e : flat.entrySet()) {
if (isRoutingPath.test(e.getKey())) {
b.hashes.add(new NameAndHash(new BytesRef(e.getKey()), hash(new BytesRef(e.getValue().toString()))));
if (e.getValue() instanceof List<?> listValue) {
listValue.forEach(v -> hashValue(b, e.getKey(), v));
} else {
hashValue(b, e.getKey(), e.getValue());
}
}
}
return b.createId(suffix, IndexRouting.ExtractFromSource::defaultOnEmpty);
}

private static void hashValue(Builder b, String key, Object value) {
b.hashes.add(new NameAndHash(new BytesRef(key), hash(new BytesRef(value.toString()))));
}

private static int defaultOnEmpty() {
throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
}
Expand Down Expand Up @@ -356,6 +364,13 @@ private void extractObject(@Nullable String path, XContentParser source) throws
}
}

private void extractArray(@Nullable String path, XContentParser source) throws IOException {
while (source.currentToken() != Token.END_ARRAY) {
expectValueToken(source.currentToken(), source);
extractItem(path, source);
}
}

private void extractItem(String path, XContentParser source) throws IOException {
switch (source.currentToken()) {
case START_OBJECT:
Expand All @@ -369,6 +384,11 @@ private void extractItem(String path, XContentParser source) throws IOException
hashes.add(new NameAndHash(new BytesRef(path), hash(new BytesRef(source.text()))));
source.nextToken();
break;
case START_ARRAY:
source.nextToken();
extractArray(path, source);
source.nextToken();
break;
case VALUE_NULL:
source.nextToken();
break;
Expand All @@ -382,21 +402,13 @@ private void extractItem(String path, XContentParser source) throws IOException
}

private int buildHash(IntSupplier onEmpty) {
Collections.sort(hashes);
Iterator<NameAndHash> itr = hashes.iterator();
if (itr.hasNext() == false) {
if (hashes.isEmpty()) {
return onEmpty.getAsInt();
}
NameAndHash prev = itr.next();
int hash = hash(prev.name) ^ prev.hash;
while (itr.hasNext()) {
NameAndHash next = itr.next();
if (prev.name.equals(next.name)) {
throw new IllegalArgumentException("Duplicate routing dimension for [" + next.name + "]");
}
int thisHash = hash(next.name) ^ next.hash;
hash = 31 * hash + thisHash;
prev = next;
Collections.sort(hashes);
int hash = 0;
for (NameAndHash nah : hashes) {
hash = 31 * hash + (hash(nah.name) ^ nah.hash);
}
return hash;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ public static void ensureExpectedToken(Token expected, Token actual, XContentPar
}
}

/**
* Makes sure the provided token {@linkplain Token#isValue() is a value type}
*
* @throws ParsingException if the token is not a value type
*/
public static void expectValueToken(Token actual, XContentParser parser) {
if (actual.isValue() == false) {
throw new ParsingException(
parser.getTokenLocation(),
String.format(Locale.ROOT, "Failed to parse object: expecting value token but found [%s]", actual)
);
}
}

private static ParsingException parsingException(XContentParser parser, Token expected, Token actual) {
return new ParsingException(
parser.getTokenLocation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@
import java.io.IOException;
import java.net.InetAddress;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.SortedMap;
import java.util.TreeMap;

/**
* Mapper for {@code _tsid} field included generated when the index is
Expand Down Expand Up @@ -180,16 +181,14 @@ public static class TimeSeriesIdBuilder implements DocumentDimensions {

public static final int MAX_DIMENSIONS = 512;

private record Dimension(BytesRef name, BytesReference value) {}

private final Murmur3Hasher tsidHasher = new Murmur3Hasher(0);

/**
* A sorted set of the serialized values of dimension fields that will be used
* for generating the _tsid field. The map will be used by {@link TimeSeriesIdFieldMapper}
* to build the _tsid field for the document.
*/
private final SortedSet<Dimension> dimensions = new TreeSet<>(Comparator.comparing(o -> o.name));
private final SortedMap<BytesRef, List<BytesReference>> dimensions = new TreeMap<>();
/**
* Builds the routing. Used for building {@code _id}. If null then skipped.
*/
Expand All @@ -207,9 +206,16 @@ public BytesReference buildLegacyTsid() throws IOException {

try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeVInt(dimensions.size());
for (Dimension entry : dimensions) {
out.writeBytesRef(entry.name);
entry.value.writeTo(out);
for (Map.Entry<BytesRef, List<BytesReference>> entry : dimensions.entrySet()) {
out.writeBytesRef(entry.getKey());
List<BytesReference> value = entry.getValue();
if (value.size() > 1) {
throw new IllegalArgumentException(
"Dimension field [" + entry.getKey().utf8ToString() + "] cannot be a multi-valued field."
);
}
assert value.isEmpty() == false : "dimension value is empty";
value.get(0).writeTo(out);
}
return out.bytes();
}
Expand Down Expand Up @@ -241,18 +247,19 @@ public BytesReference buildTsidHash() {
int tsidHashIndex = StreamOutput.putVInt(tsidHash, len, 0);

tsidHasher.reset();
for (final Dimension dimension : dimensions) {
tsidHasher.update(dimension.name.bytes);
for (final BytesRef name : dimensions.keySet()) {
tsidHasher.update(name.bytes);
}
tsidHashIndex = writeHash128(tsidHasher.digestHash(), tsidHash, tsidHashIndex);

// NOTE: concatenate all dimension value hashes up to a certain number of dimensions
int tsidHashStartIndex = tsidHashIndex;
for (final Dimension dimension : dimensions) {
for (final List<BytesReference> values : dimensions.values()) {
if ((tsidHashIndex - tsidHashStartIndex) >= 4 * numberOfDimensions) {
break;
}
final BytesRef dimensionValueBytesRef = dimension.value.toBytesRef();
assert values.isEmpty() == false : "dimension values are empty";
final BytesRef dimensionValueBytesRef = values.get(0).toBytesRef();
ByteUtils.writeIntLE(
StringHelper.murmurhash3_x86_32(
dimensionValueBytesRef.bytes,
Expand All @@ -268,8 +275,8 @@ public BytesReference buildTsidHash() {

// NOTE: hash all dimension field allValues
tsidHasher.reset();
for (final Dimension dimension : dimensions) {
tsidHasher.update(dimension.value.toBytesRef().bytes);
for (final List<BytesReference> values : dimensions.values()) {
values.forEach(v -> tsidHasher.update(v.toBytesRef().bytes));
}
tsidHashIndex = writeHash128(tsidHasher.digestHash(), tsidHash, tsidHashIndex);

Expand Down Expand Up @@ -372,8 +379,15 @@ public DocumentDimensions validate(final IndexSettings settings) {
}

private void add(String fieldName, BytesReference encoded) throws IOException {
if (dimensions.add(new Dimension(new BytesRef(fieldName), encoded)) == false) {
throw new IllegalArgumentException("Dimension field [" + fieldName + "] cannot be a multi-valued field.");
BytesRef name = new BytesRef(fieldName);
List<BytesReference> values = dimensions.get(name);
if (values == null) {
// optimize for the common case where dimensions are not multi-valued
values = new ArrayList<>(1);
values.add(encoded);
dimensions.put(name, values);
} else {
values.add(encoded);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,31 @@ public void testRoutingPathBooleansInSource() throws IOException {
IndexRouting routing = indexRoutingForPath(shards, "foo");
assertIndexShard(routing, Map.of("foo", true), Math.floorMod(hash(List.of("foo", "true")), shards));
assertIndexShard(routing, Map.of("foo", false), Math.floorMod(hash(List.of("foo", "false")), shards));
}

public void testRoutingPathArraysInSource() throws IOException {
int shards = between(2, 1000);
IndexRouting routing = indexRoutingForPath(shards, "a,b,c,d");
assertIndexShard(
routing,
Map.of("a", List.of("foo", "bar", "foo"), "b", List.of(21, 42), "c", List.of(true), "d", List.of()),
Math.floorMod(hash(List.of("a", "foo", "a", "bar", "a", "foo", "b", "21", "b", "42", "c", "true")), shards)
);
}

public void testRoutingPathObjectArraysInSource() throws IOException {
int shards = between(2, 1000);
IndexRouting routing = indexRoutingForPath(shards, "a");

BytesReference source = source(Map.of("a", List.of("foo", Map.of("foo", "bar"))));
Exception e = expectThrows(
IllegalArgumentException.class,
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source, s -> {})
);
assertThat(
e.getMessage(),
equalTo("Error extracting routing: Failed to parse object: expecting value token but found [START_OBJECT]")
);
}

public void testRoutingPathBwc() throws IOException {
Expand Down Expand Up @@ -667,7 +691,11 @@ private void assertIndexShard(IndexRouting routing, Map<String, Object> source,

IndexRouting.ExtractFromSource.Builder b = r.builder();
for (Map.Entry<String, Object> e : flattened.entrySet()) {
b.addMatching(e.getKey(), new BytesRef(e.getValue().toString()));
if (e.getValue() instanceof List<?> listValue) {
listValue.forEach(v -> b.addMatching(e.getKey(), new BytesRef(v.toString())));
} else {
b.addMatching(e.getKey(), new BytesRef(e.getValue().toString()));
}
}
String idFromBuilder = b.createId(suffix, () -> { throw new AssertionError(); });
assertThat(idFromBuilder, equalTo(idFromSource));
Expand Down
Loading

0 comments on commit d9f5151

Please sign in to comment.