Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coalesce rows #89

Merged
merged 15 commits into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions cli/feast/pkg/parse/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package parse

import (
"encoding/json"
"fmt"
"time"

"github.com/golang/protobuf/ptypes/timestamp"
Expand Down Expand Up @@ -98,11 +97,6 @@ func YamlToImportSpec(in []byte) (*specs.ImportSpec, error) {
return nil, err
}

// schema must be available for 'file' or 'bigquery'
if (ymlMap["type"] == "file" || ymlMap["type"] == "bigquery") && ymlMap["schema"] == nil {
return nil, fmt.Errorf("Schema must be specified for importing data from file or BigQuery")
}

// either timestampValue or timestampColumn
var timestampValue *timestamp.Timestamp
var timestampColumn string
Expand Down
55 changes: 7 additions & 48 deletions cli/feast/pkg/parse/yaml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ func TestYamlToImportSpec(t *testing.T) {
{
name: "valid yaml",
input: []byte(`type: file
options:
jobOptions:
coalesceRows.enabled: "true"
sourceOptions:
format: csv
path: jaeger_last_opportunity_sample.csv
entities:
Expand All @@ -258,7 +260,10 @@ schema:
featureId: driver.none.last_opportunity`),
expected: &specs.ImportSpec{
Type: "file",
Options: map[string]string{
JobOptions: map[string]string{
"coalesceRows.enabled": "true",
},
SourceOptions: map[string]string{
"format": "csv",
"path": "jaeger_last_opportunity_sample.csv",
},
Expand Down Expand Up @@ -299,49 +304,3 @@ schema:
})
}
}

func TestYamlToImportSpecNoSchema(t *testing.T) {
tt := []struct {
name string
input []byte
expected *specs.ImportSpec
err error
}{
{
name: "valid yaml",
input: []byte(`type: pubsub
options:
topic: projects/your-gcp-project/topics/feast-test
entities:
- testentity`),
expected: &specs.ImportSpec{
Type: "pubsub",
Options: map[string]string{
"topic": "projects/your-gcp-project/topics/feast-test",
},
Entities: []string{"testentity"},
},
err: nil,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
spec, err := YamlToImportSpec(tc.input)
if tc.err == nil {
if err != nil {
t.Error(err)
} else if !cmp.Equal(spec, tc.expected) {
t.Errorf("Expected %s, got %s", tc.expected, spec)
}
} else {
// we expect an error
if err == nil {
t.Error(err)
} else if err.Error() != tc.err.Error() {
t.Errorf("Expected error %v, got %v", err.Error(), tc.err.Error())
}
}
})
}
}
11 changes: 8 additions & 3 deletions core/src/main/java/feast/core/model/JobInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ public class JobInfo extends AbstractTimestampEntity {
private String runner;

// Job options. Stored as a json string as it is specific to the runner.
@Column(name = "options")
private String options;
@Column(name = "source_options")
private String sourceOptions;

// Job options. Stored as a json string as it is specific to the runner.
@Column(name = "job_options")
private String jobOptions;

// Entities populated by the job
@ManyToMany
Expand Down Expand Up @@ -99,7 +103,8 @@ public JobInfo(
this.extId = extId;
this.type = importSpec.getType();
this.runner = runner;
this.options = TypeConversion.convertMapToJsonString(importSpec.getOptionsMap());
this.sourceOptions = TypeConversion.convertMapToJsonString(importSpec.getSourceOptionsMap());
this.jobOptions = TypeConversion.convertMapToJsonString(importSpec.getJobOptionsMap());
this.entities = new ArrayList<>();
for (String entity : importSpec.getEntitiesList()) {
EntityInfo entityInfo = new EntityInfo();
Expand Down
34 changes: 26 additions & 8 deletions core/src/main/java/feast/core/validators/SpecValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static feast.core.validators.Matchers.checkLowerSnakeCase;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import feast.core.dao.EntityInfoRepository;
Expand All @@ -40,6 +41,8 @@
import feast.specs.ImportSpecProto.ImportSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -256,6 +259,19 @@ public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException
entityInfoRepository.existsById(name),
Strings.lenientFormat("Entity %s not registered", name));
}
Map<String, String> jobOptions = spec.getJobOptionsMap();
if (jobOptions.size() > 0) {
List<String> opts = Lists.newArrayList(
"sample.limit",
"coalesceRows.enabled",
"coalesceRows.delaySeconds",
"coalesceRows.timeoutSeconds"
);
for (String key : jobOptions.keySet()) {
Preconditions.checkArgument(opts.contains(key),
Strings.lenientFormat("Option %s is not a valid jobOption", key));
}
}
} catch (NullPointerException | IllegalArgumentException e) {
throw new IllegalArgumentException(
Strings.lenientFormat("Validation for import spec failed: %s", e.getMessage()));
Expand All @@ -264,8 +280,8 @@ public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException

private void checkKafkaImportSpecOption(ImportSpec spec) {
try {
String topics = spec.getOptionsOrDefault("topics", "");
String server = spec.getOptionsOrDefault("server", "");
String topics = spec.getSourceOptionsOrDefault("topics", "");
String server = spec.getSourceOptionsOrDefault("server", "");
if (topics.equals("") && server.equals("")) {
throw new IllegalArgumentException(
"Kafka ingestion requires either topics or servers");
Expand All @@ -278,7 +294,8 @@ private void checkKafkaImportSpecOption(ImportSpec spec) {

private void checkFileImportSpecOption(ImportSpec spec) throws IllegalArgumentException {
try {
checkArgument(!spec.getOptionsOrDefault("path", "").equals(""), "File path cannot be empty");
checkArgument(!spec.getSourceOptionsOrDefault("path", "").equals(""),
"File path cannot be empty");
} catch (NullPointerException | IllegalArgumentException e) {
throw new IllegalArgumentException(
Strings.lenientFormat("Invalid options: %s", e.getMessage()));
Expand All @@ -287,8 +304,8 @@ private void checkFileImportSpecOption(ImportSpec spec) throws IllegalArgumentEx

private void checkPubSubImportSpecOption(ImportSpec spec) throws IllegalArgumentException {
try {
String topic = spec.getOptionsOrDefault("topic", "");
String subscription = spec.getOptionsOrDefault("subscription", "");
String topic = spec.getSourceOptionsOrDefault("topic", "");
String subscription = spec.getSourceOptionsOrDefault("subscription", "");
if (topic.equals("") && subscription.equals("")) {
throw new IllegalArgumentException(
"Pubsub ingestion requires either topic or subscription");
Expand All @@ -301,11 +318,12 @@ private void checkPubSubImportSpecOption(ImportSpec spec) throws IllegalArgument

private void checkBigqueryImportSpecOption(ImportSpec spec) throws IllegalArgumentException {
try {
checkArgument(!spec.getOptionsOrThrow("project").equals(""),
checkArgument(!spec.getSourceOptionsOrThrow("project").equals(""),
"Bigquery project cannot be empty");
checkArgument(!spec.getOptionsOrThrow("dataset").equals(""),
checkArgument(!spec.getSourceOptionsOrThrow("dataset").equals(""),
"Bigquery dataset cannot be empty");
checkArgument(!spec.getOptionsOrThrow("table").equals(""), "Bigquery table cannot be empty");
checkArgument(!spec.getSourceOptionsOrThrow("table").equals(""),
"Bigquery table cannot be empty");
} catch (NullPointerException | IllegalArgumentException e) {
throw new IllegalArgumentException(
Strings.lenientFormat("Invalid options: %s", e.getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public void getJobStatus_shouldUpdateJobInfoForRunningJob() {
"Streaming",
"DataflowRunner",
"",
"",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand Down Expand Up @@ -104,6 +105,7 @@ public void getJobMetrics_shouldPushToStatsDMetricPusherAndSaveNewMetricToDb() {
"Streaming",
"DataflowRunner",
"",
"",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand Down
11 changes: 5 additions & 6 deletions core/src/test/java/feast/core/model/JobInfoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ public void shouldInitialiseGivenJobIdAndSpec() throws InvalidProtocolBufferExce
.build();

ImportSpecProto.ImportSpec importSpec = ImportSpecProto.ImportSpec.newBuilder()
.setType("file")
.putOptions("format", "csv")
.putOptions("path", "gs://some/path")
.setType("file.csv")
.putSourceOptions("path", "gs://some/path")
.addEntities("entity")
.setSchema(schema)
.build();
Expand All @@ -52,9 +51,9 @@ public void shouldInitialiseGivenJobIdAndSpec() throws InvalidProtocolBufferExce
JobInfo expected = new JobInfo();
expected.setId("fake-job-id");
expected.setExtId("fake-ext-id");
expected.setType("file");
expected.setType("file.csv");
expected.setRunner("DataflowRunner");
expected.setOptions(TypeConversion.convertMapToJsonString(importSpec.getOptionsMap()));
expected.setSourceOptions(TypeConversion.convertMapToJsonString(importSpec.getSourceOptionsMap()));

List<EntityInfo> entities = new ArrayList<>();
EntityInfo entityInfo = new EntityInfo();
Expand All @@ -75,7 +74,7 @@ public void shouldInitialiseGivenJobIdAndSpec() throws InvalidProtocolBufferExce
assertThat(actual.getRunner(), equalTo(expected.getRunner()));
assertThat(actual.getEntities(), equalTo(expected.getEntities()));
assertThat(actual.getFeatures(), equalTo(expected.getFeatures()));
assertThat(actual.getOptions(), equalTo(expected.getOptions()));
assertThat(actual.getSourceOptions(), equalTo(expected.getSourceOptions()));
assertThat(actual.getRaw(), equalTo(expected.getRaw()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void shouldListAllJobDetails() {
"",
"",
"",
"",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand All @@ -84,6 +85,7 @@ public void shouldListAllJobDetails() {
"",
"",
"",
"",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand Down Expand Up @@ -121,6 +123,7 @@ public void shouldReturnDetailOfRequestedJobId() {
"",
"",
"",
"",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand Down
71 changes: 63 additions & 8 deletions core/src/test/java/feast/core/validators/SpecValidatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ public void fileImportSpecWithoutEntityIdColumnInSchemaShouldThrowIllegalArgumen
ImportSpec input =
ImportSpec.newBuilder()
.setType("file.csv")
.putOptions("path", "gs://asdasd")
.putSourceOptions("path", "gs://asdasd")
.build();
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
Expand All @@ -728,9 +728,9 @@ public void bigQueryImportSpecWithoutEntityIdColumnInSchemaShouldThrowIllegalArg
ImportSpec input =
ImportSpec.newBuilder()
.setType("bigquery")
.putOptions("project", "my-google-project")
.putOptions("dataset", "feast")
.putOptions("table", "feast")
.putSourceOptions("project", "my-google-project")
.putSourceOptions("dataset", "feast")
.putSourceOptions("table", "feast")
.build();
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
Expand All @@ -750,7 +750,7 @@ public void importSpecWithoutValidEntityShouldThrowIllegalArgumentException() {
ImportSpec input =
ImportSpec.newBuilder()
.setType("pubsub")
.putOptions("topic", "my/pubsub/topic")
.putSourceOptions("topic", "my/pubsub/topic")
.addEntities("someEntity")
.build();
exception.expect(IllegalArgumentException.class);
Expand All @@ -775,7 +775,7 @@ public void importSpecWithUnregisteredFeaturesShouldThrowIllegalArgumentExceptio
ImportSpec input =
ImportSpec.newBuilder()
.setType("pubsub")
.putOptions("topic", "my/pubsub/topic")
.putSourceOptions("topic", "my/pubsub/topic")
.setSchema(schema)
.addEntities("someEntity")
.build();
Expand All @@ -802,8 +802,63 @@ public void importSpecWithKafkaSourceAndCorrectOptionsShouldPassValidation() {
ImportSpec input =
ImportSpec.newBuilder()
.setType("kafka")
.putOptions("topics", "my-kafka-topic")
.putOptions("server", "localhost:54321")
.putSourceOptions("topics", "my-kafka-topic")
.putSourceOptions("server", "localhost:54321")
.setSchema(schema)
.addEntities("someEntity")
.build();
validator.validateImportSpec(input);
}

@Test
public void importSpecWithCoalesceJobOptionsShouldPassValidation() {
SpecValidator validator =
new SpecValidator(
storageInfoRepository,
entityInfoRepository,
featureGroupInfoRepository,
featureInfoRepository);
when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true);
when(entityInfoRepository.existsById("someEntity")).thenReturn(true);
Schema schema =
Schema.newBuilder()
.addFields(Field.newBuilder().setFeatureId("some_existing_feature").build())
.build();
ImportSpec input =
ImportSpec.newBuilder()
.setType("kafka")
.putSourceOptions("topics", "my-kafka-topic")
.putSourceOptions("server", "localhost:54321")
.putJobOptions("coalesceRows.enabled", "true")
.putJobOptions("coalesceRows.delaySeconds", "10000")
.putJobOptions("coalesceRows.timeoutSeconds", "20000")
.putJobOptions("sample.limit", "1000")
.setSchema(schema)
.addEntities("someEntity")
.build();
validator.validateImportSpec(input);
}

@Test
public void importSpecWithLimitJobOptionsShouldPassValidation() {
SpecValidator validator =
new SpecValidator(
storageInfoRepository,
entityInfoRepository,
featureGroupInfoRepository,
featureInfoRepository);
when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true);
when(entityInfoRepository.existsById("someEntity")).thenReturn(true);
Schema schema =
Schema.newBuilder()
.addFields(Field.newBuilder().setFeatureId("some_existing_feature").build())
.build();
ImportSpec input =
ImportSpec.newBuilder()
.setType("kafka")
.putSourceOptions("topics", "my-kafka-topic")
.putSourceOptions("server", "localhost:54321")
.putJobOptions("sample.limit", "1000")
.setSchema(schema)
.addEntities("someEntity")
.build();
Expand Down
Loading