Skip to content

Commit

Permalink
S3 endpoint configuration feast-dev#1169
Browse files Browse the repository at this point in the history
Signed-off-by: mike0sv <[email protected]>
  • Loading branch information
mike0sv committed Nov 17, 2020
1 parent aafa0a7 commit 39cac68
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 10 deletions.
5 changes: 4 additions & 1 deletion sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,9 @@ def ingest(
try:
if issubclass(type(feature_table.batch_source), FileSource):
file_url = feature_table.batch_source.file_options.file_url.rstrip("*")
_upload_to_file_source(file_url, with_partitions, dest_path)
_upload_to_file_source(
file_url, with_partitions, dest_path, self._config
)
if issubclass(type(feature_table.batch_source), BigQuerySource):
bq_table_ref = feature_table.batch_source.bigquery_options.table_ref
feature_table_timestamp_column = (
Expand Down Expand Up @@ -979,6 +981,7 @@ def get_historical_features(
entity_source = stage_entities_to_fs(
entity_source,
staging_location=self._config.get(CONFIG_SPARK_STAGING_LOCATION),
config=self._config,
)

if self._use_job_service:
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class AuthProvider(Enum):
CONFIG_TIMEOUT_KEY = "timeout"
CONFIG_MAX_WAIT_INTERVAL_KEY = "max_wait_interval"

CONFIG_S3_ENDPOINT_URL = "s3_endpoint_url"

# Spark Job Config
CONFIG_SPARK_LAUNCHER = "spark_launcher" # standalone, dataproc, emr

Expand Down Expand Up @@ -130,6 +132,8 @@ class AuthProvider(Enum):
CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY: "600",
CONFIG_TIMEOUT_KEY: "21600",
CONFIG_MAX_WAIT_INTERVAL_KEY: "60",
# Endpoint URL for S3 storage_client
CONFIG_S3_ENDPOINT_URL: None,
# Authentication Provider - Google OpenID/OAuth
CONFIG_AUTH_PROVIDER: "google",
CONFIG_SPARK_LAUNCHER: "dataproc",
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/loaders/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pyarrow as pa
from pyarrow import parquet as pq

from feast.config import Config
from feast.staging.storage_client import get_staging_client

GRPC_CONNECTION_TIMEOUT_DEFAULT = 3 # type: int
Expand Down Expand Up @@ -173,7 +174,7 @@ def _read_table_from_source(


def _upload_to_file_source(
file_url: str, with_partitions: bool, dest_path: str
file_url: str, with_partitions: bool, dest_path: str, config: Config
) -> None:
"""
Uploads data into a FileSource. Currently supports GCS, S3 and Local FS.
Expand All @@ -184,7 +185,7 @@ def _upload_to_file_source(
from urllib.parse import urlparse

uri = urlparse(file_url)
staging_client = get_staging_client(uri.scheme)
staging_client = get_staging_client(uri.scheme, config)

if with_partitions:
for path in glob.glob(os.path.join(dest_path, "**/*")):
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/staging/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pandas as pd

from feast.config import Config
from feast.data_format import ParquetFormat
from feast.data_source import BigQuerySource, FileSource
from feast.staging.storage_client import get_staging_client
Expand All @@ -18,15 +19,15 @@


def stage_entities_to_fs(
entity_source: pd.DataFrame, staging_location: str
entity_source: pd.DataFrame, staging_location: str, config: Config
) -> FileSource:
"""
Dumps given (entities) dataframe as parquet file and stage it to remote file storage (subdirectory of staging_location)
:return: FileSource with remote destination path
"""
entity_staging_uri = urlparse(os.path.join(staging_location, str(uuid.uuid4())))
staging_client = get_staging_client(entity_staging_uri.scheme)
staging_client = get_staging_client(entity_staging_uri.scheme, config)
with tempfile.NamedTemporaryFile() as df_export_path:
entity_source.to_parquet(df_export_path.name)
bucket = (
Expand Down
28 changes: 23 additions & 5 deletions sdk/python/feast/staging/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

from google.auth.exceptions import DefaultCredentialsError

from feast.config import Config
from feast.constants import CONFIG_S3_ENDPOINT_URL

GS = "gs"
S3 = "s3"
LOCAL_FILE = "file"
Expand Down Expand Up @@ -144,15 +147,15 @@ class S3Client(AbstractStagingClient):
Implementation of AbstractStagingClient for Aws S3 storage
"""

def __init__(self):
def __init__(self, endpoint_url: str = None):
try:
import boto3
except ImportError:
raise ImportError(
"Install package boto3 for s3 staging support"
"run ```pip install boto3```"
)
self.s3_client = boto3.client("s3")
self.s3_client = boto3.client("s3", endpoint_url=endpoint_url)

def download_file(self, uri: ParseResult) -> IO[bytes]:
"""
Expand Down Expand Up @@ -275,21 +278,36 @@ def upload_file(self, local_path: str, bucket: str, remote_path: str):
shutil.copy(local_path, dest_fpath)


storage_clients = {GS: GCSClient, S3: S3Client, LOCAL_FILE: LocalFSClient}
def _s3_client(config: Config = None):
return S3Client(
endpoint_url=config.get(CONFIG_S3_ENDPOINT_URL) if config is not None else None
)


def _gcs_client(config: Config = None):
return GCSClient()


def _local_fs_client(config: Config = None):
return LocalFSClient()


storage_clients = {GS: _gcs_client, S3: _s3_client, LOCAL_FILE: _local_fs_client}


def get_staging_client(scheme):
def get_staging_client(scheme, config: Config = None):
"""
Initialization of a specific client object(GCSClient, S3Client etc.)
Args:
scheme (str): uri scheme: s3, gs or file
config (Config): additional configuration
Returns:
An object of concrete implementation of AbstractStagingClient
"""
try:
return storage_clients[scheme]()
return storage_clients[scheme](config)
except ValueError:
raise Exception(
f"Could not identify file scheme {scheme}. Only gs://, file:// and s3:// are supported"
Expand Down

0 comments on commit 39cac68

Please sign in to comment.