Skip to content

Commit

Permalink
Coalesce rows (#89)
Browse files Browse the repository at this point in the history
* ptransform to coalesce feature rows in batch and streaming

* add coalesce feature rows transform, to build state of latest row per key in the global window

* coalesce for serving on entity name and key only, preparing for removing history from serving stores

* change to rounding granularities after feature row coalesce

* rename importSpec.options to be importspec.sourceOptions and add jobOptions

* only write out the latest row for bigtable and remove postgres feature store

* do not write historical values to redis serving store

* formatting and fix test

* core importspec validator test

* fix typo on option name

* use Timestamps.compare

* remove redudant proto message

* default coalescerows to true

* Update go protos and cli tests

* Update python sdk for new import spec
  • Loading branch information
tims authored and feast-ci-bot committed Feb 11, 2019
1 parent ca2ee77 commit 5a6edc4
Show file tree
Hide file tree
Showing 57 changed files with 2,218 additions and 1,100 deletions.
6 changes: 0 additions & 6 deletions cli/feast/pkg/parse/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package parse

import (
"encoding/json"
"fmt"
"time"

"github.com/golang/protobuf/ptypes/timestamp"
Expand Down Expand Up @@ -98,11 +97,6 @@ func YamlToImportSpec(in []byte) (*specs.ImportSpec, error) {
return nil, err
}

// schema must be available for 'file' or 'bigquery'
if (ymlMap["type"] == "file" || ymlMap["type"] == "bigquery") && ymlMap["schema"] == nil {
return nil, fmt.Errorf("Schema must be specified for importing data from file or BigQuery")
}

// either timestampValue or timestampColumn
var timestampValue *timestamp.Timestamp
var timestampColumn string
Expand Down
55 changes: 7 additions & 48 deletions cli/feast/pkg/parse/yaml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ func TestYamlToImportSpec(t *testing.T) {
{
name: "valid yaml",
input: []byte(`type: file
options:
jobOptions:
coalesceRows.enabled: "true"
sourceOptions:
format: csv
path: jaeger_last_opportunity_sample.csv
entities:
Expand All @@ -258,7 +260,10 @@ schema:
featureId: driver.none.last_opportunity`),
expected: &specs.ImportSpec{
Type: "file",
Options: map[string]string{
JobOptions: map[string]string{
"coalesceRows.enabled": "true",
},
SourceOptions: map[string]string{
"format": "csv",
"path": "jaeger_last_opportunity_sample.csv",
},
Expand Down Expand Up @@ -299,49 +304,3 @@ schema:
})
}
}

func TestYamlToImportSpecNoSchema(t *testing.T) {
tt := []struct {
name string
input []byte
expected *specs.ImportSpec
err error
}{
{
name: "valid yaml",
input: []byte(`type: pubsub
options:
topic: projects/your-gcp-project/topics/feast-test
entities:
- testentity`),
expected: &specs.ImportSpec{
Type: "pubsub",
Options: map[string]string{
"topic": "projects/your-gcp-project/topics/feast-test",
},
Entities: []string{"testentity"},
},
err: nil,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
spec, err := YamlToImportSpec(tc.input)
if tc.err == nil {
if err != nil {
t.Error(err)
} else if !cmp.Equal(spec, tc.expected) {
t.Errorf("Expected %s, got %s", tc.expected, spec)
}
} else {
// we expect an error
if err == nil {
t.Error(err)
} else if err.Error() != tc.err.Error() {
t.Errorf("Expected error %v, got %v", err.Error(), tc.err.Error())
}
}
})
}
}
11 changes: 8 additions & 3 deletions core/src/main/java/feast/core/model/JobInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ public class JobInfo extends AbstractTimestampEntity {
private String runner;

// Job options. Stored as a json string as it is specific to the runner.
@Column(name = "options")
private String options;
@Column(name = "source_options")
private String sourceOptions;

// Job options. Stored as a json string as it is specific to the runner.
@Column(name = "job_options")
private String jobOptions;

// Entities populated by the job
@ManyToMany
Expand Down Expand Up @@ -99,7 +103,8 @@ public JobInfo(
this.extId = extId;
this.type = importSpec.getType();
this.runner = runner;
this.options = TypeConversion.convertMapToJsonString(importSpec.getOptionsMap());
this.sourceOptions = TypeConversion.convertMapToJsonString(importSpec.getSourceOptionsMap());
this.jobOptions = TypeConversion.convertMapToJsonString(importSpec.getJobOptionsMap());
this.entities = new ArrayList<>();
for (String entity : importSpec.getEntitiesList()) {
EntityInfo entityInfo = new EntityInfo();
Expand Down
34 changes: 26 additions & 8 deletions core/src/main/java/feast/core/validators/SpecValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static feast.core.validators.Matchers.checkLowerSnakeCase;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import feast.core.dao.EntityInfoRepository;
Expand All @@ -40,6 +41,8 @@
import feast.specs.ImportSpecProto.ImportSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -256,6 +259,19 @@ public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException
entityInfoRepository.existsById(name),
Strings.lenientFormat("Entity %s not registered", name));
}
Map<String, String> jobOptions = spec.getJobOptionsMap();
if (jobOptions.size() > 0) {
List<String> opts = Lists.newArrayList(
"sample.limit",
"coalesceRows.enabled",
"coalesceRows.delaySeconds",
"coalesceRows.timeoutSeconds"
);
for (String key : jobOptions.keySet()) {
Preconditions.checkArgument(opts.contains(key),
Strings.lenientFormat("Option %s is not a valid jobOption", key));
}
}
} catch (NullPointerException | IllegalArgumentException e) {
throw new IllegalArgumentException(
Strings.lenientFormat("Validation for import spec failed: %s", e.getMessage()));
Expand All @@ -264,8 +280,8 @@ public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException

private void checkKafkaImportSpecOption(ImportSpec spec) {
try {
String topics = spec.getOptionsOrDefault("topics", "");
String server = spec.getOptionsOrDefault("server", "");
String topics = spec.getSourceOptionsOrDefault("topics", "");
String server = spec.getSourceOptionsOrDefault("server", "");
if (topics.equals("") && server.equals("")) {
throw new IllegalArgumentException(
"Kafka ingestion requires either topics or servers");
Expand All @@ -278,7 +294,8 @@ private void checkKafkaImportSpecOption(ImportSpec spec) {

private void checkFileImportSpecOption(ImportSpec spec) throws IllegalArgumentException {
try {
checkArgument(!spec.getOptionsOrDefault("path", "").equals(""), "File path cannot be empty");
checkArgument(!spec.getSourceOptionsOrDefault("path", "").equals(""),
"File path cannot be empty");
} catch (NullPointerException | IllegalArgumentException e) {
throw new IllegalArgumentException(
Strings.lenientFormat("Invalid options: %s", e.getMessage()));
Expand All @@ -287,8 +304,8 @@ private void checkFileImportSpecOption(ImportSpec spec) throws IllegalArgumentEx

private void checkPubSubImportSpecOption(ImportSpec spec) throws IllegalArgumentException {
try {
String topic = spec.getOptionsOrDefault("topic", "");
String subscription = spec.getOptionsOrDefault("subscription", "");
String topic = spec.getSourceOptionsOrDefault("topic", "");
String subscription = spec.getSourceOptionsOrDefault("subscription", "");
if (topic.equals("") && subscription.equals("")) {
throw new IllegalArgumentException(
"Pubsub ingestion requires either topic or subscription");
Expand All @@ -301,11 +318,12 @@ private void checkPubSubImportSpecOption(ImportSpec spec) throws IllegalArgument

private void checkBigqueryImportSpecOption(ImportSpec spec) throws IllegalArgumentException {
try {
checkArgument(!spec.getOptionsOrThrow("project").equals(""),
checkArgument(!spec.getSourceOptionsOrThrow("project").equals(""),
"Bigquery project cannot be empty");
checkArgument(!spec.getOptionsOrThrow("dataset").equals(""),
checkArgument(!spec.getSourceOptionsOrThrow("dataset").equals(""),
"Bigquery dataset cannot be empty");
checkArgument(!spec.getOptionsOrThrow("table").equals(""), "Bigquery table cannot be empty");
checkArgument(!spec.getSourceOptionsOrThrow("table").equals(""),
"Bigquery table cannot be empty");
} catch (NullPointerException | IllegalArgumentException e) {
throw new IllegalArgumentException(
Strings.lenientFormat("Invalid options: %s", e.getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public void getJobStatus_shouldUpdateJobInfoForRunningJob() {
"Streaming",
"DataflowRunner",
"",
"",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand Down Expand Up @@ -104,6 +105,7 @@ public void getJobMetrics_shouldPushToStatsDMetricPusherAndSaveNewMetricToDb() {
"Streaming",
"DataflowRunner",
"",
"",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand Down
11 changes: 5 additions & 6 deletions core/src/test/java/feast/core/model/JobInfoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ public void shouldInitialiseGivenJobIdAndSpec() throws InvalidProtocolBufferExce
.build();

ImportSpecProto.ImportSpec importSpec = ImportSpecProto.ImportSpec.newBuilder()
.setType("file")
.putOptions("format", "csv")
.putOptions("path", "gs://some/path")
.setType("file.csv")
.putSourceOptions("path", "gs://some/path")
.addEntities("entity")
.setSchema(schema)
.build();
Expand All @@ -52,9 +51,9 @@ public void shouldInitialiseGivenJobIdAndSpec() throws InvalidProtocolBufferExce
JobInfo expected = new JobInfo();
expected.setId("fake-job-id");
expected.setExtId("fake-ext-id");
expected.setType("file");
expected.setType("file.csv");
expected.setRunner("DataflowRunner");
expected.setOptions(TypeConversion.convertMapToJsonString(importSpec.getOptionsMap()));
expected.setSourceOptions(TypeConversion.convertMapToJsonString(importSpec.getSourceOptionsMap()));

List<EntityInfo> entities = new ArrayList<>();
EntityInfo entityInfo = new EntityInfo();
Expand All @@ -75,7 +74,7 @@ public void shouldInitialiseGivenJobIdAndSpec() throws InvalidProtocolBufferExce
assertThat(actual.getRunner(), equalTo(expected.getRunner()));
assertThat(actual.getEntities(), equalTo(expected.getEntities()));
assertThat(actual.getFeatures(), equalTo(expected.getFeatures()));
assertThat(actual.getOptions(), equalTo(expected.getOptions()));
assertThat(actual.getSourceOptions(), equalTo(expected.getSourceOptions()));
assertThat(actual.getRaw(), equalTo(expected.getRaw()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void shouldListAllJobDetails() {
"",
"",
"",
"",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand All @@ -84,6 +85,7 @@ public void shouldListAllJobDetails() {
"",
"",
"",
"",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand Down Expand Up @@ -121,6 +123,7 @@ public void shouldReturnDetailOfRequestedJobId() {
"",
"",
"",
"",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand Down
71 changes: 63 additions & 8 deletions core/src/test/java/feast/core/validators/SpecValidatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ public void fileImportSpecWithoutEntityIdColumnInSchemaShouldThrowIllegalArgumen
ImportSpec input =
ImportSpec.newBuilder()
.setType("file.csv")
.putOptions("path", "gs://asdasd")
.putSourceOptions("path", "gs://asdasd")
.build();
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
Expand All @@ -728,9 +728,9 @@ public void bigQueryImportSpecWithoutEntityIdColumnInSchemaShouldThrowIllegalArg
ImportSpec input =
ImportSpec.newBuilder()
.setType("bigquery")
.putOptions("project", "my-google-project")
.putOptions("dataset", "feast")
.putOptions("table", "feast")
.putSourceOptions("project", "my-google-project")
.putSourceOptions("dataset", "feast")
.putSourceOptions("table", "feast")
.build();
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
Expand All @@ -750,7 +750,7 @@ public void importSpecWithoutValidEntityShouldThrowIllegalArgumentException() {
ImportSpec input =
ImportSpec.newBuilder()
.setType("pubsub")
.putOptions("topic", "my/pubsub/topic")
.putSourceOptions("topic", "my/pubsub/topic")
.addEntities("someEntity")
.build();
exception.expect(IllegalArgumentException.class);
Expand All @@ -775,7 +775,7 @@ public void importSpecWithUnregisteredFeaturesShouldThrowIllegalArgumentExceptio
ImportSpec input =
ImportSpec.newBuilder()
.setType("pubsub")
.putOptions("topic", "my/pubsub/topic")
.putSourceOptions("topic", "my/pubsub/topic")
.setSchema(schema)
.addEntities("someEntity")
.build();
Expand All @@ -802,8 +802,63 @@ public void importSpecWithKafkaSourceAndCorrectOptionsShouldPassValidation() {
ImportSpec input =
ImportSpec.newBuilder()
.setType("kafka")
.putOptions("topics", "my-kafka-topic")
.putOptions("server", "localhost:54321")
.putSourceOptions("topics", "my-kafka-topic")
.putSourceOptions("server", "localhost:54321")
.setSchema(schema)
.addEntities("someEntity")
.build();
validator.validateImportSpec(input);
}

@Test
public void importSpecWithCoalesceJobOptionsShouldPassValidation() {
SpecValidator validator =
new SpecValidator(
storageInfoRepository,
entityInfoRepository,
featureGroupInfoRepository,
featureInfoRepository);
when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true);
when(entityInfoRepository.existsById("someEntity")).thenReturn(true);
Schema schema =
Schema.newBuilder()
.addFields(Field.newBuilder().setFeatureId("some_existing_feature").build())
.build();
ImportSpec input =
ImportSpec.newBuilder()
.setType("kafka")
.putSourceOptions("topics", "my-kafka-topic")
.putSourceOptions("server", "localhost:54321")
.putJobOptions("coalesceRows.enabled", "true")
.putJobOptions("coalesceRows.delaySeconds", "10000")
.putJobOptions("coalesceRows.timeoutSeconds", "20000")
.putJobOptions("sample.limit", "1000")
.setSchema(schema)
.addEntities("someEntity")
.build();
validator.validateImportSpec(input);
}

@Test
public void importSpecWithLimitJobOptionsShouldPassValidation() {
SpecValidator validator =
new SpecValidator(
storageInfoRepository,
entityInfoRepository,
featureGroupInfoRepository,
featureInfoRepository);
when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true);
when(entityInfoRepository.existsById("someEntity")).thenReturn(true);
Schema schema =
Schema.newBuilder()
.addFields(Field.newBuilder().setFeatureId("some_existing_feature").build())
.build();
ImportSpec input =
ImportSpec.newBuilder()
.setType("kafka")
.putSourceOptions("topics", "my-kafka-topic")
.putSourceOptions("server", "localhost:54321")
.putJobOptions("sample.limit", "1000")
.setSchema(schema)
.addEntities("someEntity")
.build();
Expand Down
Loading

0 comments on commit 5a6edc4

Please sign in to comment.