Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add PushSource proto and Python class #2428

Merged
merged 6 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";

// Defines a Data Source that can be used source Feature data
// Next available id: 22
// Next available id: 23
message DataSource {
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
// but they are going to be reserved for backwards compatibility.
reserved 6 to 10;

// Type of Data Source.
// Next available id: 9
// Next available id: 10
enum SourceType {
INVALID = 0;
BATCH_FILE = 1;
Expand All @@ -44,7 +44,7 @@ message DataSource {
STREAM_KINESIS = 4;
CUSTOM_SOURCE = 6;
REQUEST_SOURCE = 7;

PUSH_SOURCE = 9;
}

// Unique name of data source within the project
Expand Down Expand Up @@ -169,6 +169,16 @@ message DataSource {
map<string, feast.types.ValueType.Enum> schema = 2;
}

// Defines options for DataSource that supports pushing data to it. This allows data to be pushed to
// the online store on-demand, such as by stream consumers.
message PushOptions {
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 1;
// Optional batch source for the push source for historical features and materialization.
DataSource batch_source = 2;
}


// DataSource options.
oneof options {
FileOptions file_options = 11;
Expand All @@ -179,5 +189,6 @@ message DataSource {
RequestDataOptions request_data_options = 18;
CustomSourceOptions custom_options = 16;
SnowflakeOptions snowflake_options = 19;
PushOptions push_options = 22;
}
}
72 changes: 72 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,75 @@ def to_proto(self) -> DataSourceProto:
data_source_proto.date_partition_column = self.date_partition_column

return data_source_proto


class PushSource(DataSource):
"""
PushSource that can be used to ingest features on request

Args:
name: Name of the push source
schema: Schema mapping from the input feature name to a ValueType
"""

name: str
schema: Dict[str, ValueType]
batch_source: Optional[DataSource]

def __init__(
self,
name: str,
schema: Dict[str, ValueType],
batch_source: Optional[DataSource] = None,
):
"""Creates a PushSource object."""
super().__init__(name)
self.schema = schema
self.batch_source = batch_source

def validate(self, config: RepoConfig):
pass

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
pass

@staticmethod
def from_proto(data_source: DataSourceProto):
schema_pb = data_source.push_options.schema
schema = {}
for key, value in schema_pb.items():
schema[key] = value

batch_source = None
if data_source.push_options.HasField("batch_source"):
batch_source = DataSource.from_proto(data_source.push_options.batch_source)

return PushSource(
name=data_source.name, schema=schema, batch_source=batch_source
)

def to_proto(self) -> DataSourceProto:
schema_pb = {}
for key, value in self.schema.items():
schema_pb[key] = value
batch_source_proto = None
if self.batch_source:
batch_source_proto = self.batch_source.to_proto()

options = DataSourceProto.PushOptions(
schema=schema_pb, batch_source=batch_source_proto
)
data_source_proto = DataSourceProto(
name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options,
)

return data_source_proto

def get_table_query_string(self) -> str:
raise NotImplementedError

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError
34 changes: 34 additions & 0 deletions sdk/python/tests/unit/test_data_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from feast.data_source import PushSource
from feast.infra.offline_stores.bigquery_source import BigQuerySource
from feast.protos.feast.types.Value_pb2 import ValueType


def test_push_no_batch():
push_source = PushSource(
name="test", schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64}
)
push_source_proto = push_source.to_proto()
assert push_source_proto.push_options is not None
assert not push_source_proto.push_options.HasField("batch_source")
push_source_unproto = PushSource.from_proto(push_source_proto)

assert push_source.name == push_source_unproto.name
assert push_source.schema == push_source_unproto.schema
assert push_source.batch_source == push_source_unproto.batch_source


def test_push_with_batch():
push_source = PushSource(
name="test",
schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64},
batch_source=BigQuerySource(table="test.test"),
)
push_source_proto = push_source.to_proto()
assert push_source_proto.push_options is not None
assert push_source_proto.push_options.HasField("batch_source")

push_source_unproto = PushSource.from_proto(push_source_proto)

assert push_source.name == push_source_unproto.name
assert push_source.schema == push_source_unproto.schema
assert push_source.batch_source.name == push_source_unproto.batch_source.name