Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial pass at a pipelining transform #424

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ def get_table(self, path: str) -> tuple[pa.table, int]:
"""
Get pyArrow table for a given path
:param path - file path
:return: pyArrow table or None, if the table read failed and number of operation retries.
Retries are performed on operation failures and are typically due to the resource overload.
:return: Tuple containing
pyarrow.Table: PyArrow table if read successfully, None otherwise.
the number of retries. Retries are performed on operation failures and are typically due to the resource overload.
"""
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ def get_table(self, path: str) -> tuple[pa.table, int]:
Args:
path (str): Path to the file containing the table.

Returns:
Returns: Tuple containing
pyarrow.Table: PyArrow table if read successfully, None otherwise.
the number of retries.
"""

try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import pathlib
from typing import Any

from data_processing.transform import AbstractBinaryTransform


name = "pipeline"
cli_prefix = f"{name}_"
transform_key = "transforms"


class PipelinedBinaryTransform(AbstractBinaryTransform):
"""
Enables the sequencing of transforms.
Configuration is done by providing a list of configured AbstractBinaryTransform instances under the "transforms"
key in the dictionary provided to the initializer.
Features/considerations include:
* Transforms must be sequenced such that the output of a given transform, identified by the extension produced,
must be compatible with the next transform in the sequence.
* Intermediate input file names are only informative and do not actually exist on disk. The file extensions
used are those produced by the output of the previous transform. The base names are constructed
from the name of the generating transform class name, but should not be relied on.
* If a transform produces multiple outputs (must be with the same extension) each output is applied through
the subsequent transforms in the pipeline.
Restrictions include:
* metadata produced is merged across all transforms, for any given call to transform/flush_binary() methods.
"""

def __init__(self, config: dict[str, Any]):
"""
Create the pipeline using a list of initialize transforms
Args:
config: dictionary holding the following keys
transforms : a list of AbstractBinaryTransform instances. All transforms must expect and produce
the same data type (within the binary array) represented by the file extensions passed into and
returned by the transform/flush_binary() methods.
"""
super().__init__(config)
self.input_extension = None
self.transforms = config.get(transform_key, None)
if self.transforms is None:
raise ValueError(f"Missing configuration key {transform_key} specifying the list of transforms to run")
for transform in self.transforms:
if not isinstance(transform, AbstractBinaryTransform):
raise ValueError(f"{transform} is not an instance of AbstractBinaryTransform")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every transform here can have its own config parameters. Where are transforms initialized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. not sure we need that in the first pass, but open to suggestions. Initially this may be for the non-launched/embedded/notebooks.


def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
"""
Applies the list of transforms, provided to the initializer, to the input data.
If a transform produces multiple byte arrays, each will be applied through the downstream transforms.
Args:
file_name:
byte_array:
Returns:

"""
pending_to_process = [(file_name, byte_array)]
r_metadata = {}
for transform in self.transforms:
transform_name = type(transform).__name__
to_process = pending_to_process
pending_to_process = []
for tp in to_process: # Over all outputs from the last transform (or the initial input)
fname = tp[0]
byte_array = tp[1]
transformation_tuples, metadata = transform.transform_binary(fname, byte_array)
# Capture the list of outputs from this transform as inputs to the next (or as the return values).
for transformation in transformation_tuples:
transformed, extension = transformation
fname = transform_name + "-output" + extension
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break transform logic, if the need the input file name, for example codetoparquet or pdf conversion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't they all only operate on the extension?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not all of them

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i could use use original name, but then using the same name for all seems incorrect. Suggestion?

next = (fname, transformed)
pending_to_process.append(next)
# TODO: this is not quite right and might overwrite previous values.
# Would be better if we could somehow support lists.
r_metadata = r_metadata | metadata

r_bytes = []
for tp in pending_to_process:
fname = tp[0]
byte_array = tp[1]
extension = pathlib.Path(fname).suffix
tp = (byte_array, extension)
r_bytes.append(tp)
return r_bytes, r_metadata

def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
"""
Call flush on all transforms in the pipeline and aggregated the results.
Returns:

"""
r_bytes = []
r_metadata = {}
for transform in self.transforms:
transformation_tuples, metadata = transform.flush_binary()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is completely wrong. flush from the first transform has to be processed by the rest of them, then fluch from the second transform has to be processed by remaining one and so on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good. yes, A->B->C, A.flush() has to be fed to B.transform(). ugly, but true.

for transformed_tuples in transformation_tuples:
r_bytes.append(transformed_tuples)
# TODO: this is not quite right and might overwrite previous values.
# Would be better if we could somehow support lists.
r_metadata = r_metadata | metadata

return r_bytes, r_metadata
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import pathlib
from typing import Any, Tuple

import pyarrow as pa
from data_processing.test_support.transform import AbstractBinaryTransformTest
from data_processing.test_support.transform.noop_transform import NOOPTransform
from data_processing.transform import AbstractBinaryTransform
from data_processing.transform.binary_pipeline import (
PipelinedBinaryTransform,
transform_key,
)
from data_processing.utils import TransformUtils


table = pa.Table.from_pydict({"name": pa.array(["Tom", "Dick", "Harry"]), "age": pa.array([0, 1, 2])})
expected_table = table # We only use NOOP

# Because the test is calling transform/flush_binary(), we get the additional metadata *_doc_count.
expected_metadata_list = [
{"nfiles": 1, "nrows": 3, "result_doc_count": 3, "source_doc_count": 3}, # transform() result
{"result_doc_count": 0}, # flush() result
]


class DoublerTransform(AbstractBinaryTransform):
def __init__(self):
self.extension = None
self.buffer = []

def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
self.extension = pathlib.Path(file_name).suffix
the_tuple = (byte_array, self.extension)
self.buffer.append(the_tuple)
return [the_tuple], {}

def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
r = self.buffer
self.buffer = None
return r, {}


class TestPipelinedBinaryTransform(AbstractBinaryTransformTest):
"""
Extends the super-class to define the test data for the tests defined there.
The name of this class MUST begin with the word Test so that pytest recognizes it as a test class.
"""

def get_test_transform_fixtures(self) -> list[Tuple]:
# Defines correct evaluation of pipeline for the expected number of tables produced.
# It does NOT test the transformation of the transform contained in the pipeline other
# than to make sure the byte arrays are not changed due to using a NoopTransform in the pipeline.
# .parquet is used as the extension because the transforms being used are AbstractTableTransforms
# which use/expect parquet files.
fixtures = []
noop0 = NOOPTransform({"sleep": 0})
noop1 = NOOPTransform({"sleep": 0})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is complete cheating

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a start. that said, we're not testing the application of the underlying transforms, as much as the structure of the output. but yes, would nice to have a better test, but would require having transforms othre than NOOP in the test_support packages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you are referring to the configuration part. We have always been saying that transforms can be configured outside of the CLI/runtime mechanics. I'm doing that here. However, it is true, that to run a pipeline transform in a runtime may require more work - this is more for the python only non-runtime users, at least initially.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I repeat my sentiment. Its circumventing the "normal" execution

config = {transform_key: [noop0]}
binary_table = TransformUtils.convert_arrow_to_binary(table)
binary_expected_table = TransformUtils.convert_arrow_to_binary(expected_table)

# Simple test to makes sure a single transform works
fixtures.append(
(
PipelinedBinaryTransform(config),
[("foo.parquet", binary_table)],
[(binary_expected_table, ".parquet")],
expected_metadata_list,
)
)

# Put two transforms together
config = {transform_key: [noop0, noop1]}
fixtures.append(
(
PipelinedBinaryTransform(config),
[("foo.parquet", binary_table)],
[(binary_expected_table, ".parquet")],
expected_metadata_list,
)
)

# Add a transform to the pipeline that a) produces muliple tables and b) uses flush() to do it.
config = {transform_key: [noop0, DoublerTransform()]}
fixtures.append(
(
PipelinedBinaryTransform(config),
[("foo.parquet", binary_table)],
[(binary_expected_table, ".parquet"), (binary_expected_table, ".parquet")],
expected_metadata_list,
)
)
return fixtures