Skip to content

Commit

Permalink
S3 endpoint configuration #1169 (#1172)
Browse files Browse the repository at this point in the history
* S3 endpoint configuration #1169

Signed-off-by: mike0sv <[email protected]>

* Add allow_no_value=True to ConfigParser

Signed-off-by: mike0sv <[email protected]>

* New constants API
defaults extraction

Signed-off-by: mike0sv <[email protected]>

* fix for other types of get

Signed-off-by: mike0sv <[email protected]>

* return to the old logic and some testing

Signed-off-by: mike0sv <[email protected]>

* oooopsie

Signed-off-by: mike0sv <[email protected]>

* remove DEFAULTS logic changes

Signed-off-by: mike0sv <[email protected]>

* reformat

Signed-off-by: mike0sv <[email protected]>

* _upload_to_file_source docs

Signed-off-by: mike0sv <[email protected]>
  • Loading branch information
mike0sv authored Nov 22, 2020
1 parent faa08fe commit aed366b
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 10 deletions.
5 changes: 4 additions & 1 deletion sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,9 @@ def ingest(
try:
if issubclass(type(feature_table.batch_source), FileSource):
file_url = feature_table.batch_source.file_options.file_url.rstrip("*")
_upload_to_file_source(file_url, with_partitions, dest_path)
_upload_to_file_source(
file_url, with_partitions, dest_path, self._config
)
if issubclass(type(feature_table.batch_source), BigQuerySource):
bq_table_ref = feature_table.batch_source.bigquery_options.table_ref
feature_table_timestamp_column = (
Expand Down Expand Up @@ -1004,6 +1006,7 @@ def get_historical_features(
entity_source = stage_entities_to_fs(
entity_source,
staging_location=self._config.get(opt.SPARK_STAGING_LOCATION),
config=self._config,
)

if self._use_job_service:
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class ConfigOptions(metaclass=ConfigMeta):
#: Time to wait for historical feature requests before timing out.
BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS: str = "600"

#: Endpoint URL for S3 storage_client
S3_ENDPOINT_URL: Optional[str] = None

#: Authentication Provider - Google OpenID/OAuth
#:
#: Options: "google" / "oauth"
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/feast/loaders/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pyarrow as pa
from pyarrow import parquet as pq

from feast.config import Config
from feast.staging.storage_client import get_staging_client


Expand Down Expand Up @@ -166,18 +167,21 @@ def _read_table_from_source(


def _upload_to_file_source(
file_url: str, with_partitions: bool, dest_path: str
file_url: str, with_partitions: bool, dest_path: str, config: Config
) -> None:
"""
Uploads data into a FileSource. Currently supports GCS, S3 and Local FS.
Args:
file_url: file url of FileSource defined for FeatureTable
with_partitions: whether to treat dest_path as dir with partitioned table
dest_path: path to file or dir to be uploaded
config: Config instance to configure FileSource
"""
from urllib.parse import urlparse

uri = urlparse(file_url)
staging_client = get_staging_client(uri.scheme)
staging_client = get_staging_client(uri.scheme, config)

if with_partitions:
for path in glob.glob(os.path.join(dest_path, "**/*")):
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/staging/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pandas as pd

from feast.config import Config
from feast.data_format import ParquetFormat
from feast.data_source import BigQuerySource, FileSource
from feast.staging.storage_client import get_staging_client
Expand All @@ -18,15 +19,15 @@


def stage_entities_to_fs(
entity_source: pd.DataFrame, staging_location: str
entity_source: pd.DataFrame, staging_location: str, config: Config
) -> FileSource:
"""
Dumps given (entities) dataframe as parquet file and stage it to remote file storage (subdirectory of staging_location)
:return: FileSource with remote destination path
"""
entity_staging_uri = urlparse(os.path.join(staging_location, str(uuid.uuid4())))
staging_client = get_staging_client(entity_staging_uri.scheme)
staging_client = get_staging_client(entity_staging_uri.scheme, config)
with tempfile.NamedTemporaryFile() as df_export_path:
entity_source.to_parquet(df_export_path.name)
bucket = (
Expand Down
30 changes: 25 additions & 5 deletions sdk/python/feast/staging/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

from google.auth.exceptions import DefaultCredentialsError

from feast.config import Config
from feast.constants import ConfigOptions as opt

GS = "gs"
S3 = "s3"
LOCAL_FILE = "file"
Expand Down Expand Up @@ -144,15 +147,15 @@ class S3Client(AbstractStagingClient):
Implementation of AbstractStagingClient for Aws S3 storage
"""

def __init__(self):
def __init__(self, endpoint_url: str = None):
try:
import boto3
except ImportError:
raise ImportError(
"Install package boto3 for s3 staging support"
"run ```pip install boto3```"
)
self.s3_client = boto3.client("s3")
self.s3_client = boto3.client("s3", endpoint_url=endpoint_url)

def download_file(self, uri: ParseResult) -> IO[bytes]:
"""
Expand Down Expand Up @@ -275,21 +278,38 @@ def upload_file(self, local_path: str, bucket: str, remote_path: str):
shutil.copy(local_path, dest_fpath)


storage_clients = {GS: GCSClient, S3: S3Client, LOCAL_FILE: LocalFSClient}
def _s3_client(config: Config = None):
if config is None:
endpoint_url = None
else:
endpoint_url = config.get(opt.S3_ENDPOINT_URL, None)
return S3Client(endpoint_url=endpoint_url)


def _gcs_client(config: Config = None):
return GCSClient()


def _local_fs_client(config: Config = None):
return LocalFSClient()


storage_clients = {GS: _gcs_client, S3: _s3_client, LOCAL_FILE: _local_fs_client}


def get_staging_client(scheme):
def get_staging_client(scheme, config: Config = None):
"""
Initialization of a specific client object(GCSClient, S3Client etc.)
Args:
scheme (str): uri scheme: s3, gs or file
config (Config): additional configuration
Returns:
An object of concrete implementation of AbstractStagingClient
"""
try:
return storage_clients[scheme]()
return storage_clients[scheme](config)
except ValueError:
raise Exception(
f"Could not identify file scheme {scheme}. Only gs://, file:// and s3:// are supported"
Expand Down

0 comments on commit aed366b

Please sign in to comment.