Skip to content

Commit

Permalink
Merge branch 'master' into feat/dynamo_db_online_write_read
Browse files Browse the repository at this point in the history
  • Loading branch information
TremaMiguel committed Mar 23, 2022
2 parents 5bc54d3 + 45db6dc commit c7ab086
Show file tree
Hide file tree
Showing 96 changed files with 3,129 additions and 2,072 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/java_master_only.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: 'true'
- uses: google-github-actions/setup-gcloud@master
- uses: google-github-actions/setup-gcloud@v0
with:
version: '290.0.1'
export_default_credentials: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/master_only.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ jobs:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@master
uses: google-github-actions/setup-gcloud@v0
with:
project_id: ${{ secrets.GCP_PROJECT_ID }}
service_account_key: ${{ secrets.GCP_SA_KEY }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@master
uses: google-github-actions/setup-gcloud@v0
with:
project_id: ${{ secrets.GCP_PROJECT_ID }}
service_account_key: ${{ secrets.GCP_SA_KEY }}
Expand Down Expand Up @@ -107,7 +107,7 @@ jobs:
VERSION_WITHOUT_PREFIX: ${{ needs.get-version.outputs.version_without_prefix }}
steps:
- uses: actions/checkout@v2
- uses: google-github-actions/setup-gcloud@master
- uses: google-github-actions/setup-gcloud@v0
with:
version: '290.0.1'
export_default_credentials: true
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ package-protos:
cp -r ${ROOT_DIR}/protos ${ROOT_DIR}/sdk/python/feast/protos

compile-protos-python:
python sdk/python/setup.py build_python_protos
cd sdk/python && python setup.py build_python_protos

install-python:
cd sdk/python && python -m piptools sync requirements/py$(PYTHON)-requirements.txt
Expand Down Expand Up @@ -92,7 +92,7 @@ format-python:
cd ${ROOT_DIR}/sdk/python; python -m black --target-version py37 feast tests

lint-python:
cd ${ROOT_DIR}/sdk/python; python -m mypy feast/ tests/
cd ${ROOT_DIR}/sdk/python; python -m mypy
cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only
cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/
cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests
Expand Down Expand Up @@ -133,7 +133,7 @@ install-protoc-dependencies:
pip install grpcio-tools==1.34.0

compile-protos-go: install-go-proto-dependencies install-protoc-dependencies
python sdk/python/setup.py build_go_protos
cd sdk/python && python setup.py build_go_protos

compile-go-feature-server: compile-protos-go
go mod tidy
Expand Down
5 changes: 3 additions & 2 deletions docs/how-to-guides/adding-a-new-offline-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ The OfflineStore class contains a couple of methods to read features from the of
There are two methods that deal with reading data from the offline stores`get_historical_features`and `pull_latest_from_table_or_query`.

* `pull_latest_from_table_or_query` is invoked when running materialization (using the `feast materialize` or `feast materialize-incremental` commands, or the corresponding `FeatureStore.materialize()` method. This method pull data from the offline store, and the `FeatureStore` class takes care of writing this data into the online store.
* `get_historical_features `is invoked when reading values from the offline store using the `FeatureStore.get_historica_features()` method. Typically, this method is used to retrieve features when training ML models.
* `get_historical_features` is invoked when reading values from the offline store using the `FeatureStore.get_historical_features()` method. Typically, this method is used to retrieve features when training ML models.
* `pull_all_from_table_or_query` is a method that pulls all the data from an offline store from a specified start date to a specified end date.

{% code title="feast_custom_offline_store/file.py" %}
```python
Expand Down Expand Up @@ -223,7 +224,7 @@ To use our custom file offline store, we can use the following `feature_store.ya
project: test_custom
registry: data/registry.db
provider: local
offline_store:
offline_store:
type: feast_custom_offline_store.file.CustomFileOfflineStore
```
{% endcode %}
Expand Down
3 changes: 2 additions & 1 deletion infra/scripts/release/files_to_bump.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ infra/charts/feast/charts/feature-server/values.yaml
infra/charts/feast/README.md
infra/charts/feast-python-server/Chart.yaml
infra/charts/feast-python-server/README.md
java/pom.xml
java/pom.xml
ui/package.json
19 changes: 16 additions & 3 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";

// Defines a Data Source that can be used source Feature data
// Next available id: 22
// Next available id: 23
message DataSource {
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
// but they are going to be reserved for backwards compatibility.
reserved 6 to 10;

// Type of Data Source.
// Next available id: 9
// Next available id: 10
enum SourceType {
INVALID = 0;
BATCH_FILE = 1;
Expand All @@ -44,7 +44,7 @@ message DataSource {
STREAM_KINESIS = 4;
CUSTOM_SOURCE = 6;
REQUEST_SOURCE = 7;

PUSH_SOURCE = 9;
}

// Unique name of data source within the project
Expand All @@ -71,6 +71,8 @@ message DataSource {

// This is an internal field that is represents the python class for the data source object a proto object represents.
// This should be set by feast, and not by users.
// The field is used primarily by custom data sources and is mandatory for them to set. Feast may set it for
// first party sources as well.
string data_source_class_type = 17;

// Defines options for DataSource that sources features from a file
Expand Down Expand Up @@ -169,6 +171,16 @@ message DataSource {
map<string, feast.types.ValueType.Enum> schema = 2;
}

// Defines options for DataSource that supports pushing data to it. This allows data to be pushed to
// the online store on-demand, such as by stream consumers.
message PushOptions {
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 1;
// Optional batch source for the push source for historical features and materialization.
DataSource batch_source = 2;
}


// DataSource options.
oneof options {
FileOptions file_options = 11;
Expand All @@ -179,5 +191,6 @@ message DataSource {
RequestDataOptions request_data_options = 18;
CustomSourceOptions custom_options = 16;
SnowflakeOptions snowflake_options = 19;
PushOptions push_options = 22;
}
}
1 change: 1 addition & 0 deletions sdk/python/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
recursive-include feast/protos/ *.py
include feast/binaries/*
recursive-include feast py.typed *.pyi
134 changes: 95 additions & 39 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
return kinesis_options_proto


_DATA_SOURCE_OPTIONS = {
DataSourceProto.SourceType.BATCH_FILE: "feast.infra.offline_stores.file_source.FileSource",
DataSourceProto.SourceType.BATCH_BIGQUERY: "feast.infra.offline_stores.bigquery_source.BigQuerySource",
DataSourceProto.SourceType.BATCH_REDSHIFT: "feast.infra.offline_stores.redshift_source.RedshiftSource",
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake_source.SnowflakeSource",
DataSourceProto.SourceType.STREAM_KAFKA: "feast.data_source.KafkaSource",
DataSourceProto.SourceType.STREAM_KINESIS: "feast.data_source.KinesisSource",
DataSourceProto.SourceType.REQUEST_SOURCE: "feast.data_source.RequestDataSource",
DataSourceProto.SourceType.PUSH_SOURCE: "feast.data_source.PushSource",
}


class DataSource(ABC):
"""
DataSource that can be used to source features.
Expand Down Expand Up @@ -210,48 +222,20 @@ def from_proto(data_source: DataSourceProto) -> Any:
Raises:
ValueError: The type of DataSource could not be identified.
"""
if data_source.data_source_class_type:
cls = get_data_source_class_from_type(data_source.data_source_class_type)
return cls.from_proto(data_source)

if data_source.request_data_options and data_source.request_data_options.schema:
data_source_obj = RequestDataSource.from_proto(data_source)
elif data_source.file_options.file_format and data_source.file_options.file_url:
from feast.infra.offline_stores.file_source import FileSource

data_source_obj = FileSource.from_proto(data_source)
elif (
data_source.bigquery_options.table_ref or data_source.bigquery_options.query
data_source_type = data_source.type
if not data_source_type or (
data_source_type
not in list(_DATA_SOURCE_OPTIONS.keys())
+ [DataSourceProto.SourceType.CUSTOM_SOURCE]
):
from feast.infra.offline_stores.bigquery_source import BigQuerySource

data_source_obj = BigQuerySource.from_proto(data_source)
elif data_source.redshift_options.table or data_source.redshift_options.query:
from feast.infra.offline_stores.redshift_source import RedshiftSource

data_source_obj = RedshiftSource.from_proto(data_source)

elif data_source.snowflake_options.table or data_source.snowflake_options.query:
from feast.infra.offline_stores.snowflake_source import SnowflakeSource

data_source_obj = SnowflakeSource.from_proto(data_source)

elif (
data_source.kafka_options.bootstrap_servers
and data_source.kafka_options.topic
and data_source.kafka_options.message_format
):
data_source_obj = KafkaSource.from_proto(data_source)
elif (
data_source.kinesis_options.record_format
and data_source.kinesis_options.region
and data_source.kinesis_options.stream_name
):
data_source_obj = KinesisSource.from_proto(data_source)
else:
raise ValueError("Could not identify the source type being added.")

return data_source_obj
if data_source_type == DataSourceProto.SourceType.CUSTOM_SOURCE:
cls = get_data_source_class_from_type(data_source.data_source_class_type)
return cls.from_proto(data_source)

cls = get_data_source_class_from_type(_DATA_SOURCE_OPTIONS[data_source_type])
return cls.from_proto(data_source)

@abstractmethod
def to_proto(self) -> DataSourceProto:
Expand Down Expand Up @@ -522,3 +506,75 @@ def to_proto(self) -> DataSourceProto:
data_source_proto.date_partition_column = self.date_partition_column

return data_source_proto


class PushSource(DataSource):
"""
PushSource that can be used to ingest features on request
Args:
name: Name of the push source
schema: Schema mapping from the input feature name to a ValueType
"""

name: str
schema: Dict[str, ValueType]
batch_source: Optional[DataSource]

def __init__(
self,
name: str,
schema: Dict[str, ValueType],
batch_source: Optional[DataSource] = None,
):
"""Creates a PushSource object."""
super().__init__(name)
self.schema = schema
self.batch_source = batch_source

def validate(self, config: RepoConfig):
pass

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
pass

@staticmethod
def from_proto(data_source: DataSourceProto):
schema_pb = data_source.push_options.schema
schema = {}
for key, value in schema_pb.items():
schema[key] = value

batch_source = None
if data_source.push_options.HasField("batch_source"):
batch_source = DataSource.from_proto(data_source.push_options.batch_source)

return PushSource(
name=data_source.name, schema=schema, batch_source=batch_source
)

def to_proto(self) -> DataSourceProto:
schema_pb = {}
for key, value in self.schema.items():
schema_pb[key] = value
batch_source_proto = None
if self.batch_source:
batch_source_proto = self.batch_source.to_proto()

options = DataSourceProto.PushOptions(
schema=schema_pb, batch_source=batch_source_proto
)
data_source_proto = DataSourceProto(
name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options,
)

return data_source_proto

def get_table_query_string(self) -> str:
raise NotImplementedError

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError
26 changes: 26 additions & 0 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,24 @@ def pull_latest_from_table_or_query(
end_date: datetime,
) -> RetrievalJob:
"""
This method pulls data from the offline store, and the FeatureStore class is used to write
this data into the online store. This method is invoked when running materialization (using
the `feast materialize` or `feast materialize-incremental` commands, or the corresponding
FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore
class is used to write this data into the online store.
Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column
have all already been mapped to column names of the source table and those column names are the values passed
into this function.
Args:
config: Repo configuration object
data_source: Data source to pull all of the columns from
join_key_columns: Columns of the join keys
feature_name_columns: Columns of the feature names needed
event_timestamp_column: Timestamp column
start_date: Starting date of query
end_date: Ending date of query
"""
pass

Expand Down Expand Up @@ -210,8 +225,19 @@ def pull_all_from_table_or_query(
end_date: datetime,
) -> RetrievalJob:
"""
Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.
Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column
have all already been mapped to column names of the source table and those column names are the values passed
into this function.
Args:
config: Repo configuration object
data_source: Data source to pull all of the columns from
join_key_columns: Columns of the join keys
feature_name_columns: Columns of the feature names needed
event_timestamp_column: Timestamp column
start_date: Starting date of query
end_date: Ending date of query
"""
pass
2 changes: 2 additions & 0 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ def _initialize_dynamodb_resource(region: str):
return boto3.resource("dynamodb", region_name=region)


# TODO(achals): This form of user-facing templating is experimental.
# Please refer to https://github.com/feast-dev/feast/issues/2438 before building on top of it,
def _get_table_name(
online_config: DynamoDBOnlineStoreConfig, config: RepoConfig, table: FeatureView
) -> str:
Expand Down
Empty file added sdk/python/feast/py.typed
Empty file.
3 changes: 3 additions & 0 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ def apply_data_source(
del registry.data_sources[idx]
data_source_proto = data_source.to_proto()
data_source_proto.project = project
data_source_proto.data_source_class_type = (
f"{data_source.__class__.__module__}.{data_source.__class__.__name__}"
)
registry.data_sources.append(data_source_proto)
if commit:
self.commit()
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/setup.cfg
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[isort]
src_paths = feast,tests
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
Expand All @@ -16,5 +17,5 @@ select = B,C,E,F,W,T4
exclude = .git,__pycache__,docs/conf.py,dist,feast/protos

[mypy]
files=feast,test
files=feast,tests
ignore_missing_imports=true
5 changes: 5 additions & 0 deletions sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ def run(self):
with open(f"{self.python_folder}/feast/{sub_folder}/__init__.py", 'w'):
pass

with open(f"{self.python_folder}/__init__.py", 'w'):
pass
with open(f"{self.python_folder}/feast/__init__.py", 'w'):
pass

for path in Path("feast/protos").rglob("*.py"):
for folder in self.sub_folders:
# Read in the file
Expand Down
Loading

0 comments on commit c7ab086

Please sign in to comment.