Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dask] Dask estimators sometimes return an incomplete booster #3918

Closed
jameslamb opened this issue Feb 7, 2021 · 2 comments
Closed

[dask] Dask estimators sometimes return an incomplete booster #3918

jameslamb opened this issue Feb 7, 2021 · 2 comments

Comments

@jameslamb
Copy link
Collaborator

jameslamb commented Feb 7, 2021

Short description of the problem

DaskLGBMRanker and DaskLGBMRegressor sometimes return a model with an incomplete booster. Training with num_iterations = 50 and no early stopping, .fit() from these classes sometimes returns a booster with less than 50 trees.

Reproducible Example

The example code below uses the functions from lightgbm's tests to create datasets. I'll try to come back and simplify it further.

import itertools

import dask.array as da
import dask.dataframe as dd
import lightgbm as lgb
import numpy as np
import pandas as pd
from dask.distributed import Client, LocalCluster
from scipy.stats import spearmanr
from sklearn.datasets import make_blobs, make_regression

n_workers = 3
cluster = LocalCluster(n_workers=n_workers)
client = Client(cluster)
client.wait_for_workers(n_workers)

print(f"View the dashboard: {cluster.dashboard_link}")

from itertools import groupby
from sklearn.utils import check_random_state

def make_ranking(n_samples=100, n_features=20, n_informative=5, gmax=2,
                 group=None, random_gs=False, avg_gs=10, random_state=0):
    rnd_generator = check_random_state(random_state)

    y_vec, group_id_vec = np.empty((0,), dtype=int), np.empty((0,), dtype=int)
    gid = 0

    # build target, group ID vectors.
    relvalues = range(gmax + 1)

    # build y/target and group-id vectors with user-specified group sizes.
    if group is not None and hasattr(group, '__len__'):
        n_samples = np.sum(group)

        for i, gsize in enumerate(group):
            y_vec = np.concatenate((y_vec, rnd_generator.choice(relvalues, size=gsize, replace=True)))
            group_id_vec = np.concatenate((group_id_vec, [i] * gsize))

    # build y/target and group-id vectors according to n_samples, avg_gs, and random_gs.
    else:
        while len(y_vec) < n_samples:
            gsize = avg_gs if not random_gs else rnd_generator.poisson(avg_gs)

            # groups should contain > 1 element for pairwise learning objective.
            if gsize < 1:
                continue

            y_vec = np.append(y_vec, rnd_generator.choice(relvalues, size=gsize, replace=True))
            group_id_vec = np.append(group_id_vec, [gid] * gsize)
            gid += 1

        y_vec, group_id_vec = y_vec[:n_samples], group_id_vec[:n_samples]

    # build feature data, X. Transform first few into informative features.
    n_informative = max(min(n_features, n_informative), 0)
    X = rnd_generator.uniform(size=(n_samples, n_features))

    for j in range(n_informative):
        bias, coef = rnd_generator.normal(size=2)
        X[:, j] = bias + coef * y_vec

    return X, y_vec, group_id_vec

def _create_ranking_data(n_samples=100, chunk_size=50, **kwargs):
    X, y, g = make_ranking(n_samples=n_samples, random_state=42, **kwargs)
    rnd = np.random.RandomState(42)
    w = rnd.rand(X.shape[0]) * 0.01
    g_rle = np.array([len(list(grp)) for _, grp in groupby(g)])

    # ranking arrays: one chunk per group. Each chunk must include all columns.
    p = X.shape[1]
    dX, dy, dw, dg = [], [], [], []
    for g_idx, rhs in enumerate(np.cumsum(g_rle)):
        lhs = rhs - g_rle[g_idx]
        dX.append(da.from_array(X[lhs:rhs, :], chunks=(rhs - lhs, p)))
        dy.append(da.from_array(y[lhs:rhs]))
        dw.append(da.from_array(w[lhs:rhs]))
        dg.append(da.from_array(np.array([g_rle[g_idx]])))

    dX = da.concatenate(dX, axis=0)
    dy = da.concatenate(dy, axis=0)
    dw = da.concatenate(dw, axis=0)
    dg = da.concatenate(dg, axis=0)

    return X, y, w, g_rle, dX, dy, dw, dg

def _create_data(objective, n_samples=100, centers=2, chunk_size=50):
    if objective == 'classification':
        X, y = make_blobs(n_samples=n_samples, centers=centers, random_state=42)
    elif objective == 'regression':
        X, y = make_regression(n_samples=n_samples, random_state=42)
    rnd = np.random.RandomState(42)
    weights = rnd.random(X.shape[0]) * 0.01
    dX = da.from_array(X, (chunk_size, X.shape[1]))
    dy = da.from_array(y, chunk_size)
    dw = da.from_array(weights, chunk_size)
    return X, y, weights, dX, dy, dw

params = {
    "random_state": 42,
    "n_estimators": 50,
    "num_leaves": 20,
    "min_child_samples": 1
}

# ----- ranking ----- #
dask_ranker = lgb.DaskLGBMRanker(
    client=client,
    time_out=5,
    local_listen_port=12400,
    tree_learner_type='data_parallel',
    **params
)
local_ranker = lgb.LGBMRanker(**params)

X, y, w, g, dX, dy, dw, dg = _create_ranking_data()

dask_summaries = {}
local_summaries = {}
for _ in range(100):
    dask_ranker = dask_ranker.fit(dX, dy, sample_weight=dw, group=dg)
    num_trees = dask_ranker.booster_.num_trees()
    dask_summaries[num_trees] = dask_summaries.get(num_trees, 0) + 1
    
    local_ranker.fit(X, y, sample_weight=w, group=g)
    num_trees = local_ranker.booster_.num_trees()
    local_summaries[num_trees] = local_summaries.get(num_trees, 0) + 1

print("   dask: " + str(dask_summaries))
print("sklearn: " + str(local_summaries))

# ----- regression ----- #
dask_regressor = lgb.DaskLGBMRegressor(
    client=client,
    time_out=5,
    local_listen_port=12400,
    tree_learner_type='data_parallel',
    **params
)
local_regressor = lgb.LGBMRegressor(**params)

X, y, w, dX, dy, dw = _create_data(objective='regression')

dask_summaries = {}
local_summaries = {}
for _ in range(100):
    dask_regressor.fit(dX, dy, sample_weight=dw)
    num_trees = dask_regressor.booster_.num_trees()
    dask_summaries[num_trees] = dask_summaries.get(num_trees, 0) + 1
    
    local_regressor.fit(X, y, sample_weight=w)
    num_trees = local_regressor.booster_.num_trees()
    local_summaries[num_trees] = local_summaries.get(num_trees, 0) + 1

print("   dask: " + str(dask_summaries))
print("sklearn: " + str(local_summaries))

# ----- binary classification ----- #
dask_classifier = lgb.DaskLGBMClassifier(
    client=client,
    time_out=5,
    local_listen_port=12400,
    tree_learner_type='data_parallel',
    **params
)
local_classifier = lgb.LGBMClassifier(**params)

X, y, w, dX, dy, dw = _create_data(objective='classification')

dask_summaries = {}
local_summaries = {}
for _ in range(100):
    dask_classifier.fit(dX, dy, sample_weight=dw)
    num_trees = dask_classifier.booster_.num_trees()
    dask_summaries[num_trees] = dask_summaries.get(num_trees, 0) + 1
    
    local_classifier.fit(X, y, sample_weight=w)
    num_trees = local_classifier.booster_.num_trees()
    local_summaries[num_trees] = local_summaries.get(num_trees, 0) + 1

print("   dask: " + str(dask_summaries))
print("sklearn: " + str(local_summaries))

# ----- multiclass classification ----- #
dask_classifier = lgb.DaskLGBMClassifier(
    client=client,
    time_out=5,
    local_listen_port=12400,
    tree_learner_type='data_parallel',
    **params
)
local_classifier = lgb.LGBMClassifier(**params)

X, y, w, dX, dy, dw = _create_data(objective='classification', centers=3)

dask_summaries = {}
local_summaries = {}
for _ in range(100):
    dask_classifier.fit(dX, dy, sample_weight=dw)
    num_trees = dask_classifier.booster_.num_trees()
    dask_summaries[num_trees] = dask_summaries.get(num_trees, 0) + 1
    
    local_classifier.fit(X, y, sample_weight=w)
    num_trees = local_classifier.booster_.num_trees()
    local_summaries[num_trees] = local_summaries.get(num_trees, 0) + 1

print("   dask: " + str(dask_summaries))
print("sklearn: " + str(local_summaries))

Running that example, I got results like this:

# ----- ranking ----- #
   dask: {50: 90, 20: 2, 45: 2, 25: 1, 16: 1, 18: 1, 29: 1, 40: 1, 14: 1}
sklearn: {50: 100}

# ----- regression ----- #
   dask: {50: 100}
sklearn: {50: 100}

# ----- binary classification ----- #
   dask: {47: 53, 46: 47}
sklearn: {48: 100}

# ----- multiclass classification ----- #
   dask: {117: 54, 111: 46}
sklearn: {150: 100}

So it seems like the problem is mostly specific to distributed training.

Although it's confusing to see 48 trees for LGBMRegressor.

Environment Info

Operating System: Ubuntu 18.04

C++ compiler version: gcc 9.3.0

CMake version: 3.16.3

Python version: 3.8.5, see conda info beloow

conda info output (click me)
     active environment : None
       user config file : /home/jovyan/.condarc
 populated config files : /opt/conda/.condarc
          conda version : 4.8.3
    conda-build version : not installed
         python version : 3.8.5.final.0
       virtual packages : __glibc=2.31
       base environment : /opt/conda  (writable)
           channel URLs : https://conda.anaconda.org/conda-forge/linux-64
                          https://conda.anaconda.org/conda-forge/noarch
                          https://repo.anaconda.com/pkgs/main/linux-64
                          https://repo.anaconda.com/pkgs/main/noarch
                          https://repo.anaconda.com/pkgs/r/linux-64
                          https://repo.anaconda.com/pkgs/r/noarch
          package cache : /opt/conda/pkgs
                          /home/jovyan/.conda/pkgs
       envs directories : /opt/conda/envs
                          /home/jovyan/.conda/envs
               platform : linux-64
             user-agent : conda/4.8.3 requests/2.24.0 CPython/3.8.5 Linux/5.4.39-linuxkit ubuntu/20.04 glibc/2.31
                UID:GID : 1000:100
             netrc file : None
           offline mode : False

LightGBM version or commit hash: https://github.com/microsoft/LightGBM/tree/ffebc43fea44ba95a0bc2b4366fe9b4ff8275c22 (latest master)

Other Notes

  • I'm writing this up now that I've noticed it and have a reproducible example. I don't know yet if this is specific to LGBMRanker, or if the same problem affects LGBMClassifier and LGBMRegressor
  • If this problem is specific to the Dask interface, I suspect it is here:
    results = client.gather(futures_classifiers)
    results = [v for v in results if v]
    return results[0]
  • it's possible that this isn't an issue in the Dask module and that it's something in LightGBM distributed training generally. If that's true, it might require setting up tests on distributed training with the LightGBM CLI (Write tests for parallel code #3841)

Edits

EDIT 1: Added check that the problem doesn't exist with lgb.sklearn.LGBMRanker. It doesn't, so seems the problem is specific to distributed training.
EDIT 2 Added tests for regressor and classifier. Binary and multi-class classification seem to also suffer from this problem.

@jameslamb jameslamb changed the title [dask] DaskLGBMRanker sometimes returns an incomplete booster [dask] Dask estimators sometimes return an incomplete booster Feb 7, 2021
@jameslamb
Copy link
Collaborator Author

I ran a few more experiments tonight and found some interesting things. Basically, I don't think there is a bug and I think this is just a problem that can arise using very very small training data (relative to n_estimators and max_depth / num_leaves).

So I think this can be closed.


For binary classification, increasing the dataset size from 100 to 1000 observations makes the problem go away.

# ----- binary classification ----- #
client.restart()
X, y, w, dX, dy, dw = _create_data(n_samples=1000, objective="classification")

dX = dX.persist()
dy = dy.persist()
dw = dw.persist()
_ = wait([dX, dy, dw])

dask_summaries = {}
local_summaries = {}
for i in range(100):
    if i % 5 == 0:
        print(i)
    dask_classifier.fit(dX, dy, sample_weight=dw)
    num_trees = dask_classifier.booster_.num_trees()
    dask_summaries[num_trees] = dask_summaries.get(num_trees, 0) + 1

    local_classifier.fit(X, y, sample_weight=w)
    num_trees = local_classifier.booster_.num_trees()
    local_summaries[num_trees] = local_summaries.get(num_trees, 0) + 1

print("   dask: " + str(dask_summaries))
print("sklearn: " + str(local_summaries))



My working theory right now is that with very small data + mostly random features, and then splitting that data up into two pieces, it's easy to randomly get into a situation where it's not possible to boost for the desired number of rounds and find splits that satisfy the default conditions like min_sum_hessian_in_leaf = 1e-3. Distributed training is even more sensitive to this, since each worker makes local estimates of the gain for splits, and only holds #data / #workers data (in the perfectly-balanced case).

I found that when I cut n_samples to 50, even lgb.sklearn.LGBMRanker only boosts for 33 rounds (with the params shown in the description above, which includes n_estimators=50).

@github-actions
Copy link

This issue has been automatically locked since there has not been any recent activity since it was closed. To start a new related discussion, open a new issue at https://github.com/microsoft/LightGBM/issues including a reference to this.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Aug 23, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant