-
-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spill to disk may cause data duplication #3756
Comments
I think that your understanding that we can both spill and hold onto data is correct. I would expect this inconsistency to be short-lived though, only as long as the task execution or data communication. Given the LRU policy, I would expect that this inconsistency would not happen until all other data had been spilled. At this point my guess is that the impact of this inconsistency is usually small (the workload was probably going to run out of memory anyway) but that's just a guess. It looks like you have more hands-on experience here. |
I would also think that it is short lived. I don't think we actually hold on permanently to references
Yes, this can only happen once the LRU is essentially depleted. I'll try to collect a bit more data to see if this actually has a huge impact. From what I've seen, a very busy cluster can communicate/execute for minutes. We may need to decrease task sizes but still, if the network is busy we cannot rely on very fast transfers. A minute or two with strong data duplication may already be enough to drive workers OOM. However, if it is marginal the complexity is hardly worth the effort. |
Following up with a few observations: ContextThe computation I'm running in this example is a huge The dask version I am using is based on dask/dask#6154 since otherwise the spill-to-disk effectively is useless and first workers go OOM after about 10mins. The distributed version is more or less 2.14.0 with some additional patches to observe and count the keys duplicated by get_data and execute ObservationsThe following is a grafana dashboard showing the task states, CPU and memory usage statistics of the workers. The memory usage per worker remains consistently stable due to passive spill-to-disk (result is finished, put into buffer, buffer evicts results if necessary to maintain the threshold). Once the first user functions are evaluated (at around 18:30) we can see that the memory profile is a bit more volatile since the function itself collects huge chunks of data and works on these. This is also the time where we can see first logs about active spill-to-disk by the memory-monitor (70% threshold hit, evict data until back at target) The following plot shows all data currently held by the workers which is not part of the SummaryI was surprised to see that this duplication kicks in even at early stages of the pipeline and is more or less continuously an issue over the entire run. The impact is mostly below 5% memory which is probably not worth the effort but that it peaks at 40% is a bit concerning. In the past I've often seen workers with up to 80% mem usage while reporting the "Memory use is high but worker has no data" and now I'm wondering if this is/was the issue (in combination to in-flight data but that's another topic). Is this something we should address or am I paranoid? |
I wonder whether this is still the case with PR ( #3849 ). For clarity, that would impact spilling that uses pickle in addition to data transmission. IOW out-of-band buffers would be collected, but not copied. Meaning one would expect the overall memory usage to be lower when pickling objects that support out-of-band buffer extraction. This is because the same memory could be shared between the spilled and non-spilled forms. |
Once the data is spilled and the reference to it is lost, there is no way to connect the serialized object to its original, is there? I'm thinking of a scenario like this
In this scenario we have three distinct copies of the same data, don't we? In my naive understanding, pickle5 would help if we were trying to spill/submit/pickle concurrently but I don't believe this is causing this issue. |
Just for clarity, what I'm meaning is that data pickled with protocol 5 will take views of the data as part of pickling (as opposed to copying the data into import pickle5 as pickle
import numpy as np
a = np.arange(6)
b = []
d = pickle.dumps(a, protocol=5, buffer_callback=b.append)
assert np.may_share_memory(a, b[0]) This would mean the high-water mark would remain a bit lower during the run. Though it's worth noting compression during spilling may affect this as well. So more exploration of exactly how this would affect things is still needed. That all being said, I've not dug into this issue deeply. Perhaps pickle protocol 5 is not relevant. |
There may also be other explanations for high memory usage. For example |
This PR introduces a new _device host file_ that uses `ProxyObejct` to implement spilling of individual CUDA objects as opposed to the current host file, which spills entire keys. - [x] Implement spilling of individual objects - [x] Handle task level aliasing - [x] Handle shared device buffers - [x] Write docs To use, set `DASK_JIT_UNSPILL=True` ## Motivation ### Aliases at the task level Consider the following two tasks: ```python def task1(): # Create list of dataframes df1 = cudf.DataFrame({"a": range(10)}) df2 = cudf.DataFrame({"a": range(10)}) return [df1, df2] def task2(dfs): # Get the second item return dfs[1] ``` Running the two task on a worker we get something like: ```python >>> data["k1"] = task1() >>> data["k2"] = task2(data["k1"]) >>> data { "k1": [df1, df2], "k2": df2, } ``` Since the current implementation of spilling works on keys and handles each keys separately, it overestimate the device memory used: `sizeof(df)*3`. But even worse, if it decides to spill `k2` no device memory is freed since `k1` still holds a reference to `df2`! The new spilling implementation fixes this issue by wrapping identical CUDA objects in a shared `ProxyObejct` thus in this case `df2` in both `k1` and `k2` will refer to the same `ProxyObejct`. ### Sharing device buffers Consider the following code snippet: ```python >>> data["df"] = cudf.DataFrame({"a": range(10)}) >>> data["grouped"] = shuffle_group(data["df"], "a", 0, 2, 2, False, 2) >>> data["v1"] = data["grouped"][0] >>> data["v2"] = data["grouped"][1] ``` In this case `v1` and `v2` are separate objects and are handled separately both in the current and the new spilling implementation. However, the `shuffle_group()` in cudf actually returns a single device memory buffer such that `v1` and `v2` points to the same underlying memory buffer. Thus the current implement will again overestimate the memory use and spill one of the dataframes without any effect. The new implementation takes this into account when estimating memory usage and make sure that either both dataframes are spilled or none of them are. cc. @beckernick, @VibhuJawa xref: dask/distributed#3756 Authors: - Mads R. B. Kristensen <[email protected]> Approvers: - Peter Andreas Entschev URL: #451
@gjoseph92 am wondering if PR ( #5208 ) would help here as well. Do you know? If not, no worries (mostly curious). |
I have no idea unfortunately. As I understood, this issue was more about a bookkeeping error, so I'd doubt it. It might help make the case where we re-materialize data in order to send it to a worker slightly less bad, since at least there wouldn't be another copy during the frames merge, but there would still be the data re-read from disk either way. |
From my understanding the zero-copy fix doesn't help since the duplication I am referring to here is not caused by memcopies but rather by having distinct python objects. I'll try to explain with some simplified pseudo worker/zict buffer class ZictSpillData:
def __getitem__(self, key):
if key in self.memory:
return self.memory[key]
else:
data = load_from_store(key)
if sizeof(data) < self.memory_target:
self.memory[key] = data
return data
class Worker:
async def execute(self, ts, ...):
assert isinstance(self.data, ZictSpillData)
data = {}
for key in ts.dependencies:
data[key] = self.data[key]
assert key not in self.data.memory # data was too large, the buffer forgot it again
run_task(ts, data)
def get_data(self, keys, ...):
data = {}
for key in ts.dependencies:
data[key] = self.data[key]
assert key not in self.data.memory # data was too large, the buffer forgot it again
return data Let's assume, for simplicity, the data we're concerned about is large and will be spilled by the buffer immediately because it is beyond a given threshold In this extreme situation this is hopefully easy to follow. In more realisitc scenarios we're spilling data concurrently to other operations. For instance, data is being used as part of an execute but the buffer spills the data. This results in the data no longer being tracked by the buffer (in our new terminology this means the data is "unmanaged memory") but it is still in memory since it is being used by the task execution. If another worker then requests this piece of data we just spilled, it will load it from store and create a new instance and will start serialization. Simultaneously X other workers could do the same, resulting in X (get_data) + 1 (executing) copies of the same data because our buffer doesn't know about the keys still being used. While typing, I'm wondering if we can't fix this by being a bit smart with weakrefs but I currently don't have time to test this idea. Edit: Just in case I haven't made myself clear, yet. I believe this issue is caused by the distributed/distributed/spill.py Line 16 in 8c73a18
|
A very naive worst case calculation about how severe this issue is. We're targeting The worker limits concurrent incoming That yields a 500-1000MB data duplication for The duplication on Let's have a look at a worst case, all workers on the same machine, max branching of 32 in a shuffle, key size at 50MB and 2 threads per worker (the two is pretty random, I have to admit), that's up to 84 duplications (10 * 2 + 32 * 2) of a keys data or I guess this worst case is not too far off from a LocalCluster "deployment" on a big VM. However, most of the numbers are mere guessing and this can easily be scaled up or down so please take this with a healthy pinch of salt. |
This looks like something that could be solved with weakref and a few extra lines in spill.py. I'll take a stab at it after #5543 is merged. |
That's what I thought as well. |
In aggressive spill-to-disk scenarios I observed that distributed may spill all the data it has in memory while still complaining with the following message that there is no more data to spill
It is true that these workers do indeed still hold on to a lot of memory without it being very transparent about where the data is. GC is useless, i.e. it is not related to dask/zict#19
Investigating this issue let me realise that we may try to spill data which is actually currently in use.
The most common example for this is probably once the worker collected the dependencies and schedules the execution of the task, see here. Another one would be when data is requested form a worker and we're still serializing/submitting it.
I could prove this assumption by patching the worker code, tracking the keys in-execution/in-spilling with some logging and it turns out that for some jobs I hold on to multiple GBs of memory although it was supposedly already spilled.
If we spill data which is currently still in use, this is not only misleading to the user, since the data is still in memory, but it may also cause heavy data duplication if the spilled-but-still-in-memory dependency is requested by another worker since the original data is still in memory but the buffer would fetch the key from the slow store and materialise it a second time since it doesn't know of the original object anymore. If this piece of data is requested by more than one worker, the data could be duplicated multiple times even.
In non-aggressive spill-to-disk scenarios we should be protected from this by the LRU in the buffer but if memory pressure is high, the spilling might actually worsen the situation in these cases.
My practical approach to this would be to introduce something like a
(un)lock_key_in_store
method to the buffer which protects the key from spilling and manually (un)setting this in the distributed code. If there is a smarter approach, I'd be glad to hear about it.Also, If my reasoning is somewhere flawed, I'd appreciate feedback since, so far, I could only prove that we try to spill data currently in use but, so far, the duplication is just theory.
Related issues:
dask/dask#2456
The text was updated successfully, but these errors were encountered: