Skip to content

Commit

Permalink
Script: Ingest Metadata and CtxMap (#88458)
Browse files Browse the repository at this point in the history
Create a `Metadata` superclass for ingest and update contexts.

Create a `CtxMap` superclass for `ctx` backwards compatibility in ingest and update contexts.  `script.CtxMap` was moved from `ingest.IngestSourceAndMetadata`

`CtxMap` takes a `Metadata` subclass and validates update via the `FieldProperty`s passed in.

`Metadata` provides typed getters and setters and implements a `Map`-like interface, making it easy for a class containing `CtxMap` to implement the full `Map` interface.

The `FieldProperty` record that configures how to validate fields. Fields have a `type`, are `writeable` or read-only, and `nullable` or not and may have an additional validation useful for Set/Enum validation.
  • Loading branch information
stu-elastic authored Jul 13, 2022
1 parent d7d9ff2 commit 85b8d3d
Show file tree
Hide file tree
Showing 13 changed files with 593 additions and 369 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestIngestDocument;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.script.Metadata;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
Expand Down Expand Up @@ -140,11 +141,14 @@ public void testRenameAtomicOperationSetFails() throws Exception {
Map<String, Object> metadata = new HashMap<>();
metadata.put("list", Collections.singletonList("item"));

IngestDocument ingestDocument = TestIngestDocument.ofMetadataWithValidator(metadata, Map.of("new_field", (o, k, v) -> {
if (v != null) {
throw new UnsupportedOperationException();
}
}, "list", (o, k, v) -> {}));
IngestDocument ingestDocument = TestIngestDocument.ofMetadataWithValidator(
metadata,
Map.of("new_field", new Metadata.FieldProperty<>(Object.class, true, true, (k, v) -> {
if (v != null) {
throw new UnsupportedOperationException();
}
}), "list", new Metadata.FieldProperty<>(Object.class, true, true, null))
);
Processor processor = createRenameProcessor("list", "new_field", false);
try {
processor.execute(ingestDocument);
Expand All @@ -160,16 +164,15 @@ public void testRenameAtomicOperationRemoveFails() throws Exception {
Map<String, Object> metadata = new HashMap<>();
metadata.put("list", Collections.singletonList("item"));

IngestDocument ingestDocument = TestIngestDocument.ofMetadataWithValidator(metadata, Map.of("list", (o, k, v) -> {
if (v == null) {
throw new UnsupportedOperationException();
}
}));
IngestDocument ingestDocument = TestIngestDocument.ofMetadataWithValidator(
metadata,
Map.of("list", new Metadata.FieldProperty<>(Object.class, false, true, null))
);
Processor processor = createRenameProcessor("list", "new_field", false);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
} catch (UnsupportedOperationException e) {
} catch (IllegalArgumentException e) {
// the set failed, the old field has not been removed
assertThat(ingestDocument.getSourceAndMetadata().containsKey("list"), equalTo(true));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("new_field"), equalTo(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void testInlineIsCompiled() throws Exception {
assertThat(processor.getScript().getParams(), equalTo(Collections.emptyMap()));
assertNotNull(processor.getPrecompiledIngestScriptFactory());
IngestDocument doc = TestIngestDocument.emptyIngestDocument();
Map<String, Object> ctx = TestIngestDocument.emptyIngestDocument().getIngestSourceAndMetadata();
Map<String, Object> ctx = TestIngestDocument.emptyIngestDocument().getSourceAndMetadata();
processor.getPrecompiledIngestScriptFactory().newInstance(null, doc.getMetadata(), ctx).execute();
assertThat(ctx.get("foo"), equalTo("bar"));
}
Expand Down
76 changes: 76 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestCtxMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest;

import org.elasticsearch.index.VersionType;
import org.elasticsearch.script.CtxMap;
import org.elasticsearch.script.Metadata;

import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.Map;

/**
* Map containing ingest source and metadata.
*
* The Metadata values in {@link IngestDocument.Metadata} are validated when put in the map.
* _index, _id and _routing must be a String or null
* _version_type must be a lower case VersionType or null
* _version must be representable as a long without loss of precision or null
* _dyanmic_templates must be a map
* _if_seq_no must be a long or null
* _if_primary_term must be a long or null
*
* The map is expected to be used by processors, server code should the typed getter and setters where possible.
*/
class IngestCtxMap extends CtxMap {

/**
* Create an IngestCtxMap with the given metadata, source and default validators
*/
IngestCtxMap(
String index,
String id,
long version,
String routing,
VersionType versionType,
ZonedDateTime timestamp,
Map<String, Object> source
) {
super(new HashMap<>(source), new IngestDocMetadata(index, id, version, routing, versionType, timestamp));
}

/**
* Create IngestCtxMap from a source and metadata
*
* @param source the source document map
* @param metadata the metadata map
*/
IngestCtxMap(Map<String, Object> source, Metadata metadata) {
super(source, metadata);
}

/**
* Fetch the timestamp from the ingestMetadata, if it exists
* @return the timestamp for the document or null
*/
public static ZonedDateTime getTimestamp(Map<String, Object> ingestMetadata) {
if (ingestMetadata == null) {
return null;
}
Object ts = ingestMetadata.get(IngestDocument.TIMESTAMP);
if (ts instanceof ZonedDateTime timestamp) {
return timestamp;
} else if (ts instanceof String str) {
return ZonedDateTime.parse(str);
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.script.Metadata;

import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

class IngestDocMetadata extends Metadata {
private static final FieldProperty<String> UPDATABLE_STRING = new FieldProperty<>(String.class, true, true, null);
static final Map<String, FieldProperty<?>> PROPERTIES = Map.of(
INDEX,
UPDATABLE_STRING,
ID,
UPDATABLE_STRING,
ROUTING,
UPDATABLE_STRING,
VERSION_TYPE,
new FieldProperty<>(String.class, true, true, (k, v) -> {
try {
VersionType.fromString(v);
return;
} catch (IllegalArgumentException ignored) {}
throw new IllegalArgumentException(
k
+ " must be a null or one of ["
+ Arrays.stream(VersionType.values()).map(vt -> VersionType.toString(vt)).collect(Collectors.joining(", "))
+ "] but was ["
+ v
+ "] with type ["
+ v.getClass().getName()
+ "]"
);
}),
VERSION,
new FieldProperty<>(Number.class, false, true, FieldProperty.LONGABLE_NUMBER),
TYPE,
new FieldProperty<>(String.class, true, false, null),
IF_SEQ_NO,
new FieldProperty<>(Number.class, true, true, FieldProperty.LONGABLE_NUMBER),
IF_PRIMARY_TERM,
new FieldProperty<>(Number.class, true, true, FieldProperty.LONGABLE_NUMBER),
DYNAMIC_TEMPLATES,
new FieldProperty<>(Map.class, true, true, null)
);

protected final ZonedDateTime timestamp;

IngestDocMetadata(String index, String id, long version, String routing, VersionType versionType, ZonedDateTime timestamp) {
this(metadataMap(index, id, version, routing, versionType), timestamp);
}

IngestDocMetadata(Map<String, Object> metadata, ZonedDateTime timestamp) {
super(metadata, PROPERTIES);
this.timestamp = timestamp;
}

/**
* Create the backing metadata map with the standard contents assuming default validators.
*/
protected static Map<String, Object> metadataMap(String index, String id, long version, String routing, VersionType versionType) {
Map<String, Object> metadata = Maps.newHashMapWithExpectedSize(IngestDocument.Metadata.values().length);
metadata.put(IngestDocument.Metadata.INDEX.getFieldName(), index);
metadata.put(IngestDocument.Metadata.ID.getFieldName(), id);
metadata.put(IngestDocument.Metadata.VERSION.getFieldName(), version);
if (routing != null) {
metadata.put(IngestDocument.Metadata.ROUTING.getFieldName(), routing);
}
if (versionType != null) {
metadata.put(IngestDocument.Metadata.VERSION_TYPE.getFieldName(), VersionType.toString(versionType));
}
return metadata;
}

@Override
public ZonedDateTime getTimestamp() {
return timestamp;
}
}
52 changes: 20 additions & 32 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.common.util.LazyMap;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.IndexFieldMapper;
Expand Down Expand Up @@ -49,7 +48,7 @@ public final class IngestDocument {

static final String TIMESTAMP = "timestamp";

private final IngestSourceAndMetadata sourceAndMetadata;
private final IngestCtxMap sourceAndMetadata;
private final Map<String, Object> ingestMetadata;

// Contains all pipelines that have been executed for this document
Expand All @@ -58,15 +57,7 @@ public final class IngestDocument {
private boolean doNoSelfReferencesCheck = false;

public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map<String, Object> source) {
this.sourceAndMetadata = new IngestSourceAndMetadata(
index,
id,
version,
routing,
versionType,
ZonedDateTime.now(ZoneOffset.UTC),
source
);
this.sourceAndMetadata = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source);
this.ingestMetadata = new HashMap<>();
this.ingestMetadata.put(TIMESTAMP, sourceAndMetadata.getMetadata().getTimestamp());
}
Expand All @@ -76,7 +67,7 @@ public IngestDocument(String index, String id, long version, String routing, Ver
*/
public IngestDocument(IngestDocument other) {
this(
new IngestSourceAndMetadata(deepCopyMap(other.sourceAndMetadata.getSource()), other.sourceAndMetadata.getMetadata().clone()),
new IngestCtxMap(deepCopyMap(other.sourceAndMetadata.getSource()), other.sourceAndMetadata.getMetadata().clone()),
deepCopyMap(other.ingestMetadata)
);
}
Expand All @@ -85,24 +76,28 @@ public IngestDocument(IngestDocument other) {
* Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied.
*/
public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object> ingestMetadata) {
Tuple<Map<String, Object>, Map<String, Object>> sm = IngestSourceAndMetadata.splitSourceAndMetadata(sourceAndMetadata);
this.sourceAndMetadata = new IngestSourceAndMetadata(
sm.v1(),
new org.elasticsearch.script.Metadata(sm.v2(), IngestSourceAndMetadata.getTimestamp(ingestMetadata))
);
this.ingestMetadata = new HashMap<>(ingestMetadata);
this.ingestMetadata.computeIfPresent(TIMESTAMP, (k, v) -> {
if (v instanceof String) {
return this.sourceAndMetadata.getMetadata().getTimestamp();
Map<String, Object> source;
Map<String, Object> metadata;
if (sourceAndMetadata instanceof IngestCtxMap ingestCtxMap) {
source = new HashMap<>(ingestCtxMap.getSource());
metadata = new HashMap<>(ingestCtxMap.getMetadata().getMap());
} else {
metadata = Maps.newHashMapWithExpectedSize(Metadata.METADATA_NAMES.size());
source = new HashMap<>(sourceAndMetadata);
for (String key : Metadata.METADATA_NAMES) {
if (sourceAndMetadata.containsKey(key)) {
metadata.put(key, source.remove(key));
}
}
return v;
});
}
this.ingestMetadata = new HashMap<>(ingestMetadata);
this.sourceAndMetadata = new IngestCtxMap(source, new IngestDocMetadata(metadata, IngestCtxMap.getTimestamp(ingestMetadata)));
}

/**
* Constructor to create an IngestDocument from its constituent maps
*/
IngestDocument(IngestSourceAndMetadata sourceAndMetadata, Map<String, Object> ingestMetadata) {
IngestDocument(IngestCtxMap sourceAndMetadata, Map<String, Object> ingestMetadata) {
this.sourceAndMetadata = sourceAndMetadata;
this.ingestMetadata = ingestMetadata;
}
Expand Down Expand Up @@ -723,13 +718,6 @@ public Map<String, Object> getSourceAndMetadata() {
return sourceAndMetadata;
}

/**
* Get source and metadata map as {@link IngestSourceAndMetadata}
*/
public IngestSourceAndMetadata getIngestSourceAndMetadata() {
return sourceAndMetadata;
}

/**
* Get the strongly typed metadata
*/
Expand Down Expand Up @@ -763,7 +751,7 @@ public static Object deepCopy(Object value) {
for (Map.Entry<?, ?> entry : mapValue.entrySet()) {
copy.put(entry.getKey(), deepCopy(entry.getValue()));
}
// TODO(stu): should this check for IngestSourceAndMetadata in addition to Map?
// TODO(stu): should this check for IngestCtxMap in addition to Map?
return copy;
} else if (value instanceof List<?> listValue) {
List<Object> copy = new ArrayList<>(listValue.size());
Expand Down
Loading

0 comments on commit 85b8d3d

Please sign in to comment.