Skip to content

Commit

Permalink
Implement materialization for RedshiftOfflineStore & RedshiftRetrieva…
Browse files Browse the repository at this point in the history
…lJob (#1680)

* Implement materialization for RedshiftOfflineStore

Signed-off-by: Tsotne Tabidze <[email protected]>

* Address Willem's comments

Signed-off-by: Tsotne Tabidze <[email protected]>

* Rename method

Signed-off-by: Tsotne Tabidze <[email protected]>
  • Loading branch information
Tsotne Tabidze authored Jul 15, 2021
1 parent 6d07767 commit 8287b56
Show file tree
Hide file tree
Showing 11 changed files with 616 additions and 300 deletions.
Binary file added dump.rdb
Binary file not shown.
74 changes: 25 additions & 49 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@
from typing import Callable, Dict, Iterable, Optional, Tuple

from pyarrow.parquet import ParquetFile
from tenacity import retry, retry_unless_exception_type, wait_exponential

from feast import type_map
from feast.data_format import FileFormat, StreamFormat
from feast.errors import (
DataSourceNotFoundException,
RedshiftCredentialsError,
RedshiftQueryError,
)
from feast.errors import DataSourceNotFoundException, RedshiftCredentialsError
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import RepoConfig
from feast.value_type import ValueType
Expand Down Expand Up @@ -1062,7 +1057,7 @@ def validate(self, config: RepoConfig):
def get_table_query_string(self) -> str:
"""Returns a string that can directly be used to reference this table in SQL"""
if self.table:
return f"`{self.table}`"
return f'"{self.table}"'
else:
return f"({self.query})"

Expand All @@ -1073,62 +1068,43 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError

from feast.infra.offline_stores.redshift import RedshiftOfflineStoreConfig
from feast.infra.utils import aws_utils

assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)

client = boto3.client(
"redshift-data", config=Config(region_name=config.offline_store.region)
)
client = aws_utils.get_redshift_data_client(config.offline_store.region)

try:
if self.table is not None:
if self.table is not None:
try:
table = client.describe_table(
ClusterIdentifier=config.offline_store.cluster_id,
Database=config.offline_store.database,
DbUser=config.offline_store.user,
Table=self.table,
)
# The API returns valid JSON with empty column list when the table doesn't exist
if len(table["ColumnList"]) == 0:
raise DataSourceNotFoundException(self.table)
except ClientError as e:
if e.response["Error"]["Code"] == "ValidationException":
raise RedshiftCredentialsError() from e
raise

columns = table["ColumnList"]
else:
statement = client.execute_statement(
ClusterIdentifier=config.offline_store.cluster_id,
Database=config.offline_store.database,
DbUser=config.offline_store.user,
Sql=f"SELECT * FROM ({self.query}) LIMIT 1",
)
# The API returns valid JSON with empty column list when the table doesn't exist
if len(table["ColumnList"]) == 0:
raise DataSourceNotFoundException(self.table)

# Need to retry client.describe_statement(...) until the task is finished. We don't want to bombard
# Redshift with queries, and neither do we want to wait for a long time on the initial call.
# The solution is exponential backoff. The backoff starts with 0.1 seconds and doubles exponentially
# until reaching 30 seconds, at which point the backoff is fixed.
@retry(
wait=wait_exponential(multiplier=0.1, max=30),
retry=retry_unless_exception_type(RedshiftQueryError),
)
def wait_for_statement():
desc = client.describe_statement(Id=statement["Id"])
if desc["Status"] in ("SUBMITTED", "STARTED", "PICKED"):
raise Exception # Retry
if desc["Status"] != "FINISHED":
raise RedshiftQueryError(desc) # Don't retry. Raise exception.

wait_for_statement()

result = client.get_statement_result(Id=statement["Id"])

columns = result["ColumnMetadata"]
except ClientError as e:
if e.response["Error"]["Code"] == "ValidationException":
raise RedshiftCredentialsError() from e
raise
columns = table["ColumnList"]
else:
statement_id = aws_utils.execute_redshift_statement(
client,
config.offline_store.cluster_id,
config.offline_store.database,
config.offline_store.user,
f"SELECT * FROM ({self.query}) LIMIT 1",
)
columns = aws_utils.get_redshift_statement_result(client, statement_id)[
"ColumnMetadata"
]

return [(column["name"], column["typeName"].upper()) for column in columns]
118 changes: 115 additions & 3 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import uuid
from datetime import datetime
from typing import List, Optional, Union

import pandas as pd
import pyarrow as pa
from pydantic import StrictStr
from pydantic.typing import Literal

from feast.data_source import DataSource
from feast.data_source import DataSource, RedshiftSource
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.utils import aws_utils
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig

Expand All @@ -30,9 +33,12 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel):
database: StrictStr
""" Redshift database name """

s3_path: StrictStr
s3_staging_location: StrictStr
""" S3 path for importing & exporting data to Redshift """

iam_role: StrictStr
""" IAM Role for Redshift, granting it access to S3 """


class RedshiftOfflineStore(OfflineStore):
@staticmethod
Expand All @@ -46,7 +52,45 @@ def pull_latest_from_table_or_query(
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
pass
assert isinstance(data_source, RedshiftSource)
assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)

from_expression = data_source.get_table_query_string()

partition_by_join_key_string = ", ".join(join_key_columns)
if partition_by_join_key_string != "":
partition_by_join_key_string = (
"PARTITION BY " + partition_by_join_key_string
)
timestamp_columns = [event_timestamp_column]
if created_timestamp_column:
timestamp_columns.append(created_timestamp_column)
timestamp_desc_string = " DESC, ".join(timestamp_columns) + " DESC"
field_string = ", ".join(
join_key_columns + feature_name_columns + timestamp_columns
)

redshift_client = aws_utils.get_redshift_data_client(
config.offline_store.region
)
s3_resource = aws_utils.get_s3_resource(config.offline_store.region)

query = f"""
SELECT {field_string}
FROM (
SELECT {field_string},
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
)
WHERE _feast_row = 1
"""
return RedshiftRetrievalJob(
query=query,
redshift_client=redshift_client,
s3_resource=s3_resource,
config=config,
)

@staticmethod
def get_historical_features(
Expand All @@ -59,3 +103,71 @@ def get_historical_features(
full_feature_names: bool = False,
) -> RetrievalJob:
pass


class RedshiftRetrievalJob(RetrievalJob):
def __init__(self, query: str, redshift_client, s3_resource, config: RepoConfig):
"""Initialize RedshiftRetrievalJob object.
Args:
query: Redshift SQL query to execute.
redshift_client: boto3 redshift-data client
s3_resource: boto3 s3 resource object
config: Feast repo config
"""
self.query = query
self._redshift_client = redshift_client
self._s3_resource = s3_resource
self._config = config
self._s3_path = (
self._config.offline_store.s3_staging_location
+ "/unload/"
+ str(uuid.uuid4())
)

def to_df(self) -> pd.DataFrame:
return aws_utils.unload_redshift_query_to_df(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_resource,
self._s3_path,
self._config.offline_store.iam_role,
self.query,
)

def to_arrow(self) -> pa.Table:
return aws_utils.unload_redshift_query_to_pa(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_resource,
self._s3_path,
self._config.offline_store.iam_role,
self.query,
)

def to_s3(self) -> str:
""" Export dataset to S3 in Parquet format and return path """
aws_utils.execute_redshift_query_and_unload_to_s3(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_path,
self._config.offline_store.iam_role,
self.query,
)
return self._s3_path

def to_redshift(self, table_name: str) -> None:
""" Save dataset as a new Redshift table """
aws_utils.execute_redshift_statement(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
f'CREATE TABLE "{table_name}" AS ({self.query})',
)
Empty file.
Loading

0 comments on commit 8287b56

Please sign in to comment.