Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MVP support for on demand transforms for AWS to_s3 and to_redshift #1856

Merged
merged 1 commit into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ def _to_arrow_internal(self) -> pa.Table:

def to_s3(self) -> str:
adchia marked this conversation as resolved.
Show resolved Hide resolved
""" Export dataset to S3 in Parquet format and return path """
if self.on_demand_feature_views is not None:
transformed_df = self.to_df()
aws_utils.upload_df_to_s3(self._s3_resource, self._s3_path, transformed_df)
return self._s3_path

with self._query_generator() as query:
aws_utils.execute_redshift_query_and_unload_to_s3(
self._redshift_client,
Expand All @@ -279,6 +284,21 @@ def to_s3(self) -> str:

def to_redshift(self, table_name: str) -> None:
""" Save dataset as a new Redshift table """
if self.on_demand_feature_views is not None:
transformed_df = self.to_df()
aws_utils.upload_df_to_redshift(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_resource,
f"{self._config.offline_store.s3_staging_location}/features_df/{table_name}.parquet",
self._config.offline_store.iam_role,
table_name,
transformed_df,
)
return

with self._query_generator() as query:
query = f'CREATE TABLE "{table_name}" AS ({query});\n'
if self._drop_columns is not None:
Expand Down
24 changes: 24 additions & 0 deletions sdk/python/feast/infra/utils/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,30 @@ def get_redshift_statement_result(redshift_data_client, statement_id: str) -> di
return redshift_data_client.get_statement_result(Id=statement_id)


def upload_df_to_s3(s3_resource, s3_path: str, df: pd.DataFrame,) -> None:
"""Uploads a Pandas DataFrame to S3 as a parquet file
Args:
s3_resource: S3 Resource object
s3_path: S3 path where the Parquet file is temporarily uploaded
df: The Pandas DataFrame to upload
Returns: None
"""
bucket, key = get_bucket_and_key(s3_path)

# Drop the index so that we dont have unnecessary columns
df.reset_index(drop=True, inplace=True)

table = pa.Table.from_pandas(df)
# Write the PyArrow Table on disk in Parquet format and upload it to S3
with tempfile.TemporaryDirectory() as temp_dir:
file_path = f"{temp_dir}/{uuid.uuid4()}.parquet"
pq.write_table(table, file_path)
s3_resource.Object(bucket, key).put(Body=open(file_path, "rb"))


def upload_df_to_redshift(
redshift_data_client,
cluster_id: str,
Expand Down