Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Ingestion Job supports AVRO format as input #1072

Merged
merged 8 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/master_only.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,15 @@ jobs:
- uses: stCarolas/setup-maven@v3
with:
maven-version: 3.6.3
- name: build-jar
- name: Publish develop version of ingestion job
run: |
if [ ${GITHUB_REF#refs/*/} == "master" ]; then
make build-java-no-tests REVISION=develop
gsutil cp ./spark/ingestion/target/feast-ingestion-spark-develop.jar gs://${PUBLISH_BUCKET}/spark/ingestion/
fi
- name: Get version
run: echo ::set-env name=RELEASE_VERSION::${GITHUB_REF#refs/*/}
- name: Publish tagged version of ingestion job
run: |
SEMVER_REGEX='^v[0-9]+\.[0-9]+\.[0-9]+(-([0-9A-Za-z-]+(\.[0-9A-Za-z-]+)*))?$'
if echo "${RELEASE_VERSION}" | grep -P "$SEMVER_REGEX" &>/dev/null ; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public static void validate(DataSource spec) {
spec.getKafkaOptions().getMessageFormat().getProtoFormat().getClassPath(),
"FeatureTable");
break;
case AVRO_FORMAT:
break;
default:
throw new UnsupportedOperationException(
String.format(
Expand All @@ -68,6 +70,8 @@ public static void validate(DataSource spec) {
spec.getKinesisOptions().getRecordFormat().getProtoFormat().getClassPath(),
"FeatureTable");
break;
case AVRO_FORMAT:
break;
default:
throw new UnsupportedOperationException(
String.format("Unsupported Stream Format for Kafka Source Type: %s", recordFormat));
Expand Down
2 changes: 1 addition & 1 deletion infra/charts/feast/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ feast-core:

feast-jobcontroller:
# feast-jobcontroller.enabled -- Flag to install Feast Job Controller
enabled: true
enabled: false

feast-online-serving:
# feast-online-serving.enabled -- Flag to install Feast Online Serving
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class AuthProvider(Enum):
# Authentication Provider - Google OpenID/OAuth
CONFIG_AUTH_PROVIDER: "google",
CONFIG_SPARK_LAUNCHER: "dataproc",
CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/feast-ingestion-spark-0.8-SNAPSHOT.jar",
CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/feast-ingestion-spark-develop.jar",
CONFIG_REDIS_HOST: "localhost",
CONFIG_REDIS_PORT: "6379",
CONFIG_REDIS_SSL: "False",
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class SparkJobFailure(Exception):


class SparkJobStatus(Enum):
STARTING = 0
IN_PROGRESS = 1
FAILED = 2
COMPLETED = 3
Expand Down Expand Up @@ -48,6 +49,13 @@ def get_status(self) -> SparkJobStatus:
"""
raise NotImplementedError

@abc.abstractmethod
def cancel(self):
"""
Manually terminate job
"""
raise NotImplementedError


class SparkJobParameters(abc.ABC):
@abc.abstractmethod
Expand Down
14 changes: 10 additions & 4 deletions sdk/python/feast/pyspark/historical_feature_retrieval_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ class FileSource(Source):
options (Optional[Dict[str, str]]): Options to be passed to spark while reading the file source.
"""

PROTO_FORMAT_TO_SPARK = {
"ParquetFormat": "parquet",
"AvroFormat": "avro",
"CSVFormat": "csv",
}

def __init__(
self,
format: str,
Expand Down Expand Up @@ -147,7 +153,7 @@ def spark_path(self) -> str:
def _source_from_dict(dct: Dict) -> Source:
if "file" in dct.keys():
return FileSource(
dct["file"]["format"],
FileSource.PROTO_FORMAT_TO_SPARK[dct["file"]["format"]["json_class"]],
dct["file"]["path"],
dct["file"]["event_timestamp_column"],
dct["file"].get("created_timestamp_column"),
Expand Down Expand Up @@ -635,20 +641,20 @@ def retrieve_historical_features(

Example:
>>> entity_source_conf = {
"format": "csv",
"format": {"jsonClass": "ParquetFormat"},
"path": "file:///some_dir/customer_driver_pairs.csv"),
"options": {"inferSchema": "true", "header": "true"},
"field_mapping": {"id": "driver_id"}
}

>>> feature_tables_sources_conf = [
{
"format": "parquet",
"format": {"json_class": "ParquetFormat"},
"path": "gs://some_bucket/bookings.parquet"),
"field_mapping": {"id": "driver_id"}
},
{
"format": "avro",
"format": {"json_class": "AvroFormat", schema_json: "..avro schema.."},
"path": "s3://some_bucket/transactions.avro"),
}
]
Expand Down
25 changes: 15 additions & 10 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ def resolve_launcher(config: Config) -> JobLauncher:
return _launchers[config.get(CONFIG_SPARK_LAUNCHER)](config)


_SOURCES = {FileSource: "file", BigQuerySource: "bq", KafkaSource: "kafka"}


def _source_to_argument(source: DataSource):
common_properties = {
"field_mapping": dict(source.field_mapping),
Expand All @@ -98,20 +95,28 @@ def _source_to_argument(source: DataSource):
"date_partition_column": source.date_partition_column,
}

kind = _SOURCES[type(source)]
properties = {**common_properties}

if isinstance(source, FileSource):
properties["path"] = source.file_options.file_url
properties["format"] = str(source.file_options.file_format)
return {kind: properties}
properties["format"] = dict(
json_class=source.file_options.file_format.__class__.__name__
)
return {"file": properties}

if isinstance(source, BigQuerySource):
properties["table_ref"] = source.bigquery_options.table_ref
return {kind: properties}
return {"bq": properties}

if isinstance(source, KafkaSource):
properties["topic"] = source.kafka_options.topic
properties["classpath"] = source.kafka_options.class_path
properties["bootstrap_servers"] = source.kafka_options.bootstrap_servers
return {kind: properties}
properties["topic"] = source.kafka_options.topic
properties["format"] = {
**source.kafka_options.message_format.__dict__,
"json_class": source.kafka_options.message_format.__class__.__name__,
}
return {"kafka": properties}

raise NotImplementedError(f"Unsupported Datasource: {type(source)}")


Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/pyspark/launchers/aws/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ def get_status(self) -> SparkJobStatus:
# we should never get here
raise Exception("Invalid EMR state")

def cancel(self):
raise NotImplementedError


class EmrRetrievalJob(EmrJobMixin, RetrievalJob):
"""
Expand Down
17 changes: 13 additions & 4 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def get_status(self) -> SparkJobStatus:

return SparkJobStatus.FAILED

def cancel(self):
self._operation.cancel()


class DataprocRetrievalJob(DataprocJobMixin, RetrievalJob):
"""
Expand All @@ -71,7 +74,13 @@ def get_output_file_uri(self, timeout_sec=None):

class DataprocBatchIngestionJob(DataprocJobMixin, BatchIngestionJob):
"""
Ingestion job result for a Dataproc cluster
Batch Ingestion job result for a Dataproc cluster
"""


class DataprocStreamingIngestionJob(DataprocJobMixin, StreamIngestionJob):
"""
Streaming Ingestion job result for a Dataproc cluster
"""


Expand Down Expand Up @@ -151,14 +160,14 @@ def historical_feature_retrieval(
)

def offline_to_online_ingestion(
self, job_params: BatchIngestionJobParameters
self, ingestion_job_params: BatchIngestionJobParameters
) -> BatchIngestionJob:
return DataprocBatchIngestionJob(self.dataproc_submit(job_params))
return DataprocBatchIngestionJob(self.dataproc_submit(ingestion_job_params))

def start_stream_to_online_ingestion(
self, ingestion_job_params: StreamIngestionJobParameters
) -> StreamIngestionJob:
raise NotImplementedError
return DataprocStreamingIngestionJob(self.dataproc_submit(ingestion_job_params))

def stage_dataframe(
self, df, event_timestamp_column: str, created_timestamp_column: str,
Expand Down
99 changes: 90 additions & 9 deletions sdk/python/feast/pyspark/launchers/standalone/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import os
import socket
import subprocess
import uuid
from contextlib import closing

import requests
from requests.exceptions import RequestException

from feast.pyspark.abc import (
BatchIngestionJob,
Expand All @@ -16,28 +21,77 @@
)


def _find_free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]


class StandaloneClusterJobMixin:
def __init__(self, job_id: str, process: subprocess.Popen):
def __init__(
self, job_id: str, job_name: str, process: subprocess.Popen, ui_port: int = None
):
self._job_id = job_id
self._job_name = job_name
self._process = process
self._ui_port = ui_port

def get_id(self) -> str:
return self._job_id

def check_if_started(self):
if not self._ui_port:
return True

try:
applications = requests.get(
f"http://localhost:{self._ui_port}/api/v1/applications"
).json()
except RequestException:
return False

app = next(
iter(app for app in applications if app["name"] == self._job_name), None
)
if not app:
return False

stages = requests.get(
f"http://localhost:{self._ui_port}/api/v1/applications/{app['id']}/stages"
).json()
return bool(stages)

def get_status(self) -> SparkJobStatus:
code = self._process.poll()
if code is None:
if not self.check_if_started():
return SparkJobStatus.STARTING

return SparkJobStatus.IN_PROGRESS

if code != 0:
return SparkJobStatus.FAILED

return SparkJobStatus.COMPLETED

def cancel(self):
self._process.terminate()


class StandaloneClusterBatchIngestionJob(StandaloneClusterJobMixin, BatchIngestionJob):
"""
Ingestion job result for a standalone spark cluster
Batch Ingestion job result for a standalone spark cluster
"""

pass


class StandaloneClusterStreamingIngestionJob(
StandaloneClusterJobMixin, StreamIngestionJob
):
"""
Streaming Ingestion job result for a standalone spark cluster
"""

pass
Expand All @@ -48,7 +102,13 @@ class StandaloneClusterRetrievalJob(StandaloneClusterJobMixin, RetrievalJob):
Historical feature retrieval job result for a standalone spark cluster
"""

def __init__(self, job_id: str, process: subprocess.Popen, output_file_uri: str):
def __init__(
self,
job_id: str,
job_name: str,
process: subprocess.Popen,
output_file_uri: str,
):
"""
This is the returned historical feature retrieval job result for StandaloneClusterLauncher.
Expand All @@ -57,7 +117,7 @@ def __init__(self, job_id: str, process: subprocess.Popen, output_file_uri: str)
process (subprocess.Popen): Pyspark driver process, spawned by the launcher.
output_file_uri (str): Uri to the historical feature retrieval job output file.
"""
super().__init__(job_id, process)
super().__init__(job_id, job_name, process)
self._output_file_uri = output_file_uri

def get_output_file_uri(self, timeout_sec: int = None):
Expand Down Expand Up @@ -100,7 +160,9 @@ def __init__(self, master_url: str, spark_home: str = None):
def spark_submit_script_path(self):
return os.path.join(self.spark_home, "bin/spark-submit")

def spark_submit(self, job_params: SparkJobParameters) -> subprocess.Popen:
def spark_submit(
self, job_params: SparkJobParameters, ui_port: int = None
) -> subprocess.Popen:
submission_cmd = [
self.spark_submit_script_path,
"--master",
Expand All @@ -112,6 +174,9 @@ def spark_submit(self, job_params: SparkJobParameters) -> subprocess.Popen:
if job_params.get_class_name():
submission_cmd.extend(["--class", job_params.get_class_name()])

if ui_port:
submission_cmd.extend(["--conf", f"spark.ui.port={ui_port}"])

submission_cmd.append(job_params.get_main_file_path())
submission_cmd.extend(job_params.get_arguments())

Expand All @@ -122,19 +187,35 @@ def historical_feature_retrieval(
) -> RetrievalJob:
job_id = str(uuid.uuid4())
return StandaloneClusterRetrievalJob(
job_id, self.spark_submit(job_params), job_params.get_destination_path()
job_id,
job_params.get_name(),
self.spark_submit(job_params),
job_params.get_destination_path(),
)

def offline_to_online_ingestion(
self, job_params: BatchIngestionJobParameters
self, ingestion_job_params: BatchIngestionJobParameters
) -> BatchIngestionJob:
job_id = str(uuid.uuid4())
return StandaloneClusterBatchIngestionJob(job_id, self.spark_submit(job_params))
ui_port = _find_free_port()
return StandaloneClusterBatchIngestionJob(
job_id,
ingestion_job_params.get_name(),
self.spark_submit(ingestion_job_params, ui_port),
ui_port,
)

def start_stream_to_online_ingestion(
self, ingestion_job_params: StreamIngestionJobParameters
) -> StreamIngestionJob:
raise NotImplementedError
job_id = str(uuid.uuid4())
ui_port = _find_free_port()
return StandaloneClusterStreamingIngestionJob(
job_id,
ingestion_job_params.get_name(),
self.spark_submit(ingestion_job_params, ui_port),
ui_port,
)

def stage_dataframe(
self, df, event_timestamp_column: str, created_timestamp_column: str,
Expand Down
Loading