Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In Historical Retrieval from BQ join between source & entities is performed inside BQ #1110

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9477574
create bq view with join query
pyalex Oct 29, 2020
5e1f389
import batch_source fixture
pyalex Oct 29, 2020
7d71f33
bq dataset fixture
pyalex Oct 29, 2020
6ad2f00
bq dataset fixture
pyalex Oct 29, 2020
2e6e463
use bq dataset fixture through request
pyalex Oct 29, 2020
15761b0
move bq package to standalone launcher
pyalex Oct 29, 2020
dd97707
revert extra options
pyalex Oct 29, 2020
416906b
e2e fail fast
pyalex Oct 29, 2020
727d3d6
fix dataset id
pyalex Oct 29, 2020
6905b4f
fix bq source
pyalex Oct 29, 2020
1b4ffd7
serving to wait core is running
pyalex Oct 29, 2020
3ef5615
delete contents
pyalex Oct 29, 2020
de1833e
fix staging functions
pyalex Oct 29, 2020
983e862
add bq jar
pyalex Oct 29, 2020
3d70460
add created_timestamp
pyalex Oct 29, 2020
5b68446
test ingestion from bq view
pyalex Oct 29, 2020
23e0aed
make test-integration executable
pyalex Oct 29, 2020
3f66292
add tests timeout
pyalex Oct 29, 2020
23c8aaa
fix bq staging test
pyalex Oct 29, 2020
401ad07
fix test online e2e
pyalex Oct 29, 2020
0ed9a7e
format
pyalex Oct 29, 2020
0ccdf8f
wait until bq table created
pyalex Oct 29, 2020
eb26370
fix verify
pyalex Oct 29, 2020
c79844a
add explicit data sources to historical retrieval via JS
pyalex Oct 29, 2020
5fea06a
debug
pyalex Oct 29, 2020
b0e9eec
correct dataframe ingested
pyalex Oct 29, 2020
59a6512
debug
pyalex Oct 29, 2020
0134758
boundaries from original df
pyalex Oct 30, 2020
a488770
cleanup & online retrieval timeout
pyalex Oct 30, 2020
2627aab
bq replacement moved to launcher
pyalex Nov 2, 2020
43881e9
add passing it test to pass checks
pyalex Nov 2, 2020
cafc468
add timeout to expected arguments
pyalex Nov 2, 2020
52914ff
rerun client fixture on auth switch
pyalex Nov 2, 2020
2d39760
enable_auth tombstone
pyalex Nov 2, 2020
76472fe
update default timeout
pyalex Nov 2, 2020
ef3b8aa
check entities mapping
pyalex Nov 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions infra/scripts/test-end-to-end-gcp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ python -m pip install --upgrade pip setuptools wheel
make install-python
python -m pip install -qr tests/requirements.txt

su -p postgres -c "PATH=$PATH HOME=/tmp pytest tests/e2e/ \
su -p postgres -c "PATH=$PATH HOME=/tmp pytest -v tests/e2e/ \
--feast-version develop --env=gcloud --dataproc-cluster-name feast-e2e \
--dataproc-project kf-feast --dataproc-region us-central1 \
--redis-url 10.128.0.105:6379 --redis-cluster --kafka-brokers 10.128.0.103:9094"
--redis-url 10.128.0.105:6379 --redis-cluster --kafka-brokers 10.128.0.103:9094 \
--bq-project kf-feast"
2 changes: 1 addition & 1 deletion infra/scripts/test-end-to-end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ python -m pip install --upgrade pip setuptools wheel
make install-python
python -m pip install -qr tests/requirements.txt

su -p postgres -c "PATH=$PATH HOME=/tmp pytest tests/e2e/ --feast-version develop"
su -p postgres -c "PATH=$PATH HOME=/tmp pytest -v tests/e2e/ --feast-version develop"
Empty file modified infra/scripts/test-integration.sh
100644 → 100755
Empty file.
3 changes: 3 additions & 0 deletions protos/feast/core/JobService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ message GetHistoricalFeaturesRequest {
// Export to AWS S3 - s3://path/to/features
// Export to GCP GCS - gs://path/to/features
string output_location = 4;

// Specify format name for output, eg. parquet
string output_format = 5;
}

message GetHistoricalFeaturesResponse {
Expand Down
55 changes: 30 additions & 25 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
import multiprocessing
import os
import shutil
import tempfile
import uuid
from datetime import datetime
from itertools import groupby
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urlparse

import grpc
import pandas as pd
Expand Down Expand Up @@ -101,7 +99,11 @@
GetOnlineFeaturesRequestV2,
)
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
from feast.staging.storage_client import get_staging_client
from feast.staging.entities import (
stage_entities_to_bq,
stage_entities_to_fs,
table_reference_from_string,
)

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -855,6 +857,7 @@ def get_online_features(
entity_rows=_infer_online_entity_rows(entity_rows),
project=project if project is not None else self.project,
),
timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY),
metadata=self._get_grpc_metadata(),
)
except grpc.RpcError as e:
Expand All @@ -879,8 +882,11 @@ def get_historical_features(
"feature_table:feature" where "feature_table" & "feature" refer to
the feature and feature table names respectively.
entity_source (Union[pd.DataFrame, FileSource, BigQuerySource]): Source for the entity rows.
If entity_source is a Panda DataFrame, the dataframe will be exported to the staging
location as parquet file. It is also assumed that the column event_timestamp is present
If entity_source is a Panda DataFrame, the dataframe will be staged
to become accessible by spark workers.
If one of feature tables' source is in BigQuery - entities will be upload to BQ.
Otherwise to remote file storage (derived from configured staging location).
It is also assumed that the column event_timestamp is present
in the dataframe, and is of type datetime without timezone information.

The user needs to make sure that the source (or staging location, if entity_source is
Expand Down Expand Up @@ -916,25 +922,27 @@ def get_historical_features(
str(uuid.uuid4()),
)
output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT)
feature_sources = [
feature_table.batch_source for feature_table in feature_tables
]

if isinstance(entity_source, pd.DataFrame):
staging_location = self._config.get(CONFIG_SPARK_STAGING_LOCATION)
entity_staging_uri = urlparse(
os.path.join(staging_location, str(uuid.uuid4()))
)
staging_client = get_staging_client(entity_staging_uri.scheme)
with tempfile.NamedTemporaryFile() as df_export_path:
entity_source.to_parquet(df_export_path.name)
bucket = (
None
if entity_staging_uri.scheme == "file"
else entity_staging_uri.netloc
if any(isinstance(source, BigQuerySource) for source in feature_sources):
first_bq_source = [
source
for source in feature_sources
if isinstance(source, BigQuerySource)
][0]
source_ref = table_reference_from_string(
first_bq_source.bigquery_options.table_ref
)
staging_client.upload_file(
df_export_path.name, bucket, entity_staging_uri.path.lstrip("/")
entity_source = stage_entities_to_bq(
entity_source, source_ref.project, source_ref.dataset_id
)
entity_source = FileSource(
"event_timestamp", ParquetFormat(), entity_staging_uri.geturl(),
else:
entity_source = stage_entities_to_fs(
entity_source,
staging_location=self._config.get(CONFIG_SPARK_STAGING_LOCATION),
)

if self._use_job_service:
Expand All @@ -943,6 +951,7 @@ def get_historical_features(
feature_refs=feature_refs,
entity_source=entity_source.to_proto(),
project=project,
output_format=output_format,
output_location=output_location,
),
**self._extra_grpc_params(),
Expand All @@ -955,11 +964,7 @@ def get_historical_features(
)
else:
return start_historical_feature_retrieval_job(
self,
entity_source,
feature_tables,
output_format,
os.path.join(output_location, str(uuid.uuid4())),
self, entity_source, feature_tables, output_format, output_location,
)

def get_historical_features_df(
Expand Down
15 changes: 10 additions & 5 deletions sdk/python/feast/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from feast.core import JobService_pb2_grpc
from feast.core.JobService_pb2 import (
CancelJobResponse,
GetHistoricalFeaturesRequest,
GetHistoricalFeaturesResponse,
GetJobResponse,
)
Expand All @@ -20,6 +21,7 @@
SparkJobStatus,
StreamIngestionJob,
)
from feast.pyspark.launcher import start_historical_feature_retrieval_job
from feast.third_party.grpc.health.v1 import HealthService_pb2_grpc
from feast.third_party.grpc.health.v1.HealthService_pb2 import (
HealthCheckResponse,
Expand Down Expand Up @@ -64,13 +66,16 @@ def StartOfflineToOnlineIngestionJob(self, request, context):
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")

def GetHistoricalFeatures(self, request, context):
def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
"""Produce a training dataset, return a job id that will provide a file reference"""
job = self.client.get_historical_features(
request.feature_refs,
job = start_historical_feature_retrieval_job(
client=self.client,
entity_source=DataSource.from_proto(request.entity_source),
project=request.project,
output_location=request.output_location,
feature_tables=self.client._get_feature_tables_from_feature_refs(
list(request.feature_refs), request.project
),
output_format=request.output_format,
output_path=request.output_location,
)

output_file_uri = job.get_output_file_uri(block=False)
Expand Down
30 changes: 18 additions & 12 deletions sdk/python/feast/pyspark/historical_feature_retrieval_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,25 +149,31 @@ def spark_format(self) -> str:
def spark_path(self) -> str:
return f"{self.project}:{self.dataset}.{self.table}"

@property
def spark_read_options(self) -> Dict[str, str]:
return {**super().spark_read_options, "viewsEnabled": "true"}


def _source_from_dict(dct: Dict) -> Source:
if "file" in dct.keys():
return FileSource(
FileSource.PROTO_FORMAT_TO_SPARK[dct["file"]["format"]["json_class"]],
dct["file"]["path"],
dct["file"]["event_timestamp_column"],
dct["file"].get("created_timestamp_column"),
dct["file"].get("field_mapping"),
dct["file"].get("options"),
format=FileSource.PROTO_FORMAT_TO_SPARK[
dct["file"]["format"]["json_class"]
],
path=dct["file"]["path"],
event_timestamp_column=dct["file"]["event_timestamp_column"],
created_timestamp_column=dct["file"].get("created_timestamp_column"),
field_mapping=dct["file"].get("field_mapping"),
options=dct["file"].get("options"),
)
else:
return BigQuerySource(
dct["bq"]["project"],
dct["bq"]["dataset"],
dct["bq"]["table"],
dct["bq"].get("field_mapping", {}),
dct["bq"]["event_timestamp_column"],
dct["bq"].get("created_timestamp_column"),
project=dct["bq"]["project"],
dataset=dct["bq"]["dataset"],
table=dct["bq"]["table"],
field_mapping=dct["bq"].get("field_mapping", {}),
event_timestamp_column=dct["bq"]["event_timestamp_column"],
created_timestamp_column=dct["bq"].get("created_timestamp_column"),
)


Expand Down
48 changes: 43 additions & 5 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
StreamIngestionJob,
StreamIngestionJobParameters,
)
from feast.staging.entities import create_bq_view_of_joined_features_and_entities
from feast.staging.storage_client import get_staging_client
from feast.value_type import ValueType

Expand Down Expand Up @@ -106,7 +107,11 @@ def _source_to_argument(source: DataSource):
return {"file": properties}

if isinstance(source, BigQuerySource):
properties["table_ref"] = source.bigquery_options.table_ref
project, dataset_and_table = source.bigquery_options.table_ref.split(":")
dataset, table = dataset_and_table.split(".")
properties["project"] = project
properties["dataset"] = dataset
properties["table"] = table
return {"bq": properties}

if isinstance(source, KafkaSource):
Expand Down Expand Up @@ -171,13 +176,17 @@ def start_historical_feature_retrieval_job(
output_path: str,
) -> RetrievalJob:
launcher = resolve_launcher(client._config)
feature_sources = [
_source_to_argument(
replace_bq_table_with_joined_view(feature_table, entity_source)
)
for feature_table in feature_tables
]

return launcher.historical_feature_retrieval(
RetrievalJobParameters(
entity_source=_source_to_argument(entity_source),
feature_tables_sources=[
_source_to_argument(feature_table.batch_source)
for feature_table in feature_tables
],
feature_tables_sources=feature_sources,
feature_tables=[
_feature_table_to_argument(client, feature_table)
for feature_table in feature_tables
Expand All @@ -188,6 +197,35 @@ def start_historical_feature_retrieval_job(
)


def replace_bq_table_with_joined_view(
feature_table: FeatureTable, entity_source: Union[FileSource, BigQuerySource],
) -> Union[FileSource, BigQuerySource]:
"""
Applies optimization to historical retrieval. Instead of pulling all data from Batch Source,
with this optimization we join feature values & entities on Data Warehouse side (improving data locality).
Several conditions should be met to enable this optimization:
* entities are staged to BigQuery
* feature values are in in BigQuery
* Entity columns are not mapped (ToDo: fix this limitation)
:return: replacement for feature source
"""
if not isinstance(feature_table.batch_source, BigQuerySource):
return feature_table.batch_source

if not isinstance(entity_source, BigQuerySource):
return feature_table.batch_source

if any(
entity in feature_table.batch_source.field_mapping
for entity in feature_table.entities
):
return feature_table.batch_source

return create_bq_view_of_joined_features_and_entities(
feature_table.batch_source, entity_source, feature_table.entities,
)


def _download_jar(remote_jar: str) -> str:
remote_jar_parts = urlparse(remote_jar)

Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class DataprocClusterLauncher(JobLauncher):
addition to the Feast SDK.
"""

EXTERNAL_JARS = ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"]

def __init__(
self, cluster_name: str, staging_location: str, region: str, project_id: str,
):
Expand Down Expand Up @@ -157,7 +159,7 @@ def dataproc_submit(self, job_params: SparkJobParameters) -> Operation:
job_config.update(
{
"spark_job": {
"jar_file_uris": [main_file_uri],
"jar_file_uris": [main_file_uri] + self.EXTERNAL_JARS,
"main_class": job_params.get_class_name(),
"args": job_params.get_arguments(),
}
Expand All @@ -168,6 +170,7 @@ def dataproc_submit(self, job_params: SparkJobParameters) -> Operation:
{
"pyspark_job": {
"main_python_file_uri": main_file_uri,
"jar_file_uris": self.EXTERNAL_JARS,
"args": job_params.get_arguments(),
}
}
Expand Down
19 changes: 19 additions & 0 deletions sdk/python/feast/pyspark/launchers/standalone/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ class StandaloneClusterLauncher(JobLauncher):
Submits jobs to a standalone Spark cluster in client mode.
"""

BQ_CONNECTOR_VERSION = "2.12:0.17.3"

def __init__(self, master_url: str, spark_home: str = None):
"""
This launcher executes the spark-submit script in a subprocess. The subprocess
Expand Down Expand Up @@ -184,6 +186,23 @@ def spark_submit(
if ui_port:
submission_cmd.extend(["--conf", f"spark.ui.port={ui_port}"])

# Workaround for https://github.com/apache/spark/pull/26552
# Fix running spark job with bigquery connector (w/ shadowing) on JDK 9+
submission_cmd.extend(
[
"--conf",
"spark.executor.extraJavaOptions="
"-Dcom.google.cloud.spark.bigquery.repackaged.io.netty.tryReflectionSetAccessible=true -Duser.timezone=GMT",
"--conf",
"spark.driver.extraJavaOptions="
"-Dcom.google.cloud.spark.bigquery.repackaged.io.netty.tryReflectionSetAccessible=true -Duser.timezone=GMT",
"--conf",
"spark.sql.session.timeZone=UTC", # ignore local timezone
"--packages",
f"com.google.cloud.spark:spark-bigquery-with-dependencies_{self.BQ_CONNECTOR_VERSION}",
]
)

if job_params.get_extra_options():
submission_cmd.extend(job_params.get_extra_options().split(" "))

Expand Down
Loading