Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Massive memory (100GB) used by dask-scheduler #6388

Closed
pseudotensor opened this issue Nov 13, 2020 · 37 comments
Closed

Massive memory (100GB) used by dask-scheduler #6388

pseudotensor opened this issue Nov 13, 2020 · 37 comments

Comments

@pseudotensor
Copy link
Contributor

pseudotensor commented Nov 13, 2020

Trying to use rapids 0.14, conda, Ubuntu 16.04, python 3.6 on xgboost 1.2.

I can't tell if this issue is a dask problem or xgboost one: dask/dask#6833

So I'm cross-posting here. It's a serious problem.

Just running sample of airlines data in a fit. About 5M x 10 columns. Workers never show too much memory use, but dask-scheduler keeps accumulating memory. The work is well-distributed on the cluster, using all GPUs on a 3-node cluster each with 2 GPUs.

I can possiblly imagine that xgboost is persisting memory and when work is done that data gets pushed to the scheduler, but I'm not clear on what is going on. I can imagine my use of data is sticking data into the graph, but unclear why persists once xgboost is done and fork is closed.

Any ideas @trivialfis ? Thanks!

@trivialfis
Copy link
Member

Here is the basic workflow in xgboost:

  • persist
  • identify the owning worker of each partition.
  • construct DMatrix local to that worker by referencing identified partitions.
  • train on each worker.
  • return the booster.

I don't understand why the scheduler blows up...

@jameslamb
Copy link
Contributor

I think histogram-merging happens on the scheduler because that's where the rabit tracker process runs, right?

env = await client.run_on_scheduler(_start_tracker, n_workers)

Doesn't that mean the scheduler acts as the rabit "master"? So it would be handling the feature histograms sent back from workers and the work of merging them, right?

@trivialfis
Copy link
Member

No. The tracker is only for coordinating the allreduce tree/ring. Like assigning worker rank. No actual data is flowed in there.

@trivialfis
Copy link
Member

Once the allreduce tree is constructed, all peers pass messages directly.

@jameslamb
Copy link
Contributor

Ah ok I see! I misunderstood the role of the tracker in this process @hcho3 explained to me

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

@trivialfis I'll write a complete repro soon that hopefully is fairly minimum (hopefully only 1 node).

Also, what would you recommend? If I have an in-CPU-memory frame, what is the best way to (at low CPU and GPU memory cost) get that to xgboost? i.e. I can't use dask_cudf.read_csv() since the file is only on one node in the cluster, the disk is not shared. I don't want to use s3/hdfs/etc. this is just an on-prem cluster.

I'm currently doing what I showed in the dask issue:

with client:
    import xgboost as xgb

    import dask.dataframe as dd
    chunksize=500000
    X = dd.from_array(X, columns=columns, chunksize=chunksize)
    y = dd.from_array(y, chunksize=chunksize)
    model = xgb.dask.DaskXGBClassifier()
    model.fit(X, y)

over and over in separate forks, where the client is just connecting to the scheduler.

I see excessive scheduler memory use and accumulation even when I added a persist call (I didn't need because you already do that) and with client.cancel(X) and client.cancel(y) after the fit is done. But still the scheduler continues to accumulate memory every fit.

Do I have to also run client.scatter(X) and client.scatter(y) to get it off the scheduler? Seems like alot of extra work.

But also, what if I want to get the data on the GPU ASAP? But, I don't want to ever have all the data on 1 GPU. Can I call dask_cudf.DataFrame(dd.from_array(X, columns=columns, chunksize=chunksize)) ? Or are you basically already doing that in xgboost if given a dask frame but need to run on the GPU with gpu_hist?

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

# One shell run:
# dask-scheduler --port 8790 --host 192.168.0.26 --pid-file dask_cuda_scheduler.pid --protocol tcp --dashboard-address 8791

# another shell run:
# dask-cuda-worker tcp://192.168.0.26:8790 --pid-file dask_cuda_worker_n_jobs.pid --nthreads 1 --rmm-pool-size 1GB
# then run this script:

# suppose alot of things were done, and now have in-memory frame.  We will mimic this by loading a file.
import datatable as dt
# -rw------- 1 jon jon 2816407858 Oct 25 02:16  /home/jon/Downloads/HIGGS.head.csv
X = dt.fread("/home/jon/Downloads/HIGGS5M.csv")
X = X[:, [bool, int, float]]
target = 'C1'
y_np = X[:, target].to_numpy().ravel()
del X[:, target]
X_np = X.to_numpy()
columns = list(X.names)
del X
print("Done getting data into memory")

# from here on we just have some in-memory numpy frame
for i in range(0, 100):
    from dask.distributed import get_client
    with get_client(address="%s://%s:%s" % ("tcp", "192.168.0.26", "8790")) as client:

        params = {'tree_method': 'gpu_hist', 'n_estimators': 2, 'objective': 'binary:logistic'}
        import xgboost as xgb
        model = xgb.dask.DaskXGBClassifier(**params)

        chunksize = 5000
        import dask.dataframe as dd
        X = dd.from_array(X_np, columns=columns, chunksize=chunksize)
        y = dd.from_array(y_np, chunksize=chunksize)
        print("Done getting data into dask")
        model.fit(X, y)
        print("Done with fit")
        preds = model.predict(X)
        print("Done with predict")
        print(preds[0].compute())

        client.cancel(X)
        client.cancel(y)
        del X, y
        print("Done with del")

Where that is just HIGGS with 5000000 rows and 29 columns.

Problem 1) On 64GB system the first (iteration 0 of the loop) model.fit() step goes through all my system memory and OOM killer kills it. I don't see why model.fit should use so much memory. It's not even on GPU yet.

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

If I lower the rows to 1M and chunksize to 50000

# One shell run:
# dask-scheduler --port 8790 --host 192.168.0.26 --pid-file dask_cuda_scheduler.pid --protocol tcp --dashboard-address 8791

# another shell run:
# dask-cuda-worker tcp://192.168.0.26:8790 --pid-file dask_cuda_worker_n_jobs.pid --nthreads 1 --rmm-pool-size 1GB


# then run this script:

# suppose alot of things were done, and now have in-memory frame.  We will mimic this by loading a file.
import datatable as dt
# -rw------- 1 jon jon 2816407858 Oct 25 02:16  /home/jon/Downloads/HIGGS.head.csv
X = dt.fread("/home/jon/Downloads/HIGGS5M.csv")
X = X[0:1000000, [bool, int, float]]
target = 'C1'
y_np = X[:, target].to_numpy().ravel()
del X[:, target]
X_np = X.to_numpy()
columns = list(X.names)
del X
print("Done getting data into memory")

# from here on we just have some in-memory numpy frame
for i in range(0, 100):
    from dask.distributed import Client
    with Client(address="%s://%s:%s" % ("tcp", "192.168.0.26", "8790")) as client:

        params = {'tree_method': 'gpu_hist', 'n_estimators': 2, 'objective': 'binary:logistic'}
        import xgboost as xgb
        model = xgb.dask.DaskXGBClassifier(**params)

        chunksize = 50000
        import dask.dataframe as dd
        X = dd.from_array(X_np, columns=columns, chunksize=chunksize)
        y = dd.from_array(y_np, chunksize=chunksize)
        print("Done getting data into dask")
        model.fit(X, y)
        print("Done with fit")
        preds = model.predict(X)
        print("Done with predict")
        print(preds[0].compute())

        client.cancel(X)
        client.cancel(y)
        del X, y
        print("Done with del")

Then I get GPU OOM and xgoost hangs in client code:

worker restarts fine:

distributed.core - INFO - Starting established connection
task [xgboost.dask]:tcp://192.168.0.26:37071 connected to the tracker
task [xgboost.dask]:tcp://192.168.0.26:37071 got new rank 0
terminate called after throwing an instance of 'thrust::system::system_error'
  what():  parallel_for failed: out of memory
distributed.nanny - INFO - Worker process 10804 was killed by signal 6
distributed.nanny - WARNING - Restarting worker

but client code using xgboost hangs:

(base) jon@pseudotensor:~/h2oai.fullcondatest$ PYTHONPATH=. python scheduler_problem.py
Done getting data into memory
Done getting data into dask
/home/jon/minicondadai/lib/python3.6/site-packages/distributed/client.py:3479: RuntimeWarning: coroutine 'Client._update_scheduler_info' was never awaited
  self.sync(self._update_scheduler_info)
^CTraceback (most recent call last):
  File "scheduler_problem.py", line 37, in <module>
    print("Done getting data into dask")
  File "/home/jon/minicondadai/lib/python3.6/site-packages/xgboost/dask.py", line 1080, in fit
    eval_set, sample_weight_eval_set, verbose)
  File "/home/jon/minicondadai/lib/python3.6/site-packages/distributed/client.py", line 824, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jon/minicondadai/lib/python3.6/site-packages/distributed/utils.py", line 336, in sync
    e.wait(10)
  File "/home/jon/minicondadai/lib/python3.6/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/home/jon/minicondadai/lib/python3.6/threading.py", line 299, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt

Hang is not good behavior to have for the client side. The fact that worker went down to be dealt with by xgboost I guess.

@pseudotensor
Copy link
Contributor Author

If I lower to 1M rows but only 11 columns:

# One shell run:
# dask-scheduler --port 8790 --host 192.168.0.26 --pid-file dask_cuda_scheduler.pid --protocol tcp --dashboard-address 8791

# another shell run:
# dask-cuda-worker tcp://192.168.0.26:8790 --pid-file dask_cuda_worker_n_jobs.pid --nthreads 1 --rmm-pool-size 1GB


# then run this script:

# suppose alot of things were done, and now have in-memory frame.  We will mimic this by loading a file.
import datatable as dt
# -rw------- 1 jon jon 2816407858 Oct 25 02:16  /home/jon/Downloads/HIGGS.head.csv
X = dt.fread("/home/jon/Downloads/HIGGS5M.csv")
X = X[0:1000000, [bool, int, float]]
X = X[:, ["C%d" % i for i in range(0, 10)]]
target = 'C0'
y_np = X[:, target].to_numpy().ravel()
del X[:, target]
X_np = X.to_numpy()
columns = list(X.names)
del X
print("Done getting data into memory")

# from here on we just have some in-memory numpy frame
for i in range(0, 100):
    from dask.distributed import Client
    with Client(address="%s://%s:%s" % ("tcp", "192.168.0.26", "8790")) as client:

        params = {'tree_method': 'gpu_hist', 'n_estimators': 2, 'objective': 'binary:logistic'}
        import xgboost as xgb
        model = xgb.dask.DaskXGBClassifier(**params)

        chunksize = 50000
        import dask.dataframe as dd
        X = dd.from_array(X_np, columns=columns, chunksize=chunksize)
        y = dd.from_array(y_np, chunksize=chunksize)
        print("Done getting data into dask")
        model.fit(X, y)
        print("Done with fit")
        preds = model.predict(X)
        print("Done with predict")
        print(preds[0].compute())

        client.cancel(X)
        client.cancel(y)
        del X, y
        print("Done with del")

then things finally run.

jon@pseudotensor:~/h2oai.fullcondatest$ nvidia-smi
Thu Nov 12 23:59:58 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.33.01    Driver Version: 440.33.01    CUDA Version: 10.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  GeForce RTX 2080    On   | 00000000:01:00.0  On |                  N/A |
| 41%   52C    P0    50W / 215W |   3157MiB /  7979MiB |      3%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0      2552      G   /usr/lib/xorg/Xorg                            55MiB |
|    0      2791      G   /usr/bin/gnome-shell                          57MiB |
|    0      3358      G   /usr/lib/xorg/Xorg                           595MiB |
|    0      3492      G   /usr/bin/gnome-shell                         501MiB |
|    0      3953      G   ...AAAAAAAAAAAAAAgAAAAAAAAA --shared-files    22MiB |
|    0      4672      G   ...AAAAAAAAAAAACAAAAAAAAAA= --shared-files   334MiB |
|    0      5146      G   /usr/bin/nvidia-settings                      46MiB |
|    0      6039      G   ...charm-community-2019.1.1/jre64/bin/java    38MiB |
|    0     16029      C   dask-worker [tcp://192.168.0.26:43127]      1489MiB |
+-----------------------------------------------------------------------------+

But so far I'm not able with this code reproduce the creep-up in dask-scheduler memory use. Here is just bounces up and down.

So something else is required, even though I'm pretty closely following my use. One difference is I'm using multiple nodes in the bad case where scheduler accumulates memory.

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

However, I do see worker using excessive memory, even once the entire loop script is completely and cleanly done. And the scheduler is using 10% of 64GB even though it started at 1%.

image

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

Another full python script and both using even more memory:

image

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

Another cycle, scheduler using more memory. Worker went through a few OOM killer events during loop, so it also has issues but not as obvious at this exact moment in time after the script completed:

image

In another cycle, even the scheduler hits OOM killer:

scheduler hits:

distributed.core - INFO - Starting established connection
Killed
(base) jon@pseudotensor:~/h2oai.fullcondatest$ 

worker sees:

task [xgboost.dask]:tcp://192.168.0.26:40569 got new rank 0
tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: 'tcp://192.168.0.26:8790' processes=1 threads=1, memory=67.38 GB>>
Traceback (most recent call last):
  File "/home/jon/minicondadai/lib/python3.6/site-packages/tornado/ioloop.py", line 907, in _run
    return self.callback()
  File "/home/jon/minicondadai/lib/python3.6/site-packages/distributed/client.py", line 1157, in _heartbeat
    self.scheduler_comm.send({"op": "heartbeat-client"})
  File "/home/jon/minicondadai/lib/python3.6/site-packages/distributed/batched.py", line 117, in send
    raise CommClosedError
distributed.comm.core.CommClosedError
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
distributed.client - ERROR - Failed to reconnect to scheduler after 3.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
distributed.worker - INFO - Stopping worker at tcp://192.168.0.26:40569
distributed.nanny - INFO - Worker closed
distributed.nanny - INFO - Closing Nanny at 'tcp://192.168.0.26:38027'
(base) jon@pseudotensor:~/h2oai.fullcondatest$ 

client sees:

Done getting data into dask
Traceback (most recent call last):
  File "scheduler_problem.py", line 38, in <module>
    model.fit(X, y)
  File "/home/jon/minicondadai/lib/python3.6/site-packages/xgboost/dask.py", line 1080, in fit
    eval_set, sample_weight_eval_set, verbose)
  File "/home/jon/minicondadai/lib/python3.6/site-packages/distributed/client.py", line 824, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jon/minicondadai/lib/python3.6/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/home/jon/minicondadai/lib/python3.6/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/home/jon/minicondadai/lib/python3.6/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
concurrent.futures._base.CancelledError

Once this happens everything is gone.

@pseudotensor
Copy link
Contributor Author

Moved dask-specific discussion to: dask/distributed#4243 since seems to be dask.distributed problem only perhaps.

However, @teju85 , this issue makes using dask and xgboost with multi-GPU or multi-node with NVIDIA rapids/xgboost impossible.

As for the other problem #6388 (comment) , it would seem that is a highly excessive amount of memory being used by xgboost during model.fit before going to dask or using GPU. That also is a major issue.

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

Well, I'm not sure the memory problem is not xgboost. If I perform the same exact task but without xgboost, there is never any accumulation of memory:

# One shell run:
# dask-scheduler --port 8790 --host 192.168.0.26 --pid-file dask_cuda_scheduler.pid --protocol tcp --dashboard-address 8791

# another shell run:
# dask-cuda-worker tcp://192.168.0.26:8790 --pid-file dask_cuda_worker_n_jobs.pid --nthreads 1 --rmm-pool-size 1GB


# then run this script:

# suppose alot of things were done, and now have in-memory frame.  We will mimic this by loading a file.
import datatable as dt
# -rw------- 1 jon jon 2816407858 Oct 25 02:16  /home/jon/Downloads/HIGGS.head.csv
X = dt.fread("/home/jon/Downloads/HIGGS5M.csv")
X = X[0:1000000, [bool, int, float]]
X = X[:, ["C%d" % i for i in range(0, 10)]]
target = 'C0'
y_np = X[:, target].to_numpy().ravel()
del X[:, target]
X_np = X.to_numpy()
columns = list(X.names)
del X
print("Done getting data into memory")

# from here on we just have some in-memory numpy frame
for i in range(0, 10000):
    from dask.distributed import Client
    with Client(address="%s://%s:%s" % ("tcp", "192.168.0.26", "8790")) as client:

        params = {'tree_method': 'gpu_hist', 'n_estimators': 2, 'objective': 'binary:logistic'}
        import xgboost as xgb
        model = xgb.dask.DaskXGBClassifier(**params)

        chunksize = 50000
        import dask.dataframe as dd
        X = dd.from_array(X_np, columns=columns, chunksize=chunksize)
        y = dd.from_array(y_np, chunksize=chunksize)
        print("Done getting data into dask")
        if False:
            model.fit(X, y)
            print("Done with fit")
            preds = model.predict(X)
            print("Done with predict")
            print(preds[0].compute())

        client.cancel(X)
        client.cancel(y)
        del X, y
        print("Done with del")

This never accumulates. Either running 10000 times or running it over and over like before. Never accumulates.

@teju85 , so new theory is that xgboost is creating frames and leaking them. So then all of the problems mentioned are xgboosts fault.

@trivialfis
Copy link
Member

Could you please run this script and see how it goes:

dask-scheduler --scheduler-file='scheduler.json'
# new shell
dask-cuda-worker --scheduler-file='scheduler.json'
import pandas as pd
import dask.dataframe as dd
import xgboost as xgb


def main():
    colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
    df = pd.read_csv('HIGGS.csv',  names=colnames)

    y = df['label']
    X = df[df.columns.difference(['label'])]
    print('X.shape', X.shape)

    for i in range(0, 100):
        from dask.distributed import Client
        with Client(scheduler_file='scheduler.json') as client:
            params = {
                'tree_method': 'gpu_hist',
                'n_estimators': 2,
                'objective': 'binary:logistic'
            }

            model = xgb.dask.DaskXGBClassifier(**params)

            chunksize = 50000
            dX = dd.from_pandas(X, chunksize=chunksize)
            dy = dd.from_pandas(y, chunksize=chunksize)

            print("Done getting data into dask")
            model.fit(dX, dy)
            print("Done with fit")
            preds = model.predict(dX)
            print("Done with predict")
            print(preds[0].compute())

            client.cancel(dX)
            client.cancel(dy)
            print("Done with del")


if __name__ == '__main__':
    main()

The HIGGS.csv is unmodified version of HIGGS with X of shape (11000000, 28).

@pseudotensor
Copy link
Contributor Author

Could you please run this script and see how it goes:

dask-scheduler --scheduler-file='scheduler.json'
# new shell
dask-cuda-worker --scheduler-file='scheduler.json'
import pandas as pd
import dask.dataframe as dd
import xgboost as xgb


def main():
    colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
    df = pd.read_csv('HIGGS.csv',  names=colnames)

    y = df['label']
    X = df[df.columns.difference(['label'])]
    print('X.shape', X.shape)

    for i in range(0, 100):
        from dask.distributed import Client
        with Client(scheduler_file='scheduler.json') as client:
            params = {
                'tree_method': 'gpu_hist',
                'n_estimators': 2,
                'objective': 'binary:logistic'
            }

            model = xgb.dask.DaskXGBClassifier(**params)

            chunksize = 50000
            dX = dd.from_pandas(X, chunksize=chunksize)
            dy = dd.from_pandas(y, chunksize=chunksize)

            print("Done getting data into dask")
            model.fit(dX, dy)
            print("Done with fit")
            preds = model.predict(dX)
            print("Done with predict")
            print(preds[0].compute())

            client.cancel(dX)
            client.cancel(dy)
            print("Done with del")


if __name__ == '__main__':
    main()

The HIGGS.csv is unmodified version of HIGGS with X of shape (11000000, 28).

What is scheduler.json here?

@trivialfis
Copy link
Member

It's a file created by dask-scheduler command.

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

Also, I'm no sure what you are asking me to do. It appears that your script is just the same as mine. But I'll try the json thing. You also used main.

But on even on 5M higgs with all 28 columns I hit the #6388 (comment) massive memory use already. With 10 columns I hit the GPU OOM on my 1 GPU 1080ti. Only with 1M x 10 did I avoid those issues.

@trivialfis
Copy link
Member

Yup. I modified your script to be minimal. I'm curious as I got to 12 rounds and there's no sign of memory leak.

@pseudotensor
Copy link
Contributor Author

12 rounds? You mean 12/100 iterations? That's probably too little to see. I showed the increment is about 10% of 64GB every 100 iterations for the scheduler and for worker is more.

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

I ran your exact case. There are some differences:

  1. 5M x 28 columns runs without the rapid (first iteration in model.fit) CPU memory use problem. Uses not much memory when starting model.fit for that first iteration. Doesn't run out of GPU memory.

So somehow my version of setup and script burns through all 64GB of memory just doing model.fit and even 5M x 10 hits GPU OOM.

How do you explain? Did you try my version?

  1. I still see dask-worker eating up tons of memory and accumulating fast. As before the dask-scheduler accumulates memory just much more slowly. Again, this is opposite of my original airlines data case, where I only see the scheduler rapidly (crazily) using lots of memory on the scheduler.

Even within 1 overall cycle of 100 iterations, at about 50 iterations the worker dies due to being killed by OOM killer:

distributed.nanny - INFO - Worker process 9018 was killed by signal 9
distributed.nanny - WARNING - Restarting worker
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.worker - INFO -       Start worker at:   tcp://192.168.0.26:42173
distributed.worker - INFO -          Listening to:   tcp://192.168.0.26:42173
distributed.worker - INFO -          dashboard at:         192.168.0.26:40011
distributed.worker - INFO - Waiting to connect to:    tcp://192.168.0.26:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   67.38 GB
distributed.worker - INFO -       Local Directory: /home/jon/h2oai.fullcondatest/worker-0cqyubhi
distributed.worker - INFO - Starting Worker plugin <dask_cuda.utils.RMMPool object at 0x7fce3caab390>-1fb48df2-9e8a-4a0b-8ea9-11270a198c15
distributed.worker - INFO - Starting Worker plugin <dask_cuda.utils.CPUAffinity object at 0x7fce3caab-41432ce7-62a3-4944-8d4a-1b61ee314d49
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:    tcp://192.168.0.26:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

At that point xgboost/client seems to hang, unlike before. No progress in the iterations of that loop.

CTRL-C on the hung client code shows:

^CTraceback (most recent call last):
  File "scheduler_problem_tj.py", line 42, in <module>
    main()
  File "scheduler_problem_tj.py", line 30, in main
    model.fit(dX, dy)
  File "/home/jon/minicondadai/lib/python3.6/site-packages/xgboost/dask.py", line 1080, in fit
    eval_set, sample_weight_eval_set, verbose)
  File "/home/jon/minicondadai/lib/python3.6/site-packages/distributed/client.py", line 824, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jon/minicondadai/lib/python3.6/site-packages/distributed/utils.py", line 336, in sync
    e.wait(10)
  File "/home/jon/minicondadai/lib/python3.6/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/home/jon/minicondadai/lib/python3.6/threading.py", line 299, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt

@pseudotensor
Copy link
Contributor Author

To be clear, as mentioned at first, I'm using rapids 0.14 xgboost 1.2 So there may be improvements. But maybe you would know if such things were fixed.

@trivialfis
Copy link
Member

trivialfis commented Nov 13, 2020

How do you explain? Did you try my version?

Sure. Could you provide a script that I can exactly follow without making any edits (except for data path)?

@trivialfis
Copy link
Member

To be clear, as mentioned at first, I'm using rapids 0.14 xgboost 1.2 So there may be improvements. But maybe you would know if such things were fixed.

I'm not aware of memory leak in xgboost. We run benchmarks with larger datasets every now and then.

@trivialfis
Copy link
Member

12 rounds? You mean 12/100 iterations? That's probably too little to see.

In the meanwhile, it's still running. Hopefully can reproduce it later.

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

If I start to bisect between our codes to look into the first-iteration-CPU-OOM-in-model-fit issue and the GPU OOM issue, some notes:

  1. Running the scheduler/worker/client like your case has no effect.
dask-scheduler --port 8790 --host 192.168.0.26 --pid-file dask_cuda_scheduler.pid --protocol tcp --dashboard-address 8791 --scheduler-file='scheduler.json'
dask-cuda-worker --scheduler-file='scheduler.json'
and `Client(scheduler_file='scheduler.json') as client:` in the script
  1. Adding main has no effect

  2. Consuming pandas instead of numpy fixes the first-iteration-CPU-OOM-in-model-fit issue problem at least. Why should model.fit() consume all 64GB of memory just because I used from_array instead of from_pandas?

To be clear, this first-iteration CPU OOM is not the same issue as the worker or scheduler hitting OOM. Instead, the client process itself hits the CPU OOM during model.fit(). If you have more than 64GB, you should stare at top or something to see memory usage.

Can you just try your script but use from_array()? Just use dX = dd.from_array(X.values, columns=X.columns, chunksize=chunksize)

  1. Avoiding the RMM pool stuff does not fix the GPU OOM problem. I was mistaken originally.

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

i.e. if I use your exact sequence but only 5M rows of HIGGS, I hit the CPU OOM during/inside the very first model.fit() on the client -- again, different issue than worker/scheduler memory problems or the GPU OOM.

import pandas as pd
import dask.dataframe as dd
import xgboost as xgb


def main():
    colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
    df = pd.read_csv('/home/jon/Downloads/HIGGS5M.csv',  names=colnames)

    y = df['label']
    X = df[df.columns.difference(['label'])]
    print('X.shape', X.shape)

    for i in range(0, 100):
        from dask.distributed import Client
        with Client(scheduler_file='scheduler.json') as client:
            params = {
                'tree_method': 'gpu_hist',
                'n_estimators': 2,
                'objective': 'binary:logistic'
            }

            model = xgb.dask.DaskXGBClassifier(**params)

            chunksize = 50000
            #dX = dd.from_pandas(X, chunksize=chunksize)
            dX = dd.from_array(X.values, columns=X.columns, chunksize=chunksize)   # only diff.
            dy = dd.from_pandas(y, chunksize=chunksize)

            print("Done getting data into dask")
            model.fit(dX, dy)
            print("Done with fit")
            preds = model.predict(dX)
            print("Done with predict")
            print(preds[0].compute())

            client.cancel(dX)
            client.cancel(dy)
            print("Done with del")


if __name__ == '__main__':
    main()

@trivialfis
Copy link
Member

trivialfis commented Nov 13, 2020

Replying to: #6388 (comment)

@pseudotensor So here we have a number of issues:

  • On your setup, client process OOM at first iteration.

After using from_pandas instead of from_array, this issue is mitigated, and more issues follow:

  • scheduler OOM for some reason? As scheduler doesn't use GPU, so I believe it's CPU OOM.
  • worker GPU OOM, which is mitigated by not using RMM.

Is this a fair summary? Some more questions: How did you install xgboost? pip/conda/conda with rapids build/built from source?

Replying to #6388 (comment):

Considering that I'm running it on 11M rows right now. Should I cut it down to 5M and try again? Seems very strange. My machine is exactly 64 GB mem.

Screenshot from 2020-11-13 18-59-41

@trivialfis
Copy link
Member

Feel free to correct me if I'm wrong about the summary. But it's a bit difficult for me to comprehend with many comments mixed together. It would be great if you can list out some precise details:

With which script, under what condition, how large is your data, which process OOM, and whether OOM is happening on CPU or GPU.

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

I'll summarize:

  1. Tried 5M x 28 in my original script, hit CPU OOM on first model.fit. Yes, "mitigated" by using from_pandas, but seems like serious issue that from_array leads to such problems. I modified your script to just use from_array and it also hits the CPU OOM in model.fit on client process.

  2. Tried 1M x 28 in my original script, but hit GPU OOM on first model fit. i.e. this exact script: Massive memory (100GB) used by dask-scheduler #6388 (comment) . Every time, first fit GPU OOM is hit. Hangs client, crashes worker. I did not yet try to bisect this to see how changing the script will affect things.

task [xgboost.dask]:tcp://192.168.0.26:40307 connected to the tracker
task [xgboost.dask]:tcp://192.168.0.26:40307 got new rank 0
terminate called after throwing an instance of 'thrust::system::system_error'
  what():  parallel_for failed: out of memory

RMM option there or not doesn't matter. Still hits GPU OOM message. client hangs, bad behavior as well. from_pandas does nothing to fix this. No mitigation so far.

  1. I tried 1M x 10, so I could get around issues 1 and 2 above. This reproduces a worker (especially) but also scheduler acumulation of memory, across iterations within 1 python client script call, but across any number of separate clean runs of the script as well. This is similar to what I originally saw with airlines data, but in that case the scheduler has massive increases in memory use very quickly.

No mitigation so far. Your script shows same accumulation. When worker dies of by OOM killer after 31 loops (for the 5M rows case, but otherwise your original script), the client running xgboost hangs, which is also bad.

To generate the HIGGS5M.csv I'm just taking original full higgs and just doing head -5000000 HIGGS.csv > HIGGS5M.csv

@trivialfis
Copy link
Member

trivialfis commented Nov 13, 2020

Thanks for summarizing. How about:

How did you install xgboost? pip/conda/conda with rapids build/built from source?

Assuming that you are using 1.2.

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

Thanks for summarizing. How about:

How did you install xgboost? pip/conda/conda with rapids build/built from source?

Yes, as for last month all my issues have been on exact same setup software-wise: #6232 (comment) Yes, rapids 0.14, conda, all binaries from conda repos. xgboost 1.2. I can share full details of my conda solution if needed. Only difference for this issue is I'm on 1080ti for all cases (full airlines 3-node cluster case or this MRE case).

@trivialfis
Copy link
Member

Great! I will try reproducing it over this weekend.

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 13, 2020

I made comment above, but your version of script without xgboost leads to no scheduler/worker memory accumulation. So the issue 3 does seem like a pure xgboost issue of (I guess) not cleaning up temporary dask objects, e.g. cancelling futures.

import pandas as pd
import dask.dataframe as dd


def main():
    colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
    df = pd.read_csv('/home/jon/Downloads/HIGGS5M.csv',  names=colnames)

    y = df['label']
    X = df[df.columns.difference(['label'])]
    print('X.shape', X.shape)

    for i in range(0, 100):
        print("i=%d" % i)
        from dask.distributed import Client
        with Client(scheduler_file='scheduler.json') as client:
            chunksize = 50000
            dX = dd.from_pandas(X, chunksize=chunksize)
            #dX = dd.from_array(X.values, columns=X.columns, chunksize=chunksize)
            dy = dd.from_pandas(y, chunksize=chunksize)

            print("Done getting data into dask")
            print(dX.mean().compute())
            print(dy.mean().compute())
            print("Done with means")

            client.cancel(dX)
            client.cancel(dy)
            print("Done with del")


if __name__ == '__main__':
    main()

With this otherwise same script but doing no xgboost stuff, worker stays at about 3.6% of 64GB of memory and scheduler at 2%.

Even doing 1000 iterations and never is there anything like issue 3.

@quasiben
Copy link

A number of things have changed since RAPIDS 0.14. Can you try with 0.16? This was just released in late October?

@pseudotensor
Copy link
Contributor Author

pseudotensor commented Nov 17, 2020

Yes, although I think these are pure xgboost issues for all 3 problems: #6388 (comment)

For problem 1) This happens before any GPU stuff, the numpy frame is poorly handled by xgboost.
For problem 2) This is not rapids specific
For problem 3) This is not rapids specific, likely temporary dask frames that are not cancelled in xgboost. dask_cudf shouldn't be issue since the frames are on CPU and system memory is exhausted. The frames are not on GPU yet and are not GPU memory related.

@trivialfis
Copy link
Member

Continued discussion in dask/dask#6833 . A reproducible example without xgboost is created in dask/dask#6833 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants