Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HLRC: ML start data feed API #33898

Merged
merged 3 commits into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.client.ml.PutCalendarRequest;
import org.elasticsearch.client.ml.PutDatafeedRequest;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.StartDatafeedRequest;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -231,6 +232,19 @@ static Request deleteDatafeed(DeleteDatafeedRequest deleteDatafeedRequest) {
return request;
}

static Request startDatafeed(StartDatafeedRequest startDatafeedRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("datafeeds")
.addPathPart(startDatafeedRequest.getDatafeedId())
.addPathPartAsIs("_start")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
request.setEntity(createEntity(startDatafeedRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deleteForecast(DeleteForecastRequest deleteForecastRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.elasticsearch.client.ml.PutDatafeedResponse;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.PutJobResponse;
import org.elasticsearch.client.ml.StartDatafeedRequest;
import org.elasticsearch.client.ml.StartDatafeedResponse;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.client.ml.job.stats.JobStats;

Expand Down Expand Up @@ -565,6 +567,46 @@ public void deleteDatafeedAsync(DeleteDatafeedRequest request, RequestOptions op
Collections.emptySet());
}

/**
* Starts the given Machine Learning Datafeed
* <p>
* For additional info
* see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-start-datafeed.html">
* ML Start Datafeed documentation</a>
*
* @param request The request to start the datafeed
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return action acknowledgement
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public StartDatafeedResponse startDatafeed(StartDatafeedRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::startDatafeed,
options,
StartDatafeedResponse::fromXContent,
Collections.emptySet());
}

/**
* Starts the given Machine Learning Datafeed asynchronously and notifies the listener on completion
* <p>
* For additional info
* see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-start-datafeed.html">
* ML Start Datafeed documentation</a>
*
* @param request The request to start the datafeed
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void startDatafeedAsync(StartDatafeedRequest request, RequestOptions options, ActionListener<StartDatafeedResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::startDatafeed,
options,
StartDatafeedResponse::fromXContent,
listener,
Collections.emptySet());
}

/**
* Updates a Machine Learning {@link org.elasticsearch.client.ml.job.config.Job}
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Request to start a Datafeed
*/
public class StartDatafeedRequest extends ActionRequest implements ToXContentObject {

public static final ParseField START = new ParseField("start");
public static final ParseField END = new ParseField("end");
public static final ParseField TIMEOUT = new ParseField("timeout");

public static ConstructingObjectParser<StartDatafeedRequest, Void> PARSER =
new ConstructingObjectParser<>("start_datafeed_request", a -> new StartDatafeedRequest((String)a[0]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID);
PARSER.declareString(StartDatafeedRequest::setStart, START);
PARSER.declareString(StartDatafeedRequest::setEnd, END);
PARSER.declareString((params, val) ->
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
}

private final String datafeedId;
private String start;
private String end;
private TimeValue timeout;

/**
* Create a new StartDatafeedRequest for the given DatafeedId
*
* @param datafeedId non-null existing Datafeed ID
*/
public StartDatafeedRequest(String datafeedId) {
this.datafeedId = Objects.requireNonNull(datafeedId, "[datafeed_id] must not be null");
}

public String getDatafeedId() {
return datafeedId;
}

public String getStart() {
return start;
}

/**
* The time that the datafeed should begin. This value is inclusive.
*
* If you specify a start value that is earlier than the timestamp of the latest processed record,
* the datafeed continues from 1 millisecond after the timestamp of the latest processed record.
*
* If you do not specify a start time and the datafeed is associated with a new job,
* the analysis starts from the earliest time for which data is available.
*
* @param start String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
*/
public void setStart(String start) {
this.start = start;
}

public String getEnd() {
return end;
}

/**
* The time that the datafeed should end. This value is exclusive.
* If you do not specify an end time, the datafeed runs continuously.
*
* @param end String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
*/
public void setEnd(String end) {
this.end = end;
}

public TimeValue getTimeout() {
return timeout;
}

/**
* Indicates how long to wait for the cluster to respond to the request.
*
* @param timeout TimeValue for how long to wait for a response from the cluster
*/
public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public int hashCode() {
return Objects.hash(datafeedId, start, end, timeout);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || obj.getClass() != getClass()) {
return false;
}

StartDatafeedRequest other = (StartDatafeedRequest) obj;
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(start, other.start) &&
Objects.equals(end, other.end) &&
Objects.equals(timeout, other.timeout);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
if (start != null) {
builder.field(START.getPreferredName(), start);
}
if (end != null) {
builder.field(END.getPreferredName(), end);
}
if (timeout != null) {
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
}
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

/**
* Response indicating if the Machine Learning Datafeed is now started or not
*/
public class StartDatafeedResponse extends ActionResponse implements ToXContentObject {

private static final ParseField STARTED = new ParseField("started");

public static final ConstructingObjectParser<StartDatafeedResponse, Void> PARSER =
new ConstructingObjectParser<>(
"start_datafeed_response",
true,
(a) -> new StartDatafeedResponse((Boolean)a[0]));

static {
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), STARTED);
}

private final boolean started;

public StartDatafeedResponse(boolean started) {
this.started = started;
}

public static StartDatafeedResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

/**
* Has the Datafeed started or not
*
* @return boolean value indicating the Datafeed started status
*/
public boolean isStarted() {
return started;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

StartDatafeedResponse that = (StartDatafeedResponse) other;
return isStarted() == that.isStarted();
}

@Override
public int hashCode() {
return Objects.hash(isStarted());
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(STARTED.getPreferredName(), started);
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.elasticsearch.client.ml.PutCalendarRequest;
import org.elasticsearch.client.ml.PutDatafeedRequest;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.StartDatafeedRequest;
import org.elasticsearch.client.ml.StartDatafeedRequestTests;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.client.ml.calendars.Calendar;
import org.elasticsearch.client.ml.calendars.CalendarTests;
Expand Down Expand Up @@ -261,6 +263,19 @@ public void testDeleteDatafeed() {
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
}

public void testStartDatafeed() throws Exception {
String datafeedId = DatafeedConfigTests.randomValidDatafeedId();
StartDatafeedRequest datafeedRequest = StartDatafeedRequestTests.createRandomInstance(datafeedId);

Request request = MLRequestConverters.startDatafeed(datafeedRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/datafeeds/" + datafeedId + "/_start", request.getEndpoint());
try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
StartDatafeedRequest parsedDatafeedRequest = StartDatafeedRequest.PARSER.apply(parser, null);
assertThat(parsedDatafeedRequest, equalTo(datafeedRequest));
}
}

public void testDeleteForecast() {
String jobId = randomAlphaOfLength(10);
DeleteForecastRequest deleteForecastRequest = new DeleteForecastRequest(jobId);
Expand Down
Loading