From a12e0e92db73de610cb329d8856d8c097506cfc5 Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Mon, 21 Mar 2022 12:57:10 -0700 Subject: [PATCH 1/2] Adding the simplest state handler --- ray_beam_runner/portability/execution.py | 107 ++++++++++++++++++ ray_beam_runner/portability/execution_test.py | 42 +++++++ setup.py | 14 ++- 3 files changed, 161 insertions(+), 2 deletions(-) create mode 100644 ray_beam_runner/portability/execution.py create mode 100644 ray_beam_runner/portability/execution_test.py diff --git a/ray_beam_runner/portability/execution.py b/ray_beam_runner/portability/execution.py new file mode 100644 index 0000000..0e5a80b --- /dev/null +++ b/ray_beam_runner/portability/execution.py @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Set of utilities for execution of a pipeline by the FnApiRunner.""" + +# mypy: disallow-untyped-defs + +import contextlib +import collections +from typing import Iterator +from typing import Optional +from typing import Tuple + +import ray + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.runners.worker import sdk_worker + + +@ray.remote +class _ActorStateManager: + def __init__(self): + self._data = collections.defaultdict(lambda : []) + + def get_raw( + self, + bundle_id: str, + state_key: str, + continuation_token: Optional[bytes] = None, + ) -> Tuple[bytes, Optional[bytes]]: + if continuation_token: + continuation_token = int(continuation_token) + else: + continuation_token = 0 + + new_cont_token = continuation_token + 1 + if len(self._data[(bundle_id, state_key)]) == new_cont_token: + return self._data[(bundle_id, state_key)][continuation_token], None + else: + return (self._data[(bundle_id, state_key)][continuation_token], + str(continuation_token + 1).encode('utf8')) + + def append_raw( + self, + bundle_id: str, + state_key: str, + data: bytes + ): + self._data[(bundle_id, state_key)].append(data) + + def clear(self, bundle_id: str, state_key: str): + self._data[(bundle_id, state_key)] = [] + + +class RayStateManager(sdk_worker.StateHandler): + def __init__(self, state_actor: Optional[_ActorStateManager] = None): + self._state_actor = state_actor or _ActorStateManager.remote() + self._instruction_id: Optional[str] = None + + @staticmethod + def _to_key(state_key: beam_fn_api_pb2.StateKey): + return state_key.SerializeToString() + + def get_raw( + self, + state_key, # type: beam_fn_api_pb2.StateKey + continuation_token=None # type: Optional[bytes] + ) -> Tuple[bytes, Optional[bytes]]: + assert self._instruction_id is not None + return ray.get( + self._state_actor.get_raw.remote(self._instruction_id, RayStateManager._to_key(state_key), continuation_token)) + + def append_raw( + self, + state_key: beam_fn_api_pb2.StateKey, + data: bytes + ) -> sdk_worker._Future: + assert self._instruction_id is not None + return self._state_actor.append_raw.remote(self._instruction_id, RayStateManager._to_key(state_key), data) + + def clear(self, state_key: beam_fn_api_pb2.StateKey) -> sdk_worker._Future: + # TODO(pabloem): Does the ray future work as a replacement of Beam _Future? + assert self._instruction_id is not None + return self._state_actor.clear.remote(self._instruction_id, RayStateManager._to_key(state_key)) + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id: str) -> Iterator[None]: + self._instruction_id = bundle_id + yield + self._instruction_id = None + + def done(self): + pass diff --git a/ray_beam_runner/portability/execution_test.py b/ray_beam_runner/portability/execution_test.py new file mode 100644 index 0000000..ee95ceb --- /dev/null +++ b/ray_beam_runner/portability/execution_test.py @@ -0,0 +1,42 @@ +import hamcrest as hc +import unittest + +import ray + +import apache_beam.portability.api.beam_fn_api_pb2 +from ray_beam_runner.portability.execution import RayStateManager + +class StateHandlerTest(unittest.TestCase): + SAMPLE_STATE_KEY = apache_beam.portability.api.beam_fn_api_pb2.StateKey() + SAMPLE_INPUT_DATA = [ + b'bobby' + b'tables', + b'drop table', + b'where table_name > 12345' + ] + + @classmethod + def setUpClass(cls) -> None: + if not ray.is_initialized(): + ray.init() + + @classmethod + def tearDownClass(cls) -> None: + ray.shutdown() + + def test_data_stored_properly(self): + sh = RayStateManager() + with sh.process_instruction_id('anyinstruction'): + for data in StateHandlerTest.SAMPLE_INPUT_DATA: + sh.append_raw(StateHandlerTest.SAMPLE_STATE_KEY, data) + + with sh.process_instruction_id('anyinstruction'): + continuation_token = None + all_data = [] + while True: + data, continuation_token = sh.get_raw(StateHandlerTest.SAMPLE_STATE_KEY, continuation_token) + all_data.append(data) + if continuation_token is None: + break + + hc.assert_that(all_data, hc.contains_exactly(*StateHandlerTest.SAMPLE_INPUT_DATA)) diff --git a/setup.py b/setup.py index eb0eff4..b0104b8 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,11 @@ from setuptools import find_packages, setup +TEST_REQUIREMENTS = [ + 'pyhamcrest', + 'pytest', + 'tenacity', +] + setup( name="ray_beam_runner", packages=find_packages(where=".", include="ray_beam_runner*"), @@ -10,5 +16,9 @@ "distributed computing framework Ray.", url="https://github.com/ray-project/ray_beam_runner", install_requires=[ - "ray", "apache_beam" - ]) + "ray[data]", "apache_beam" + ], + extras_require={ + 'test': TEST_REQUIREMENTS, + } +) From 6afc5221a365aed59587c536a5cc7b9eb78d794a Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Mon, 21 Mar 2022 13:00:40 -0700 Subject: [PATCH 2/2] fixup --- ray_beam_runner/portability/execution.py | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ray_beam_runner/portability/execution.py b/ray_beam_runner/portability/execution.py index 0e5a80b..bf7cffc 100644 --- a/ray_beam_runner/portability/execution.py +++ b/ray_beam_runner/portability/execution.py @@ -15,7 +15,7 @@ # limitations under the License. # -"""Set of utilities for execution of a pipeline by the FnApiRunner.""" +"""Set of utilities for execution of a pipeline by the RayRunner.""" # mypy: disallow-untyped-defs diff --git a/setup.py b/setup.py index b0104b8..f30aeb3 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,9 @@ from setuptools import find_packages, setup TEST_REQUIREMENTS = [ + 'apache_beam[test]', 'pyhamcrest', 'pytest', - 'tenacity', ] setup(