Skip to content

Commit

Permalink
[7.x] [ML][Data Frame] adds new pipeline field to dest config (#43124) (
Browse files Browse the repository at this point in the history
#43388)

* [ML][Data Frame] adds new pipeline field to dest config (#43124)

* [ML][Data Frame] adds new pipeline field to dest config

* Adding pipeline support to _preview

* removing unused import

* moving towards extracting _source from pipeline simulation

* fixing permission requirement, adding _index entry to doc

* adjusting for java 8 compatibility

* adjusting bwc serialization version to 7.3.0
  • Loading branch information
benwtrent authored Jun 19, 2019
1 parent b957aa4 commit b333ced
Show file tree
Hide file tree
Showing 18 changed files with 412 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,48 @@
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Configuration containing the destination index for the {@link DataFrameTransformConfig}
*/
public class DestConfig implements ToXContentObject {

public static final ParseField INDEX = new ParseField("index");
public static final ParseField PIPELINE = new ParseField("pipeline");

public static final ConstructingObjectParser<DestConfig, Void> PARSER = new ConstructingObjectParser<>("data_frame_config_dest",
true,
args -> new DestConfig((String)args[0]));
args -> new DestConfig((String)args[0], (String)args[1]));

static {
PARSER.declareString(constructorArg(), INDEX);
PARSER.declareString(optionalConstructorArg(), PIPELINE);
}

private final String index;
private final String pipeline;

public DestConfig(String index) {
DestConfig(String index, String pipeline) {
this.index = Objects.requireNonNull(index, INDEX.getPreferredName());
this.pipeline = pipeline;
}

public String getIndex() {
return index;
}

public String getPipeline() {
return pipeline;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INDEX.getPreferredName(), index);
if (pipeline != null) {
builder.field(PIPELINE.getPreferredName(), pipeline);
}
builder.endObject();
return builder;
}
Expand All @@ -72,11 +84,45 @@ public boolean equals(Object other) {
}

DestConfig that = (DestConfig) other;
return Objects.equals(index, that.index);
return Objects.equals(index, that.index) &&
Objects.equals(pipeline, that.pipeline);
}

@Override
public int hashCode(){
return Objects.hash(index);
return Objects.hash(index, pipeline);
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private String index;
private String pipeline;

/**
* Sets which index to which to write the data
* @param index where to write the data
* @return The {@link Builder} with index set
*/
public Builder setIndex(String index) {
this.index = Objects.requireNonNull(index, INDEX.getPreferredName());
return this;
}

/**
* Sets the pipeline through which the indexed documents should be processed
* @param pipeline The pipeline ID
* @return The {@link Builder} with pipeline set
*/
public Builder setPipeline(String pipeline) {
this.pipeline = pipeline;
return this;
}

public DestConfig build() {
return new DestConfig(index, pipeline);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ private DataFrameTransformConfig validDataFrameTransformConfig(String id, String
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
PivotConfig pivotConfig = PivotConfig.builder().setGroups(groupConfig).setAggregations(aggBuilder).build();

DestConfig destConfig = (destination != null) ? new DestConfig(destination) : null;
DestConfig destConfig = (destination != null) ? DestConfig.builder().setIndex(destination).build() : null;

return DataFrameTransformConfig.builder()
.setId(id)
Expand All @@ -334,7 +334,7 @@ public void testGetStats() throws Exception {
DataFrameTransformConfig transform = DataFrameTransformConfig.builder()
.setId(id)
.setSource(SourceConfig.builder().setIndex(sourceIndex).setQuery(new MatchAllQueryBuilder()).build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.setDescription("transform for testing stats")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
public class DestConfigTests extends AbstractXContentTestCase<DestConfig> {

public static DestConfig randomDestConfig() {
return new DestConfig(randomAlphaOfLength(10));
return new DestConfig(randomAlphaOfLength(10),
randomBoolean() ? null : randomAlphaOfLength(10));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
.setIndex("source-index")
.setQueryConfig(queryConfig).build();
// end::put-data-frame-transform-source-config
// tag::put-data-frame-transform-dest-config
DestConfig destConfig = DestConfig.builder()
.setIndex("pivot-destination")
.setPipeline("my-pipeline").build();
// end::put-data-frame-transform-dest-config
// tag::put-data-frame-transform-group-config
GroupConfig groupConfig = GroupConfig.builder()
.groupBy("reviewer", // <1>
Expand All @@ -149,7 +154,7 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
.builder()
.setId("reviewer-avg-rating") // <1>
.setSource(sourceConfig) // <2>
.setDest(new DestConfig("pivot-destination")) // <3>
.setDest(destConfig) // <3>
.setPivotConfig(pivotConfig) // <4>
.setDescription("This is my test transform") // <5>
.build();
Expand Down Expand Up @@ -222,7 +227,7 @@ public void testStartStop() throws IOException, InterruptedException {
DataFrameTransformConfig transformConfig = DataFrameTransformConfig.builder()
.setId("mega-transform")
.setSource(SourceConfig.builder().setIndex("source-data").setQueryConfig(queryConfig).build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();

Expand Down Expand Up @@ -344,7 +349,7 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();
DataFrameTransformConfig transformConfig2 = DataFrameTransformConfig.builder()
Expand All @@ -353,7 +358,7 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest2"))
.setDest(DestConfig.builder().setIndex("pivot-dest2").build())
.setPivotConfig(pivotConfig)
.build();

Expand Down Expand Up @@ -488,7 +493,7 @@ public void testGetStats() throws IOException, InterruptedException {
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();
client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT);
Expand Down Expand Up @@ -574,7 +579,7 @@ public void testGetDataFrameTransform() throws IOException, InterruptedException
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();

Expand Down
12 changes: 11 additions & 1 deletion docs/java-rest/high-level/dataframe/put_data_frame.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ include-tagged::{doc-tests-file}[{api}-config]
--------------------------------------------------
<1> The {dataframe-transform} ID
<2> The source indices and query from which to gather data
<3> The destination index
<3> The destination index and optional pipeline
<4> The PivotConfig
<5> Optional free text description of the transform

Expand All @@ -49,6 +49,16 @@ If query is not set, a `match_all` query is used by default.
include-tagged::{doc-tests-file}[{api}-source-config]
--------------------------------------------------

==== DestConfig

The index where to write the data and the optional pipeline
through which the docs should be indexed

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-dest-config]
--------------------------------------------------

===== QueryConfig

The query with which to select data from the source.
Expand Down
6 changes: 4 additions & 2 deletions docs/reference/data-frames/apis/put-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}.
`source` (required):: (object) The source configuration, consisting of `index` and optionally
a `query`.

`dest` (required):: (object) The destination configuration, consisting of `index`.
`dest` (required):: (object) The destination configuration, consisting of `index` and optionally a
`pipeline` id.

`pivot`:: (object) Defines the pivot function `group by` fields and the aggregation to
reduce the data. See <<data-frame-transform-pivot, data frame transform pivot objects>>.
Expand Down Expand Up @@ -76,7 +77,8 @@ PUT _data_frame/transforms/ecommerce_transform
}
},
"dest": {
"index": "kibana_sample_data_ecommerce_transform"
"index": "kibana_sample_data_ecommerce_transform",
"pipeline": "add_timestamp_pipeline"
},
"pivot": {
"group_by": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -66,8 +67,20 @@ public Request(StreamInput in) throws IOException {

public static Request fromXContent(final XContentParser parser) throws IOException {
Map<String, Object> content = parser.map();
// Destination and ID are not required for Preview, so we just supply our own
content.put(DataFrameField.DESTINATION.getPreferredName(), Collections.singletonMap("index", "unused-transform-preview-index"));
// dest.index and ID are not required for Preview, so we just supply our own
Map<String, String> tempDestination = new HashMap<>();
tempDestination.put(DestConfig.INDEX.getPreferredName(), "unused-transform-preview-index");
// Users can still provide just dest.pipeline to preview what their data would look like given the pipeline ID
Object providedDestination = content.get(DataFrameField.DESTINATION.getPreferredName());
if (providedDestination instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, String> destMap = (Map<String, String>)providedDestination;
String pipeline = destMap.get(DestConfig.PIPELINE.getPreferredName());
if (pipeline != null) {
tempDestination.put(DestConfig.PIPELINE.getPreferredName(), pipeline);
}
}
content.put(DataFrameField.DESTINATION.getPreferredName(), tempDestination);
content.put(DataFrameField.ID.getPreferredName(), "transform-preview");
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content);
XContentParser newParser = XContentType.JSON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.dataframe.transforms;

import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -20,49 +21,69 @@
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class DestConfig implements Writeable, ToXContentObject {

public static final ParseField INDEX = new ParseField("index");
public static final ParseField PIPELINE = new ParseField("pipeline");

public static final ConstructingObjectParser<DestConfig, Void> STRICT_PARSER = createParser(false);
public static final ConstructingObjectParser<DestConfig, Void> LENIENT_PARSER = createParser(true);

private static ConstructingObjectParser<DestConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<DestConfig, Void> parser = new ConstructingObjectParser<>("data_frame_config_dest",
lenient,
args -> new DestConfig((String)args[0]));
args -> new DestConfig((String)args[0], (String) args[1]));
parser.declareString(constructorArg(), INDEX);
parser.declareString(optionalConstructorArg(), PIPELINE);
return parser;
}

private final String index;
private final String pipeline;

public DestConfig(String index) {
public DestConfig(String index, String pipeline) {
this.index = ExceptionsHelper.requireNonNull(index, INDEX.getPreferredName());
this.pipeline = pipeline;
}

public DestConfig(final StreamInput in) throws IOException {
index = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
pipeline = in.readOptionalString();
} else {
pipeline = null;
}
}

public String getIndex() {
return index;
}

public String getPipeline() {
return pipeline;
}

public boolean isValid() {
return index.isEmpty() == false;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeOptionalString(pipeline);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INDEX.getPreferredName(), index);
if (pipeline != null) {
builder.field(PIPELINE.getPreferredName(), pipeline);
}
builder.endObject();
return builder;
}
Expand All @@ -77,12 +98,13 @@ public boolean equals(Object other) {
}

DestConfig that = (DestConfig) other;
return Objects.equals(index, that.index);
return Objects.equals(index, that.index) &&
Objects.equals(pipeline, that.pipeline);
}

@Override
public int hashCode(){
return Objects.hash(index);
return Objects.hash(index, pipeline);
}

public static DestConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected boolean supportsUnknownFields() {
@Override
protected Request createTestInstance() {
DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomSourceConfig(),
new DestConfig("unused-transform-preview-index"),
new DestConfig("unused-transform-preview-index", null),
null, PivotConfigTests.randomPivotConfig(), null);
return new Request(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ public class DestConfigTests extends AbstractSerializingDataFrameTestCase<DestCo
private boolean lenient;

public static DestConfig randomDestConfig() {
return new DestConfig(randomAlphaOfLength(10));
return new DestConfig(randomAlphaOfLength(10),
randomBoolean() ? null : randomAlphaOfLength(10));
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ protected DataFrameTransformConfig createTransformConfig(String id,
return DataFrameTransformConfig.builder()
.setId(id)
.setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build())
.setDest(new DestConfig(destinationIndex))
.setDest(DestConfig.builder().setIndex(destinationIndex).build())
.setPivotConfig(createPivotConfig(groups, aggregations))
.setDescription("Test data frame transform config id: " + id)
.build();
Expand Down
Loading

0 comments on commit b333ced

Please sign in to comment.