Skip to content

Commit

Permalink
HLRC: ML start data feed API (#33898)
Browse files Browse the repository at this point in the history
* HLRC: ML start data feed API
  • Loading branch information
benwtrent authored and kcm committed Oct 30, 2018
1 parent dbc0529 commit 3583453
Show file tree
Hide file tree
Showing 11 changed files with 647 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.client.ml.PutCalendarRequest;
import org.elasticsearch.client.ml.PutDatafeedRequest;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.StartDatafeedRequest;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -231,6 +232,19 @@ static Request deleteDatafeed(DeleteDatafeedRequest deleteDatafeedRequest) {
return request;
}

static Request startDatafeed(StartDatafeedRequest startDatafeedRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("datafeeds")
.addPathPart(startDatafeedRequest.getDatafeedId())
.addPathPartAsIs("_start")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
request.setEntity(createEntity(startDatafeedRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deleteForecast(DeleteForecastRequest deleteForecastRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.elasticsearch.client.ml.PutDatafeedResponse;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.PutJobResponse;
import org.elasticsearch.client.ml.StartDatafeedRequest;
import org.elasticsearch.client.ml.StartDatafeedResponse;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.client.ml.job.stats.JobStats;

Expand Down Expand Up @@ -565,6 +567,46 @@ public void deleteDatafeedAsync(DeleteDatafeedRequest request, RequestOptions op
Collections.emptySet());
}

/**
* Starts the given Machine Learning Datafeed
* <p>
* For additional info
* see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-start-datafeed.html">
* ML Start Datafeed documentation</a>
*
* @param request The request to start the datafeed
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return action acknowledgement
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public StartDatafeedResponse startDatafeed(StartDatafeedRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::startDatafeed,
options,
StartDatafeedResponse::fromXContent,
Collections.emptySet());
}

/**
* Starts the given Machine Learning Datafeed asynchronously and notifies the listener on completion
* <p>
* For additional info
* see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-start-datafeed.html">
* ML Start Datafeed documentation</a>
*
* @param request The request to start the datafeed
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void startDatafeedAsync(StartDatafeedRequest request, RequestOptions options, ActionListener<StartDatafeedResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::startDatafeed,
options,
StartDatafeedResponse::fromXContent,
listener,
Collections.emptySet());
}

/**
* Updates a Machine Learning {@link org.elasticsearch.client.ml.job.config.Job}
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Request to start a Datafeed
*/
public class StartDatafeedRequest extends ActionRequest implements ToXContentObject {

public static final ParseField START = new ParseField("start");
public static final ParseField END = new ParseField("end");
public static final ParseField TIMEOUT = new ParseField("timeout");

public static ConstructingObjectParser<StartDatafeedRequest, Void> PARSER =
new ConstructingObjectParser<>("start_datafeed_request", a -> new StartDatafeedRequest((String)a[0]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID);
PARSER.declareString(StartDatafeedRequest::setStart, START);
PARSER.declareString(StartDatafeedRequest::setEnd, END);
PARSER.declareString((params, val) ->
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
}

private final String datafeedId;
private String start;
private String end;
private TimeValue timeout;

/**
* Create a new StartDatafeedRequest for the given DatafeedId
*
* @param datafeedId non-null existing Datafeed ID
*/
public StartDatafeedRequest(String datafeedId) {
this.datafeedId = Objects.requireNonNull(datafeedId, "[datafeed_id] must not be null");
}

public String getDatafeedId() {
return datafeedId;
}

public String getStart() {
return start;
}

/**
* The time that the datafeed should begin. This value is inclusive.
*
* If you specify a start value that is earlier than the timestamp of the latest processed record,
* the datafeed continues from 1 millisecond after the timestamp of the latest processed record.
*
* If you do not specify a start time and the datafeed is associated with a new job,
* the analysis starts from the earliest time for which data is available.
*
* @param start String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
*/
public void setStart(String start) {
this.start = start;
}

public String getEnd() {
return end;
}

/**
* The time that the datafeed should end. This value is exclusive.
* If you do not specify an end time, the datafeed runs continuously.
*
* @param end String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
*/
public void setEnd(String end) {
this.end = end;
}

public TimeValue getTimeout() {
return timeout;
}

/**
* Indicates how long to wait for the cluster to respond to the request.
*
* @param timeout TimeValue for how long to wait for a response from the cluster
*/
public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public int hashCode() {
return Objects.hash(datafeedId, start, end, timeout);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || obj.getClass() != getClass()) {
return false;
}

StartDatafeedRequest other = (StartDatafeedRequest) obj;
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(start, other.start) &&
Objects.equals(end, other.end) &&
Objects.equals(timeout, other.timeout);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
if (start != null) {
builder.field(START.getPreferredName(), start);
}
if (end != null) {
builder.field(END.getPreferredName(), end);
}
if (timeout != null) {
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
}
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

/**
* Response indicating if the Machine Learning Datafeed is now started or not
*/
public class StartDatafeedResponse extends ActionResponse implements ToXContentObject {

private static final ParseField STARTED = new ParseField("started");

public static final ConstructingObjectParser<StartDatafeedResponse, Void> PARSER =
new ConstructingObjectParser<>(
"start_datafeed_response",
true,
(a) -> new StartDatafeedResponse((Boolean)a[0]));

static {
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), STARTED);
}

private final boolean started;

public StartDatafeedResponse(boolean started) {
this.started = started;
}

public static StartDatafeedResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

/**
* Has the Datafeed started or not
*
* @return boolean value indicating the Datafeed started status
*/
public boolean isStarted() {
return started;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

StartDatafeedResponse that = (StartDatafeedResponse) other;
return isStarted() == that.isStarted();
}

@Override
public int hashCode() {
return Objects.hash(isStarted());
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(STARTED.getPreferredName(), started);
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.elasticsearch.client.ml.PutCalendarRequest;
import org.elasticsearch.client.ml.PutDatafeedRequest;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.StartDatafeedRequest;
import org.elasticsearch.client.ml.StartDatafeedRequestTests;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.client.ml.calendars.Calendar;
import org.elasticsearch.client.ml.calendars.CalendarTests;
Expand Down Expand Up @@ -261,6 +263,19 @@ public void testDeleteDatafeed() {
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
}

public void testStartDatafeed() throws Exception {
String datafeedId = DatafeedConfigTests.randomValidDatafeedId();
StartDatafeedRequest datafeedRequest = StartDatafeedRequestTests.createRandomInstance(datafeedId);

Request request = MLRequestConverters.startDatafeed(datafeedRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/datafeeds/" + datafeedId + "/_start", request.getEndpoint());
try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
StartDatafeedRequest parsedDatafeedRequest = StartDatafeedRequest.PARSER.apply(parser, null);
assertThat(parsedDatafeedRequest, equalTo(datafeedRequest));
}
}

public void testDeleteForecast() {
String jobId = randomAlphaOfLength(10);
DeleteForecastRequest deleteForecastRequest = new DeleteForecastRequest(jobId);
Expand Down
Loading

0 comments on commit 3583453

Please sign in to comment.