diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 215ac8cc32..0d7d689c5d 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -849,7 +849,9 @@ def ingest( try: if issubclass(type(feature_table.batch_source), FileSource): file_url = feature_table.batch_source.file_options.file_url.rstrip("*") - _upload_to_file_source(file_url, with_partitions, dest_path) + _upload_to_file_source( + file_url, with_partitions, dest_path, self._config + ) if issubclass(type(feature_table.batch_source), BigQuerySource): bq_table_ref = feature_table.batch_source.bigquery_options.table_ref feature_table_timestamp_column = ( @@ -1004,6 +1006,7 @@ def get_historical_features( entity_source = stage_entities_to_fs( entity_source, staging_location=self._config.get(opt.SPARK_STAGING_LOCATION), + config=self._config, ) if self._use_job_service: diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 4d677bc38d..8b1db7b76b 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -125,6 +125,9 @@ class ConfigOptions(metaclass=ConfigMeta): #: Time to wait for historical feature requests before timing out. BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS: str = "600" + #: Endpoint URL for S3 storage_client + S3_ENDPOINT_URL: Optional[str] = None + #: Authentication Provider - Google OpenID/OAuth #: #: Options: "google" / "oauth" diff --git a/sdk/python/feast/loaders/ingest.py b/sdk/python/feast/loaders/ingest.py index b4dc1e4239..67c8f5f726 100644 --- a/sdk/python/feast/loaders/ingest.py +++ b/sdk/python/feast/loaders/ingest.py @@ -9,6 +9,7 @@ import pyarrow as pa from pyarrow import parquet as pq +from feast.config import Config from feast.staging.storage_client import get_staging_client @@ -166,18 +167,21 @@ def _read_table_from_source( def _upload_to_file_source( - file_url: str, with_partitions: bool, dest_path: str + file_url: str, with_partitions: bool, dest_path: str, config: Config ) -> None: """ Uploads data into a FileSource. Currently supports GCS, S3 and Local FS. Args: file_url: file url of FileSource defined for FeatureTable + with_partitions: whether to treat dest_path as dir with partitioned table + dest_path: path to file or dir to be uploaded + config: Config instance to configure FileSource """ from urllib.parse import urlparse uri = urlparse(file_url) - staging_client = get_staging_client(uri.scheme) + staging_client = get_staging_client(uri.scheme, config) if with_partitions: for path in glob.glob(os.path.join(dest_path, "**/*")): diff --git a/sdk/python/feast/staging/entities.py b/sdk/python/feast/staging/entities.py index 8a4745fe24..665cbcac13 100644 --- a/sdk/python/feast/staging/entities.py +++ b/sdk/python/feast/staging/entities.py @@ -7,6 +7,7 @@ import pandas as pd +from feast.config import Config from feast.data_format import ParquetFormat from feast.data_source import BigQuerySource, FileSource from feast.staging.storage_client import get_staging_client @@ -18,7 +19,7 @@ def stage_entities_to_fs( - entity_source: pd.DataFrame, staging_location: str + entity_source: pd.DataFrame, staging_location: str, config: Config ) -> FileSource: """ Dumps given (entities) dataframe as parquet file and stage it to remote file storage (subdirectory of staging_location) @@ -26,7 +27,7 @@ def stage_entities_to_fs( :return: FileSource with remote destination path """ entity_staging_uri = urlparse(os.path.join(staging_location, str(uuid.uuid4()))) - staging_client = get_staging_client(entity_staging_uri.scheme) + staging_client = get_staging_client(entity_staging_uri.scheme, config) with tempfile.NamedTemporaryFile() as df_export_path: entity_source.to_parquet(df_export_path.name) bucket = ( diff --git a/sdk/python/feast/staging/storage_client.py b/sdk/python/feast/staging/storage_client.py index 1cb250a598..9d6c9f52a4 100644 --- a/sdk/python/feast/staging/storage_client.py +++ b/sdk/python/feast/staging/storage_client.py @@ -24,6 +24,9 @@ from google.auth.exceptions import DefaultCredentialsError +from feast.config import Config +from feast.constants import ConfigOptions as opt + GS = "gs" S3 = "s3" LOCAL_FILE = "file" @@ -144,7 +147,7 @@ class S3Client(AbstractStagingClient): Implementation of AbstractStagingClient for Aws S3 storage """ - def __init__(self): + def __init__(self, endpoint_url: str = None): try: import boto3 except ImportError: @@ -152,7 +155,7 @@ def __init__(self): "Install package boto3 for s3 staging support" "run ```pip install boto3```" ) - self.s3_client = boto3.client("s3") + self.s3_client = boto3.client("s3", endpoint_url=endpoint_url) def download_file(self, uri: ParseResult) -> IO[bytes]: """ @@ -275,21 +278,38 @@ def upload_file(self, local_path: str, bucket: str, remote_path: str): shutil.copy(local_path, dest_fpath) -storage_clients = {GS: GCSClient, S3: S3Client, LOCAL_FILE: LocalFSClient} +def _s3_client(config: Config = None): + if config is None: + endpoint_url = None + else: + endpoint_url = config.get(opt.S3_ENDPOINT_URL, None) + return S3Client(endpoint_url=endpoint_url) + + +def _gcs_client(config: Config = None): + return GCSClient() + + +def _local_fs_client(config: Config = None): + return LocalFSClient() + + +storage_clients = {GS: _gcs_client, S3: _s3_client, LOCAL_FILE: _local_fs_client} -def get_staging_client(scheme): +def get_staging_client(scheme, config: Config = None): """ Initialization of a specific client object(GCSClient, S3Client etc.) Args: scheme (str): uri scheme: s3, gs or file + config (Config): additional configuration Returns: An object of concrete implementation of AbstractStagingClient """ try: - return storage_clients[scheme]() + return storage_clients[scheme](config) except ValueError: raise Exception( f"Could not identify file scheme {scheme}. Only gs://, file:// and s3:// are supported"