-
Notifications
You must be signed in to change notification settings - Fork 300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove singleton from structured dataset transformer engine #848
Merged
+88
−73
Merged
Changes from 6 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
a03bc8f
import singleton
wild-endeavor 30fed4f
make compat a different job
wild-endeavor d6be0ee
nit
wild-endeavor 9603d86
wrong import location
wild-endeavor 6daa7b5
turn certain functions into class methods
wild-endeavor 4657497
make class
wild-endeavor 4ff5983
revert warning to debug
wild-endeavor fa4a809
Fixed tests
pingsutw bff85be
Fixed tests
pingsutw bd3be41
fix lint
wild-endeavor d201095
rename register_handler to register
wild-endeavor File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,8 +24,7 @@ | |
|
||
from flytekit.configuration.sdk import USE_STRUCTURED_DATASET | ||
from flytekit.core.context_manager import FlyteContext, FlyteContextManager | ||
from flytekit.core.type_engine import TypeTransformer | ||
from flytekit.extend import TypeEngine | ||
from flytekit.core.type_engine import TypeTransformer, TypeEngine | ||
from flytekit.loggers import logger | ||
from flytekit.models import literals | ||
from flytekit.models import types as type_models | ||
|
@@ -106,15 +105,15 @@ def all(self) -> DF: | |
if self._dataframe_type is None: | ||
raise ValueError("No dataframe type set. Use open() to set the local dataframe type you want to use.") | ||
ctx = FlyteContextManager.current_context() | ||
return FLYTE_DATASET_TRANSFORMER.open_as( | ||
return flyte_dataset_transformer.open_as( | ||
ctx, self.literal, self._dataframe_type, updated_metadata=self.metadata | ||
) | ||
|
||
def iter(self) -> Generator[DF, None, None]: | ||
if self._dataframe_type is None: | ||
raise ValueError("No dataframe type set. Use open() to set the local dataframe type you want to use.") | ||
ctx = FlyteContextManager.current_context() | ||
return FLYTE_DATASET_TRANSFORMER.iter_as( | ||
return flyte_dataset_transformer.iter_as( | ||
ctx, self.literal, self._dataframe_type, updated_metadata=self.metadata | ||
) | ||
|
||
|
@@ -170,7 +169,7 @@ class StructuredDatasetEncoder(ABC): | |
def __init__(self, python_type: Type[T], protocol: str, supported_format: Optional[str] = None): | ||
""" | ||
Extend this abstract class, implement the encode function, and register your concrete class with the | ||
FLYTE_DATASET_TRANSFORMER defined at this module level in order for the core flytekit type engine to handle | ||
StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle | ||
dataframe libraries. This is the encoding interface, meaning it is used when there is a Python value that the | ||
flytekit type engine is trying to convert into a Flyte Literal. For the other way, see | ||
the StructuredDatasetEncoder | ||
|
@@ -230,7 +229,7 @@ class StructuredDatasetDecoder(ABC): | |
def __init__(self, python_type: Type[DF], protocol: str, supported_format: Optional[str] = None): | ||
""" | ||
Extend this abstract class, implement the decode function, and register your concrete class with the | ||
FLYTE_DATASET_TRANSFORMER defined at this module level in order for the core flytekit type engine to handle | ||
StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle | ||
dataframe libraries. This is the decoder interface, meaning it is used when there is a Flyte Literal value, | ||
and we have to get a Python value out of it. For the other way, see the StructuredDatasetEncoder | ||
|
||
|
@@ -337,7 +336,8 @@ class StructuredDatasetTransformerEngine(TypeTransformer[StructuredDataset]): | |
|
||
Handlers = Union[StructuredDatasetEncoder, StructuredDatasetDecoder] | ||
|
||
def _finder(self, handler_map, df_type: Type, protocol: str, format: str): | ||
@staticmethod | ||
def _finder(handler_map, df_type: Type, protocol: str, format: str): | ||
try: | ||
return handler_map[df_type][protocol][format] | ||
except KeyError: | ||
|
@@ -352,18 +352,21 @@ def _finder(self, handler_map, df_type: Type, protocol: str, format: str): | |
... | ||
raise ValueError(f"Failed to find a handler for {df_type}, protocol {protocol}, fmt {format}") | ||
|
||
def get_encoder(self, df_type: Type, protocol: str, format: str): | ||
return self._finder(self.ENCODERS, df_type, protocol, format) | ||
@classmethod | ||
def get_encoder(cls, df_type: Type, protocol: str, format: str): | ||
return cls._finder(StructuredDatasetTransformerEngine.ENCODERS, df_type, protocol, format) | ||
|
||
def get_decoder(self, df_type: Type, protocol: str, format: str): | ||
return self._finder(self.DECODERS, df_type, protocol, format) | ||
@classmethod | ||
def get_decoder(cls, df_type: Type, protocol: str, format: str): | ||
return cls._finder(StructuredDatasetTransformerEngine.DECODERS, df_type, protocol, format) | ||
|
||
def _handler_finder(self, h: Handlers) -> Dict[str, Handlers]: | ||
@classmethod | ||
def _handler_finder(cls, h: Handlers) -> Dict[str, Handlers]: | ||
# Maybe think about default dict in the future, but is typing as nice? | ||
if isinstance(h, StructuredDatasetEncoder): | ||
top_level = self.ENCODERS | ||
top_level = cls.ENCODERS | ||
elif isinstance(h, StructuredDatasetDecoder): | ||
top_level = self.DECODERS | ||
top_level = cls.DECODERS | ||
else: | ||
raise TypeError(f"We don't support this type of handler {h}") | ||
if h.python_type not in top_level: | ||
|
@@ -376,7 +379,8 @@ def __init__(self): | |
super().__init__("StructuredDataset Transformer", StructuredDataset) | ||
self._type_assertions_enabled = False | ||
|
||
def register_handler(self, h: Handlers, default_for_type: Optional[bool] = True, override: Optional[bool] = False): | ||
@classmethod | ||
def register_handler(cls, h: Handlers, default_for_type: Optional[bool] = True, override: Optional[bool] = False): | ||
""" | ||
Call this with any handler to register it with this dataframe meta-transformer | ||
|
||
|
@@ -386,21 +390,22 @@ def register_handler(self, h: Handlers, default_for_type: Optional[bool] = True, | |
logger.info(f"Structured datasets not enabled, not registering handler {h}") | ||
return | ||
|
||
lowest_level = self._handler_finder(h) | ||
lowest_level = cls._handler_finder(h) | ||
if h.supported_format in lowest_level and override is False: | ||
raise ValueError(f"Already registered a handler for {(h.python_type, h.protocol, h.supported_format)}") | ||
lowest_level[h.supported_format] = h | ||
logger.debug(f"Registered {h} as handler for {h.python_type}, protocol {h.protocol}, fmt {h.supported_format}") | ||
|
||
if default_for_type: | ||
# TODO: Add logging, think about better ux, maybe default False and warn if doesn't exist. | ||
self.DEFAULT_FORMATS[h.python_type] = h.supported_format | ||
self.DEFAULT_PROTOCOLS[h.python_type] = h.protocol | ||
cls.DEFAULT_FORMATS[h.python_type] = h.supported_format | ||
cls.DEFAULT_PROTOCOLS[h.python_type] = h.protocol | ||
|
||
# Register with the type engine as well | ||
# The semantics as of now are such that it doesn't matter which order these transformers are loaded in, as | ||
# long as the older Pandas/FlyteSchema transformer do not also specify the override | ||
TypeEngine.register_additional_type(self, h.python_type, override=True) | ||
engine = StructuredDatasetTransformerEngine() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is main change - every time we register a new handler, we create a new instance of this class. not the best, but okay. we can also cache it as a global variable (which already exists at the bottom of this .py file)... either way. |
||
TypeEngine.register_additional_type(engine, h.python_type, override=True) | ||
|
||
def assert_type(self, t: Type[StructuredDataset], v: typing.Any): | ||
return | ||
|
@@ -723,8 +728,8 @@ def guess_python_type(self, literal_type: LiteralType) -> Type[T]: | |
|
||
|
||
if USE_STRUCTURED_DATASET.get(): | ||
logger.debug("Structured dataset module load... using structured datasets!") | ||
FLYTE_DATASET_TRANSFORMER = StructuredDatasetTransformerEngine() | ||
TypeEngine.register(FLYTE_DATASET_TRANSFORMER) | ||
logger.warning("Structured dataset module load... using structured datasets!") | ||
flyte_dataset_transformer = StructuredDatasetTransformerEngine() | ||
TypeEngine.register(flyte_dataset_transformer) | ||
else: | ||
logger.debug("Structured dataset module load... not using structured datasets") | ||
logger.warning("Structured dataset module load... not using structured datasets") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can revert this, either way, doesn't really matter. i think i like not running the compat stuff with coverage.