Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous Disk Access in Workers #4424

Open
mrocklin opened this issue Jan 13, 2021 · 42 comments · Fixed by dask/zict#82, dask/zict#84, dask/zict#87, dask/zict#90 or #7691
Open

Asynchronous Disk Access in Workers #4424

mrocklin opened this issue Jan 13, 2021 · 42 comments · Fixed by dask/zict#82, dask/zict#84, dask/zict#87, dask/zict#90 or #7691
Assignees
Labels

Comments

@mrocklin
Copy link
Member

Currently reading from or writing to disk blocks the event loop

def put_key_in_memory(self, ts, value, transition=True):
if ts.key in self.data:
ts.state = "memory"
return
if ts.key in self.actors:
self.actors[ts.key] = value
else:
start = time()
self.data[ts.key] = value
ts.state = "memory"
stop = time()
if stop - start > 0.020:
ts.startstops.append(
{"action": "disk-write", "start": start, "stop": stop}
)

This can cause workers to become unresponsive, especially in systems with very slow disk access. Ideally Disk I/O would happen concurrently. There are a couple of ways to do this.

Offload to separate thread

We could move all manipulation of the Worker.data MutableMapping to a separate thread, such as we do with the offload function, which we use today for deserialization.

However, if we do this then we need to do it for all access to Worker.data including seemingly innocuous checks like if key in self.data which may become annoying.

Handle Disk logic directly in the worker

We could also break apart the MutableMapping abstraction, and unpack the zict logic directly into the Worker code. This would allow us to keep a lot of the fast access in the event loop, while treating disk access specially. It would also open the door for more performance improvements, like trying to schedule tasks for data that is currently in memory rather than data that is currently on disk. In general if we want to improve out-of-memory handling in Dask we'll eventually need to break this abstraction.

However, breaking this abstraction comes at considerable cost. First, it means that there is more to manage in a monolithic Worker codebase (zict has tricky logic that we haven't really had to touch or maintain in years). Second, it means that we'll have to find a way that still lets other groups like RAPIDS extend the storage hierarchy (they have device->host->disk rather than just host->disk).

cc @quasiben @pentschev @jrbourbeau @fjetter

@fjetter
Copy link
Member

fjetter commented Jan 13, 2021

Is it a possibility to use a bigger/different interface than MutableMapping? I'm not too worried about "not using zict" any longer but I am concerned about the monolithic worker problem you mentioned and I think we should keep some level of abstraction for this, let alone for easier testing once the logic continues to grow with the out-of-memory topic you mentioned.

What I would also hope to address is the potential data duplication caused by spilling #3756 if we are reworking this area. I never followed up on that topic but fixing it would also require breaking up that abstraction to some extend.

@mrocklin
Copy link
Member Author

mrocklin commented Jan 13, 2021 via email

@quasiben
Copy link
Member

cc @madsbk who has also recently been doing work around spilling

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Apr 30, 2021

I'm curious about the ripple effects this could be having on performance. See ocean-transport/coiled_collaboration#9, where the same workload is running with and without dask.config.set({"distributed.worker.memory.spill": None}), doing much better with spilling turned off. Copying the links for convenience:

EDIT: the spill config wasn't actually being applied to workers: ocean-transport/coiled_collaboration#9 (comment). So whatever's causing the difference is unrelated. However, I think the observations of blocking IO might still be relevant?

The interesting thing to me is that with spilling on, the task stream has lots of blank space in it—workers are "idle" (not doing user work) a lot of the time. With spilling off, work is fully packed, and there's no idle time. I think ideally, spilling should not cause more idle time in the task stream. The red transfer times might be longer, and we'd see more yellow disk-read- tasks, but we'd hope other useful work could happen concurrently, instead of blank space.

I notice a pattern looking at the task stream in the spilling case. You can kind of see white/"idle" patches on one worker that match up with data-transfer waits on another worker. This might just be a Rorschach interpretation. But it seems to indicate that the sender of the data is blocked from running other tasks until the data it's sending has been un-spilled from disk. (This makes sense, since if the event loop is blocked, how could new tasks be submitted to run?)

Screen Shot 2021-04-29 at 5 11 23 PM

(@jrbourbeau: here's another case where the "inverted task stream" / "what is my working doing when it's not working" visualization we were talking about could be useful)

I'm skeptical that disk IO could be taking 40-60sec in some of these cases (getting that number from the corresponding transfer durations). That seems really slow. But, in the administrative worker profile, we can see that ~1200 seconds are spent in f.read() from zict/file.py, called from get_data. 120sec per worker is too low to add up to the total transfer time we see, but is still a lot. It would be helpful to know the distribution of these f.read() times.

Basically it seems that transferring spilled data locks up the sender of the data for part of the time. Blocking the worker IO loop to me seems consequential. Based on my incomplete understanding of the system—please correct what I'm getting wrong here—I think this could be the fallout (assuming IO is blocking for ~seconds at a time):

  • Can't submit new tasks to run in the executor
  • All worker comms are down
    • In-progress data transfers to other workers are blocked
    • Can't get new tasks from the scheduler
    • Can't update scheduler on completed tasks
      • Scheduler can't tell other workers that data they need is now available
      • Workers can't start transferring that data

Semi-related thing I noticed: in get_data, we're deserializing data stored on disk, then immediately re-serializing it to send over the comm. It would be cool if, in the get_data case, we could just wrap the raw bytes from disk in Serialized. Even cooler if we could avoid slurping all the data from disk into memory first, and instead stream it. That would also allow us to remove the worker paused throttle, since transferring spilled data then wouldn't increase the memory usage of a worker (beyond some small buffer size). I wouldn't be surprised if that throttle is a bottleneck as well when workers come under memory pressure.

@fjetter
Copy link
Member

fjetter commented Apr 30, 2021

@gjoseph92
You might want to look into the crick statistics for get-data-load-duration, see

self.outgoing_current_count += 1
data = {k: self.data[k] for k in keys if k in self.data}
if len(data) < len(keys):
for k in set(keys) - set(data):
if k in self.actors:
from .actor import Actor
data[k] = Actor(type(self.actors[k]), self.address, k)
msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}}
nbytes = {k: self.tasks[k].nbytes for k in data if k in self.tasks}
stop = time()
if self.digests is not None:
self.digests["get-data-load-duration"].add(stop - start)

That's the time spent loading the data before the sender submits it. That's 100% disk IO.

Similarly, there is the get-data-send-duration

try:
compressed = await comm.write(msg, serializers=serializers)
response = await comm.read(deserializers=serializers)
assert response == "OK", response
except EnvironmentError:
logger.exception(
"failed during get data with %s -> %s", self.address, who, exc_info=True
)
comm.abort()
raise
finally:
self.outgoing_current_count -= 1
stop = time()
if self.digests is not None:
self.digests["get-data-send-duration"].add(stop - start)
which is the corresponding send. This does not give you any timeseries but might be a start. Some of these metrics are also exported via prometheus which would give a timeseries if scraped.

That should at least help understanding where the time is spent on the sender. In particular if no task was queued for execution before this is initiated, the worker will idle until this is over, that's correct.

@fjetter
Copy link
Member

fjetter commented Apr 30, 2021

I've thought about the suggest of @gjoseph92 w.r.t a streaming like interface and I believe this aligns pretty well with the target of avoiding data duplication. I tried to put my idea about how this interface could look like into some code and would be glad to receive feedback. To be entirely honest, this proposal would not allow for direct from-disk-streaming but I think this could be easily added

https://gist.github.com/fjetter/25fae963c70c9b756b591213244af96a

This proposal keeps the mutable mapping and still allows for fast is in or .keys() but introduces an object DataProxy which would act as a file-like object and would only allow data access via open/close which allows us to track if it is currently in use or not. If it is not in use, it would allow for spilling the data. Instead of referring to spilling directly, I figured we might want to have multiple layers, e.g. in-memory, in-memory-compressed and disk/remote (I remember the GPU folks having a concept of multi layered memory?) and would suggest to implement this transition in another class I called "Processor" which would allow for pluggable storage. This draft is not functional but there is enough meat to deliver the idea, I hope.

The interface itself would be write async and whether or not the Data implements actual async access is then an implementation detail

@jrbourbeau
Copy link
Member

jrbourbeau commented Apr 30, 2021

cc'ing @madsbk who has worked on spilling over in dask-cuda and has experience with using data proxy objects (though to be clear, I've not yet looked deeply into how @madsbk and @fjetter proposals may overlap)

@madsbk
Copy link
Contributor

madsbk commented May 3, 2021

@gjoseph92 I think you are right in your observations. I made some similar ones, which turned in to the JIT-unspilling approach for GPU -> CPU memory spilling: rapidsai/dask-cuda#451. Notice, this approach delegates the de-serialization to the worker-thread executing the task. Also it handles the de-serializing & re-serializing issue in get_data().
I have summarized the shortcomings of the current spilling here: #4568 (comment), which are all fixed in Dask-CUDA.
Our plan is to generalize the approach to include CPU -> Disk spilling.

... and a small commercial, I will be talking about this at the Dask Distributed Summit 2021 :)

@madsbk
Copy link
Contributor

madsbk commented May 3, 2021

I've not yet looked deeply into how @madsbk and @fjetter proposals may overlap

I think there is a significant overlap. If I am not mistaken, the interface @fjetter propose is very similar to the ProxifyHostFile in Dask-CUDA. The main difference being that Dask-CUDA leaks the proxy objects into the executing task. However, there is no reason why a worker couldn't un-proxify the input arguments before executing tasks.

@fjetter
Copy link
Member

fjetter commented May 3, 2021

If I am not mistaken, the interface @fjetter propose is very similar to the

There are at least similar ideas. The execution is quite different but it is also not a fair comparison since my code is not actually working :)

However, there is no reason why a worker couldn't un-proxify the input arguments before executing tasks.

Considering that the goal of this issue is to allow for async data access, this would require us to change the interface for the worker anyhow and I would consider un-proxification a hard requirement for this feature since generic user functions should not be required to deal with our internal abstraction.

I could not 100% follow the implementation of the cuda proxies but it looks reasonably unspecific to GPUs which is good. From what I understand, most of the complexity is introduced since the implementation deduplicates by id (at least this is what I understand from the ProxiesTally) instead of merely by key and I'm wondering what the reason for this decision is.
I assume that I would need to put in locks for multithreading as well and the two approaches, assuming un-proxification, would probably end up looking fairly similar indeed.


I'm wondering how much of the "how do we want to spill" discussion should/must leak into the "what is a good asyncio interface" discussion.

@mrocklin
Copy link
Member Author

mrocklin commented May 3, 2021

I'm wondering how much of the "how do we want to spill" discussion should/must leak into the "what is a good asyncio interface" discussion.

I will put forward three options here for asyncio interfaces. I'm not recommending these interfaces in particular; instead my objective is to highlight a choice we have.

AsyncMutableMapping

We could make an async extension of the MutableMapping Interface with async def getitem and async def setitem methods. This would be a relatively small change, but would stop the stalling that we currently see in workers

However, it still keeps the keys in memory / disk / gpu memory opaque. This is good because it's a separate concern and extensible, but also somewhat bad because we can't use this informaiton for scheduling decisions or diagnostics.

No abstraction at all

Alternativley we could just fully bake in ideas for memory / disk / gpu memory into Dask Workers. After several years there have only been two serious implementations of this abstraction, the default memory/disk layout and dask-cuda's DeviceHostDisk layout. Let's just absorb these two options into the Worker and be done with it.

This closes the door for easy addition of future layers to the memory hierarchy (object stores? compressed-in-memory? other accelerators?) but opens the door for smarter scheduling. In particular there are often workloads where we should be computing on data that we have close at hand in main memory first, but instead we read in data from disk, and compute on that data while shoving the easy-to-access in-memory data to disk, only to have to read it later.

Storage Layer abstraction (middle ground)

Maybe in the middle, we create an abstraction for how to asynchronously get and set data into some storage system, and the Worker has a stack of these things with associated size limitations.

class File(StorageLayer):
    async def getitem(self, key):
        ...
    async def setitem(self, key, value):
        ...
    async def delitem(self, key):
        ...

The stack-of-layers/zict.Buffer/zict.LRU logic is hard-coded into the Worker while the logic of each storage-layer is abstracted into a separate class.

@mrocklin
Copy link
Member Author

mrocklin commented May 3, 2021

Hrm, or perhaps that's too simple. I now realize that the DeviceHostDisk abstraction isn't just a stack of storage layers where one overflows to the other, but that there is some more complex logic about what to put where.

@gjoseph92
Copy link
Collaborator

opens the door for smarter scheduling. In particular there are often workloads where we should be computing on data that we have close at hand in main memory first

As another option (or an extension of the Storage Layer abstraction), could we define an interface for storage layers to give us enough information to make such smarter scheduling decisions? What are the questions this would need to answer? It sounds like we want to know "which keys are fast to load". Maybe also "how much memory will this key take if I load it", "give me the ranking of (these keys|all keys) in 'fastness'"?

@mrocklin
Copy link
Member Author

mrocklin commented May 4, 2021

Perhaps it would make sense to just start with adding async methods to the MutableMappings in zict so that we can avoid blocking the event loop. This is probably an easy(ish) change and would relieve short term pressure.

@mrocklin
Copy link
Member Author

mrocklin commented May 4, 2021

It would also avoid us having to come to some grand design here

@fjetter
Copy link
Member

fjetter commented Feb 22, 2022

Thought about this again in terms of what the minimal interface needs to be to unblock the event loop. Might be very obvious but I stumbled over this so I'll put this down

  • setitem, keys, etc. may be synchronous. We'll pass ownership to the Buffer (keeping zict terminology but that doesn't mean we cannot ditch it) and typically do not care whether it is in memory or not. The Buffer can decide whether to move things to disk or not. This likely happens in a background thread anyhow.
  • del is the same. We only need to remove it from bookkeeping and trust the buffer to clean up eventually
  • If we keep evict this should likely be a coroutine since this is a place we actually care about where the data is
  • getitem must be a coroutine since we must not block the event loop while the buffer fetches the data

I'm still in favour of keeping the abstraction for testing purposes and cleaner code but I don't care too much about where the implementation lives. We do have our own SpillBuffer class by now anyhow

@gjoseph92
Copy link
Collaborator

Recent workloads I've run have made me think streaming spilled bytes straight from disk to other workers (transferring spilled data without either deserializing it, or loading all the bytes into memory) needs to be a core part of this design.

I made a simple workload that resulted in spilling lots of data to disk (basically df - df.mean(), where df was larger than the cluster). We all know spilling to disk is often the death knell of a cluster, so I expected this to blow up. But it actually ran great, because the tasks that needed the spilled data always ran on the workers where the data already had been spilled. No transfers were necessary. Additionally, the event loop being blocked by disk IO might not have been as big a deal, since the workers didn't have much communication to do anyway (just with the scheduler).

same-worker-uses-spilled-data

Notice there are lots of disk reads, but few transfers, little white space—task stream looks great

Compare this to an (a @ a.T).mean(), which blows up the cluster and grinds to a halt. These tasks fundamentally require more inputs and therefore more memory, true. But in this case, data that’s been spilled on one worker needs to be transferred to another worker. Because doing this un-spills that data, the worker is playing memory whack-a-mole: it’s reading data back into memory for transfers about as fast as (maybe even faster than?) it’s dumping it to disk. And the blocking of the event loop is gumming up these data transfers to other workers, slowing things down even more.

spilled-data-transferred-to-different-worker

I’d just never seen spilling work well, like it did in the first example. Which made me realize that maybe the biggest problems aren't so much with spilling, but with how spilling interacts with data transfer. So I think it's essential that we make that a core part of any redesign.

@fjetter
Copy link
Member

fjetter commented Mar 4, 2022

The biggest problem I am aware of, and have prove for, connecting spill+transfer is #3756 where up to 40% of the memory was allocated by untracked data copies, that's not including (de-)serialization overhead but only managed memory. There have been suggestions to solve this by using weakrefs which would not require any big redesign and may already solve a significant part of this problem.

If RSS blows up uncontrolled, this can cause all sorts of side effects and even cause a vicious cycle (high RSS -> consistent LRU eviction -> more spill to disk -> more data duplication)

Because doing this un-spills that data, the worker is playing memory whack-a-mole: it’s reading data back into memory for transfers about as fast as (maybe even faster than?) it’s dumping it to disk. And the blocking of the event loop is gumming up these data transfers to other workers, slowing things down even more.

I would be very interested to talk about instrumentation to confirm this. As I said, the data duplication problem above is amplified by the LRU logic, particularly with the random access pattern of a shuffle.

What kind of instrumentation would be helpful here?

  • LRU cache hits/misses
  • Compression ratios of the spilled data
  • Actual measurements of the payload once serialized
  • More granular timing about disk IO and (de-)serialization timings
  • event loop blocking timings (probably redundant with better timings about disk IO and serialization above)
  • Some proxy to infer how "severe" the event loop blockage is. Not sure how to do this but a measure of incoming messages, number and timing of when handlers are called. With (Worker) State Machine determinism and replayability #5736 we could also measure timings like "time between decision to execute something and time until it was actually submitted to the threadpool"
  • ???

It's obvious that not doing the (de-)serialization in get_data is beneficial if possible but the introduced complexity may be big

@crusaderky
Copy link
Collaborator

Once the Active Memory Manager Rebalance comes into play later on, there's going to be an extra burden of data that is fully deserialised upon arrival, but it's actually just "parked" on the worker and may not be needed in its deserialised form for a long time. I think it would be fairly straightforward to have an interface where you can deserialise data only when a task actually needs it. I suspect garbage collection times will improve drastically too when data isn't primarily made of large numpy arrays.

@crusaderky
Copy link
Collaborator

This discussion feels OT for the ticket. Moving it to #5900.

@mrocklin
Copy link
Member Author

mrocklin commented Mar 4, 2022

FWIW my gut feeling here is to not invent an abstraction and just put memory/disk/gpu memory directly on the worker. I think that an abstraction will be hard to create here, and that it will get in the way. I think that things will be simpler if we have state and methods like

class Worker:
    self.memory_storage = {}
    self.disk_storage = zict.File(...)
    self.device_storage = {}

    def get_data_from_storage(key: str) -> object:
        ...
    def put_data_into_storage(key: str) -> object:
        ...

And then we can start to use this broken out state incrementally throughout the worker logic. This would be common in scheduling (we strongly prefer tasks for which the data is already in RAM) and as Gabe points out, in communication (when sending data-on-disk to other machines there's no reason to deserialize it). These additions could be done after the fact rather than all at once.

@gjoseph92
Copy link
Collaborator

I very much agree with Matt. I don't think we should abstract away where the data is stored, since in different contexts we want to use different logic depending on the storage location. Eventually an abstraction could be appropriate, but right now we don't know what to use, so we should just not use one yet. This is what I'm trying to say in #5891 (comment).

@mrocklin
Copy link
Member Author

mrocklin commented Mar 7, 2022

When Gabe and I agree, you know we're on to something 🙂

@gjoseph92
Copy link
Collaborator

I'm curious what the reasons are against modelling spilled keys with a new state in the state machine. It's a bit more code, but more explicit and therefore easier to maintain, I imagine. Un-spilling feels quite analogous to fetching data, which has been nice to model this way. I think we'd all agree that the worker state machine is great and generally a huge boon to stability. Putting more things in it seems good.

It would be a bigger diff, since we'd no longer use the layering of zict mappings to explicitly spill and un-spill stuff. WorkerState would have separate data and spilled fields, and keys would move between them via explicit transitions. get_data and the memory manager would cause spill/un-spill via state machine events, instead of just doing it implicitly.

I imagine we'd get benefits that aren't easy with an implicit spilling design, since the worker could generally make more explicit scheduling decisions around spilling and memory. For example, we could explicitly decide when to un-spill keys prior to running a task, in parallel with other tasks executing—just like we decide when to fetch data, in parallel with execution. This would pipeline disk IO and execution #7643. We could return "busy" for a get-data request if un-spilling the data would put us over the memory limit. We could let a task with all inputs in memory run ahead of a task that still needs to un-spill, but it higher priority.

I don't know if there are any huge new features this would enable in the short term; I mostly think of it for maintainability, stability, and debug-ability, and because using the state machine for coordinating these asynchronous operations has just been so dang nice.

@fjetter
Copy link
Member

fjetter commented Mar 13, 2023

Couple of thoughts about the above two suggestions

async SpillBuffer (Guidos approach)

I'm not entirely convinced that we'll need an async set or update. In this architecture, the SpillBuffer would own the data and would be responsible for spilling/unspilling. We could very easily keep the set methods synchronous and perform the actual spilling in a background task that removes the key from fast once it's on disk.
I understand that if we want to implement the current behavior exactly we'll need to use an async API because we need to block for the duration of this spill (if target is surpassed) but I'm not entirely convinced this is beneficial and I'd be very open to throwing this behavior away if it helps us reduce complexity.

Spilled keys as worker states (Gabes approach)

I imagine this would be more difficult to pull off backwards compatible (thinking of the RAPIDS folks) but should be doable.

I don't know if there are any huge new features t

I don't think there are many new features this would enable right away but I agree that this would grant us much more control. I also see how this could reduce complexity in some cases. One example is the weakref caching we put in to avoid data duplication

In an offline conversation @crusaderky and I discussed the possibility of using state machine events to optimistically unspill things (assuming there is an async buffer), i.e. the two approaches could be unified for maximum complexity :)

The suggestion of using state machine events for this sounds appealing. I could see a multitude of optimizations being implemented without a hierarchical spill buffer (e.g. use parquet for arrow tables, do not spill regardless of size if we know that it will be used immediately afterwards, etc.)

I wonder what @crusaderky thinks of this

@fjetter
Copy link
Member

fjetter commented Mar 13, 2023

@madsbk pinging you again for visibility. To understand @gjoseph92 s proposal you'll likely need a bit more insight into how the worker functions nowadays. There is some documentation https://distributed.dask.org/en/stable/worker-state.html (The suggestion is to use an Instruction for spill/unspill) but I'm also happy to catch you up in case you are interested

@crusaderky
Copy link
Collaborator

. We could very easily keep the set methods synchronous and perform the actual spilling in a background task that removes the key from fast once it's on disk.

Sorry for the misunderstanding - yes, that's exactly what I proposed. async_set would instantly put the key in fast and then, in its own time, move it to slow. There will be a few microseconds where the key is both in fast and slow (which is ok) but never a moment where the key is in neither. The function returns an asyncio.Future[None], but it's only for keeping tabs; it's otherwise a fire-and-forget activity.

I understand that if we want to implement the current behavior exactly we'll need to use an async API because we need to block for the duration of this spill (if target is surpassed) but I'm not entirely convinced this is beneficial

I don't think it is beneficial either. Since set_async immediately puts keys in fast and returns, it means that you may reach the pause threshold faster. May because on one hand the whole event loop today slows down between tasks, but on the other hand it also means that tasks that have actually finished are just waiting with their return value in unmanaged memory for longer.

could see a multitude of optimizations being implemented without a hierarchical spill buffer (e.g. use parquet for arrow tables

I don't see why such an optimization shouldn't be encapsulated in the SpillBuffer? You just described a Sieve:

slow = zict.Sieve(
    mappings={
        "file": zict.Func(dumps, loads, zict.File(local_directory / "file")),
        "parquet": zict.Parquet(local_directory / "parquet"),
    },
    selector=lambda k, v: "parquet" if isinstance(v, pd.DataFrame) else "file",
)

do not spill regardless of size if we know that it will be used immediately afterwards, etc.)

I've been thinking about something similar recently, specifically about preventing a key from being spilled if it's currently being used by execute or get_data. I think it could be straightforwardly be achieved via hints to the SpillBuffer.

@crusaderky
Copy link
Collaborator

crusaderky commented Mar 30, 2023

Delivery timeline

  1. Release zict 3.0.0
  2. Merge [WIP] Asynchronous SpillBuffer #7686.
    This adds a new public API for SpillBuffer alternatives (e.g. Dask-CUDA), AsyncBufferProto, which however is not going to be final until the next step. This interface is in addition to the legacy ManualEvictProto.
  3. Deliver Don't use pure managed memory for the target threshold #7421. This will require a PR in distributed (but not in zict), as well as a change in the AsyncBufferProto.
    Officially document the new AsyncBufferProto.
  4. Dask-CUDA team can now integrate with the new AsyncBufferProto API
  5. Dask-CUDA team may push some internal refactoring to the new zict.AsyncBuffer class in order to increase code reuse
  6. Dask-CUDA team completes upgrade from ManualEvictProto to AsyncBufferProto
  7. Drop support for zict 2.2.0 and ManualEvictProto. This will remove a lot of dead code from dask/distributed. As I don't really think anybody besides Dask-CUDA overrides the SpillBuffer with a spillable interface, I think it's safe to do so as soon as the new Dask-CUDA release is out, without waiting for several months?

New public API

dask/distributed will indefinitely continue to support passing a generic MutableMapping[str, object] instead of the SpillBuffer when you initialise the Worker.
I would like to deprecate ManualEvictProto (which is an extension to MutableMapping[str, object]) and instead create a new API AsyncBufferProto, which crucially is not a MutableMapping.

This is the new API, downstream of #7686 + #7421:

@runtime_checkable
class AsyncBufferProto(Protocol, Collection[str]):
    """Duck-type API that a third-party alternative to SpillBuffer must respect if it
    wishes to support spilling.

    Notes
    -----
    All methods must return (almost) immediately.
    ``__setitem__`` and ``set_offset`` *may* trigger asynchronous spilling activity, 
    which however this API is opaque to.

    ``async_get`` must raise KeyError if and only if a key is not in the collection at
    some point during the call. ``__setitem__`` immediately followed by ``async_get``
    *never* raises KeyError; ``__delitem__ `` immediately followed by ``async_get``
    *always* raises KeyError. Likewise, ``__contains__`` and ``__len__`` must
    immediately reflect the changes wrought by ``__setitem__`` / ``__delitem__``.

    This is public API.
    """

    def __setitem__(self, key: str, value: object) -> None:
        ...

    def __delitem__(self, key: str) -> None:
        ...

    def async_get(
        self, keys: Collection[str], missing: Literal["raise", "omit"] = "raise"
    ) -> Awaitable[dict[str, object]]:
        """Fetch zero or more key/value pairs.

        Parameters
        ----------
        keys:
            Zero or more keys
        missing: raise | omit, optional
            raise
                If any key is missing, raise KeyError
            omit
                Omit missing keys from the returned dict
        """
        ...  # pragma: nocover

    def memory_total(self) -> int:
        """(estimated) bytes currently held in fast memory.
        Does not include offset or spilled bytes.
        """
        ...  # pragma: nocover

    def set_offset(self, n: int) -> None:
        """Change the offset to be added to memory_total in order to determine if
        key/value pairs should be internally spilled. It may be negative. This may
        internally trigger or cancel spilling activity but, like in ``__setitem__``,
        this is opaque to this API.
        """

CC @madsbk is this feasible to implement on your side?

@madsbk
Copy link
Contributor

madsbk commented Mar 30, 2023

CC @madsbk is this feasible to implement on your side?

Yes, it looks feasible. For dask-cuda's JIT-unspill, this is almost trivial. Dask-cuda's default spilling requires a bit more work but definitely feasible.

@crusaderky
Copy link
Collaborator

Status update:

dask/zict#92 is currently undergoing peer review.
I've run a first batch of performance tests.

The good news:

The change works exactly as intended. The event loop is not blocked anymore while spilling/unspilling is in effect.
If a task does not depend on spilled-out data, it will be completely unaffected by any ongoing spilling/unspilling activity.

The looks-bad-but-it's-actually-good news:

In all use cases where there was spilling and unspilling caused by a lot of independent tasks, this change will cause workers to reach the paused state a lot faster - effectively resulting in higher memory consumption. The reason is that, in main, once you pass the target threshold you have effectively a mini-pause at the end of every task, until as much data as the task's output has been spilled out from the bottom of the LRU. With this change, there's nothing slowing tasks until you hit the pause threshold - which means that memory will actually get used.

We may need to tweak the pause threshold down a bit - maybe 5% -or people will start experiencing OOM restarts whereas before they weren't, simply because their heap (across all threads) is more than 15% total memory.

The bad news:

This change in behaviour uncovered a very, very ugly bug, which completely destroys performance: #7742.
I've worked around it by introducing an artificial delay in Worker.execute - effectively, an undeclared mini-pause when you start a new task if there's spilling activity going on - which degrades performance but produces a memory profile more similar to main.

The second piece of bad news is that it turns out that the current coiled-runtime test bench is not very impacted by a choked event loop. The two tests in test_spill are not:

  • test_spill.py::test_spilling has trivial tasks and does not perform any meaningful amounts of network transfers
  • test_spill.py::test_dot_product_spill has very heavy tasks, that are long to execute and to transfer, which means that while a lot of time is spent with a blocked event loop, it doesn't impact runtime much:

There's only one test, which features fast-running and a lot of memory generation: test_join.py::test_set_index. I'm still running A/B tests, but preliminary results show how the test now runs in half the time compared to main.

test_spilling [uncompressible]

branch main AsyncBuffer [1] AsyncBuffer [2]
idle 139.65 490.34 [6] 117.39
wait-unpause[3] 10.48  
compress 0.04 0.00 0.00
decompress 0.00 0.00 0.00
deserialize 1.32 0.62 0.62
disk-read 291.67 285.84 277.52
disk-write 307.91 0.00 0.22
other[4] 514.73 12.96 14.03
serialize 0.04 0.00 0.00
thread-cpu 106.29 149.31 151.09
thread-noncpu 14.96 30.70 29.50
zict-offload   366.36 802.97 [5]
TOTAL 1377.50 1346.61 1393.34

[1] hack in the test code, as shown in the last video in #7742, which waits for workers to unpause
[2] hack in the worker code, as described above, which prevents more tasks from running if the're current spill activity
[3] these are 10.5 seconds over 10 threads, so 1.05 wall seconds
[4] You can see how other drops to nothing - that's time with a blocked event loop!
[5] This includes the hack slowdown
[6] The increase is caused by workers getting paused

The test has the same end-to-end runtime in the three use cases; this is unsurprising as it's dominated by raw disk throughput.

test_dot_product_spill [uncompressible]

branch main AsyncBuffer
idle 127.43 43.91
compress 0.13 0.01
decompress 0.01 0.01
deserialize 1.33 0.65
disk-read 218.50 218.60
disk-write 18.28 10.47
other 232.62 30.99
serialize 0.01 0.00
thread-cpu 3168.60 3173.47
thread-noncpu 480.43 519.63
zict-offload 0.00 312.92
TOTAL 4247.34 4310.67

Again, note how other has dropped to zero; there is also a substantial drop in unknown time, which is where workers are waiting on network transfers from other workers or the scheduler.
The zict-offload time shows how you frequently have a task that needs to unspill its dependencies, while either another task is doing the same or older tasks are being spilled.

test_set_index

I've failed to reproduce this test in a jupyter notebook so far - which I need to extract the metrics.
As a side note, I've found the metrics above exceptionally useful, and I'd definitely want to have them saved in the coiled-runtime CI artifacts.

@fjetter
Copy link
Member

fjetter commented Apr 3, 2023

I've failed to reproduce this test in a jupyter notebook so far

How come? Did you fail to reproduce the runtime difference or are you not able to run the test?

@fjetter
Copy link
Member

fjetter commented Apr 3, 2023

test_spilling [uncompressible]

For these tests I'd be interested to see the raw disk IO rates. Did the AsyncBuffer achieve higher peak rates or how does this look like? If you could share the grafana link, that'd be helpful (the benchmark stuff should be publicly available in grafana)

@fjetter
Copy link
Member

fjetter commented Apr 3, 2023

There's only one test, which features fast-running and a lot of memory generation:

I assume this refers to set_index w/ tasks?

@crusaderky
Copy link
Collaborator

I've failed to reproduce this test in a jupyter notebook so far

How come? Did you fail to reproduce the runtime difference or are you not able to run the test?

I've copied the test verbatim (including the call to dask.config.set to choose the shuffle engine) but it completes in ~20 seconds, without ever getting even close to spilling, instead of ~200s with heavy spilling. Could not figure out what I'm doing wrong.

There's only one test, which features fast-running and a lot of memory generation:

I assume this refers to set_index w/ tasks?

Yes

@crusaderky
Copy link
Collaborator

test_spilling [uncompressible]

For these tests I'd be interested to see the raw disk IO rates. Did the AsyncBuffer achieve higher peak rates or how does this look like? If you could share the grafana link, that'd be helpful (the benchmark stuff should be publicly available in grafana)

They're roughly the same. In the new version, disk write and task execution are pipelined, whereas before they were interleaved; however in test_spilling specifically task execution takes a very modest amount of time so you can't tell the impact of the difference.

@fjetter
Copy link
Member

fjetter commented Apr 4, 2023

There's only one test, which features fast-running and a lot of memory generation: test_join.py::test_set_index. I'm still running A/B tests, but preliminary results show how the test now runs in half the time compared to main.

I could only confirm a runtime decrease of about 33%. Still nice but a little less impressive than what your measurements are showing. I can also only confirm this speedup if we are truly under very heavy memory pressure in the case of memory_multiplier=1 and persist=True, i.e. this is a very unhealthy place for the cluster to be in. Without persisting data I can even see the opposite effect that the async spilling implementation performs worse (haven't collected the data)

@crusaderky
Copy link
Collaborator

This issue has been postponed indefinitely following disappointing A/B test results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment