Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discussion: Composition without Memory Bloat #709

Open
ranchodeluxe opened this issue Mar 17, 2024 · 0 comments
Open

Discussion: Composition without Memory Bloat #709

ranchodeluxe opened this issue Mar 17, 2024 · 0 comments
Labels
design question A question of the design of Pangeo Forge

Comments

@ranchodeluxe
Copy link
Contributor

ranchodeluxe commented Mar 17, 2024

Sorry for the verbosity, just throwing more spaghetti at the wall

Issue:

I enjoy the composition and dataset chaining that we offer job writers in this repo but lately I've been thinking about the tradeoffs (probably b/c I'm in the unlucky position of looking at memory graphs for Flink 😆). My gut tells me that fsspec openers should be transient and therefore as close as possible to the source of loading.

I've noticed in some workflows (namely beam.Create() | OpenURLWithFSSpec | OpenWithXarray | WhateverTransform) that we open fsspec file-like handlers outside of context managers and pass them to downstream transforms via xr.Dataset(s). This makes sense given that memory is cheap and that we don't know if downstream transforms (WhateverTransform for example) will be operating on them lazily or eagerly.

But the memory bloat can be gross for workflows that need to only work on metadata (see the example below). Personally I'd call this type of interaction a memory leak not in the strong sense of "we lost a reference and cannot deallocate memory" but in the mild sense that for the duration of the job we are holding onto "memory which is no longer needed but not released". I see how it depends on what the next downstream transform is doing. I realize this might also be the only way to build chaining for xr.Dataset(s) between steps across apache.beam. And I'm wondering out loud here about possible other alternatives.

For example, here is a simple recipe that doesn't do much but beam.Map over some lazily loaded xr.Dataset(s) (no context manager version and with context manager version). Yet with a small number of yearly inputs it hogs memory when the file-like handlers are left open compared to when closed as the image below shows:

Screen Shot 2024-03-16 at 8 38 27 PM

Ideas:

If downstream transforms stop expecting xr.Dataset(s) as input and start expecting something else that can force them to choose how to set the fsspec context duration then that "might" help. It definitely will force other tradeoffs and might break the chaining of datasets. But I don't know xarray well. I keep thinking about lazy ORMs and how they'd handle this situation comparatively. I don't see much that allows me to manipulate the opening/closing of xarray's internal fsspec backend while maintaining the previous modifications to the dataset and probably for very good reasons that I am naive about. Below is an example allowing the transform to make the decision about the fsspec context duration. But, yes, this pattern would get tedious fast if we can't chain:

@dataclass
class FSXRWrapper:
    """i suck at naming things"""

    filepath: Optional[str] = None
    fsspec_default_kwargs: Dict[Any, Any] = field(default_factory=dict)
    xarray_default_kwargs: Dict[Any, Any] = field(default_factory=dict)

    @contextmanager
    def open_with_xarray(self, fsspec_override_kwargs=None, xarray_override_kwargs=None) -> Iterator[xr.Dataset]:
        fsspec_kwargs = fsspec_override_kwargs or self.fsspec_default_kwargs
        xarray_kwargs = xarray_override_kwargs or self.xarray_default_kwargs
        # yes, two .open() calls :facepalm:
        of = fsspec.open(self.filepath, mode="rb", **fsspec_kwargs).open()
        ds = xr.Dataset(of, **xarray_kwargs)
        yield ds
        of.close()


@dataclass
class FSXRFactory(beam.PTransform):
    """i suck at naming things"""

    fsspec_kwargs: Dict[Any, Any] = field(default_factory=dict)
    xarray_kwargs: Dict[Any, Any] = field(default_factory=dict)

    @staticmethod
    def create(item, fsspec_kwargs, xarray_kwargs) -> FSXRWrapper:
        index, url = item
        return index, FSXRWrapper(url, fsspec_kwargs, xarray_kwargs)

    def expand(
        self,
        urls: beam.PCollection[Tuple[Index, str]],
    ) -> beam.PCollection[FSXRWrapper]:
        return urls | beam.Map(self.create, self.fsspec_kwargs, self.xarray_kwargs)



@dataclass
class WhateverOnMetadata(beam.PTransform):
 
    @staticmethod
    def par_meta(item) -> Tuple[Index, Tuple[xr.Dataset, FSXRWrapper]]:
        """warned you i was bad at naming"""
        index, fsxr = item
        new_values = []
        with fsxr.open_with_xarray() as ds:
            # do stuff on metadata and but also pass along fsxr if needed
            new_values.append([ds_meta, fsxr])
            return index, new_values

    def expand(
        self,
        urls: beam.PCollection[Tuple[Index, str]],
    ) -> beam.PCollection[Tuple[Index, Tuple[xr.Dataset, FSXRWrapper]]]:
            x = urls | beam.Map(self.par_meta)
            return x


beam.Create(pattern.items())
    | FSXRFactory(fsspec_kwargs={}, xarray_kwargs={})
    | WhateverOnMetadata()

Even if this is just me talking to myself this was useful to think about b/c I see how the tradeoff here is about chaining vs memory and I'm looking for the middle way. Let me know where I'm not thinking clearly here or what I've missed please 🙇‍♂️

@ranchodeluxe ranchodeluxe added the design question A question of the design of Pangeo Forge label Mar 17, 2024
@ranchodeluxe ranchodeluxe changed the title Discussion: Composability without Memory Bloat Discussion: Composition without Memory Bloat Mar 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design question A question of the design of Pangeo Forge
Projects
None yet
Development

No branches or pull requests

1 participant