forked from elastic/elasticsearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add data frame feature (elastic#38934)
The data frame plugin allows users to create feature indexes by pivoting a source index. In a nutshell this can be understood as reindex supporting aggregations or similar to the so called entity centric indexing. Full history is provided in: feature/data-frame-transforms
- Loading branch information
Hendrik Muhs
authored
Feb 18, 2019
1 parent
a1fcc62
commit 869de3f
Showing
93 changed files
with
8,671 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
83 changes: 83 additions & 0 deletions
83
...n/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameFeatureSetUsage.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.core.dataframe; | ||
|
||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.xpack.core.XPackFeatureSet.Usage; | ||
import org.elasticsearch.xpack.core.XPackField; | ||
import org.elasticsearch.xpack.core.dataframe.transform.DataFrameIndexerTransformStats; | ||
|
||
import java.io.IOException; | ||
import java.util.Map; | ||
import java.util.Map.Entry; | ||
import java.util.Objects; | ||
|
||
public class DataFrameFeatureSetUsage extends Usage { | ||
|
||
private final Map<String, Long> transformCountByState; | ||
private final DataFrameIndexerTransformStats accumulatedStats; | ||
|
||
public DataFrameFeatureSetUsage(StreamInput in) throws IOException { | ||
super(in); | ||
this.transformCountByState = in.readMap(StreamInput::readString, StreamInput::readLong); | ||
this.accumulatedStats = new DataFrameIndexerTransformStats(in); | ||
} | ||
|
||
public DataFrameFeatureSetUsage(boolean available, boolean enabled, Map<String, Long> transformCountByState, | ||
DataFrameIndexerTransformStats accumulatedStats) { | ||
super(XPackField.DATA_FRAME, available, enabled); | ||
this.transformCountByState = Objects.requireNonNull(transformCountByState); | ||
this.accumulatedStats = Objects.requireNonNull(accumulatedStats); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeMap(transformCountByState, StreamOutput::writeString, StreamOutput::writeLong); | ||
accumulatedStats.writeTo(out); | ||
} | ||
|
||
@Override | ||
protected void innerXContent(XContentBuilder builder, Params params) throws IOException { | ||
super.innerXContent(builder, params); | ||
if (transformCountByState.isEmpty() == false) { | ||
builder.startObject(DataFrameField.TRANSFORMS.getPreferredName()); | ||
long all = 0L; | ||
for (Entry<String, Long> entry : transformCountByState.entrySet()) { | ||
builder.field(entry.getKey(), entry.getValue()); | ||
all+=entry.getValue(); | ||
} | ||
builder.field(MetaData.ALL, all); | ||
builder.endObject(); | ||
|
||
// if there are no transforms, do not show any stats | ||
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), accumulatedStats); | ||
} | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(enabled, available, transformCountByState, accumulatedStats); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (obj == null) { | ||
return false; | ||
} | ||
if (getClass() != obj.getClass()) { | ||
return false; | ||
} | ||
DataFrameFeatureSetUsage other = (DataFrameFeatureSetUsage) obj; | ||
return Objects.equals(name, other.name) && available == other.available && enabled == other.enabled | ||
&& Objects.equals(transformCountByState, other.transformCountByState) | ||
&& Objects.equals(accumulatedStats, other.accumulatedStats); | ||
} | ||
} |
46 changes: 46 additions & 0 deletions
46
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.core.dataframe; | ||
|
||
import org.elasticsearch.common.ParseField; | ||
|
||
/* | ||
* Utility class to hold common fields and strings for data frame. | ||
*/ | ||
public final class DataFrameField { | ||
|
||
// common parse fields | ||
public static final ParseField AGGREGATIONS = new ParseField("aggregations"); | ||
public static final ParseField AGGS = new ParseField("aggs"); | ||
public static final ParseField ID = new ParseField("id"); | ||
public static final ParseField TRANSFORMS = new ParseField("transforms"); | ||
public static final ParseField COUNT = new ParseField("count"); | ||
public static final ParseField GROUP_BY = new ParseField("group_by"); | ||
public static final ParseField TIMEOUT = new ParseField("timeout"); | ||
public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion"); | ||
public static final ParseField STATS_FIELD = new ParseField("stats"); | ||
|
||
// common strings | ||
public static final String TASK_NAME = "data_frame/transforms"; | ||
public static final String REST_BASE_PATH = "/_data_frame/"; | ||
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH + "transforms/{id}/"; | ||
|
||
// note: this is used to match tasks | ||
public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_"; | ||
|
||
// strings for meta information | ||
public static final String META_FIELDNAME = "_data_frame"; | ||
public static final String CREATION_DATE_MILLIS = "creation_date_in_millis"; | ||
public static final String VERSION = "version"; | ||
public static final String CREATED = "created"; | ||
public static final String CREATED_BY = "created_by"; | ||
public static final String TRANSFORM = "transform"; | ||
public static final String DATA_FRAME_SIGNATURE = "data-frame-transform"; | ||
|
||
private DataFrameField() { | ||
} | ||
} |
73 changes: 73 additions & 0 deletions
73
...k/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.core.dataframe; | ||
|
||
import java.text.MessageFormat; | ||
import java.util.Locale; | ||
|
||
public class DataFrameMessages { | ||
|
||
public static final String REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_TIMEOUT = | ||
"Timed out after [{0}] while waiting for data frame transform [{1}] to stop"; | ||
public static final String REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT = | ||
"Interrupted while waiting for data frame transform [{0}] to stop"; | ||
public static final String REST_PUT_DATA_FRAME_TRANSFORM_EXISTS = "Transform with id [{0}] already exists"; | ||
public static final String REST_DATA_FRAME_UNKNOWN_TRANSFORM = "Transform with id [{0}] could not be found"; | ||
public static final String REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION = | ||
"Failed to validate data frame configuration"; | ||
public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration"; | ||
public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_TARGET_MAPPINGS = "Failed to deduce target mappings"; | ||
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX = "Failed to create target index"; | ||
public static final String REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK = | ||
"Failed to start persistent task, configuration has been cleaned up: [{0}]"; | ||
public static final String REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]"; | ||
|
||
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform[{1}]"; | ||
public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION = | ||
"Failed to load data frame transform configuration for transform [{0}]"; | ||
public static final String FAILED_TO_PARSE_TRANSFORM_CONFIGURATION = | ||
"Failed to parse transform configuration for data frame transform [{0}]"; | ||
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_NO_TRANSFORM = | ||
"Data frame transform configuration must specify exactly 1 function"; | ||
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_PIVOT_NO_GROUP_BY = | ||
"Data frame pivot transform configuration must specify at least 1 group_by"; | ||
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_PIVOT_NO_AGGREGATION = | ||
"Data frame pivot transform configuration must specify at least 1 aggregation"; | ||
public static final String DATA_FRAME_TRANSFORM_PIVOT_FAILED_TO_CREATE_COMPOSITE_AGGREGATION = | ||
"Failed to create composite aggregation from pivot function"; | ||
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID = | ||
"Data frame transform configuration [{0}] has invalid elements"; | ||
|
||
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_QUERY = | ||
"Failed to parse query for data frame transform"; | ||
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_GROUP_BY = | ||
"Failed to parse group_by for data frame pivot transform"; | ||
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_AGGREGATION = | ||
"Failed to parse aggregation for data frame pivot transform"; | ||
|
||
private DataFrameMessages() { | ||
} | ||
|
||
/** | ||
* Returns the message parameter | ||
* | ||
* @param message Should be one of the statics defined in this class | ||
*/ | ||
public static String getMessage(String message) { | ||
return message; | ||
} | ||
|
||
/** | ||
* Format the message with the supplied arguments | ||
* | ||
* @param message Should be one of the statics defined in this class | ||
* @param args MessageFormat arguments. See {@linkplain MessageFormat#format(Object)}] | ||
*/ | ||
public static String getMessage(String message, Object... args) { | ||
return new MessageFormat(message, Locale.ROOT).format(args); | ||
} | ||
} |
Oops, something went wrong.