From feb6cb8518889288d6ddd97e4482db2f6b86eabd Mon Sep 17 00:00:00 2001 From: Stefano Lottini Date: Tue, 9 Aug 2022 18:45:17 +0200 Subject: [PATCH] feat: Add Cassandra/AstraDB online store contribution (#2873) * Cassandra online store * Refactor file-editing to a shared utils module * Use f-strings in the CassandraOnlineStoreCreator * Specify version 2 in serializing to make the entity key * Remove unnecessary empty comment lines * Rename proj to columns in _read_rows_by_entity_key * Introduce Cassandra-specific pytest targets * Adapt roadmaps and docs to cover/index Cassandra online store * Add license notes to code files Signed-off-by: Stefano Lottini * remove from main CI path and update entity key serialization in template Signed-off-by: Danny Chiao * revert makefile change Signed-off-by: Danny Chiao Co-authored-by: Kevin Zhang Co-authored-by: Danny Chiao --- CONTRIBUTING.md | 1 + Makefile | 25 + README.md | 2 +- docs/reference/online-stores/README.md | 4 + docs/reference/online-stores/cassandra.md | 61 ++ docs/roadmap.md | 2 +- docs/specs/online_store_format.md | 80 +++ sdk/python/docs/index.rst | 7 + .../feast.infra.offline_stores.contrib.rst | 20 +- ..._stores.contrib.cassandra_online_store.rst | 21 + .../feast.infra.online_stores.contrib.rst | 9 + sdk/python/docs/source/feast.rst | 8 + sdk/python/docs/source/index.rst | 7 + sdk/python/feast/cli.py | 2 +- sdk/python/feast/file_utils.py | 85 +++ .../contrib/cassandra_online_store/README.md | 131 +++++ .../cassandra_online_store/__init__.py | 0 .../cassandra_online_store.py | 547 ++++++++++++++++++ .../contrib/cassandra_repo_configuration.py | 26 + sdk/python/feast/repo_config.py | 1 + sdk/python/feast/repo_operations.py | 9 +- sdk/python/feast/templates/aws/bootstrap.py | 9 +- .../feast/templates/cassandra/__init__.py | 0 .../feast/templates/cassandra/bootstrap.py | 257 ++++++++ .../feast/templates/cassandra/example.py | 56 ++ .../templates/cassandra/feature_store.yaml | 17 + sdk/python/feast/templates/hbase/bootstrap.py | 11 +- sdk/python/feast/templates/local/bootstrap.py | 11 +- .../feast/templates/postgres/bootstrap.py | 9 +- .../feast/templates/snowflake/bootstrap.py | 9 +- .../universal/online_store/cassandra.py | 56 ++ setup.py | 6 + 32 files changed, 1432 insertions(+), 57 deletions(-) create mode 100644 docs/reference/online-stores/cassandra.md create mode 100644 sdk/python/docs/source/feast.infra.online_stores.contrib.cassandra_online_store.rst create mode 100644 sdk/python/feast/file_utils.py create mode 100644 sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md create mode 100644 sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/__init__.py create mode 100644 sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py create mode 100644 sdk/python/feast/infra/online_stores/contrib/cassandra_repo_configuration.py create mode 100644 sdk/python/feast/templates/cassandra/__init__.py create mode 100644 sdk/python/feast/templates/cassandra/bootstrap.py create mode 100644 sdk/python/feast/templates/cassandra/example.py create mode 100644 sdk/python/feast/templates/cassandra/feature_store.yaml create mode 100644 sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 43ab6a58b8..d16aa23d89 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -317,6 +317,7 @@ The services with containerized replacements currently implemented are: - Trino - HBase - Postgres +- Cassandra You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies. diff --git a/Makefile b/Makefile index 67be3ba248..35becab0cd 100644 --- a/Makefile +++ b/Makefile @@ -156,6 +156,31 @@ test-python-universal-postgres: not test_universal_types" \ sdk/python/tests +test-python-universal-cassandra: + PYTHONPATH='.' \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.cassandra_repo_configuration \ + FEAST_USAGE=False \ + IS_TEST=True \ + python -m pytest -x --integration \ + sdk/python/tests + +test-python-universal-cassandra-no-cloud-providers: + PYTHONPATH='.' \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.cassandra_repo_configuration \ + FEAST_USAGE=False \ + IS_TEST=True \ + python -m pytest -x --integration \ + -k "not test_lambda_materialization_consistency and \ + not test_apply_entity_integration and \ + not test_apply_feature_view_integration and \ + not test_apply_entity_integration and \ + not test_apply_feature_view_integration and \ + not test_apply_data_source_integration and \ + not test_nullable_online_store and \ + not gcs_registry and \ + not s3_registry" \ + sdk/python/tests + test-python-universal: FEAST_USAGE=False IS_TEST=True python -m pytest -n 8 --integration sdk/python/tests diff --git a/README.md b/README.md index 8ad37a83aa..9616c91e8c 100644 --- a/README.md +++ b/README.md @@ -177,7 +177,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [Azure Cache for Redis (community plugin)](https://github.com/Azure/feast-azure) * [x] [Postgres (contrib plugin)](https://docs.feast.dev/reference/online-stores/postgres) * [x] [Custom online store support](https://docs.feast.dev/how-to-guides/adding-support-for-a-new-online-store) - * [x] [Cassandra / AstraDB](https://github.com/datastaxdevs/feast-cassandra-online-store) + * [x] [Cassandra / AstraDB](https://docs.feast.dev/reference/online-stores/cassandra) * [ ] Bigtable (in progress) * **Feature Engineering** * [x] On-demand Transformations (Alpha release. See [RFC](https://docs.google.com/document/d/1lgfIw0Drc65LpaxbUu49RCeJgMew547meSJttnUqz7c/edit#)) diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index 8367e2ce74..b7e7d4e7ca 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -25,3 +25,7 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli {% content-ref url="postgres.md" %} [postgres.md](postgres.md) {% endcontent-ref %} + +{% content-ref url="cassandra.md" %} +[cassandra.md](cassandra.md) +{% endcontent-ref %} diff --git a/docs/reference/online-stores/cassandra.md b/docs/reference/online-stores/cassandra.md new file mode 100644 index 0000000000..7a83f905ed --- /dev/null +++ b/docs/reference/online-stores/cassandra.md @@ -0,0 +1,61 @@ +# Cassandra / Astra DB online store + +## Description + +The [Cassandra / Astra DB] online store provides support for materializing feature values into an Apache Cassandra / Astra DB database for online features. + +* The whole project is contained within a Cassandra keyspace +* Each feature view is mapped one-to-one to a specific Cassandra table +* This implementation inherits all strengths of Cassandra such as high availability, fault-tolerance, and data distribution + +An easy way to get started is the command `feast init REPO_NAME -t cassandra`. + +### Example (Cassandra) + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: cassandra + hosts: + - 192.168.1.1 + - 192.168.1.2 + - 192.168.1.3 + keyspace: KeyspaceName + port: 9042 # optional + username: user # optional + password: secret # optional + protocol_version: 5 # optional + load_balancing: # optional + local_dc: 'datacenter1' # optional + load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional +``` +{% endcode %} + +### Example (Astra DB) + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: cassandra + secure_bundle_path: /path/to/secure/bundle.zip + keyspace: KeyspaceName + username: Client_ID + password: Client_Secret + protocol_version: 4 # optional + load_balancing: # optional + local_dc: 'eu-central-1' # optional + load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional + +``` +{% endcode %} + +For a full explanation of configuration options please look at file +`sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md`. + +Storage specifications can be found at `docs/specs/online_store_format.md`. \ No newline at end of file diff --git a/docs/roadmap.md b/docs/roadmap.md index e481453dff..4e610aa172 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -35,7 +35,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [Azure Cache for Redis (community plugin)](https://github.com/Azure/feast-azure) * [x] [Postgres (contrib plugin)](https://docs.feast.dev/reference/online-stores/postgres) * [x] [Custom online store support](https://docs.feast.dev/how-to-guides/adding-support-for-a-new-online-store) - * [x] [Cassandra / AstraDB](https://github.com/datastaxdevs/feast-cassandra-online-store) + * [x] [Cassandra / AstraDB](https://docs.feast.dev/reference/online-stores/cassandra) * [ ] Bigtable (in progress) * **Feature Engineering** * [x] On-demand Transformations (Alpha release. See [RFC](https://docs.google.com/document/d/1lgfIw0Drc65LpaxbUu49RCeJgMew547meSJttnUqz7c/edit#)) diff --git a/docs/specs/online_store_format.md b/docs/specs/online_store_format.md index 9f901ae69c..5c3c545c8d 100644 --- a/docs/specs/online_store_format.md +++ b/docs/specs/online_store_format.md @@ -92,6 +92,86 @@ Other types of entity keys are not supported in this version of the specificatio ![Datastore Online Example](datastore_online_example.png) +## Cassandra/Astra DB Online Store Format + +### Overview + +Apache Cassandra™ is a table-oriented NoSQL distributed database. Astra DB is a managed database-as-a-service +built on Cassandra, and will be assimilated to the former in what follows. + +In Cassandra, tables are grouped in _keyspaces_ (groups of related tables). Each table is comprised of +_rows_, each containing data for a given set of _columns_. Moreover, rows are grouped in _partitions_ according +to a _partition key_ (a portion of the uniqueness-defining _primary key_ set of columns), so that all rows +with the same values for the partition key are guaranteed to be stored on the same Cassandra nodes, next to each other, +which guarantees fast retrieval times. + +This architecture makes Cassandra a good fit for an online feature store in Feast. + +### Cassandra Online Store Format + +Each project (denoted by its name, called "feature store name" elsewhere) may contain an +arbitrary number of `FeatureView`s: these correspond each to a specific table, and +all tables for a project are to be contained in a single keyspace. The keyspace should +have been created by the Feast user preliminarly and is to be specified in the feature store +configuration `yaml`. + +The table for a project `project` and feature view `FeatureView` will have name +`project_FeatureView` (e.g. `feature_repo_driver_hourly_stats`). + +All tables have the same structure. Cassandra is schemaful and the columns are strongly typed. +In the following table schema (which also serves as Chebotko diagram) the Python +and Cassandra data types are both specified: + +|Table: |``_`` | | _(Python type)_ | +|---------------|-----------------------------|--|----------------------| +|`entity_key` |`TEXT` |K | `str` | +|`feature_name` |`TEXT` |C↑| `str` | +|`value` |`BLOB` | | `bytes` | +|`event_ts` |`TIMESTAMP` | | `datetime.datetime` | +|`created_ts` |`TIMESTAMP` | | `datetime.datetime` | + +Each row in the table represents a single value for a feature in a feature view, +thus associated to a specific entity. The choice of partitioning ensures that, +within a given feature view (i.e. a single table), for a given entity any number +of features can be retrieved with a single, best-practice-respecting query +(which is what happens in the `online_read` method implementation). + + +The `entity_key` column is computed as `serialize_entity_key(entityKey).hex()`, +where `entityKey` is of type `feast.protos.feast.types.EntityKey_pb2.EntityKey`. + +The value of `feature_name` is the plain-text name of the feature as defined +in the corresponding `FeatureView`. + +For `value`, the bytes from `[protoValue].SerializeToString()` +are used, where `protoValue` is of type `feast.protos.feast.types.Value_pb2.Value`. + +Column `event_ts` stores the timestamp the feature value refers to, as passed +to the store method. Conversely, column `created_ts`, meant to store the write +time for the entry, is now being deprecated and will be never written by this +online-store implementation. Thanks to the internal storage mechanism of Cassandra, +this does not incur a noticeable performance penalty (hence, for the time being, +the column can be maintained in the schema). + +### Example entry + +For a project `feature_repo` and feature view named `driver_hourly_stats`, +a typical row in table `feature_repo_driver_hourly_stats` might look like: + +|Column |content | notes | +|---------------|-----------------------------------------------------|-------------------------------------------------------------------| +|`entity_key` |`020000006472697665725f69640400000004000000ea030000` | from `"driver_id = 1002"` | +|`feature_name` |`conv_rate` | | +|`value` |`0x35f5696d3f` | from `float_val: 0.9273980259895325`, i.e. `(b'5\xf5im?').hex()` | +|`event_ts` |`2022-07-07 09:00:00.000000+0000` | from `datetime.datetime(2022, 7, 7, 9, 0)` | +|`created_ts` |`null` | not explicitly written to avoid unnecessary tombstones | + +### Known Issues + +If a `FeatureView` ever gets _re-defined_ in a schema-breaking way, the implementation is not able to rearrange the +schema of the underlying table accordingly (neither dropping all data nor, even less so, keeping it somehow). +This should never occur, lest one encounters all sorts of data-retrieval issues anywhere in Feast usage. + # Appendix ##### Appendix A. Value proto format. diff --git a/sdk/python/docs/index.rst b/sdk/python/docs/index.rst index 07b9d9a77e..4d17f2b05b 100644 --- a/sdk/python/docs/index.rst +++ b/sdk/python/docs/index.rst @@ -287,6 +287,13 @@ HBase Online Store :members: :noindex: +Cassandra Online Store +----------------------- + +.. automodule:: feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store + :members: + :noindex: + Batch Materialization Engine ============================ diff --git a/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst b/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst index 39902da130..80916046b5 100644 --- a/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst @@ -14,18 +14,26 @@ Subpackages Submodules ---------- -feast.infra.offline\_stores.contrib.contrib\_repo\_configuration module ------------------------------------------------------------------------ +feast.infra.offline\_stores.contrib.postgres\_repo\_configuration module +------------------------------------------------------------------------ -.. automodule:: feast.infra.offline_stores.contrib.contrib_repo_configuration +.. automodule:: feast.infra.offline_stores.contrib.postgres_repo_configuration :members: :undoc-members: :show-inheritance: -feast.infra.offline\_stores.contrib.postgres\_repo\_configuration module ------------------------------------------------------------------------- +feast.infra.offline\_stores.contrib.spark\_repo\_configuration module +--------------------------------------------------------------------- -.. automodule:: feast.infra.offline_stores.contrib.postgres_repo_configuration +.. automodule:: feast.infra.offline_stores.contrib.spark_repo_configuration + :members: + :undoc-members: + :show-inheritance: + +feast.infra.offline\_stores.contrib.trino\_repo\_configuration module +--------------------------------------------------------------------- + +.. automodule:: feast.infra.offline_stores.contrib.trino_repo_configuration :members: :undoc-members: :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.cassandra_online_store.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.cassandra_online_store.rst new file mode 100644 index 0000000000..3770cc8af7 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.cassandra_online_store.rst @@ -0,0 +1,21 @@ +feast.infra.online\_stores.contrib.cassandra\_online\_store package +=================================================================== + +Submodules +---------- + +feast.infra.online\_stores.contrib.cassandra\_online\_store.cassandra\_online\_store module +------------------------------------------------------------------------------------------- + +.. automodule:: feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.online_stores.contrib.cassandra_online_store + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst index 7315bb741e..ca07b356d6 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst @@ -7,11 +7,20 @@ Subpackages .. toctree:: :maxdepth: 4 + feast.infra.online_stores.contrib.cassandra_online_store feast.infra.online_stores.contrib.hbase_online_store Submodules ---------- +feast.infra.online\_stores.contrib.cassandra\_repo\_configuration module +------------------------------------------------------------------------ + +.. automodule:: feast.infra.online_stores.contrib.cassandra_repo_configuration + :members: + :undoc-members: + :show-inheritance: + feast.infra.online\_stores.contrib.hbase\_repo\_configuration module -------------------------------------------------------------------- diff --git a/sdk/python/docs/source/feast.rst b/sdk/python/docs/source/feast.rst index b1fb70a362..11fbde83e5 100644 --- a/sdk/python/docs/source/feast.rst +++ b/sdk/python/docs/source/feast.rst @@ -169,6 +169,14 @@ feast.field module :undoc-members: :show-inheritance: +feast.file\_utils module +------------------------ + +.. automodule:: feast.file_utils + :members: + :undoc-members: + :show-inheritance: + feast.flags\_helper module -------------------------- diff --git a/sdk/python/docs/source/index.rst b/sdk/python/docs/source/index.rst index 07b9d9a77e..4d17f2b05b 100644 --- a/sdk/python/docs/source/index.rst +++ b/sdk/python/docs/source/index.rst @@ -287,6 +287,13 @@ HBase Online Store :members: :noindex: +Cassandra Online Store +----------------------- + +.. automodule:: feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store + :members: + :noindex: + Batch Materialization Engine ============================ diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 8c1f988e36..c6a301e958 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -590,7 +590,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List "--template", "-t", type=click.Choice( - ["local", "gcp", "aws", "snowflake", "spark", "postgres", "hbase"], + ["local", "gcp", "aws", "snowflake", "spark", "postgres", "hbase", "cassandra"], case_sensitive=False, ), help="Specify a template for the created project", diff --git a/sdk/python/feast/file_utils.py b/sdk/python/feast/file_utils.py new file mode 100644 index 0000000000..0a3b614dd4 --- /dev/null +++ b/sdk/python/feast/file_utils.py @@ -0,0 +1,85 @@ +# +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +def replace_str_in_file(file_path, match_str, sub_str): + """ + Replace a string, in-place, in a text file, throughout. + Does not return anything, side-effect only. + Inputs are: + file_path, a string with the path to the ascii file to edit + match_str, the substring to be replaced (as many times as it's found) + sub_str, the string to insert in place of match_str + NOTE: not suitable for very large files (it does all in-memory). + """ + with open(file_path, "r") as f: + contents = f.read() + contents = contents.replace(match_str, sub_str) + with open(file_path, "wt") as f: + f.write(contents) + + +def remove_lines_from_file(file_path, match_str, partial=True): + """ + Edit an ascii file (in-place) by removing all lines that + match a given string (partially or totally). + Does not return anything, side-effect only. + Inputs are: + file_path, a string with the path to the ascii file to edit + match_str, the string to look for in the file lines + partial, a boolean: if True, any line with match_str as substring + will be removed; if False, only lines matching it entirely. + NOTE: not suitable for very large files (it does all in-memory). + """ + + def _line_matcher(line, _m=match_str, _p=partial): + if _p: + return _m in line + else: + return _m == line + + with open(file_path, "r") as f: + file_lines = list(f.readlines()) + + new_file_lines = [line for line in file_lines if not _line_matcher(line)] + + with open(file_path, "wt") as f: + f.write("".join(new_file_lines)) + + +def write_setting_or_remove( + file_path, setting_value, setting_name, setting_placeholder_value +): + """ + Utility to adapt a settings-file template to some provided values. + Assumes the file has lines such as + " username: c_username" + (quotes excluded) where the placeholder might be replaced with actual value + or the line might not be needed altogether. + Then, calling + write_settings_or_remove(file_path, new_username, 'username', 'c_username') + the file is edited in-place in one of two ways: + 1. if new_username is None, the line disappears completely + 2. if e.g. new_username == 'jenny', the line becomes + " username: jenny" + This utility is called repeatedly (a bit inefficiently, admittedly) + to refine the template feature-store yaml config to suit the parameters + supplied during a "feast init" feature store setup. + """ + if setting_value is not None: + replace_str_in_file(file_path, setting_placeholder_value, str(setting_value)) + else: + remove_lines_from_file(file_path, setting_name) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md new file mode 100644 index 0000000000..3dea1917aa --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md @@ -0,0 +1,131 @@ +# Cassandra/Astra DB Online Store + +This contribution makes it possible to use [Apache Cassandra™](https://cassandra.apache.org) / +[Astra DB](https://astra.datastax.com/) as online store for Feast. + +Once the database connection and the keyspace are configured, everything else +is handled as with any other online store: table creation, +read/write from/to table and table destruction. + +## Quick usage + +The following refers to the [Feast quickstart](https://docs.feast.dev/getting-started/quickstart) page. Only +Step 2 ("Create a feature repository") is slightly different, as it involves +a bit of specific configuration about the Astra DB / Cassandra cluster you +are going to use. + +It will be assumed that Feast has been installed in your system. + +### Creating the feature repository + +The easiest way to get started is to use the Feast CLI to initialize a new +feature store. Once Feast is installed, the command + +``` +feast init FEATURE_STORE_NAME -t cassandra +``` + +will interactively help you create the `feature_store.yaml` with the +required configuration details to access your Cassandra / Astra DB instance. + +Alternatively, you can run `feast init -t FEATURE_STORE_NAME`, as described +in the quickstart, and then manually edit the `online_store` key in +the `feature_store.yaml` file as detailed below. + +The following steps (setup of feature definitions, deployment of the store, +generation of training data, materialization, fetching of online/offline +features) proceed exactly as in the general Feast quickstart instructions. + +#### Cassandra setup + +The only required settings are `hosts` and `type`. The port number +is to be provided only if different than the default (9042), +and username/password only if the database requires authentication. + +```yaml +[...] +online_store: + type: cassandra + hosts: + - 192.168.1.1 + - 192.168.1.2 + - 192.168.1.3 + keyspace: KeyspaceName + port: 9042 # optional + username: user # optional + password: secret # optional + protocol_version: 5 # optional + load_balancing: # optional + local_dc: 'datacenter1' # optional + load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional +``` + +#### Astra DB setup: + +To point Feast to using an Astra DB instance as online store, an +[Astra DB token](https://awesome-astra.github.io/docs/pages/astra/create-token/#c-procedure) +with "Database Administrator" role is required: provide the Client ID and +Client Secret in the token as username and password. + +The +["secure connect bundle"](https://awesome-astra.github.io/docs/pages/astra/download-scb/#c-procedure) +for connecting to the database is also needed: +its full path must be given in the configuration below: + +```yaml +[...] +online_store: + type: cassandra + secure_bundle_path: /path/to/secure/bundle.zip + keyspace: KeyspaceName + username: Client_ID + password: Client_Secret + protocol_version: 4 # optional + load_balancing: # optional + local_dc: 'eu-central-1' # optional + load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional +``` + +#### Protocol version and load-balancing settings + +Whether on Astra DB or Cassandra, there are some optional settings in the +store definition yaml: + +```yaml + [...] + protocol_version: 5 # optional + load_balancing: # optional + local_dc: 'datacenter1' # optional + load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional +``` + +If you specify a protocol version (4 for `Astra DB` as of June 2022, 5 for `Cassandra 4.*`), +you avoid the drivers having to negotiate it on their own, thus speeding up initialization +time (and reducing the `INFO` messages being logged). See [this page](https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/#cassandra.ProtocolVersion) for a listing +of protocol versions. + +You should provide the load-balancing properties as well (the reference datacenter +to use for the connection and the load-balancing policy to use). In a future version +of the driver, according to the warnings issued in the logs, this will become mandatory. +The former parameter is a region name for Astra DB instances (as can be verified on the Astra DB UI). +See the source code of the online store integration for the allowed values of +the latter parameter. + +### More info + +For a more detailed walkthrough, please see the +[Awesome Astra](https://awesome-astra.github.io/docs/pages/tools/integration/feast/) +page on the Feast integration. + +## Features + +The plugin leverages the architecture of Cassandra for optimal performance: + +- table partitioning tailored to data access pattern; +- prepared statements. + +#### Credits + +The author of this plugin acknowledges prior exploratory work by +[`hamzakpt`](https://github.com/hamzakpt) and Brian Mortimore, +on which this implementation is loosely based. diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/__init__.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py new file mode 100644 index 0000000000..ee0cb19fef --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -0,0 +1,547 @@ +# +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Cassandra/Astra DB online store for Feast. +""" + +import logging +from datetime import datetime +from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple + +from cassandra.auth import PlainTextAuthProvider +from cassandra.cluster import ( + EXEC_PROFILE_DEFAULT, + Cluster, + ExecutionProfile, + ResultSet, + Session, +) +from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy +from cassandra.query import PreparedStatement +from pydantic import StrictInt, StrictStr +from pydantic.typing import Literal + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel +from feast.usage import log_exceptions_and_usage, tracing_span + +# Error messages +E_CASSANDRA_UNEXPECTED_CONFIGURATION_CLASS = ( + "Unexpected configuration object (not a CassandraOnlineStoreConfig instance)" +) +E_CASSANDRA_NOT_CONFIGURED = ( + "Inconsistent Cassandra configuration: provide exactly one between " + "'hosts' and 'secure_bundle_path' and a 'keyspace'" +) +E_CASSANDRA_MISCONFIGURED = ( + "Inconsistent Cassandra configuration: provide either 'hosts' or " + "'secure_bundle_path', not both" +) +E_CASSANDRA_INCONSISTENT_AUTH = ( + "Username and password for Cassandra must be provided either both or none" +) +E_CASSANDRA_UNKNOWN_LB_POLICY = ( + "Unknown/unsupported Load Balancing Policy name in Cassandra configuration" +) + +# CQL command templates (that is, before replacing schema names) +INSERT_CQL_4_TEMPLATE = ( + "INSERT INTO {fqtable} (feature_name," + " value, entity_key, event_ts) VALUES" + " (?, ?, ?, ?);" +) + +SELECT_CQL_TEMPLATE = "SELECT {columns} FROM {fqtable} WHERE entity_key = ?;" + +CREATE_TABLE_CQL_TEMPLATE = """ + CREATE TABLE IF NOT EXISTS {fqtable} ( + entity_key TEXT, + feature_name TEXT, + value BLOB, + event_ts TIMESTAMP, + created_ts TIMESTAMP, + PRIMARY KEY ((entity_key), feature_name) + ) WITH CLUSTERING ORDER BY (feature_name ASC); +""" + +DROP_TABLE_CQL_TEMPLATE = "DROP TABLE IF EXISTS {fqtable};" + +# op_name -> (cql template string, prepare boolean) +CQL_TEMPLATE_MAP = { + # Queries/DML, statements to be prepared + "insert4": (INSERT_CQL_4_TEMPLATE, True), + "select": (SELECT_CQL_TEMPLATE, True), + # DDL, do not prepare these + "drop": (DROP_TABLE_CQL_TEMPLATE, False), + "create": (CREATE_TABLE_CQL_TEMPLATE, False), +} + +# Logger +logger = logging.getLogger(__name__) + + +class CassandraInvalidConfig(Exception): + def __init__(self, msg: str): + super().__init__(msg) + + +class CassandraOnlineStoreConfig(FeastConfigBaseModel): + """ + Configuration for the Cassandra/Astra DB online store. + + Exactly one of `hosts` and `secure_bundle_path` must be provided; + depending on which one, the connection will be to a regular Cassandra + or an Astra DB instance (respectively). + + If connecting to Astra DB, authentication must be provided with username + and password being the Client ID and Client Secret of the database token. + """ + + type: Literal["cassandra"] = "cassandra" + """Online store type selector.""" + + # settings for connection to Cassandra / Astra DB + + hosts: Optional[List[StrictStr]] = None + """List of host addresses to reach the cluster.""" + + secure_bundle_path: Optional[StrictStr] = None + """Path to the secure connect bundle (for Astra DB; replaces hosts).""" + + port: Optional[StrictInt] = None + """Port number for connecting to the cluster (optional).""" + + keyspace: StrictStr = "feast_keyspace" + """Target Cassandra keyspace where all tables will be.""" + + username: Optional[StrictStr] = None + """Username for DB auth, possibly Astra DB token Client ID.""" + + password: Optional[StrictStr] = None + """Password for DB auth, possibly Astra DB token Client Secret.""" + + protocol_version: Optional[StrictInt] = None + """Explicit specification of the CQL protocol version used.""" + + class CassandraLoadBalancingPolicy(FeastConfigBaseModel): + """ + Configuration block related to the Cluster's load-balancing policy. + """ + + load_balancing_policy: StrictStr + """ + A stringy description of the load balancing policy to instantiate + the cluster with. Supported values: + "DCAwareRoundRobinPolicy" + "TokenAwarePolicy(DCAwareRoundRobinPolicy)" + """ + + local_dc: StrictStr = "datacenter1" + """The local datacenter, usually necessary to create the policy.""" + + load_balancing: Optional[CassandraLoadBalancingPolicy] = None + """ + Details on the load-balancing policy: it will be + wrapped into an execution profile if present. + """ + + +class CassandraOnlineStore(OnlineStore): + """ + Cassandra/Astra DB online store implementation for Feast. + + Attributes: + _cluster: Cassandra cluster to connect to. + _session: (DataStax Cassandra drivers) session object + to issue commands. + _keyspace: Cassandra keyspace all tables live in. + _prepared_statements: cache of statements prepared by the driver. + """ + + _cluster: Cluster = None + _session: Session = None + _keyspace: str = "feast_keyspace" + _prepared_statements: Dict[str, PreparedStatement] = {} + + def _get_session(self, config: RepoConfig): + """ + Establish the database connection, if not yet created, + and return it. + + Also perform basic config validation checks. + """ + + online_store_config = config.online_store + if not isinstance(online_store_config, CassandraOnlineStoreConfig): + raise CassandraInvalidConfig(E_CASSANDRA_UNEXPECTED_CONFIGURATION_CLASS) + + if self._session: + return self._session + if not self._session: + # configuration consistency checks + hosts = online_store_config.hosts + secure_bundle_path = online_store_config.secure_bundle_path + port = online_store_config.port or 9042 + keyspace = online_store_config.keyspace + username = online_store_config.username + password = online_store_config.password + protocol_version = online_store_config.protocol_version + + db_directions = hosts or secure_bundle_path + if not db_directions or not keyspace: + raise CassandraInvalidConfig(E_CASSANDRA_NOT_CONFIGURED) + if hosts and secure_bundle_path: + raise CassandraInvalidConfig(E_CASSANDRA_MISCONFIGURED) + if (username is None) ^ (password is None): + raise CassandraInvalidConfig(E_CASSANDRA_INCONSISTENT_AUTH) + + if username is not None: + auth_provider = PlainTextAuthProvider( + username=username, + password=password, + ) + else: + auth_provider = None + + # handling of load-balancing policy (optional) + if online_store_config.load_balancing: + # construct a proper execution profile embedding + # the configured LB policy + _lbp_name = online_store_config.load_balancing.load_balancing_policy + if _lbp_name == "DCAwareRoundRobinPolicy": + lb_policy = DCAwareRoundRobinPolicy( + local_dc=online_store_config.load_balancing.local_dc, + ) + elif _lbp_name == "TokenAwarePolicy(DCAwareRoundRobinPolicy)": + lb_policy = TokenAwarePolicy( + DCAwareRoundRobinPolicy( + local_dc=online_store_config.load_balancing.local_dc, + ) + ) + else: + raise CassandraInvalidConfig(E_CASSANDRA_UNKNOWN_LB_POLICY) + + # wrap it up in a map of ex.profiles with a default + exe_profile = ExecutionProfile(load_balancing_policy=lb_policy) + execution_profiles = {EXEC_PROFILE_DEFAULT: exe_profile} + else: + execution_profiles = None + + # additional optional keyword args to Cluster + cluster_kwargs = { + k: v + for k, v in { + "protocol_version": protocol_version, + "execution_profiles": execution_profiles, + }.items() + if v is not None + } + + # creation of Cluster (Cassandra vs. Astra) + if hosts: + self._cluster = Cluster( + hosts, port=port, auth_provider=auth_provider, **cluster_kwargs + ) + else: + # we use 'secure_bundle_path' + self._cluster = Cluster( + cloud={"secure_connect_bundle": secure_bundle_path}, + auth_provider=auth_provider, + **cluster_kwargs, + ) + + # creation of Session + self._keyspace = keyspace + self._session = self._cluster.connect(self._keyspace) + + return self._session + + def __del__(self): + """ + One may be tempted to reclaim resources and do, here: + if self._session: + self._session.shutdown() + But *beware*, DON'T DO THIS. + Indeed this could destroy the session object before some internal + tasks runs in other threads (this is handled internally in the + Cassandra driver). + You'd get a RuntimeError "cannot schedule new futures after shutdown". + """ + pass + + @log_exceptions_and_usage(online_store="cassandra") + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Write a batch of features of several entities to the database. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + data: a list of quadruplets containing Feature data. Each + quadruplet contains an Entity Key, a dict containing feature + values, an event timestamp for the row, and + the created timestamp for the row if it exists. + progress: Optional function to be called once every mini-batch of + rows is written to the online store. Can be used to + display progress. + """ + project = config.project + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, entity_key_serialization_version=2 + ).hex() + with tracing_span(name="remote_call"): + self._write_rows( + config, + project, + table, + entity_key_bin, + values.items(), + timestamp, + created_ts, + ) + if progress: + progress(1) + + @log_exceptions_and_usage(online_store="cassandra") + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Read feature values pertaining to the requested entities from + the online store. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + entity_keys: a list of entity keys that should be read + from the FeatureStore. + """ + project = config.project + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + + for entity_key in entity_keys: + entity_key_bin = serialize_entity_key( + entity_key, entity_key_serialization_version=2 + ).hex() + + with tracing_span(name="remote_call"): + feature_rows = self._read_rows_by_entity_key( + config, + project, + table, + entity_key_bin, + columns=["feature_name", "value", "event_ts"], + ) + + res = {} + res_ts = None + for feature_row in feature_rows: + if ( + requested_features is None + or feature_row.feature_name in requested_features + ): + val = ValueProto() + val.ParseFromString(feature_row.value) + res[feature_row.feature_name] = val + res_ts = feature_row.event_ts + if not res: + result.append((None, None)) + else: + result.append((res_ts, res)) + return result + + @log_exceptions_and_usage(online_store="cassandra") + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + """ + Update schema on DB, by creating and destroying tables accordingly. + + Args: + config: The RepoConfig for the current FeatureStore. + tables_to_delete: Tables to delete from the Online Store. + tables_to_keep: Tables to keep in the Online Store. + """ + project = config.project + + for table in tables_to_keep: + with tracing_span(name="remote_call"): + self._create_table(config, project, table) + for table in tables_to_delete: + with tracing_span(name="remote_call"): + self._drop_table(config, project, table) + + @log_exceptions_and_usage(online_store="cassandra") + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + """ + Delete tables from the database. + + Args: + config: The RepoConfig for the current FeatureStore. + tables: Tables to delete from the feature repo. + """ + project = config.project + + for table in tables: + with tracing_span(name="remote_call"): + self._drop_table(config, project, table) + + @staticmethod + def _fq_table_name(keyspace: str, project: str, table: FeatureView) -> str: + """ + Generate a fully-qualified table name, + including quotes and keyspace. + """ + return f'"{keyspace}"."{project}_{table.name}"' + + def _write_rows( + self, + config: RepoConfig, + project: str, + table: FeatureView, + entity_key_bin: str, + features_vals: Iterable[Tuple[str, ValueProto]], + timestamp: datetime, + created_ts: Optional[datetime], + ): + """ + Handle the CQL (low-level) insertion of feature values to a table. + + Note: `created_ts` can be None: in that case we avoid explicitly + inserting it to prevent unnecessary tombstone creation on Cassandra. + Note: `created_ts` is being deprecated (July 2022) and the following + reflects this fact. + """ + session: Session = self._get_session(config) + keyspace: str = self._keyspace + fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) + insert_cql = self._get_cql_statement(config, "insert4", fqtable=fqtable) + for feature_name, val in features_vals: + params: Sequence[object] = ( + feature_name, + val.SerializeToString(), + entity_key_bin, + timestamp, + ) + session.execute( + insert_cql, + params, + ) + + def _read_rows_by_entity_key( + self, + config: RepoConfig, + project: str, + table: FeatureView, + entity_key_bin: str, + columns: Optional[List[str]] = None, + ) -> ResultSet: + """ + Handle the CQL (low-level) reading of feature values from a table. + """ + session: Session = self._get_session(config) + keyspace: str = self._keyspace + fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) + projection_columns = "*" if columns is None else ", ".join(columns) + select_cql = self._get_cql_statement( + config, + "select", + fqtable=fqtable, + columns=projection_columns, + ) + return session.execute(select_cql, [entity_key_bin]) + + def _drop_table( + self, + config: RepoConfig, + project: str, + table: FeatureView, + ): + """Handle the CQL (low-level) deletion of a table.""" + session: Session = self._get_session(config) + keyspace: str = self._keyspace + fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) + drop_cql = self._get_cql_statement(config, "drop", fqtable) + logger.info(f"Deleting table {fqtable}.") + session.execute(drop_cql) + + def _create_table(self, config: RepoConfig, project: str, table: FeatureView): + """Handle the CQL (low-level) creation of a table.""" + session: Session = self._get_session(config) + keyspace: str = self._keyspace + fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) + create_cql = self._get_cql_statement(config, "create", fqtable) + logger.info(f"Creating table {fqtable}.") + session.execute(create_cql) + + def _get_cql_statement( + self, config: RepoConfig, op_name: str, fqtable: str, **kwargs + ): + """ + Resolve an 'op_name' (create, insert4, etc) into a CQL statement + ready to be bound to parameters when executing. + + If the statement is defined to be 'prepared', use an instance-specific + cache of prepared statements. + + This additional layer makes it easy to control whether to use prepared + statements and, if so, on which database operations. + """ + session: Session = self._get_session(config) + template, prepare = CQL_TEMPLATE_MAP[op_name] + statement = template.format( + fqtable=fqtable, + **kwargs, + ) + if prepare: + # using the statement itself as key (no problem with that) + cache_key = statement + if cache_key not in self._prepared_statements: + logger.info(f"Preparing a {op_name} statement on {fqtable}.") + self._prepared_statements[cache_key] = session.prepare(statement) + return self._prepared_statements[cache_key] + else: + return statement diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_repo_configuration.py new file mode 100644 index 0000000000..a1d619646f --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_repo_configuration.py @@ -0,0 +1,26 @@ +# +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.cassandra import ( + CassandraOnlineStoreCreator, +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig(online_store_creator=CassandraOnlineStoreCreator), +] diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 278bc7da69..5bc25faee5 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -47,6 +47,7 @@ "snowflake.online": "feast.infra.online_stores.snowflake.SnowflakeOnlineStore", "postgres": "feast.infra.online_stores.contrib.postgres.PostgreSQLOnlineStore", "hbase": "feast.infra.online_stores.contrib.hbase_online_store.hbase.HbaseOnlineStore", + "cassandra": "feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 9a5e64f8c3..91cab2e992 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -20,6 +20,7 @@ from feast.feature_service import FeatureService from feast.feature_store import FeatureStore from feast.feature_view import DUMMY_ENTITY, FeatureView +from feast.file_utils import replace_str_in_file from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry @@ -406,14 +407,6 @@ def is_valid_name(name: str) -> bool: return not name.startswith("_") and re.compile(r"\W+").search(name) is None -def replace_str_in_file(file_path, match_str, sub_str): - with open(file_path, "r") as f: - contents = f.read() - contents = contents.replace(match_str, sub_str) - with open(file_path, "wt") as f: - f.write(contents) - - def generate_project_name() -> str: """Generates a unique project name""" return f"{random.choice(adjectives)}_{random.choice(animals)}" diff --git a/sdk/python/feast/templates/aws/bootstrap.py b/sdk/python/feast/templates/aws/bootstrap.py index 456c6e9b70..5701941a28 100644 --- a/sdk/python/feast/templates/aws/bootstrap.py +++ b/sdk/python/feast/templates/aws/bootstrap.py @@ -1,5 +1,6 @@ import click +from feast.file_utils import replace_str_in_file from feast.infra.utils import aws_utils @@ -66,13 +67,5 @@ def bootstrap(): replace_str_in_file(config_file, "%REDSHIFT_IAM_ROLE%", iam_role) -def replace_str_in_file(file_path, match_str, sub_str): - with open(file_path, "r") as f: - contents = f.read() - contents = contents.replace(match_str, sub_str) - with open(file_path, "wt") as f: - f.write(contents) - - if __name__ == "__main__": bootstrap() diff --git a/sdk/python/feast/templates/cassandra/__init__.py b/sdk/python/feast/templates/cassandra/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/templates/cassandra/bootstrap.py b/sdk/python/feast/templates/cassandra/bootstrap.py new file mode 100644 index 0000000000..261332a67a --- /dev/null +++ b/sdk/python/feast/templates/cassandra/bootstrap.py @@ -0,0 +1,257 @@ +# +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import pathlib +import sys +from datetime import datetime, timedelta + +import click + +from feast.file_utils import ( + remove_lines_from_file, + replace_str_in_file, + write_setting_or_remove, +) + + +def collect_cassandra_store_settings(): + """ + Interactive CLI collection of settings for the feature store yaml. + Returns a dict with all keys, possibly some are None. + """ + + db_type = click.prompt( + "Regular [C]assandra or [A]stra DB?", + type=click.Choice(["C", "A"]), + show_choices=False, + default="C", + ) + is_astra = db_type == "A" + + if is_astra: + c_secure_bundle_path = click.prompt( + "Enter the full path to your Secure Connect Bundle" + ) + c_hosts = None + c_port = None + c_username = click.prompt("Enter the Client ID from your Astra DB token") + c_password = click.prompt( + "Enter the Client Secret from your Astra DB token", + hide_input=True, + ) + else: + # it's regular Cassandra + c_secure_bundle_path = None + hosts_string = click.prompt( + ("Enter the seed hosts of your cluster " "(comma-separated IP addresses)"), + default="127.0.0.1", + ) + c_hosts = [ + haddr + for haddr in (host.strip() for host in hosts_string.split(",")) + if haddr != "" + ] + if not c_hosts: + print("*Error* : seed host list cannot be empty.") + sys.exit(1) + needs_port = click.confirm("Need to specify port?", default=False) + if needs_port: + c_port = click.prompt("Port to use", default=9042, type=int) + else: + c_port = None + use_auth = click.confirm( + "Do you need username/password?", + default=False, + ) + if use_auth: + c_username = click.prompt("Database username") + c_password = click.prompt("Database password", hide_input=True) + else: + c_username = None + c_password = None + + c_keyspace = click.prompt( + "Specify the keyspace to use", + default="feast_keyspace", + ) + + specify_protocol_version = click.confirm( + "Specify protocol version?", + default=False, + ) + if specify_protocol_version: + c_protocol_version = click.prompt( + "Protocol version", + default={"A": 4, "C": 5}.get(db_type, 5), + type=int, + ) + else: + c_protocol_version = None + + specify_lb = click.confirm("Specify load-balancing?", default=False) + if specify_lb: + c_local_dc = click.prompt( + "Local datacenter (for load-balancing)", + default="datacenter1" if db_type == "C" else None, + ) + c_load_balancing_policy = click.prompt( + "Load-balancing policy", + type=click.Choice( + [ + "TokenAwarePolicy(DCAwareRoundRobinPolicy)", + "DCAwareRoundRobinPolicy", + ] + ), + default="TokenAwarePolicy(DCAwareRoundRobinPolicy)", + ) + else: + c_local_dc = None + c_load_balancing_policy = None + + return { + "c_secure_bundle_path": c_secure_bundle_path, + "c_hosts": c_hosts, + "c_port": c_port, + "c_username": c_username, + "c_password": c_password, + "c_keyspace": c_keyspace, + "c_protocol_version": c_protocol_version, + "c_local_dc": c_local_dc, + "c_load_balancing_policy": c_load_balancing_policy, + } + + +def apply_cassandra_store_settings(config_file, settings): + """ + In-place replacements to `config_file` according to the settings + to make the yaml a proper Cassandra/AstraDB feature-store yaml. + `settings` must have all its keys, possibly the optional ones set to None: + 'c_secure_bundle_path' + 'c_hosts' + 'c_port' + 'c_username' + 'c_password' + 'c_keyspace' + 'c_protocol_version' + 'c_local_dc' + 'c_load_balancing_policy' + """ + write_setting_or_remove( + config_file, + settings["c_secure_bundle_path"], + "secure_bundle_path", + "/path/to/secure/bundle.zip", + ) + # + if settings["c_hosts"]: + replace_str_in_file( + config_file, + " - 127.0.0.1", + os.linesep.join(f" - {c_host}" for c_host in settings["c_hosts"]), + ) + else: + remove_lines_from_file(config_file, "hosts:") + remove_lines_from_file(config_file, "- 127.0.0.1") + # + write_setting_or_remove( + config_file, + settings["c_port"], + "port", + "9042", + ) + # + write_setting_or_remove( + config_file, + settings["c_username"], + "username", + "c_username", + ) + # + write_setting_or_remove( + config_file, + settings["c_password"], + "password", + "c_password", + ) + # + replace_str_in_file( + config_file, + "feast_keyspace", + settings["c_keyspace"], + ) + # + write_setting_or_remove( + config_file, + settings["c_protocol_version"], + "protocol_version", + "c_protocol_version", + ) + # it is assumed that if there's local_dc also there's l.b.p. + if settings["c_local_dc"] is not None: + replace_str_in_file( + config_file, + "c_local_dc", + settings["c_local_dc"], + ) + replace_str_in_file( + config_file, + "c_load_balancing_policy", + settings["c_load_balancing_policy"], + ) + else: + remove_lines_from_file(config_file, "load_balancing:") + remove_lines_from_file(config_file, "local_dc:") + remove_lines_from_file(config_file, "load_balancing_policy:") + + +def bootstrap(): + """ + Bootstrap() will automatically be called + from the init_repo() during `feast init`. + """ + from feast.driver_test_data import create_driver_hourly_stats_df + + repo_path = pathlib.Path(__file__).parent.absolute() + config_file = repo_path / "feature_store.yaml" + + data_path = repo_path / "data" + data_path.mkdir(exist_ok=True) + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + # + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df( + driver_entities, + start_date, + end_date, + ) + # + driver_stats_path = data_path / "driver_stats.parquet" + driver_df.to_parquet(path=str(driver_stats_path), allow_truncated_timestamps=True) + + # example.py + example_py_file = repo_path / "example.py" + replace_str_in_file(example_py_file, "%PARQUET_PATH%", str(driver_stats_path)) + + # store config yaml, interact with user and then customize file: + settings = collect_cassandra_store_settings() + apply_cassandra_store_settings(config_file, settings) + + +if __name__ == "__main__": + bootstrap() diff --git a/sdk/python/feast/templates/cassandra/example.py b/sdk/python/feast/templates/cassandra/example.py new file mode 100644 index 0000000000..2d0d630f2d --- /dev/null +++ b/sdk/python/feast/templates/cassandra/example.py @@ -0,0 +1,56 @@ +# +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This is an example feature definition file + +from datetime import timedelta + +from feast import Entity, FeatureService, FeatureView, Field, FileSource +from feast.types import Float32, Int64 + +# Read data from parquet files. Parquet is convenient for local development mode. For +# production, you can use your favorite DWH, such as BigQuery. See Feast documentation +# for more info. +driver_hourly_stats = FileSource( + path="%PARQUET_PATH%", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +# Define an entity for the driver. You can think of entity as a primary key used to +# fetch features. +driver = Entity(name="driver", join_keys=["driver_id"]) + +# Our parquet files contain sample data that includes a driver_id column, timestamps and +# three feature column. Here we define a Feature View that will allow us to serve this +# data to our model online. +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_hourly_stats, + tags={}, +) + +driver_stats_fs = FeatureService( + name="driver_activity", features=[driver_hourly_stats_view] +) diff --git a/sdk/python/feast/templates/cassandra/feature_store.yaml b/sdk/python/feast/templates/cassandra/feature_store.yaml new file mode 100644 index 0000000000..9c42cf4aeb --- /dev/null +++ b/sdk/python/feast/templates/cassandra/feature_store.yaml @@ -0,0 +1,17 @@ +project: my_project +registry: data/registry.db +provider: local +online_store: + type: cassandra + secure_bundle_path: /path/to/secure/bundle.zip + hosts: + - 127.0.0.1 + port: 9042 + username: c_username + password: c_password + keyspace: feast_keyspace + protocol_version: c_protocol_version + load_balancing: + local_dc: c_local_dc + load_balancing_policy: c_load_balancing_policy +entity_key_serialization_version: 2 \ No newline at end of file diff --git a/sdk/python/feast/templates/hbase/bootstrap.py b/sdk/python/feast/templates/hbase/bootstrap.py index 4013ca5a8d..338db542ce 100644 --- a/sdk/python/feast/templates/hbase/bootstrap.py +++ b/sdk/python/feast/templates/hbase/bootstrap.py @@ -1,3 +1,6 @@ +from feast.file_utils import replace_str_in_file + + def bootstrap(): # Bootstrap() will automatically be called from the init_repo() during `feast init` @@ -23,13 +26,5 @@ def bootstrap(): replace_str_in_file(example_py_file, "%PARQUET_PATH%", str(driver_stats_path)) -def replace_str_in_file(file_path, match_str, sub_str): - with open(file_path, "r") as f: - contents = f.read() - contents = contents.replace(match_str, sub_str) - with open(file_path, "wt") as f: - f.write(contents) - - if __name__ == "__main__": bootstrap() diff --git a/sdk/python/feast/templates/local/bootstrap.py b/sdk/python/feast/templates/local/bootstrap.py index 4013ca5a8d..338db542ce 100644 --- a/sdk/python/feast/templates/local/bootstrap.py +++ b/sdk/python/feast/templates/local/bootstrap.py @@ -1,3 +1,6 @@ +from feast.file_utils import replace_str_in_file + + def bootstrap(): # Bootstrap() will automatically be called from the init_repo() during `feast init` @@ -23,13 +26,5 @@ def bootstrap(): replace_str_in_file(example_py_file, "%PARQUET_PATH%", str(driver_stats_path)) -def replace_str_in_file(file_path, match_str, sub_str): - with open(file_path, "r") as f: - contents = f.read() - contents = contents.replace(match_str, sub_str) - with open(file_path, "wt") as f: - f.write(contents) - - if __name__ == "__main__": bootstrap() diff --git a/sdk/python/feast/templates/postgres/bootstrap.py b/sdk/python/feast/templates/postgres/bootstrap.py index 078d7cdc68..7c004e4c26 100644 --- a/sdk/python/feast/templates/postgres/bootstrap.py +++ b/sdk/python/feast/templates/postgres/bootstrap.py @@ -1,6 +1,7 @@ import click import psycopg2 +from feast.file_utils import replace_str_in_file from feast.infra.utils.postgres.connection_utils import df_to_postgres_table from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig @@ -66,13 +67,5 @@ def bootstrap(): replace_str_in_file(config_file, "DB_PASSWORD", postgres_password) -def replace_str_in_file(file_path, match_str, sub_str): - with open(file_path, "r") as f: - contents = f.read() - contents = contents.replace(match_str, sub_str) - with open(file_path, "wt") as f: - f.write(contents) - - if __name__ == "__main__": bootstrap() diff --git a/sdk/python/feast/templates/snowflake/bootstrap.py b/sdk/python/feast/templates/snowflake/bootstrap.py index 1663a1fb8b..d6c8d6b527 100644 --- a/sdk/python/feast/templates/snowflake/bootstrap.py +++ b/sdk/python/feast/templates/snowflake/bootstrap.py @@ -1,6 +1,7 @@ import click import snowflake.connector +from feast.file_utils import replace_str_in_file from feast.infra.utils.snowflake_utils import write_pandas @@ -76,13 +77,5 @@ def bootstrap(): conn.close() -def replace_str_in_file(file_path, match_str, sub_str): - with open(file_path, "r") as f: - contents = f.read() - contents = contents.replace(match_str, sub_str) - with open(file_path, "wt") as f: - f.write(contents) - - if __name__ == "__main__": bootstrap() diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py b/sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py new file mode 100644 index 0000000000..190d94a830 --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py @@ -0,0 +1,56 @@ +# +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import time +from typing import Dict + +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs + +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class CassandraOnlineStoreCreator(OnlineStoreCreator): + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + self.container = DockerContainer("library/cassandra:4.0.4").with_exposed_ports( + "9042" + ) + + def create_online_store(self) -> Dict[str, object]: + self.container.start() + log_string_to_wait_for = "Startup complete" + # on a modern machine it takes about 45-60 seconds for the container + # to start accepting CQL requests: + wait_for_logs( + container=self.container, predicate=log_string_to_wait_for, timeout=90 + ) + keyspace_name = "feast_keyspace" + keyspace_creation_command = f"create KEYSPACE \"{keyspace_name}\" WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}};" + self.container.exec(f'cqlsh -e "{keyspace_creation_command}"') + time.sleep(2) + exposed_port = int(self.container.get_exposed_port("9042")) + return { + "type": "cassandra", + "hosts": ["127.0.0.1"], + "port": exposed_port, + "keyspace": keyspace_name, + } + + def teardown(self): + self.container.stop() diff --git a/setup.py b/setup.py index f03aeefcf6..b52453d43a 100644 --- a/setup.py +++ b/setup.py @@ -117,6 +117,10 @@ "happybase>=1.2.0,<3", ] +CASSANDRA_REQUIRED = [ + "cassandra-driver>=3.24.0,<4", +] + GE_REQUIRED = ["great_expectations>=0.14.0,<0.15.0"] GO_REQUIRED = [ @@ -177,6 +181,7 @@ + TRINO_REQUIRED + GE_REQUIRED + HBASE_REQUIRED + + CASSANDRA_REQUIRED ) @@ -511,6 +516,7 @@ def copy_extensions_to_source(self): "hbase": HBASE_REQUIRED, "go": GO_REQUIRED, "docs": DOCS_REQUIRED, + "cassandra": CASSANDRA_REQUIRED, }, include_package_data=True, license="Apache",