Skip to content

Commit

Permalink
Add MVP support for on demand transforms for AWS to_s3 and to_redshift
Browse files Browse the repository at this point in the history
Signed-off-by: Danny Chiao <[email protected]>
  • Loading branch information
adchia committed Sep 13, 2021
1 parent 0d0bdf2 commit b25df05
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
20 changes: 20 additions & 0 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ def _to_arrow_internal(self) -> pa.Table:

def to_s3(self) -> str:
""" Export dataset to S3 in Parquet format and return path """
if self.on_demand_feature_views is not None:
transformed_df = self.to_df()
aws_utils.upload_df_to_s3(self._s3_resource, self._s3_path, transformed_df)
return self._s3_path

with self._query_generator() as query:
aws_utils.execute_redshift_query_and_unload_to_s3(
self._redshift_client,
Expand All @@ -279,6 +284,21 @@ def to_s3(self) -> str:

def to_redshift(self, table_name: str) -> None:
""" Save dataset as a new Redshift table """
if self.on_demand_feature_views is not None:
transformed_df = self.to_df()
aws_utils.upload_df_to_redshift(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_resource,
f"{self._config.offline_store.s3_staging_location}/features_df/{table_name}.parquet",
self._config.offline_store.iam_role,
table_name,
transformed_df,
)
return

with self._query_generator() as query:
query = f'CREATE TABLE "{table_name}" AS ({query});\n'
if self._drop_columns is not None:
Expand Down
24 changes: 24 additions & 0 deletions sdk/python/feast/infra/utils/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,30 @@ def get_redshift_statement_result(redshift_data_client, statement_id: str) -> di
return redshift_data_client.get_statement_result(Id=statement_id)


def upload_df_to_s3(s3_resource, s3_path: str, df: pd.DataFrame,) -> None:
"""Uploads a Pandas DataFrame to S3 as a parquet file
Args:
s3_resource: S3 Resource object
s3_path: S3 path where the Parquet file is temporarily uploaded
df: The Pandas DataFrame to upload
Returns: None
"""
bucket, key = get_bucket_and_key(s3_path)

# Drop the index so that we dont have unnecessary columns
df.reset_index(drop=True, inplace=True)

table = pa.Table.from_pandas(df)
# Write the PyArrow Table on disk in Parquet format and upload it to S3
with tempfile.TemporaryDirectory() as temp_dir:
file_path = f"{temp_dir}/{uuid.uuid4()}.parquet"
pq.write_table(table, file_path)
s3_resource.Object(bucket, key).put(Body=open(file_path, "rb"))


def upload_df_to_redshift(
redshift_data_client,
cluster_id: str,
Expand Down

0 comments on commit b25df05

Please sign in to comment.