Skip to content

Commit

Permalink
Implement list job and get job methods for Dataproc launcher (#1106)
Browse files Browse the repository at this point in the history
* Implement list job and get job methods for Dataproc launcher

Signed-off-by: Khor Shu Heng <[email protected]>

* Add error message to SparkJobFailure

Signed-off-by: Khor Shu Heng <[email protected]>

* Add integration test for dataproc launcher

Signed-off-by: Khor Shu Heng <[email protected]>

* Remove extra line space

Signed-off-by: Khor Shu Heng <[email protected]>

* Fix argument name to integration test script

Signed-off-by: Khor Shu Heng <[email protected]>

* Add error handling for cancelled job

Signed-off-by: Khor Shu Heng <[email protected]>

* Remove dummy test

Signed-off-by: Khor Shu Heng <[email protected]>

Co-authored-by: Khor Shu Heng <[email protected]>
  • Loading branch information
khorshuheng and khorshuheng authored Nov 3, 2020
1 parent f57bc38 commit 3a634cf
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 46 deletions.
2 changes: 1 addition & 1 deletion infra/scripts/test-integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ python -m pip install --upgrade pip setuptools wheel
make install-python
python -m pip install -qr tests/requirements.txt

pytest tests/integration/
pytest tests/integration --dataproc-cluster-name feast-e2e --dataproc-project kf-feast --dataproc-region us-central1 --dataproc-staging-location gs://feast-templocation-kf-feast
51 changes: 42 additions & 9 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ class SparkJobStatus(Enum):
COMPLETED = 3


class SparkJobType(Enum):
HISTORICAL_RETRIEVAL = 0
BATCH_INGESTION = 1
STREAM_INGESTION = 2

def to_pascal_case(self):
return self.name.title().replace("_", "")


class SparkJob(abc.ABC):
"""
Base class for all spark jobs
Expand All @@ -45,7 +54,8 @@ def get_status(self) -> SparkJobStatus:
"""
Job Status retrieval
:return: SparkJobStatus
Returns:
SparkJobStatus: Job status
"""
raise NotImplementedError

Expand All @@ -62,22 +72,34 @@ class SparkJobParameters(abc.ABC):
def get_name(self) -> str:
"""
Getter for job name
:return: Job name
Returns:
str: Job name.
"""
raise NotImplementedError

@abc.abstractmethod
def get_job_type(self) -> SparkJobType:
"""
Getter for job type.
Returns:
SparkJobType: Job type enum.
"""
raise NotImplementedError

@abc.abstractmethod
def get_main_file_path(self) -> str:
"""
Getter for jar | python path
:return: Full path to file
Returns:
str: Full path to file.
"""
raise NotImplementedError

def get_class_name(self) -> Optional[str]:
"""
Getter for main class name if it's applicable
:return: java class path, e.g. feast.ingestion.IngestionJob
Returns:
Optional[str]: java class path, e.g. feast.ingestion.IngestionJob.
"""
return None

Expand All @@ -86,15 +108,17 @@ def get_arguments(self) -> List[str]:
"""
Getter for job arguments
E.g., ["--source", '{"kafka":...}', ...]
:return: List of arguments
Returns:
List[str]: List of arguments.
"""
raise NotImplementedError

@abc.abstractmethod
def get_extra_options(self) -> str:
"""
Spark job dependencies (expected to resolved from maven)
:return:
Returns:
str: Spark job dependencies.
"""
raise NotImplementedError

Expand Down Expand Up @@ -222,7 +246,10 @@ def __init__(

def get_name(self) -> str:
all_feature_tables_names = [ft["name"] for ft in self._feature_tables]
return f"HistoryRetrieval-{'-'.join(all_feature_tables_names)}"
return f"{self.get_job_type().to_pascal_case()}-{'-'.join(all_feature_tables_names)}"

def get_job_type(self) -> SparkJobType:
return SparkJobType.HISTORICAL_RETRIEVAL

def get_main_file_path(self) -> str:
return os.path.join(
Expand Down Expand Up @@ -302,10 +329,13 @@ def __init__(

def get_name(self) -> str:
return (
f"BatchIngestion-{self.get_feature_table_name()}-"
f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}-"
f"{self._start.strftime('%Y-%m-%d')}-{self._end.strftime('%Y-%m-%d')}"
)

def get_job_type(self) -> SparkJobType:
return SparkJobType.BATCH_INGESTION

def _get_redis_config(self):
return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl)

Expand Down Expand Up @@ -360,7 +390,10 @@ def __init__(
self._extra_options = extra_options

def get_name(self) -> str:
return f"StreamIngestion-{self.get_feature_table_name()}"
return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}"

def get_job_type(self) -> SparkJobType:
return SparkJobType.STREAM_INGESTION

def _get_redis_config(self):
return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl)
Expand Down
Loading

0 comments on commit 3a634cf

Please sign in to comment.