Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 0.4.5 #477

Merged
merged 15 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Changelog

## [v0.4.5](https://github.com/gojek/feast/tree/v0.4.5) (2020-02-14)

[Full Changelog](https://github.com/gojek/feast/compare/v0.4.4...v0.4.5)

**Merged pull requests:**
- Use bzip2 compressed feature set json as pipeline option [\#466](https://github.com/gojek/feast/pull/466) ([khorshuheng](https://github.com/khorshuheng))
- Make redis key creation more determinisitic [\#471](https://github.com/gojek/feast/pull/471) ([zhilingc](https://github.com/zhilingc))
- Helm Chart Upgrades [\#458](https://github.com/gojek/feast/pull/458) ([Yanson](https://github.com/Yanson))
- Exclude version from grouping [\#441](https://github.com/gojek/feast/pull/441) ([khorshuheng](https://github.com/khorshuheng))
- Use concrete class for AvroCoder compatibility [\#465](https://github.com/gojek/feast/pull/465) ([zhilingc](https://github.com/zhilingc))
- Fix typo in split string length check [\#464](https://github.com/gojek/feast/pull/464) ([zhilingc](https://github.com/zhilingc))
- Update README.md and remove versions from Helm Charts [\#457](https://github.com/gojek/feast/pull/457) ([woop](https://github.com/woop))
- Deduplicate example notebooks [\#456](https://github.com/gojek/feast/pull/456) ([woop](https://github.com/woop))
- Allow users not to set max age for batch retrieval [\#446](https://github.com/gojek/feast/pull/446) ([zhilingc](https://github.com/zhilingc))

## [v0.4.4](https://github.com/gojek/feast/tree/v0.4.4) (2020-01-28)

[Full Changelog](https://github.com/gojek/feast/compare/v0.4.3...v0.4.4)
Expand Down
26 changes: 22 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,29 @@ my_model = ml.fit(data)
prediction = my_model.predict(fs.get_online_features(customer_features, customer_entities))
```

## Getting Started with Docker Compose
The following commands will start Feast in online-only mode.
```
git clone https://github.com/gojek/feast.git
cd feast/infra/docker-compose
cp .env.sample .env
docker-compose up -d
```

A [Jupyter Notebook](http://localhost:8888/tree/feast/examples) is now available to start using Feast.

Please see the links below to set up Feast for batch/historical serving with BigQuery.

## Important resources
* [Why Feast?](docs/why-feast.md)
* [Concepts](docs/concepts.md)
* [Installation](docs/getting-started/installing-feast.md)
* [Getting Help](docs/community.md)

Please refer to the official documentation at <https://docs.feast.dev>

* [Why Feast?](https://docs.feast.dev/why-feast)
* [Concepts](https://docs.feast.dev/concepts)
* [Installation](https://docs.feast.dev/installing-feast/overview)
* [Examples](https://github.com/gojek/feast/blob/master/examples/)
* [Change Log](https://github.com/gojek/feast/blob/master/CHANGELOG.md)
* [Slack (#Feast)](https://join.slack.com/t/kubeflow/shared_invite/enQtNDg5MTM4NTQyNjczLTdkNTVhMjg1ZTExOWI0N2QyYTQ2MTIzNTJjMWRiOTFjOGRlZWEzODc1NzMwNTMwM2EzNjY1MTFhODczNjk4MTk)

## Notice

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,20 @@
import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import feast.core.FeatureSetProto;
import feast.core.SourceProto;
import feast.core.StoreProto;
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.exception.JobExecutionException;
import feast.core.job.JobManager;
import feast.core.job.Runner;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Project;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.*;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -88,7 +85,12 @@ public Job startJob(Job job) {
job.getStore().toProto(),
false);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("Unable to start job %s", job.getId()), e);
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format(
"DataflowJobManager failed to START job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
}

Expand All @@ -103,12 +105,15 @@ public Job updateJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());

return submitDataflowJob(
job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true);

} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("Unable to update job %s", job.getId()), e);
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format(
"DataflowJobManager failed to UPDATE job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
}

Expand Down Expand Up @@ -210,13 +215,12 @@ private ImportOptions getPipelineOptions(
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
Printer printer = JsonFormat.printer();
List<String> featureSetsJson = new ArrayList<>();
for (FeatureSetProto.FeatureSet featureSet : featureSets) {
featureSetsJson.add(printer.print(featureSet.getSpec()));
}
pipelineOptions.setFeatureSetJson(featureSetsJson);
pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink)));

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());

pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets));
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setProject(projectId);
pipelineOptions.setUpdate(update);
pipelineOptions.setRunner(DataflowRunner.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@
package feast.core.job.direct;

import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import feast.core.FeatureSetProto;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.StoreProto;
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.exception.JobExecutionException;
import feast.core.job.JobManager;
import feast.core.job.Runner;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -92,17 +93,15 @@ public Job startJob(Job job) {
}

private ImportOptions getPipelineOptions(
List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink)
throws InvalidProtocolBufferException {
List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink) throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
Printer printer = JsonFormat.printer();
List<String> featureSetsJson = new ArrayList<>();
for (FeatureSetProto.FeatureSet featureSet : featureSets) {
featureSetsJson.add(printer.print(featureSet.getSpec()));
}
pipelineOptions.setFeatureSetJson(featureSetsJson);
pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink)));

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());

pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets));
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setRunner(DirectRunner.class);
pipelineOptions.setProject(""); // set to default value to satisfy validation
if (metrics.isEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job.option;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import feast.core.FeatureSetProto;
import feast.ingestion.options.OptionByteConverter;
import java.util.ArrayList;
import java.util.List;

public class FeatureSetJsonByteConverter
implements OptionByteConverter<List<FeatureSetProto.FeatureSet>> {

/**
* Convert list of feature sets to json strings joined by new line, represented as byte arrays
*
* @param featureSets List of feature set protobufs
* @return Byte array representation of the json strings
* @throws InvalidProtocolBufferException
*/
@Override
public byte[] toByte(List<FeatureSetProto.FeatureSet> featureSets)
throws InvalidProtocolBufferException {
JsonFormat.Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts();
List<String> featureSetsJson = new ArrayList<>();
for (FeatureSetProto.FeatureSet featureSet : featureSets) {
featureSetsJson.add(printer.print(featureSet.getSpec()));
}
return String.join("\n", featureSetsJson).getBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.services.dataflow.Dataflow;
Expand All @@ -44,14 +40,15 @@
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.exception.JobExecutionException;
import feast.core.job.Runner;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.*;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
Expand Down Expand Up @@ -131,8 +128,11 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {
expectedPipelineOptions.setAppName("DataflowJobManager");
expectedPipelineOptions.setJobName(jobName);
expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store)));

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());
expectedPipelineOptions.setFeatureSetJson(
Lists.newArrayList(printer.print(featureSet.getSpec())));
featureSetJsonCompressor.compress(Collections.singletonList(featureSet)));

ArgumentCaptor<ImportOptions> captor = ArgumentCaptor.forClass(ImportOptions.class);

Expand Down Expand Up @@ -170,7 +170,19 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {
// Assume the files that are staged are correct
expectedPipelineOptions.setFilesToStage(actualPipelineOptions.getFilesToStage());

assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString()));
assertThat(
actualPipelineOptions.getFeatureSetJson(),
equalTo(expectedPipelineOptions.getFeatureSetJson()));
assertThat(
actualPipelineOptions.getDeadLetterTableSpec(),
equalTo(expectedPipelineOptions.getDeadLetterTableSpec()));
assertThat(
actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost()));
assertThat(
actualPipelineOptions.getMetricsExporterType(),
equalTo(expectedPipelineOptions.getMetricsExporterType()));
assertThat(
actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson()));
assertThat(actual.getExtId(), equalTo(expectedExtJobId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,19 @@
import feast.core.StoreProto.Store.Subscription;
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.job.Runner;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -121,8 +126,11 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {
expectedPipelineOptions.setProject("");
expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store)));
expectedPipelineOptions.setProject("");

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());
expectedPipelineOptions.setFeatureSetJson(
Lists.newArrayList(printer.print(featureSet.getSpec())));
featureSetJsonCompressor.compress(Collections.singletonList(featureSet)));

String expectedJobId = "feast-job-0";
ArgumentCaptor<ImportOptions> pipelineOptionsCaptor =
Expand Down Expand Up @@ -150,7 +158,20 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {
expectedPipelineOptions.setOptionsId(
actualPipelineOptions.getOptionsId()); // avoid comparing this value

assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString()));
assertThat(
actualPipelineOptions.getFeatureSetJson(),
equalTo(expectedPipelineOptions.getFeatureSetJson()));
assertThat(
actualPipelineOptions.getDeadLetterTableSpec(),
equalTo(expectedPipelineOptions.getDeadLetterTableSpec()));
assertThat(
actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost()));
assertThat(
actualPipelineOptions.getMetricsExporterType(),
equalTo(expectedPipelineOptions.getMetricsExporterType()));
assertThat(
actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson()));

assertThat(jobStarted.getPipelineResult(), equalTo(mockPipelineResult));
assertThat(jobStarted.getJobId(), equalTo(expectedJobId));
assertThat(actual.getExtId(), equalTo(expectedJobId));
Expand Down
Loading