Skip to content

Commit

Permalink
[ML] Add a file structure determination endpoint (#33471)
Browse files Browse the repository at this point in the history
This endpoint accepts an arbitrary file in the request body and
attempts to determine the structure.  If successful it also
proposes mappings that could be used when indexing the file's
contents, and calculates simple statistics for each of the fields
that are useful in the data preparation step prior to configuring
machine learning jobs.
  • Loading branch information
droberts195 authored Sep 7, 2018
1 parent 4d23310 commit e42cc5c
Show file tree
Hide file tree
Showing 16 changed files with 596 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.action.FindFileStructureAction;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.ForecastJobAction;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
Expand Down Expand Up @@ -265,6 +266,7 @@ public List<Action<? extends ActionResponse>> getClientActions() {
GetCalendarEventsAction.INSTANCE,
PostCalendarEventsAction.INSTANCE,
PersistJobAction.INSTANCE,
FindFileStructureAction.INSTANCE,
// security
ClearRealmCacheAction.INSTANCE,
ClearRolesCacheAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FileStructure;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;

public class FindFileStructureAction extends Action<FindFileStructureAction.Response> {

public static final FindFileStructureAction INSTANCE = new FindFileStructureAction();
public static final String NAME = "cluster:monitor/xpack/ml/findfilestructure";

private FindFileStructureAction() {
super(NAME);
}

@Override
public Response newResponse() {
return new Response();
}

static class RequestBuilder extends ActionRequestBuilder<Request, Response> {

RequestBuilder(ElasticsearchClient client, FindFileStructureAction action) {
super(client, action, new Request());
}
}

public static class Response extends ActionResponse implements StatusToXContentObject, Writeable {

private FileStructure fileStructure;

public Response(FileStructure fileStructure) {
this.fileStructure = fileStructure;
}

Response() {
}

public FileStructure getFileStructure() {
return fileStructure;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
fileStructure = new FileStructure(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
fileStructure.writeTo(out);
}

@Override
public RestStatus status() {
return RestStatus.OK;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
fileStructure.toXContent(builder, params);
return builder;
}

@Override
public int hashCode() {
return Objects.hash(fileStructure);
}

@Override
public boolean equals(Object other) {

if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

FindFileStructureAction.Response that = (FindFileStructureAction.Response) other;
return Objects.equals(fileStructure, that.fileStructure);
}
}

public static class Request extends ActionRequest {

public static final ParseField LINES_TO_SAMPLE = new ParseField("lines_to_sample");

private Integer linesToSample;
private BytesReference sample;

public Request() {
}

public Integer getLinesToSample() {
return linesToSample;
}

public void setLinesToSample(Integer linesToSample) {
this.linesToSample = linesToSample;
}

public BytesReference getSample() {
return sample;
}

public void setSample(BytesReference sample) {
this.sample = sample;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (linesToSample != null && linesToSample <= 0) {
validationException =
addValidationError(LINES_TO_SAMPLE.getPreferredName() + " must be positive if specified", validationException);
}
if (sample == null || sample.length() == 0) {
validationException = addValidationError("sample must be specified", validationException);
}
return validationException;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
linesToSample = in.readOptionalVInt();
sample = in.readBytesReference();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalVInt(linesToSample);
out.writeBytesReference(sample);
}

@Override
public int hashCode() {
return Objects.hash(linesToSample, sample);
}

@Override
public boolean equals(Object other) {

if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

Request that = (Request) other;
return Objects.equals(this.linesToSample, that.linesToSample) &&
Objects.equals(this.sample, that.sample);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package org.elasticsearch.xpack.core.ml.filestructurefinder;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -16,7 +19,7 @@
import java.util.Map;
import java.util.Objects;

public class FieldStats implements ToXContentObject {
public class FieldStats implements ToXContentObject, Writeable {

static final ParseField COUNT = new ParseField("count");
static final ParseField CARDINALITY = new ParseField("cardinality");
Expand Down Expand Up @@ -64,6 +67,27 @@ public FieldStats(long count, int cardinality, Double minValue, Double maxValue,
this.topHits = (topHits == null) ? Collections.emptyList() : Collections.unmodifiableList(topHits);
}

public FieldStats(StreamInput in) throws IOException {
count = in.readVLong();
cardinality = in.readVInt();
minValue = in.readOptionalDouble();
maxValue = in.readOptionalDouble();
meanValue = in.readOptionalDouble();
medianValue = in.readOptionalDouble();
topHits = in.readList(StreamInput::readMap);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
out.writeVInt(cardinality);
out.writeOptionalDouble(minValue);
out.writeOptionalDouble(maxValue);
out.writeOptionalDouble(meanValue);
out.writeOptionalDouble(medianValue);
out.writeCollection(topHits, StreamOutput::writeMap);
}

public long getCount() {
return count;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package org.elasticsearch.xpack.core.ml.filestructurefinder;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -24,7 +27,7 @@
/**
* Stores the file format determined by Machine Learning.
*/
public class FileStructure implements ToXContentObject {
public class FileStructure implements ToXContentObject, Writeable {

public enum Format {

Expand Down Expand Up @@ -79,6 +82,8 @@ public String toString() {
}
}

public static final String EXPLAIN = "explain";

static final ParseField NUM_LINES_ANALYZED = new ParseField("num_lines_analyzed");
static final ParseField NUM_MESSAGES_ANALYZED = new ParseField("num_messages_analyzed");
static final ParseField SAMPLE_START = new ParseField("sample_start");
Expand Down Expand Up @@ -176,6 +181,66 @@ public FileStructure(int numLinesAnalyzed, int numMessagesAnalyzed, String sampl
this.explanation = Collections.unmodifiableList(new ArrayList<>(explanation));
}

public FileStructure(StreamInput in) throws IOException {
numLinesAnalyzed = in.readVInt();
numMessagesAnalyzed = in.readVInt();
sampleStart = in.readString();
charset = in.readString();
hasByteOrderMarker = in.readOptionalBoolean();
format = in.readEnum(Format.class);
multilineStartPattern = in.readOptionalString();
excludeLinesPattern = in.readOptionalString();
inputFields = in.readBoolean() ? Collections.unmodifiableList(in.readList(StreamInput::readString)) : null;
hasHeaderRow = in.readOptionalBoolean();
delimiter = in.readBoolean() ? (char) in.readVInt() : null;
shouldTrimFields = in.readOptionalBoolean();
grokPattern = in.readOptionalString();
timestampFormats = in.readBoolean() ? Collections.unmodifiableList(in.readList(StreamInput::readString)) : null;
timestampField = in.readOptionalString();
needClientTimezone = in.readBoolean();
mappings = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap()));
fieldStats = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap(StreamInput::readString, FieldStats::new)));
explanation = Collections.unmodifiableList(in.readList(StreamInput::readString));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numLinesAnalyzed);
out.writeVInt(numMessagesAnalyzed);
out.writeString(sampleStart);
out.writeString(charset);
out.writeOptionalBoolean(hasByteOrderMarker);
out.writeEnum(format);
out.writeOptionalString(multilineStartPattern);
out.writeOptionalString(excludeLinesPattern);
if (inputFields == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeCollection(inputFields, StreamOutput::writeString);
}
out.writeOptionalBoolean(hasHeaderRow);
if (delimiter == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeVInt(delimiter);
}
out.writeOptionalBoolean(shouldTrimFields);
out.writeOptionalString(grokPattern);
if (timestampFormats == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeCollection(timestampFormats, StreamOutput::writeString);
}
out.writeOptionalString(timestampField);
out.writeBoolean(needClientTimezone);
out.writeMap(mappings);
out.writeMap(fieldStats, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
out.writeCollection(explanation, StreamOutput::writeString);
}

public int getNumLinesAnalyzed() {
return numLinesAnalyzed;
}
Expand Down Expand Up @@ -300,7 +365,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}
builder.field(EXPLANATION.getPreferredName(), explanation);
if (params.paramAsBoolean(EXPLAIN, false)) {
builder.field(EXPLANATION.getPreferredName(), explanation);
}
builder.endObject();

return builder;
Expand Down
Loading

0 comments on commit e42cc5c

Please sign in to comment.