Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EMR streaming job launcher #1065

Merged
merged 1 commit into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,10 @@ def start_stream_to_online(feature_table: str, jar: str):
"""
Start stream to online sync job.
"""
import feast.pyspark.aws.jobs

client = Client()
table = client.get_feature_table(feature_table)
feast.pyspark.aws.jobs.start_stream_to_online(client, table, [jar] if jar else [])
client.start_stream_to_online_ingestion(table, [jar] if jar else [])


@cli.command()
Expand Down
8 changes: 7 additions & 1 deletion sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
start_historical_feature_retrieval_job,
start_historical_feature_retrieval_spark_session,
start_offline_to_online_ingestion,
start_stream_to_online_ingestion,
)
from feast.serving.ServingService_pb2 import (
GetFeastServingInfoRequest,
Expand Down Expand Up @@ -886,10 +887,15 @@ def _get_feature_tables_from_feature_refs(
return feature_tables

def start_offline_to_online_ingestion(
self, feature_table: Union[FeatureTable, str], start: datetime, end: datetime,
self, feature_table: FeatureTable, start: datetime, end: datetime,
) -> SparkJob:
return start_offline_to_online_ingestion(feature_table, start, end, self) # type: ignore

def start_stream_to_online_ingestion(
self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None,
) -> SparkJob:
return start_stream_to_online_ingestion(feature_table, extra_jars or [], self)

def stage_dataframe(
self,
df: pd.DataFrame,
Expand Down
82 changes: 77 additions & 5 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def get_output_file_uri(self, timeout_sec=None):
raise NotImplementedError


class IngestionJobParameters(SparkJobParameters):
class BatchIngestionJobParameters(SparkJobParameters):
def __init__(
self,
feature_table: Dict,
Expand Down Expand Up @@ -310,12 +310,68 @@ def get_arguments(self) -> List[str]:
]


class IngestionJob(SparkJob):
class StreamIngestionJobParameters(SparkJobParameters):
def __init__(
self,
feature_table: Dict,
source: Dict,
jar: str,
extra_jars: List[str],
redis_host: str,
redis_port: int,
redis_ssl: bool,
):
self._feature_table = feature_table
self._source = source
self._jar = jar
self._extra_jars = extra_jars
self._redis_host = redis_host
self._redis_port = redis_port
self._redis_ssl = redis_ssl

def get_name(self) -> str:
return f"StreamIngestion-{self.get_feature_table_name()}"

def _get_redis_config(self):
return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl)

def get_feature_table_name(self) -> str:
return self._feature_table["name"]

def get_main_file_path(self) -> str:
return self._jar

def get_extra_jar_paths(self) -> List[str]:
return self._extra_jars

def get_class_name(self) -> Optional[str]:
return "feast.ingestion.IngestionJob"

def get_arguments(self) -> List[str]:
return [
"--mode",
"online",
"--feature-table",
json.dumps(self._feature_table),
"--source",
json.dumps(self._source),
"--redis",
json.dumps(self._get_redis_config()),
]


class BatchIngestionJob(SparkJob):
"""
Container for the ingestion job result
"""


class StreamIngestionJob(SparkJob):
"""
Container for the streaming ingestion job result
"""


class JobLauncher(abc.ABC):
"""
Submits spark jobs to a spark cluster. Currently supports only historical feature retrieval jobs.
Expand All @@ -339,8 +395,8 @@ def historical_feature_retrieval(

@abc.abstractmethod
def offline_to_online_ingestion(
self, ingestion_job_params: IngestionJobParameters
) -> IngestionJob:
self, ingestion_job_params: BatchIngestionJobParameters
) -> BatchIngestionJob:
"""
Submits a batch ingestion job to a Spark cluster.

Expand All @@ -349,7 +405,23 @@ def offline_to_online_ingestion(
during execution, or timeout.

Returns:
IngestionJob: wrapper around remote job that can be used to check when job completed.
BatchIngestionJob: wrapper around remote job that can be used to check when job completed.
"""
raise NotImplementedError

@abc.abstractmethod
def start_stream_to_online_ingestion(
self, ingestion_job_params: StreamIngestionJobParameters
) -> StreamIngestionJob:
"""
Starts a stream ingestion job to a Spark cluster.

Raises:
SparkJobFailure: The spark job submission failed, encountered error
during execution, or timeout.

Returns:
StreamIngestionJob: wrapper around remote job.
"""
raise NotImplementedError

Expand Down
58 changes: 40 additions & 18 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import shutil
import tempfile
from datetime import datetime
from typing import TYPE_CHECKING, List, Union, cast
from typing import TYPE_CHECKING, List, Union
from urllib.parse import urlparse

from feast.config import Config
Expand All @@ -23,14 +23,16 @@
CONFIG_SPARK_LAUNCHER,
CONFIG_SPARK_STANDALONE_MASTER,
)
from feast.data_source import BigQuerySource, DataSource, FileSource
from feast.data_source import BigQuerySource, DataSource, FileSource, KafkaSource
from feast.feature_table import FeatureTable
from feast.pyspark.abc import (
IngestionJob,
IngestionJobParameters,
BatchIngestionJob,
BatchIngestionJobParameters,
JobLauncher,
RetrievalJob,
RetrievalJobParameters,
StreamIngestionJob,
StreamIngestionJobParameters,
)
from feast.staging.storage_client import get_staging_client
from feast.value_type import ValueType
Expand Down Expand Up @@ -85,10 +87,7 @@ def resolve_launcher(config: Config) -> JobLauncher:
return _launchers[config.get(CONFIG_SPARK_LAUNCHER)](config)


_SOURCES = {
FileSource: ("file", "file_options"),
BigQuerySource: ("bq", "bigquery_options"),
}
_SOURCES = {FileSource: "file", BigQuerySource: "bq", KafkaSource: "kafka"}


def _source_to_argument(source: DataSource):
Expand All @@ -99,16 +98,19 @@ def _source_to_argument(source: DataSource):
"date_partition_column": source.date_partition_column,
}

kind, option_field = _SOURCES[type(source)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

option_field here was unused anyway after #1049

kind = _SOURCES[type(source)]
properties = {**common_properties}
if type(source) == FileSource:
file_source = cast(FileSource, source)
properties["path"] = file_source.file_options.file_url
properties["format"] = str(file_source.file_options.file_format)
if isinstance(source, FileSource):
properties["path"] = source.file_options.file_url
properties["format"] = str(source.file_options.file_format)
return {kind: properties}
if isinstance(source, BigQuerySource):
properties["table_ref"] = source.bigquery_options.table_ref
return {kind: properties}
if type(source) == BigQuerySource:
bq_source = cast(BigQuerySource, source)
properties["table_ref"] = bq_source.bigquery_options.table_ref
if isinstance(source, KafkaSource):
properties["topic"] = source.kafka_options.topic
properties["classpath"] = source.kafka_options.class_path
properties["bootstrap_servers"] = source.kafka_options.bootstrap_servers
return {kind: properties}
raise NotImplementedError(f"Unsupported Datasource: {type(source)}")

Expand Down Expand Up @@ -194,13 +196,13 @@ def _download_jar(remote_jar: str) -> str:

def start_offline_to_online_ingestion(
feature_table: FeatureTable, start: datetime, end: datetime, client: "Client"
) -> IngestionJob:
) -> BatchIngestionJob:

launcher = resolve_launcher(client._config)
local_jar_path = _download_jar(client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR))

return launcher.offline_to_online_ingestion(
IngestionJobParameters(
BatchIngestionJobParameters(
jar=local_jar_path,
source=_source_to_argument(feature_table.batch_source),
feature_table=_feature_table_to_argument(client, feature_table),
Expand All @@ -213,6 +215,26 @@ def start_offline_to_online_ingestion(
)


def start_stream_to_online_ingestion(
feature_table: FeatureTable, extra_jars: List[str], client: "Client"
) -> StreamIngestionJob:

launcher = resolve_launcher(client._config)
local_jar_path = _download_jar(client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR))

return launcher.start_stream_to_online_ingestion(
StreamIngestionJobParameters(
jar=local_jar_path,
extra_jars=extra_jars,
source=_source_to_argument(feature_table.stream_source),
feature_table=_feature_table_to_argument(client, feature_table),
redis_host=client._config.get(CONFIG_REDIS_HOST),
redis_port=client._config.getint(CONFIG_REDIS_PORT),
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
)
)


def stage_dataframe(
df, event_timestamp_column: str, created_timestamp_column: str, client: "Client"
) -> FileSource:
Expand Down
14 changes: 12 additions & 2 deletions sdk/python/feast/pyspark/launchers/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
from .emr import EmrClusterLauncher, EmrIngestionJob, EmrRetrievalJob
from .emr import (
EmrBatchIngestionJob,
EmrClusterLauncher,
EmrRetrievalJob,
EmrStreamIngestionJob,
)

__all__ = ["EmrRetrievalJob", "EmrIngestionJob", "EmrClusterLauncher"]
__all__ = [
"EmrRetrievalJob",
"EmrBatchIngestionJob",
"EmrStreamIngestionJob",
"EmrClusterLauncher",
]
59 changes: 51 additions & 8 deletions sdk/python/feast/pyspark/launchers/aws/emr.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import os
import tempfile
from io import BytesIO
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

import boto3
import pandas

from feast.data_format import ParquetFormat
from feast.data_source import FileSource
from feast.pyspark.abc import (
IngestionJob,
IngestionJobParameters,
BatchIngestionJob,
BatchIngestionJobParameters,
JobLauncher,
RetrievalJob,
RetrievalJobParameters,
SparkJobStatus,
StreamIngestionJob,
StreamIngestionJobParameters,
)

from .emr_utils import (
Expand All @@ -28,6 +30,7 @@
_load_new_cluster_template,
_random_string,
_s3_upload,
_stream_ingestion_step,
_sync_offline_to_online_step,
_upload_jar,
_wait_for_job_state,
Expand Down Expand Up @@ -82,7 +85,7 @@ def get_output_file_uri(self, timeout_sec=None):
return self._output_file_uri


class EmrIngestionJob(EmrJobMixin, IngestionJob):
class EmrBatchIngestionJob(EmrJobMixin, BatchIngestionJob):
"""
Ingestion job result for a EMR cluster
"""
Expand All @@ -91,6 +94,15 @@ def __init__(self, emr_client, job_ref: EmrJobRef):
super().__init__(emr_client, job_ref)


class EmrStreamIngestionJob(EmrJobMixin, StreamIngestionJob):
"""
Ingestion streaming job for a EMR cluster
"""

def __init__(self, emr_client, job_ref: EmrJobRef):
super().__init__(emr_client, job_ref)


class EmrClusterLauncher(JobLauncher):
"""
Submits jobs to an existing or new EMR cluster. Requires boto3 as an additional dependency.
Expand Down Expand Up @@ -203,8 +215,8 @@ def historical_feature_retrieval(
)

def offline_to_online_ingestion(
self, ingestion_job_params: IngestionJobParameters
) -> IngestionJob:
self, ingestion_job_params: BatchIngestionJobParameters
) -> BatchIngestionJob:
"""
Submits a batch ingestion job to a Spark cluster.

Expand All @@ -213,7 +225,7 @@ def offline_to_online_ingestion(
during execution, or timeout.

Returns:
IngestionJob: wrapper around remote job that can be used to check when job completed.
BatchIngestionJob: wrapper around remote job that can be used to check when job completed.
"""

jar_s3_path = _upload_jar(
Expand All @@ -227,7 +239,38 @@ def offline_to_online_ingestion(

job_ref = self._submit_emr_job(step)

return EmrIngestionJob(self._emr_client(), job_ref)
return EmrBatchIngestionJob(self._emr_client(), job_ref)

def start_stream_to_online_ingestion(
oavdeev marked this conversation as resolved.
Show resolved Hide resolved
self, ingestion_job_params: StreamIngestionJobParameters
) -> StreamIngestionJob:
"""
Starts a stream ingestion job on a Spark cluster.

Returns:
StreamIngestionJob: wrapper around remote job that can be used to check on the job.
"""
jar_s3_path = _upload_jar(
self._staging_location, ingestion_job_params.get_main_file_path()
)

extra_jar_paths: List[str] = []
for extra_jar in ingestion_job_params.get_extra_jar_paths():
if extra_jar.startswith("s3://"):
extra_jar_paths.append(extra_jar)
else:
extra_jar_paths.append(_upload_jar(self._staging_location, extra_jar))

step = _stream_ingestion_step(
jar_s3_path,
extra_jar_paths,
ingestion_job_params.get_feature_table_name(),
args=ingestion_job_params.get_arguments(),
)

job_ref = self._submit_emr_job(step)

return EmrStreamIngestionJob(self._emr_client(), job_ref)

def stage_dataframe(
self, df: pandas.DataFrame, event_timestamp: str, created_timestamp_column: str
Expand Down
Loading