Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Add a file structure determination endpoint #33471

Merged
merged 6 commits into from
Sep 7, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.FileStructureAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.ForecastJobAction;
Expand Down Expand Up @@ -265,6 +266,7 @@ public List<Action<? extends ActionResponse>> getClientActions() {
GetCalendarEventsAction.INSTANCE,
PostCalendarEventsAction.INSTANCE,
PersistJobAction.INSTANCE,
FileStructureAction.INSTANCE,
// security
ClearRealmCacheAction.INSTANCE,
ClearRolesCacheAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FileStructure;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;

public class FileStructureAction extends Action<FileStructureAction.Response> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note that this is different in 6.x so make sure to fix compilation problems before backporting.


public static final FileStructureAction INSTANCE = new FileStructureAction();
public static final String NAME = "cluster:monitor/xpack/ml/filestructure";

private FileStructureAction() {
super(NAME);
}

@Override
public Response newResponse() {
return new Response();
}

static class RequestBuilder extends ActionRequestBuilder<Request, Response> {

RequestBuilder(ElasticsearchClient client, FileStructureAction action) {
super(client, action, new Request());
}
}

public static class Response extends ActionResponse implements StatusToXContentObject, Writeable {

private FileStructure fileStructure;

public Response(FileStructure fileStructure) {
this.fileStructure = fileStructure;
}

public Response() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need this at all, it should not be public

}

public FileStructure getFileStructure() {
return fileStructure;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
fileStructure = new FileStructure(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
fileStructure.writeTo(out);
}

@Override
public RestStatus status() {
return RestStatus.OK;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
fileStructure.toXContent(builder, params);
return builder;
}

@Override
public int hashCode() {
return Objects.hash(fileStructure);
}

@Override
public boolean equals(Object other) {

if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

FileStructureAction.Response that = (FileStructureAction.Response) other;
return Objects.equals(fileStructure, that.fileStructure);
}
}

public static class Request extends ActionRequest {

public static final ParseField LINES_TO_SAMPLE = new ParseField("lines_to_sample");

private Integer linesToSample;
private BytesReference sample;

public Request() {
}

public Integer getLinesToSample() {
return linesToSample;
}

public void setLinesToSample(Integer linesToSample) {
this.linesToSample = linesToSample;
}

public BytesReference getSample() {
return sample;
}

public void setSample(BytesReference sample) {
this.sample = sample;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (linesToSample != null && linesToSample <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check against an upper bound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the uploaded data contains fewer lines then we just sample all of what's been uploaded.

Therefore the number of lines that can be sampled is limited by the size of the ES HTTP receive buffer.

One problem with sampling many lines is the time taken to do the analysis. At some point I should probably add the option to limit the maximum time allowed. Some other functionality has this facility, like the Grok ingest processor. But I'd rather do this in a followup PR.

validationException = addValidationError("lines_to_sample must be positive if specified", validationException);
}
if (sample == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want to verify that sample.length() > 0 as well.

validationException = addValidationError("sample must be specified", validationException);
}
return validationException;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
linesToSample = in.readOptionalVInt();
sample = in.readBytesReference();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalVInt(linesToSample);
out.writeBytesReference(sample);
}

@Override
public int hashCode() {
return Objects.hash(linesToSample, sample);
}

@Override
public boolean equals(Object other) {

if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

Request that = (Request) other;
return Objects.equals(this.linesToSample, that.linesToSample) &&
Objects.equals(this.sample, that.sample);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package org.elasticsearch.xpack.core.ml.filestructurefinder;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -16,7 +19,7 @@
import java.util.Map;
import java.util.Objects;

public class FieldStats implements ToXContentObject {
public class FieldStats implements ToXContentObject, Writeable {

static final ParseField COUNT = new ParseField("count");
static final ParseField CARDINALITY = new ParseField("cardinality");
Expand Down Expand Up @@ -64,6 +67,27 @@ public FieldStats(long count, int cardinality, Double minValue, Double maxValue,
this.topHits = (topHits == null) ? Collections.emptyList() : Collections.unmodifiableList(topHits);
}

public FieldStats(StreamInput in) throws IOException {
count = in.readVLong();
cardinality = in.readVInt();
minValue = in.readOptionalDouble();
maxValue = in.readOptionalDouble();
meanValue = in.readOptionalDouble();
medianValue = in.readOptionalDouble();
topHits = in.readList(StreamInput::readMap);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
out.writeVInt(cardinality);
out.writeOptionalDouble(minValue);
out.writeOptionalDouble(maxValue);
out.writeOptionalDouble(meanValue);
out.writeOptionalDouble(medianValue);
out.writeCollection(topHits, StreamOutput::writeMap);
}

public long getCount() {
return count;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package org.elasticsearch.xpack.core.ml.filestructurefinder;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -24,7 +27,7 @@
/**
* Stores the file format determined by Machine Learning.
*/
public class FileStructure implements ToXContentObject {
public class FileStructure implements ToXContentObject, Writeable {

public enum Format {

Expand Down Expand Up @@ -79,6 +82,8 @@ public String toString() {
}
}

public static final String EXPLAIN = "explain";

static final ParseField NUM_LINES_ANALYZED = new ParseField("num_lines_analyzed");
static final ParseField NUM_MESSAGES_ANALYZED = new ParseField("num_messages_analyzed");
static final ParseField SAMPLE_START = new ParseField("sample_start");
Expand Down Expand Up @@ -176,6 +181,66 @@ public FileStructure(int numLinesAnalyzed, int numMessagesAnalyzed, String sampl
this.explanation = Collections.unmodifiableList(new ArrayList<>(explanation));
}

public FileStructure(StreamInput in) throws IOException {
numLinesAnalyzed = in.readVInt();
numMessagesAnalyzed = in.readVInt();
sampleStart = in.readString();
charset = in.readString();
hasByteOrderMarker = in.readOptionalBoolean();
format = in.readEnum(Format.class);
multilineStartPattern = in.readOptionalString();
excludeLinesPattern = in.readOptionalString();
inputFields = in.readBoolean() ? Collections.unmodifiableList(in.readList(StreamInput::readString)) : null;
hasHeaderRow = in.readOptionalBoolean();
delimiter = in.readBoolean() ? (char) in.readVInt() : null;
shouldTrimFields = in.readOptionalBoolean();
grokPattern = in.readOptionalString();
timestampFormats = in.readBoolean() ? Collections.unmodifiableList(in.readList(StreamInput::readString)) : null;
timestampField = in.readOptionalString();
needClientTimezone = in.readBoolean();
mappings = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap()));
fieldStats = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap(StreamInput::readString, FieldStats::new)));
explanation = Collections.unmodifiableList(in.readList(StreamInput::readString));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numLinesAnalyzed);
out.writeVInt(numMessagesAnalyzed);
out.writeString(sampleStart);
out.writeString(charset);
out.writeOptionalBoolean(hasByteOrderMarker);
out.writeEnum(format);
out.writeOptionalString(multilineStartPattern);
out.writeOptionalString(excludeLinesPattern);
if (inputFields == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeCollection(inputFields, StreamOutput::writeString);
}
out.writeOptionalBoolean(hasHeaderRow);
if (delimiter == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeVInt(delimiter);
}
out.writeOptionalBoolean(shouldTrimFields);
out.writeOptionalString(grokPattern);
if (timestampFormats == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeCollection(timestampFormats, StreamOutput::writeString);
}
out.writeOptionalString(timestampField);
out.writeBoolean(needClientTimezone);
out.writeMap(mappings);
out.writeMap(fieldStats, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
out.writeCollection(explanation, StreamOutput::writeString);
}

public int getNumLinesAnalyzed() {
return numLinesAnalyzed;
}
Expand Down Expand Up @@ -300,7 +365,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}
builder.field(EXPLANATION.getPreferredName(), explanation);
if (params.paramAsBoolean(EXPLAIN, false)) {
builder.field(EXPLANATION.getPreferredName(), explanation);
}
builder.endObject();

return builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.test.AbstractStreamableTestCase;

public class FileStructureActionRequestTests extends AbstractStreamableTestCase<FileStructureAction.Request> {

@Override
protected FileStructureAction.Request createTestInstance() {

FileStructureAction.Request request = new FileStructureAction.Request();

if (randomBoolean()) {
request.setLinesToSample(randomIntBetween(10, 2000));
}
request.setSample(new BytesArray(randomByteArrayOfLength(randomIntBetween(1000, 20000))));

return request;
}

@Override
protected FileStructureAction.Request createBlankInstance() {
return new FileStructureAction.Request();
}
}
Loading