-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Script: Ingest Metadata and CtxMap #88458
Merged
stu-elastic
merged 32 commits into
elastic:master
from
stu-elastic:ingest_ctx_map_field_prop
Jul 13, 2022
Merged
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
bf2085f
WIP
stu-elastic e0ff58a
Split Metadata off off IngestSourceAndMetadata
stu-elastic 4ea1791
Update metadata javadoc
stu-elastic 7244010
Add hashcode and equals to make the WritableIngestDocumentTests pass
stu-elastic 76022a6
Merge branch 'master' of github.com:elastic/elasticsearch into ingest…
stu-elastic 75f85bd
keys() -> getKeys(), don't cache entrySet, check metadata in source, …
stu-elastic aebad9a
Return raw map
stu-elastic 0c2e023
Better javadoc for metadata
stu-elastic 267ebde
Merge branch 'master' of github.com:stu-elastic/elasticsearch into in…
stu-elastic 20d8019
Script: Move Map impl from IngestSourceAndMetadata to CtxMap
stu-elastic 86b35ed
Script: Configuration driven validation for CtxMap
stu-elastic 29dd6ec
Fix hashCode in CtxMap
stu-elastic 472b824
Fix hashCode in CtxMap
stu-elastic bdf6bfd
Check metadata map in testSimulate
stu-elastic 1a0ef93
Merge branch 'ingest_ctx_map' of github.com:stu-elastic/elasticsearch…
stu-elastic 1d74d86
Merge branch 'ingest_ctx_map' of github.com:stu-elastic/elasticsearch…
stu-elastic 1600725
keySet(), Sets.intersection, isAvailable, protected validator & getters
stu-elastic ae733b7
Merge branch 'ingest_ctx_map' of github.com:stu-elastic/elasticsearch…
stu-elastic 48c62de
Merge branch 'ingest_sm_to_ctx_map' of github.com:stu-elastic/elastic…
stu-elastic f9ac7ae
Fix unit tests
stu-elastic 15328b1
FieldProperty javadoc
stu-elastic 824cdc8
Implement getTimestamp() in IngestMetadata
stu-elastic c39bb9d
Use extended validators, add hashcode and equals for Metadata
stu-elastic e81f5fa
_type can be updated
stu-elastic 7276bb0
Fix testExecutePropagateAllMetadataUpdates rather than allow _type to…
stu-elastic 7be1f41
Merge branch 'master' of github.com:stu-elastic/elasticsearch into in…
stu-elastic e27d73a
Merge branch 'master' of github.com:stu-elastic/elasticsearch into in…
stu-elastic 9c7b389
Remove getIngestSourceAndMetadata
stu-elastic 5a45f97
Don't clone properties, add MetadataTests
stu-elastic dc1fc58
IngestMetadata -> IngestDocMetadata, remove splitSourceAndMetadta, fi…
stu-elastic a08afb8
Don't copy source
stu-elastic 8eb42c6
Revert "Don't copy source"
stu-elastic File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
150 changes: 150 additions & 0 deletions
150
server/src/main/java/org/elasticsearch/ingest/IngestCtxMap.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.ingest; | ||
|
||
import org.elasticsearch.common.util.Maps; | ||
import org.elasticsearch.index.VersionType; | ||
import org.elasticsearch.script.CtxMap; | ||
import org.elasticsearch.script.Metadata; | ||
|
||
import java.time.ZonedDateTime; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Map containing ingest source and metadata. | ||
* | ||
* The Metadata values in {@link IngestDocument.Metadata} are validated when put in the map. | ||
* _index, _id and _routing must be a String or null | ||
* _version_type must be a lower case VersionType or null | ||
* _version must be representable as a long without loss of precision or null | ||
* _dyanmic_templates must be a map | ||
* _if_seq_no must be a long or null | ||
* _if_primary_term must be a long or null | ||
* | ||
* The map is expected to be used by processors, server code should the typed getter and setters where possible. | ||
*/ | ||
class IngestCtxMap extends CtxMap { | ||
|
||
/** | ||
* Create an IngestCtxMap with the given metadata, source and default validators | ||
*/ | ||
IngestCtxMap( | ||
String index, | ||
String id, | ||
long version, | ||
String routing, | ||
VersionType versionType, | ||
ZonedDateTime timestamp, | ||
Map<String, Object> source | ||
) { | ||
super(new HashMap<>(source), new IngestMetadata(index, id, version, routing, versionType, timestamp)); | ||
} | ||
|
||
/** | ||
* Create IngestCtxMap from a source and metadata | ||
* | ||
* @param source the source document map | ||
* @param metadata the metadata map | ||
*/ | ||
IngestCtxMap(Map<String, Object> source, Metadata metadata) { | ||
super(source, metadata); | ||
} | ||
|
||
/** | ||
* Fetch the timestamp from the ingestMetadata, if it exists | ||
* @return the timestamp for the document or null | ||
*/ | ||
public static ZonedDateTime getTimestamp(Map<String, Object> ingestMetadata) { | ||
if (ingestMetadata == null) { | ||
return null; | ||
} | ||
Object ts = ingestMetadata.get(IngestDocument.TIMESTAMP); | ||
if (ts instanceof ZonedDateTime timestamp) { | ||
return timestamp; | ||
} else if (ts instanceof String str) { | ||
return ZonedDateTime.parse(str); | ||
} | ||
return null; | ||
} | ||
|
||
static class IngestMetadata extends Metadata { | ||
private static final FieldProperty<String> UPDATABLE_STRING = new FieldProperty<>(String.class, true, true, null); | ||
static final Map<String, FieldProperty<?>> PROPERTIES = Map.of( | ||
INDEX, | ||
UPDATABLE_STRING, | ||
ID, | ||
UPDATABLE_STRING, | ||
ROUTING, | ||
UPDATABLE_STRING, | ||
VERSION_TYPE, | ||
new FieldProperty<>(String.class, true, true, (k, v) -> { | ||
try { | ||
VersionType.fromString(v); | ||
return; | ||
} catch (IllegalArgumentException ignored) {} | ||
throw new IllegalArgumentException( | ||
k | ||
+ " must be a null or one of [" | ||
+ Arrays.stream(VersionType.values()).map(vt -> VersionType.toString(vt)).collect(Collectors.joining(", ")) | ||
+ "] but was [" | ||
+ v | ||
+ "] with type [" | ||
+ v.getClass().getName() | ||
+ "]" | ||
); | ||
}), | ||
VERSION, | ||
new FieldProperty<>(Number.class, false, true, FieldProperty.LONGABLE_NUMBER), | ||
TYPE, | ||
new FieldProperty<>(String.class, true, false, null), | ||
IF_SEQ_NO, | ||
new FieldProperty<>(Number.class, true, true, FieldProperty.LONGABLE_NUMBER), | ||
IF_PRIMARY_TERM, | ||
new FieldProperty<>(Number.class, true, true, FieldProperty.LONGABLE_NUMBER), | ||
DYNAMIC_TEMPLATES, | ||
new FieldProperty<>(Map.class, true, true, null) | ||
); | ||
|
||
protected final ZonedDateTime timestamp; | ||
|
||
IngestMetadata(String index, String id, long version, String routing, VersionType versionType, ZonedDateTime timestamp) { | ||
this(metadataMap(index, id, version, routing, versionType), timestamp); | ||
} | ||
|
||
IngestMetadata(Map<String, Object> metadata, ZonedDateTime timestamp) { | ||
super(metadata, PROPERTIES); | ||
this.timestamp = timestamp; | ||
} | ||
|
||
/** | ||
* Create the backing metadata map with the standard contents assuming default validators. | ||
*/ | ||
protected static Map<String, Object> metadataMap(String index, String id, long version, String routing, VersionType versionType) { | ||
Map<String, Object> metadata = Maps.newHashMapWithExpectedSize(IngestDocument.Metadata.values().length); | ||
metadata.put(IngestDocument.Metadata.INDEX.getFieldName(), index); | ||
metadata.put(IngestDocument.Metadata.ID.getFieldName(), id); | ||
metadata.put(IngestDocument.Metadata.VERSION.getFieldName(), version); | ||
if (routing != null) { | ||
metadata.put(IngestDocument.Metadata.ROUTING.getFieldName(), routing); | ||
} | ||
if (versionType != null) { | ||
metadata.put(IngestDocument.Metadata.VERSION_TYPE.getFieldName(), VersionType.toString(versionType)); | ||
} | ||
return metadata; | ||
} | ||
|
||
@Override | ||
public ZonedDateTime getTimestamp() { | ||
return timestamp; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,7 +49,7 @@ public final class IngestDocument { | |
|
||
static final String TIMESTAMP = "timestamp"; | ||
|
||
private final IngestSourceAndMetadata sourceAndMetadata; | ||
private final IngestCtxMap sourceAndMetadata; | ||
private final Map<String, Object> ingestMetadata; | ||
|
||
// Contains all pipelines that have been executed for this document | ||
|
@@ -58,15 +58,7 @@ public final class IngestDocument { | |
private boolean doNoSelfReferencesCheck = false; | ||
|
||
public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map<String, Object> source) { | ||
this.sourceAndMetadata = new IngestSourceAndMetadata( | ||
index, | ||
id, | ||
version, | ||
routing, | ||
versionType, | ||
ZonedDateTime.now(ZoneOffset.UTC), | ||
source | ||
); | ||
this.sourceAndMetadata = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); | ||
this.ingestMetadata = new HashMap<>(); | ||
this.ingestMetadata.put(TIMESTAMP, sourceAndMetadata.getMetadata().getTimestamp()); | ||
} | ||
|
@@ -76,7 +68,7 @@ public IngestDocument(String index, String id, long version, String routing, Ver | |
*/ | ||
public IngestDocument(IngestDocument other) { | ||
this( | ||
new IngestSourceAndMetadata(deepCopyMap(other.sourceAndMetadata.getSource()), other.sourceAndMetadata.getMetadata().clone()), | ||
new IngestCtxMap(deepCopyMap(other.sourceAndMetadata.getSource()), other.sourceAndMetadata.getMetadata().clone()), | ||
deepCopyMap(other.ingestMetadata) | ||
); | ||
} | ||
|
@@ -85,10 +77,13 @@ public IngestDocument(IngestDocument other) { | |
* Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied. | ||
*/ | ||
public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object> ingestMetadata) { | ||
Tuple<Map<String, Object>, Map<String, Object>> sm = IngestSourceAndMetadata.splitSourceAndMetadata(sourceAndMetadata); | ||
this.sourceAndMetadata = new IngestSourceAndMetadata( | ||
Tuple<Map<String, Object>, Map<String, Object>> sm = IngestCtxMap.splitSourceAndMetadata( | ||
sourceAndMetadata, | ||
Arrays.stream(IngestDocument.Metadata.values()).map(IngestDocument.Metadata::getFieldName).collect(Collectors.toSet()) | ||
); | ||
this.sourceAndMetadata = new IngestCtxMap( | ||
sm.v1(), | ||
new org.elasticsearch.script.Metadata(sm.v2(), IngestSourceAndMetadata.getTimestamp(ingestMetadata)) | ||
new IngestCtxMap.IngestMetadata(sm.v2(), IngestCtxMap.getTimestamp(ingestMetadata)) | ||
); | ||
this.ingestMetadata = new HashMap<>(ingestMetadata); | ||
this.ingestMetadata.computeIfPresent(TIMESTAMP, (k, v) -> { | ||
|
@@ -102,7 +97,7 @@ public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object> | |
/** | ||
* Constructor to create an IngestDocument from its constituent maps | ||
*/ | ||
IngestDocument(IngestSourceAndMetadata sourceAndMetadata, Map<String, Object> ingestMetadata) { | ||
IngestDocument(IngestCtxMap sourceAndMetadata, Map<String, Object> ingestMetadata) { | ||
this.sourceAndMetadata = sourceAndMetadata; | ||
this.ingestMetadata = ingestMetadata; | ||
} | ||
|
@@ -723,13 +718,6 @@ public Map<String, Object> getSourceAndMetadata() { | |
return sourceAndMetadata; | ||
} | ||
|
||
/** | ||
* Get source and metadata map as {@link IngestSourceAndMetadata} | ||
*/ | ||
public IngestSourceAndMetadata getIngestSourceAndMetadata() { | ||
return sourceAndMetadata; | ||
} | ||
|
||
/** | ||
* Get the strongly typed metadata | ||
*/ | ||
|
@@ -763,7 +751,7 @@ public static Object deepCopy(Object value) { | |
for (Map.Entry<?, ?> entry : mapValue.entrySet()) { | ||
copy.put(entry.getKey(), deepCopy(entry.getValue())); | ||
} | ||
// TODO(stu): should this check for IngestSourceAndMetadata in addition to Map? | ||
// TODO(stu): should this check for IngestCtxMap in addition to Map? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you mean to leave this as part of the PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It shows up because of the rename, it's still relevant question for now. |
||
return copy; | ||
} else if (value instanceof List<?> listValue) { | ||
List<Object> copy = new ArrayList<>(listValue.size()); | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is static, let's move it out to a tope level class. It can still be package private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already an IngestMetadata that "Holds the ingest pipelines that are available in the cluster".
I'm going to rename this to
IngestDocMetadata
when I make it top-level.