Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Ingestion Job supports AVRO format as input #1072

Merged
merged 8 commits into from
Oct 20, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
better check if started
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
pyalex committed Oct 20, 2020

Verified

This commit was signed with the committer’s verified signature.
xrmx Riccardo Magliocchetti
commit b1c1822edaee8229ac7319f48da81ca7eb359ae2
17 changes: 12 additions & 5 deletions sdk/python/feast/pyspark/launchers/standalone/local.py
Original file line number Diff line number Diff line change
@@ -40,25 +40,32 @@ def __init__(
def get_id(self) -> str:
return self._job_id

def get_internal_state(self):
def check_if_started(self):
if not self._ui_port:
return
return True

try:
applications = requests.get(
f"http://localhost:{self._ui_port}/api/v1/applications"
).json()
except RequestException:
return
return False

return next(
app = next(
iter(app for app in applications if app["name"] == self._job_name), None
)
if not app:
return False

stages = requests.get(
f"http://localhost:{self._ui_port}/api/v1/applications/{app['id']}/stages"
).json()
return bool(stages)

def get_status(self) -> SparkJobStatus:
code = self._process.poll()
if code is None:
if self.get_internal_state() is None:
if not self.check_if_started():
return SparkJobStatus.STARTING

return SparkJobStatus.IN_PROGRESS
2 changes: 1 addition & 1 deletion tests/e2e/requirements.txt
Original file line number Diff line number Diff line change
@@ -13,4 +13,4 @@ deepdiff==4.3.2
tensorflow==2.1.0
tfx-bsl==0.21.* # lock to 0.21
confluent_kafka
avro
avro==1.10.0
47 changes: 24 additions & 23 deletions tests/e2e/test_online_features.py
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
import pandas as pd
import pyspark
import pytest
import pytz
from avro.io import BinaryEncoder, DatumWriter
from confluent_kafka import Producer

@@ -170,29 +171,29 @@ def test_streaming_ingestion(feast_client: Client, staging_path: str, pytestconf
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 60
)

time.sleep(5)

original = generate_data()[["s2id", "unique_drivers", "event_timestamp"]]
for record in original.to_dict("records"):
record["event_timestamp"] = int(record["event_timestamp"].timestamp() * 1e6)

send_avro_record_to_kafka(
"avro",
record,
bootstrap_servers=pytestconfig.getoption("kafka_brokers"),
avro_schema_json=avro_schema(),
)

def get_online_features():
features = feast_client.get_online_features(
["drivers_stream:unique_drivers"],
entity_rows=[{"s2id": s2_id} for s2_id in original["s2id"].tolist()],
).to_dict()
df = pd.DataFrame.from_dict(features)
return df, not df["drivers_stream:unique_drivers"].isna().any()

ingested = wait_retry_backoff(get_online_features, 60)
job.cancel()
try:
original = generate_data()[["s2id", "unique_drivers", "event_timestamp"]]
for record in original.to_dict("records"):
record["event_timestamp"] = record["event_timestamp"].to_pydatetime().replace(tzinfo=pytz.utc)

send_avro_record_to_kafka(
"avro",
record,
bootstrap_servers=pytestconfig.getoption("kafka_brokers"),
avro_schema_json=avro_schema(),
)

def get_online_features():
features = feast_client.get_online_features(
["drivers_stream:unique_drivers"],
entity_rows=[{"s2id": s2_id} for s2_id in original["s2id"].tolist()],
).to_dict()
df = pd.DataFrame.from_dict(features)
return df, not df["drivers_stream:unique_drivers"].isna().any()

ingested = wait_retry_backoff(get_online_features, 60)
finally:
job.cancel()

pd.testing.assert_frame_equal(
ingested[["s2id", "drivers_stream:unique_drivers"]],