Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e test fixes to make them work on AWS #1132

Merged
merged 1 commit into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ spec:
{{- end }}
{{- end }}

command:
- python
- "-m"
- "feast.cli"
- server
ports:
- name: http
containerPort: {{ .Values.service.http.targetPort }}
Expand Down
191 changes: 191 additions & 0 deletions infra/scripts/codebuild_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#!/usr/bin/env python

# This is a thin wrapper for AWS Codebuild API to kick off a build, wait for it to finish,
# and tail build logs while it is running.

import os
import json
from typing import Dict, Any, List, Optional, AsyncGenerator
from datetime import datetime
import asyncio
import sys
import argparse
import boto3


class LogTailer:
""" A simple cloudwatch log tailer. """

_next_token: Optional[str]

def __init__(self, client, log_group: str, log_stream: str):
self._client = client
self._next_token = None
self._log_group = log_group
self._log_stream = log_stream

def _get_log_events_args(self) -> Dict[str, Any]:
res = dict(
logGroupName=self._log_group,
logStreamName=self._log_stream,
limit=100,
startFromHead=True,
)
if self._next_token:
res["nextToken"] = self._next_token
return res

async def tail_chunk(self) -> List[Dict[str, str]]:
max_sleep = 5.0
SLEEP_TIME = 0.5

while max_sleep > 0:
resp = self._client.get_log_events(**self._get_log_events_args())
events = resp["events"]
self._next_token = resp.get("nextForwardToken")
if events:
return events
else:
max_sleep -= SLEEP_TIME
await asyncio.sleep(SLEEP_TIME)
else:
return []

async def read_all_chunks(self) -> AsyncGenerator[List[Dict[str, str]], None]:
while True:
resp = self._client.get_log_events(**self._get_log_events_args())
events = resp["events"]
self._next_token = resp.get("nextForwardToken")
if events:
yield events
else:
return


async def _wait_build_state(
client, build_id, desired_phase: Optional[str], desired_states: List[str]
) -> Dict[str, Any]:
""" Wait until the build is in one of the desired states, or in the desired phase. """
while True:
resp = client.batch_get_builds(ids=[build_id])
assert len(resp["builds"]) == 1
build = resp["builds"][0]
if build["buildStatus"] in desired_states:
return build
for phase in build["phases"]:
if desired_phase and (phase["phaseType"] == desired_phase):
return build

await asyncio.sleep(2)


def print_log_event(event) -> None:
print(
str(datetime.fromtimestamp(event["timestamp"] / 1000.0)),
event["message"],
end="",
)


async def main() -> None:
parser = argparse.ArgumentParser(description="Process some integers.")
parser.add_argument(
"--project-name", default="feast-ci-project", type=str, help="Project name"
)
parser.add_argument(
"--source-location",
type=str,
help="Source location, e.g. https://github.com/feast/feast.git",
)
parser.add_argument(
"--source-version", type=str, help="Source version, e.g. master"
)
parser.add_argument(
"--location-from-prow", action='store_true', help="Infer source location and version from prow environment variables"
)
args = parser.parse_args()

if args.location_from_prow:
job_spec = json.loads(os.getenv('JOB_SPEC', ''))
source_location = job_spec['refs']['repo_link']
source_version = source_version_from_prow_job_spec(job_spec)
else:
source_location = args.source_location
source_version = args.source_version

await run_build(
project_name=args.project_name,
source_location=source_location,
source_version=source_version,
)

def source_version_from_prow_job_spec(job_spec: Dict[str, Any]) -> str:
pull = job_spec['refs']['pulls'][0]
return f'refs/pull/{pull["number"]}/head^{{{pull["sha"]}}}'

async def run_build(project_name: str, source_version: str, source_location: str):
print(f"Building {project_name} at {source_version}", file=sys.stderr)
logs_client = boto3.client("logs", region_name="us-west-2")
codebuild_client = boto3.client("codebuild", region_name="us-west-2")

print("Submitting the build..", file=sys.stderr)
build_resp = codebuild_client.start_build(
projectName=project_name,
sourceLocationOverride=source_location,
sourceVersion=source_version,
)

build_id = build_resp["build"]["id"]

try:
print(
"Waiting for the INSTALL phase to start before tailing the log",
file=sys.stderr,
)
build = await _wait_build_state(
codebuild_client,
build_id,
desired_phase="INSTALL",
desired_states=["SUCCEEDED", "FAILED", "STOPPED", "TIMED_OUT", "FAULT"],
)

if build["buildStatus"] != "IN_PROGRESS":
print(
f"Build failed before install phase: {build['buildStatus']}",
file=sys.stderr,
)
sys.exit(1)

log_tailer = LogTailer(
logs_client,
log_stream=build["logs"]["streamName"],
log_group=build["logs"]["groupName"],
)

waiter_task = asyncio.create_task(
_wait_build_state(
codebuild_client,
build_id,
desired_phase=None,
desired_states=["SUCCEEDED", "FAILED", "STOPPED", "TIMED_OUT", "FAULT"],
)
)

while not waiter_task.done():
events = await log_tailer.tail_chunk()
for event in events:
print_log_event(event)

build_status = waiter_task.result()["buildStatus"]
if build_status == "SUCCEEDED":
print(f"Build {build_status}", file=sys.stderr)
else:
print(f"Build {build_status}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print(f"Stopping build {build_id}", file=sys.stderr)
codebuild_client.stop_build(id=build_id)


if __name__ == "__main__":
asyncio.run(main())
14 changes: 14 additions & 0 deletions infra/scripts/setup-e2e-env-aws.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

make compile-protos-python

python -m pip install --upgrade pip setuptools wheel

python -m pip install -qr sdk/python/requirements-dev.txt
python -m pip install -qr tests/requirements.txt

# Using mvn -q to make it less verbose. This step happens after docker containers were
# succesfully built so it should be unlikely to fail.
echo "########## Building ingestion jar"
TIMEFORMAT='########## took %R seconds'
time mvn -q --no-transfer-progress -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true clean package
18 changes: 17 additions & 1 deletion infra/scripts/test-end-to-end-aws.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
#!/usr/bin/env bash

aws sts get-caller-identity
set -euo pipefail
woop marked this conversation as resolved.
Show resolved Hide resolved

pip install "s3fs" "boto3" "urllib3>=1.25.4"

export DISABLE_FEAST_SERVICE_FIXTURES=1
export DISABLE_SERVICE_FIXTURES=1

PYTHONPATH=sdk/python pytest tests/e2e/ \
--core-url cicd-feast-core:6565 \
--serving-url cicd-feast-online-serving:6566 \
--env aws \
--emr-cluster-id $CLUSTER_ID \
--staging-path $STAGING_PATH \
--redis-url $NODE_IP:32379 \
--emr-region us-west-2 \
--kafka-brokers $NODE_IP:30092 \
-m "not bq"
6 changes: 5 additions & 1 deletion sdk/python/feast/pyspark/launchers/aws/emr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ def _cancel_job(emr_client, job: EmrJobRef):
else:
step_id = job.step_id

emr_client.cancel_steps(ClusterId=job.cluster_id, StepIds=[step_id])
emr_client.cancel_steps(
ClusterId=job.cluster_id,
StepIds=[step_id],
StepCancellationOption="TERMINATE_PROCESS",
)

_wait_for_job_state(
emr_client, EmrJobRef(job.cluster_id, step_id), TERMINAL_STEP_STATES, 180
Expand Down
4 changes: 4 additions & 0 deletions spark/ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@
<pattern>com.google.protobuf</pattern>
<shadedPattern>com.google.protobuf.vendor</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.kafka.vendor</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def pytest_addoption(parser):
parser.addoption("--staging-path", action="store")
parser.addoption("--dataproc-cluster-name", action="store")
parser.addoption("--dataproc-region", action="store")
parser.addoption("--emr-cluster-id", action="store")
parser.addoption("--emr-region", action="store")
parser.addoption("--dataproc-project", action="store")
parser.addoption("--ingestion-jar", action="store")
parser.addoption("--redis-url", action="store", default="localhost:6379")
Expand Down
21 changes: 19 additions & 2 deletions tests/e2e/fixtures/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import uuid
from typing import Optional, Tuple

import pyspark
import pytest
from pytest_redis.executor import RedisExecutor

Expand All @@ -29,7 +28,9 @@ def feast_client(
)

if pytestconfig.getoption("env") == "local":
c = Client(
import pyspark

return Client(
core_url=f"{feast_core[0]}:{feast_core[1]}",
serving_url=f"{feast_serving[0]}:{feast_serving[1]}",
spark_launcher="standalone",
Expand Down Expand Up @@ -62,6 +63,22 @@ def feast_client(
),
**job_service_env,
)
elif pytestconfig.getoption("env") == "aws":
return Client(
core_url=f"{feast_core[0]}:{feast_core[1]}",
serving_url=f"{feast_serving[0]}:{feast_serving[1]}",
spark_launcher="emr",
emr_cluster_id=pytestconfig.getoption("emr_cluster_id"),
emr_region=pytestconfig.getoption("emr_region"),
spark_staging_location=os.path.join(local_staging_path, "emr"),
emr_log_location=os.path.join(local_staging_path, "emr_logs"),
spark_ingestion_jar=ingestion_job_jar,
redis_host=pytestconfig.getoption("redis_url").split(":")[0],
redis_port=pytestconfig.getoption("redis_url").split(":")[1],
historical_feature_output_location=os.path.join(
local_staging_path, "historical_output"
),
)
else:
raise KeyError(f"Unknown environment {pytestconfig.getoption('env')}")

Expand Down
11 changes: 9 additions & 2 deletions tests/e2e/test_historical_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@ def read_parquet(uri):
return pd.read_parquet(parsed_uri.path)
elif parsed_uri.scheme == "gs":
fs = gcsfs.GCSFileSystem()
files = ["gs://" + path for path in gcsfs.GCSFileSystem().glob(uri + "/part-*")]
files = ["gs://" + path for path in fs.glob(uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
elif parsed_uri.scheme == "s3":
import s3fs

fs = s3fs.S3FileSystem()
files = ["s3://" + path for path in fs.glob(uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
else:
raise ValueError("Unsupported scheme")
raise ValueError(f"Unsupported URL scheme {uri}")


def generate_data():
Expand Down
8 changes: 5 additions & 3 deletions tests/e2e/test_online_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ def test_streaming_ingestion(
job = feast_client.start_stream_to_online_ingestion(feature_table)

wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 60
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
)

wait_retry_backoff(
lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 60
lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 120
)

try:
Expand Down Expand Up @@ -183,7 +183,9 @@ def ingest_and_verify(
original.event_timestamp.max().to_pydatetime() + timedelta(seconds=1),
)

wait_retry_backoff(lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 60)
wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 180
)

features = feast_client.get_online_features(
[f"{feature_table.name}:unique_drivers"],
Expand Down