diff --git a/docs/specs/datastore_online_example.monopic b/docs/specs/datastore_online_example.monopic new file mode 100644 index 0000000000..c01354983e Binary files /dev/null and b/docs/specs/datastore_online_example.monopic differ diff --git a/docs/specs/datastore_online_example.png b/docs/specs/datastore_online_example.png new file mode 100644 index 0000000000..9c9b644d66 Binary files /dev/null and b/docs/specs/datastore_online_example.png differ diff --git a/docs/specs/online_store_format.md b/docs/specs/online_store_format.md index 8f27503e6c..97d508fb6e 100644 --- a/docs/specs/online_store_format.md +++ b/docs/specs/online_store_format.md @@ -59,20 +59,20 @@ Here's an example of how the entire thing looks like: However, we'll address this issue in future versions of the protocol. -## Cloud Firestore Online Store Format +## Google Datastore Online Store Format -[Firebase data model](https://firebase.google.com/docs/firestore/data-model) is a hierarchy of documents that can contain (sub)-collections. This structure can be multiple levels deep; documents and subcollections are alternating in this hierarchy. +[Datastore data model](https://cloud.google.com/datastore/docs/concepts/entities) is a collection of documents called Entities (not to be confused with Feast Entities). Documents can be organized in a hierarchy using Kinds. -We use the following structure to store feature data in the Firestore: -* at the first level, there is a collection for each Feast project -* second level, in each project-collection, there is a Firebase document for each Feature Table -* third level, in the document for the Feature Table, there is a subcollection called `values` that contain a document per feature row. That document contains the following fields: - * `key` contains entity key as serialized `feast.types.EntityKey` proto - * `values` contains feature name to value map, values serialized as `feast.types.Value` proto - * `event_ts` contains event timestamp (in the native firestore timestamp format) - * `created_ts` contains write timestamp (in the native firestore timestamp format) +We use the following structure to store feature data in Datastore: +* there is a Datastore Entity for each Feast Project, with Kind `Project` +* under that Datastore Entity, there is a Datastore Entity for each Feast Feature Table or View, with Kind `Table`. That contains one additional field, `created_ts` that contains the timestamp when this Datastore Entity was created. +* under that Datastore Entity, there is a Datastore Entity for each Feast Entity Key with Kind `Row`. That contains the following fields: + * `key` contains entity key as serialized `feast.types.EntityKey` proto + * `values` contains feature name to value map, values serialized as `feast.types.Value` proto + * `event_ts` contains event timestamp (in the datastore timestamp format) + * `created_ts` contains write timestamp (in the datastore timestamp format) -Document id for the feature document is computed by hashing entity key using murmurhash3_128 algorithm as follows: +The id for the `Row` Datastore Entity is computed by hashing entity key using murmurhash3_128 algorithm as follows: 1. hash entity names, sorted in alphanumeric order, by serializing them to bytes using the Value Serialization steps below 2. hash the entity values in the same order as corresponding entity names, by serializing them to bytes using the Value Serialization steps below @@ -90,7 +90,7 @@ Other types of entity keys are not supported in this version of the specificatio **Example:** -![Firestore Online Example](firebase_online_example.png) +![Datastore Online Example](datastore_online_example.png) # Appendix diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 0f76c6eeef..62aed28b99 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -11,9 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path from typing import Optional -from feast.repo_config import RepoConfig, load_repo_config +from feast.infra.provider import Provider, get_provider +from feast.registry import Registry +from feast.repo_config import ( + LocalOnlineStoreConfig, + OnlineStoreConfig, + RepoConfig, + load_repo_config, +) class FeatureStore: @@ -21,14 +29,29 @@ class FeatureStore: A FeatureStore object is used to define, create, and retrieve features. """ + config: RepoConfig + def __init__( - self, config_path: Optional[str], config: Optional[RepoConfig], + self, repo_path: Optional[str], config: Optional[RepoConfig], ): - if config_path is None or config is None: - raise Exception("You cannot specify both config_path and config") + if repo_path is not None and config is not None: + raise Exception("You cannot specify both repo_path and config") if config is not None: self.config = config - elif config_path is not None: - self.config = load_repo_config(config_path) + elif repo_path is not None: + self.config = load_repo_config(Path(repo_path)) else: - self.config = RepoConfig() + self.config = RepoConfig( + metadata_store="./metadata.db", + project="default", + provider="local", + online_store=OnlineStoreConfig( + local=LocalOnlineStoreConfig("online_store.db") + ), + ) + + def _get_provider(self) -> Provider: + return get_provider(self.config) + + def _get_registry(self) -> Registry: + return Registry(self.config.metadata_store) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 8c3882bb1e..70a78ee814 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -1,9 +1,16 @@ from datetime import datetime -from typing import List, Optional +from typing import Dict, List, Optional, Tuple + +import mmh3 +from pytz import utc from feast import FeatureTable from feast.infra.provider import Provider from feast.repo_config import DatastoreOnlineStoreConfig +from feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.types.Value_pb2 import Value as ValueProto + +from .key_encoding_utils import serialize_entity_key def _delete_all_values(client, key) -> None: @@ -11,7 +18,7 @@ def _delete_all_values(client, key) -> None: Delete all data under the key path in datastore. """ while True: - query = client.query(kind="Value", ancestor=key) + query = client.query(kind="Row", ancestor=key) entities = list(query.fetch(limit=1000)) if not entities: return @@ -21,19 +28,37 @@ def _delete_all_values(client, key) -> None: client.delete(entity.key) +def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str: + """ + Compute Datastore Entity id given Feast Entity Key. + + Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to + do with the Entity concept we have in Feast. + """ + return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() + + +def _make_tzaware(t: datetime): + """ We assume tz-naive datetimes are UTC """ + if t.tzinfo is None: + return t.replace(tzinfo=utc) + else: + return t + + class Gcp(Provider): - _project_id: Optional[str] + _gcp_project_id: Optional[str] def __init__(self, config: Optional[DatastoreOnlineStoreConfig]): if config: - self._project_id = config.project_id + self._gcp_project_id = config.project_id else: - self._project_id = None + self._gcp_project_id = None def _initialize_client(self): from google.cloud import datastore - if self._project_id is not None: + if self._gcp_project_id is not None: return datastore.Client(self.project_id) else: return datastore.Client() @@ -49,18 +74,18 @@ def update_infra( client = self._initialize_client() for table in tables_to_keep: - key = client.key("FeastProject", project, "FeatureTable", table.name) + key = client.key("Project", project, "Table", table.name) entity = datastore.Entity(key=key) - entity.update({"created_at": datetime.utcnow()}) + entity.update({"created_ts": datetime.utcnow()}) client.put(entity) for table in tables_to_delete: _delete_all_values( - client, client.key("FeastProject", project, "FeatureTable", table.name) + client, client.key("Project", project, "Table", table.name) ) # Delete the table metadata datastore entity - key = client.key("FeastProject", project, "FeatureTable", table.name) + key = client.key("Project", project, "Table", table.name) client.delete(key) def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: @@ -68,9 +93,69 @@ def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: for table in tables: _delete_all_values( - client, client.key("FeastProject", project, "FeatureTable", table.name) + client, client.key("Project", project, "Table", table.name) ) # Delete the table metadata datastore entity - key = client.key("FeastProject", project, "FeatureTable", table.name) + key = client.key("Project", project, "Table", table.name) client.delete(key) + + def online_write_batch( + self, + project: str, + table: FeatureTable, + data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]], + created_ts: datetime, + ) -> None: + from google.cloud import datastore + + client = self._initialize_client() + + for entity_key, features, timestamp in data: + document_id = compute_datastore_entity_id(entity_key) + + key = client.key( + "Project", project, "Table", table.name, "Row", document_id, + ) + with client.transaction(): + entity = client.get(key) + if entity is not None: + if entity["event_ts"] > _make_tzaware(timestamp): + # Do not overwrite feature values computed from fresher data + continue + elif entity["event_ts"] == _make_tzaware(timestamp) and entity[ + "created_ts" + ] > _make_tzaware(created_ts): + # Do not overwrite feature values computed from the same data, but + # computed later than this one + continue + else: + entity = datastore.Entity(key=key) + + entity.update( + dict( + key=entity_key.SerializeToString(), + values={k: v.SerializeToString() for k, v in features.items()}, + event_ts=_make_tzaware(timestamp), + created_ts=_make_tzaware(created_ts), + ) + ) + client.put(entity) + + def online_read( + self, project: str, table: FeatureTable, entity_key: EntityKeyProto + ) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: + client = self._initialize_client() + + document_id = compute_datastore_entity_id(entity_key) + key = client.key("Project", project, "Table", table.name, "Row", document_id) + value = client.get(key) + if value is not None: + res = {} + for feature_name, value_bin in value["values"].items(): + val = ValueProto() + val.ParseFromString(value_bin) + res[feature_name] = val + return value["event_ts"], res + else: + return None, None diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py new file mode 100644 index 0000000000..5fc9f118df --- /dev/null +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -0,0 +1,48 @@ +import struct +from typing import List, Tuple + +from feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.types.Value_pb2 import Value as ValueProto +from feast.types.Value_pb2 import ValueType + + +def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]: + if value_type == "string_val": + return v.string_val.encode("utf8"), ValueType.STRING + elif value_type == "bytes_val": + return v.bytes_val, ValueType.BYTES + elif value_type == "int32_val": + return struct.pack(" bytes: + """ + Serialize entity key to a bytestring so it can be used as a lookup key in a hash table. + + We need this encoding to be stable; therefore we cannot just use protobuf serialization + here since it does not guarantee that two proto messages containing the same data will + serialize to the same byte string[1]. + + [1] https://developers.google.com/protocol-buffers/docs/encoding + """ + sorted_keys, sorted_values = zip( + *sorted(zip(entity_key.entity_names, entity_key.entity_values)) + ) + + output: List[bytes] = [] + for k in sorted_keys: + output.append(struct.pack(" str: @@ -17,16 +21,24 @@ class LocalSqlite(Provider): def __init__(self, config: LocalOnlineStoreConfig): self._db_path = config.path + def _get_conn(self): + return sqlite3.connect( + self._db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES + ) + def update_infra( self, project: str, tables_to_delete: List[FeatureTable], tables_to_keep: List[FeatureTable], ): - conn = sqlite3.connect(self._db_path) + conn = self._get_conn() for table in tables_to_keep: conn.execute( - f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (key BLOB, value BLOB)" + f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))" + ) + conn.execute( + f"CREATE INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)} (entity_key);" ) for table in tables_to_delete: @@ -34,3 +46,66 @@ def update_infra( def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: os.unlink(self._db_path) + + def online_write_batch( + self, + project: str, + table: FeatureTable, + data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]], + created_ts: datetime, + ) -> None: + conn = self._get_conn() + with conn: + for entity_key, values, timestamp in data: + for feature_name, val in values.items(): + entity_key_bin = serialize_entity_key(entity_key) + conn.execute( + f"""INSERT INTO {_table_id(project, table)} + (entity_key, feature_name, value, event_ts, created_ts) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (entity_key, feature_name) DO UPDATE + SET value = ?, event_ts = ?, created_ts = ? + WHERE event_ts < ? OR (event_ts = ? AND created_ts < ?) + """, + ( + # INSERT + entity_key_bin, + feature_name, + val.SerializeToString(), + timestamp, + created_ts, + # SET + val.SerializeToString(), + timestamp, + created_ts, + # WHERE + timestamp, + timestamp, + created_ts, + ), + ) + + def online_read( + self, project: str, table: FeatureTable, entity_key: EntityKeyProto + ) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: + entity_key_bin = serialize_entity_key(entity_key) + + conn = self._get_conn() + cur = conn.cursor() + cur.execute( + f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = ?", + (entity_key_bin,), + ) + + res = {} + res_ts = None + for feature_name, val_bin, ts in cur.fetchall(): + val = ValueProto() + val.ParseFromString(val_bin) + res[feature_name] = val + res_ts = ts + + if not res: + return None, None + else: + return res_ts, res diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 52689e9543..4bb2cc383b 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -1,8 +1,11 @@ import abc -from typing import List +from datetime import datetime +from typing import Dict, List, Optional, Tuple from feast import FeatureTable from feast.repo_config import RepoConfig +from feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.types.Value_pb2 import Value as ValueProto class Provider(abc.ABC): @@ -34,6 +37,44 @@ def teardown_infra(self, project: str, tables: List[FeatureTable]): """ ... + @abc.abstractmethod + def online_write_batch( + self, + project: str, + table: FeatureTable, + data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]], + created_ts: datetime, + ) -> None: + """ + Write a batch of feature rows to the online store. This is a low level interface, not + expected to be used by the users directly. + + If a tz-naive timestamp is passed to this method, it is assumed to be UTC. + + Args: + project: Feast project name + table: Feast FeatureTable + data: a list of triplets containing Feature data. Each triplet contains an Entity Key, + a dict containing feature values, and event timestamp for the row. + created_ts: the created timestamp (typically set to current time), same value used for + all rows. + """ + ... + + @abc.abstractmethod + def online_read( + self, project: str, table: FeatureTable, entity_key: EntityKeyProto + ) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: + """ + Read feature values given an Entity Key. This is a low level interface, not + expected to be used by the users directly. + + Returns: + A tuple of event_ts for the row, and the feature data as a dict from feature names + to values. Values are returned as Value proto message. + """ + ... + def get_provider(config: RepoConfig) -> Provider: if config.provider == "gcp": diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 7660a7bcdd..a58bc4b8b7 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -46,6 +46,7 @@ "google", "kubernetes==12.0.*", "bindr", + "mmh3", ] # README file from Feast repo root directory diff --git a/sdk/python/tests/cli/online_read_write_test.py b/sdk/python/tests/cli/online_read_write_test.py new file mode 100644 index 0000000000..a78dc331e1 --- /dev/null +++ b/sdk/python/tests/cli/online_read_write_test.py @@ -0,0 +1,89 @@ +from datetime import datetime, timedelta +from pathlib import Path + +from feast.feature_store import FeatureStore +from feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.types.Value_pb2 import Value as ValueProto + + +def basic_rw_test(repo_path: Path, project_name: str) -> None: + """ + This is a provider-independent test suite for reading and writing from the online store, to + be used by provider-specific tests. + """ + store = FeatureStore(repo_path=repo_path, config=None) + registry = store._get_registry() + table = registry.get_feature_table(project=project_name, name="driver_locations") + + provider = store._get_provider() + + entity_key = EntityKeyProto( + entity_names=["driver"], entity_values=[ValueProto(int64_val=1)] + ) + + def _driver_rw_test(event_ts, created_ts, write, expect_read): + """ A helper function to write values and read them back """ + write_lat, write_lon = write + expect_lat, expect_lon = expect_read + provider.online_write_batch( + project=project_name, + table=table, + data=[ + ( + entity_key, + { + "lat": ValueProto(double_val=write_lat), + "lon": ValueProto(string_val=write_lon), + }, + event_ts, + ) + ], + created_ts=created_ts, + ) + + _, val = provider.online_read( + project=project_name, table=table, entity_key=entity_key + ) + assert val["lon"].string_val == expect_lon + assert abs(val["lat"].double_val - expect_lat) < 1e-6 + + """ 1. Basic test: write value, read it back """ + + time_1 = datetime.utcnow() + _driver_rw_test( + event_ts=time_1, created_ts=time_1, write=(1.1, "3.1"), expect_read=(1.1, "3.1") + ) + + """ Values with an older event_ts should not overwrite newer ones """ + time_2 = datetime.utcnow() + _driver_rw_test( + event_ts=time_1 - timedelta(hours=1), + created_ts=time_2, + write=(-1000, "OLD"), + expect_read=(1.1, "3.1"), + ) + + """ Values with an new event_ts should overwrite older ones """ + time_3 = datetime.utcnow() + _driver_rw_test( + event_ts=time_1 + timedelta(hours=1), + created_ts=time_3, + write=(1123, "NEWER"), + expect_read=(1123, "NEWER"), + ) + + """ created_ts is used as a tie breaker, using older created_ts here so no overwrite """ + _driver_rw_test( + event_ts=time_1 + timedelta(hours=1), + created_ts=time_3 - timedelta(hours=1), + write=(54321, "I HAVE AN OLDER created_ts SO I LOSE"), + expect_read=(1123, "NEWER"), + ) + + """ created_ts is used as a tie breaker, using older created_ts here so no overwrite """ + _driver_rw_test( + event_ts=time_1 + timedelta(hours=1), + created_ts=time_3 + timedelta(hours=1), + write=(96864, "I HAVE A NEWER created_ts SO I WIN"), + expect_read=(96864, "I HAVE A NEWER created_ts SO I WIN"), + ) diff --git a/sdk/python/tests/cli/test_cli_local.py b/sdk/python/tests/cli/test_cli_local.py index cb3628b953..ede04bf0b2 100644 --- a/sdk/python/tests/cli/test_cli_local.py +++ b/sdk/python/tests/cli/test_cli_local.py @@ -6,6 +6,7 @@ from typing import List from feast import cli +from tests.cli.online_read_write_test import basic_rw_test class CliRunner: @@ -50,5 +51,7 @@ def test_basic(self) -> None: result = runner.run(["apply", str(repo_path)], cwd=repo_path) assert result.returncode == 0 + basic_rw_test(repo_path, "foo") + result = runner.run(["teardown", str(repo_path)], cwd=repo_path) assert result.returncode == 0 diff --git a/sdk/python/tests/cli/test_datastore.py b/sdk/python/tests/cli/test_datastore.py index c2ef7d350e..4a64f670dc 100644 --- a/sdk/python/tests/cli/test_datastore.py +++ b/sdk/python/tests/cli/test_datastore.py @@ -10,6 +10,7 @@ import pytest from feast import cli +from tests.cli.online_read_write_test import basic_rw_test class CliRunner: @@ -59,5 +60,7 @@ def test_basic(self) -> None: result = runner.run(["apply", str(repo_path)], cwd=repo_path) assert result.returncode == 0 + basic_rw_test(repo_path, project_name=self._project_id) + result = runner.run(["teardown", str(repo_path)], cwd=repo_path) assert result.returncode == 0