Skip to content

Commit

Permalink
Python-centric feast deploy CLI (#1362)
Browse files Browse the repository at this point in the history
* Python-centric feast deploy CLI

Signed-off-by: Oleg Avdeev <[email protected]>

* sqlite provider

Signed-off-by: Oleg Avdeev <[email protected]>

* add a proper test for local

Signed-off-by: Oleg Avdeev <[email protected]>

* gcp test

Signed-off-by: Oleg Avdeev <[email protected]>

* add missing files and mark integration test

Signed-off-by: Oleg Avdeev <[email protected]>

* reconcile configs

Signed-off-by: Oleg Avdeev <[email protected]>

* add missing file

Signed-off-by: Oleg Avdeev <[email protected]>

* add missing dep

Signed-off-by: Oleg Avdeev <[email protected]>

* add missing dep

Signed-off-by: Oleg Avdeev <[email protected]>

* fix deps again

Signed-off-by: Oleg Avdeev <[email protected]>

* use datastore not firestore

Signed-off-by: Oleg Avdeev <[email protected]>

* fix tests

Signed-off-by: Oleg Avdeev <[email protected]>

* comments

Signed-off-by: Oleg Avdeev <[email protected]>
  • Loading branch information
oavdeev authored Mar 10, 2021
1 parent 321894d commit 53ebe47
Show file tree
Hide file tree
Showing 14 changed files with 480 additions and 58 deletions.
36 changes: 36 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import json
import logging
import sys
from pathlib import Path
from typing import Dict

import click
Expand All @@ -26,6 +27,8 @@
from feast.entity import Entity
from feast.feature_table import FeatureTable
from feast.loaders.yaml import yaml_loader
from feast.repo_config import load_repo_config
from feast.repo_operations import apply_total, registry_dump, teardown

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -353,5 +356,38 @@ def project_list():
print(tabulate(table, headers=["NAME"], tablefmt="plain"))


@cli.command("apply")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
def apply_total_command(repo_path: str):
"""
Applies a feature repo
"""
repo_config = load_repo_config(Path(repo_path))

apply_total(repo_config, Path(repo_path).resolve())


@cli.command("teardown")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
def teardown_command(repo_path: str):
"""
Tear down infra for a feature repo
"""
repo_config = load_repo_config(Path(repo_path))

teardown(repo_config, Path(repo_path).resolve())


@cli.command("registry-dump")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
def registry_dump_command(repo_path: str):
"""
Prints contents of the metadata registry
"""
repo_config = load_repo_config(Path(repo_path))

registry_dump(repo_config)


if __name__ == "__main__":
cli()
8 changes: 4 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
from typing import Optional

from feast.feature_store_config import Config
from feast.repo_config import RepoConfig, load_repo_config


class FeatureStore:
Expand All @@ -22,13 +22,13 @@ class FeatureStore:
"""

def __init__(
self, config_path: Optional[str], config: Optional[Config],
self, config_path: Optional[str], config: Optional[RepoConfig],
):
if config_path is None or config is None:
raise Exception("You cannot specify both config_path and config")
if config is not None:
self.config = config
elif config_path is not None:
self.config = Config.from_path(config_path)
self.config = load_repo_config(config_path)
else:
self.config = Config()
self.config = RepoConfig()
53 changes: 0 additions & 53 deletions sdk/python/feast/feature_store_config.py

This file was deleted.

1 change: 1 addition & 0 deletions sdk/python/feast/infra/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# from .provider import Provider
76 changes: 76 additions & 0 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from datetime import datetime
from typing import List, Optional

from feast import FeatureTable
from feast.infra.provider import Provider
from feast.repo_config import DatastoreOnlineStoreConfig


def _delete_all_values(client, key) -> None:
"""
Delete all data under the key path in datastore.
"""
while True:
query = client.query(kind="Value", ancestor=key)
entities = list(query.fetch(limit=1000))
if not entities:
return

for entity in entities:
print("Deleting: {}".format(entity))
client.delete(entity.key)


class Gcp(Provider):
_project_id: Optional[str]

def __init__(self, config: Optional[DatastoreOnlineStoreConfig]):
if config:
self._project_id = config.project_id
else:
self._project_id = None

def _initialize_client(self):
from google.cloud import datastore

if self._project_id is not None:
return datastore.Client(self.project_id)
else:
return datastore.Client()

def update_infra(
self,
project: str,
tables_to_delete: List[FeatureTable],
tables_to_keep: List[FeatureTable],
):
from google.cloud import datastore

client = self._initialize_client()

for table in tables_to_keep:
key = client.key("FeastProject", project, "FeatureTable", table.name)
entity = datastore.Entity(key=key)
entity.update({"created_at": datetime.utcnow()})
client.put(entity)

for table in tables_to_delete:
_delete_all_values(
client, client.key("FeastProject", project, "FeatureTable", table.name)
)

# Delete the table metadata datastore entity
key = client.key("FeastProject", project, "FeatureTable", table.name)
client.delete(key)

def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None:
client = self._initialize_client()

for table in tables:
_delete_all_values(
client, client.key("FeastProject", project, "FeatureTable", table.name)
)

# Delete the table metadata datastore entity
key = client.key("FeastProject", project, "FeatureTable", table.name)
client.delete(key)
36 changes: 36 additions & 0 deletions sdk/python/feast/infra/local_sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os
import sqlite3
from typing import List

from feast import FeatureTable
from feast.infra.provider import Provider
from feast.repo_config import LocalOnlineStoreConfig


def _table_id(project: str, table: FeatureTable) -> str:
return f"{project}_{table.name}"


class LocalSqlite(Provider):
_db_path: str

def __init__(self, config: LocalOnlineStoreConfig):
self._db_path = config.path

def update_infra(
self,
project: str,
tables_to_delete: List[FeatureTable],
tables_to_keep: List[FeatureTable],
):
conn = sqlite3.connect(self._db_path)
for table in tables_to_keep:
conn.execute(
f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (key BLOB, value BLOB)"
)

for table in tables_to_delete:
conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")

def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None:
os.unlink(self._db_path)
49 changes: 49 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import abc
from typing import List

from feast import FeatureTable
from feast.repo_config import RepoConfig


class Provider(abc.ABC):
@abc.abstractmethod
def update_infra(
self,
project: str,
tables_to_delete: List[FeatureTable],
tables_to_keep: List[FeatureTable],
):
"""
Reconcile cloud resources with the objects declared in the feature repo.
Args:
tables_to_delete: Tables that were deleted from the feature repo, so provider needs to
clean up the corresponding cloud resources.
tables_to_keep: Tables that are still in the feature repo. Depending on implementation,
provider may or may not need to update the corresponding resources.
"""
...

@abc.abstractmethod
def teardown_infra(self, project: str, tables: List[FeatureTable]):
"""
Tear down all cloud resources for a repo.
Args:
tables: Tables that are declared in the feature repo.
"""
...


def get_provider(config: RepoConfig) -> Provider:
if config.provider == "gcp":
from feast.infra.gcp import Gcp

return Gcp(config.online_store.datastore)
elif config.provider == "local":
from feast.infra.local_sqlite import LocalSqlite

assert config.online_store.local is not None
return LocalSqlite(config.online_store.local)
else:
raise ValueError(config)
31 changes: 31 additions & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from pathlib import Path
from typing import NamedTuple, Optional

import yaml
from bindr import bind


class LocalOnlineStoreConfig(NamedTuple):
path: str


class DatastoreOnlineStoreConfig(NamedTuple):
project_id: str


class OnlineStoreConfig(NamedTuple):
datastore: Optional[DatastoreOnlineStoreConfig] = None
local: Optional[LocalOnlineStoreConfig] = None


class RepoConfig(NamedTuple):
metadata_store: str
project: str
provider: str
online_store: OnlineStoreConfig


def load_repo_config(repo_path: Path) -> RepoConfig:
with open(repo_path / "feature_store.yaml") as f:
raw_config = yaml.safe_load(f)
return bind(RepoConfig, raw_config)
Loading

0 comments on commit 53ebe47

Please sign in to comment.