Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Add true streaming APIs to reduce client-side memory usage #5299

Merged
merged 33 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c2c3632
Extract streaming to own PR
svotaw Jun 16, 2022
6456bb4
small merge fixes and cleanup
svotaw Jun 16, 2022
f3db2f5
linting fixes
svotaw Jun 16, 2022
5224234
fix cast warning
svotaw Jun 17, 2022
0d2c2f7
Fix accidental deletion during branch transfer
svotaw Jun 17, 2022
cd566c5
responded to initial triage comments
svotaw Jun 20, 2022
c74a91e
Added more tests to use create-from-samples APIs
svotaw Jul 1, 2022
da83bfb
added mutex and adjusted nclasses logic
svotaw Jul 9, 2022
9f17b39
Fix thread-safety for pushing data to sparse bins through Push APIs
svotaw Jul 19, 2022
1164bb8
lint and doc fixes
svotaw Jul 19, 2022
e0e9a68
Small SWIG fix
svotaw Jul 22, 2022
06e5021
nit fix
svotaw Jul 23, 2022
3b254d0
Responded to StrikerRUS comments
svotaw Jul 24, 2022
1cc4d7c
fix breaking change after merge with master
svotaw Jul 24, 2022
6769682
Extract streaming to own PR
svotaw Jun 16, 2022
eb1deab
small merge fixes and cleanup
svotaw Jun 16, 2022
b0ee60c
Fix accidental deletion during branch transfer
svotaw Jun 17, 2022
a08a493
responded to initial triage comments
svotaw Jun 20, 2022
d533a06
Added more tests to use create-from-samples APIs
svotaw Jul 1, 2022
c50f419
Fix rstcheck call in ci
svotaw Jul 25, 2022
7b1bd50
remove TODOs
svotaw Jul 25, 2022
7458ac4
Extract streaming to own PR
svotaw Jun 16, 2022
780cd7f
small merge fixes and cleanup
svotaw Jun 16, 2022
4107ffb
Fix accidental deletion during branch transfer
svotaw Jun 17, 2022
f60842a
responded to initial triage comments
svotaw Jun 20, 2022
7a29a3b
Added more tests to use create-from-samples APIs
svotaw Jul 1, 2022
16ee03e
Small SWIG fix
svotaw Jul 22, 2022
958ea22
remove ci change
svotaw Jul 25, 2022
386978c
responded to shiyu1994 comments
svotaw Jul 27, 2022
be3c368
Merge branch 'master' into streaming
svotaw Jul 28, 2022
cef90e9
responded to StrikerRUS comments
svotaw Jul 31, 2022
91ca0b7
Fixes from StrikerRUS comments
svotaw Aug 2, 2022
7adbde3
Merge branch 'microsoft:master' into streaming
svotaw Aug 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,10 @@ if(BUILD_CPP_TEST)
FetchContent_MakeAvailable(googletest)
add_library(GTest::GTest ALIAS gtest)
endif()

set(LightGBM_TEST_HEADER_DIR ${PROJECT_SOURCE_DIR}/tests/cpp_tests)
include_directories(${LightGBM_TEST_HEADER_DIR})

file(GLOB CPP_TEST_SOURCES tests/cpp_tests/*.cpp)
if(MSVC)
set(
Expand All @@ -597,7 +601,7 @@ if(BUILD_CPP_TEST)
endforeach()
endif()
add_executable(testlightgbm ${CPP_TEST_SOURCES})
target_link_libraries(testlightgbm PRIVATE lightgbm_objs GTest::GTest)
target_link_libraries(testlightgbm PRIVATE lightgbm_objs lightgbm_capi_objs GTest::GTest)
endif()

install(
Expand Down
7 changes: 6 additions & 1 deletion include/LightGBM/bin.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,13 @@ class Bin {
/*! \brief virtual destructor */
virtual ~Bin() {}
/*!
* \brief Initialize for pushing. By default, no action needed.
* \pram num_thread The number of external threads that will be calling the push APIs
svotaw marked this conversation as resolved.
Show resolved Hide resolved
svotaw marked this conversation as resolved.
Show resolved Hide resolved
*/
virtual void InitStreaming(uint32_t /*num_thread*/) { }
/*!
* \brief Push one record
* \pram tid Thread id
* \param tid Thread id
* \param idx Index of record
* \param value bin value of record
*/
Expand Down
98 changes: 98 additions & 0 deletions include/LightGBM/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,23 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateByReference(const DatasetHandle referenc
int64_t num_total_row,
DatasetHandle* out);

/*!
* \brief Initialize the Dataset for streaming.
* \param dataset Handle of dataset
* \param has_weights Whether the dataset has Metadata weights
* \param has_init_scores Whether the dataset has Metadata initial scores
* \param has_queries Whether the dataset has Metadata queries/groups
* \param nclasses Number of initial score classes
* \param nthreads Number of external threads that will use the PushRows APIs
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetInitStreaming(DatasetHandle dataset,
int32_t has_weights,
int32_t has_init_scores,
int32_t has_queries,
int32_t nclasses,
int32_t nthreads);

/*!
* \brief Push data to existing dataset, if ``nrow + start_row == num_total_row``, will call ``dataset->FinishLoad``.
* \param dataset Handle of dataset
Expand All @@ -162,6 +179,38 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetPushRows(DatasetHandle dataset,
int32_t ncol,
int32_t start_row);

/*!
* \brief Push data to existing dataset.
* The general flow for a streaming scenario is:
* 1. create Dataset "schema" (e.g. ``LGBM_DatasetCreateFromSampledColumn``)
* 2. init them for thread-safe streaming (``LGBM_DatasetInitStreaming``)
* 3. push data (``LGBM_DatasetPushRowsWithMetadata`` or ``LGBM_DatasetPushRowsByCSRWithMetadata``)
* 4. call ``LGBM_DatasetMarkFinished``
* \param dataset Handle of dataset
* \param data Pointer to the data space
* \param data_type Type of ``data`` pointer, can be ``C_API_DTYPE_FLOAT32`` or ``C_API_DTYPE_FLOAT64``
* \param nrow Number of rows
* \param ncol Number of feature columns
* \param start_row Row start index, i.e., the index at which to start inserting data
* \param label Pointer to array with nrow labels
* \param weight Optional pointer to array with nrow weights
* \param init_score Optional pointer to array with nrow*nclasses initial scores, in column format
* \param query Optional pointer to array with nrow query values
* \param tid The id of the calling thread, from 0...N-1 threads
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetPushRowsWithMetadata(DatasetHandle dataset,
const void* data,
int data_type,
int32_t nrow,
int32_t ncol,
int32_t start_row,
const float* label,
const float* weight,
const double* init_score,
const int32_t* query,
int32_t tid);

/*!
* \brief Push data to existing dataset, if ``nrow + start_row == num_total_row``, will call ``dataset->FinishLoad``.
* \param dataset Handle of dataset
Expand All @@ -187,6 +236,55 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetPushRowsByCSR(DatasetHandle dataset,
int64_t num_col,
int64_t start_row);

/*!
* \brief Push CSR data to existing dataset. (See ``LGBM_DatasetPushRowsWithMetadata`` for more details.)
* \param dataset Handle of dataset
* \param indptr Pointer to row headers
* \param indptr_type Type of ``indptr``, can be ``C_API_DTYPE_INT32`` or ``C_API_DTYPE_INT64``
* \param indices Pointer to column indices
* \param data Pointer to the data space
* \param data_type Type of ``data`` pointer, can be ``C_API_DTYPE_FLOAT32`` or ``C_API_DTYPE_FLOAT64``
* \param nindptr Number of rows in the matrix + 1
* \param nelem Number of nonzero elements in the matrix
* \param start_row Row start index
* \param label Pointer to array with nindptr-1 labels
* \param weight Optional pointer to array with nindptr-1 weights
* \param init_score Optional pointer to array with (nindptr-1)*nclasses initial scores, in column format
* \param query Optional pointer to array with nindptr-1 query values
* \param tid The id of the calling thread, from 0...N-1 threads
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset,
const void* indptr,
int indptr_type,
const int32_t* indices,
const void* data,
int data_type,
int64_t nindptr,
int64_t nelem,
int64_t start_row,
const float* label,
const float* weight,
const double* init_score,
const int32_t* query,
int32_t tid);

/*!
* \brief Set whether or not the Dataset waits for a manual MarkFinished call or calls FinishLoad on itself automatically.
* Set to 1 for streaming scenario, and use ``LGBM_DatasetMarkFinished`` to manually finish the Dataset.
* \param dataset Handle of dataset
* \param wait Whether to wait or not (1 or 0)
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetSetWaitForManualFinish(DatasetHandle dataset, int wait);

/*!
* \brief Mark the Dataset as complete by calling ``dataset->FinishLoad``.
* \param dataset Handle of dataset
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetMarkFinished(DatasetHandle dataset);

/*!
* \brief Create a dataset from CSR format.
* \param indptr Pointer to row headers
Expand Down
121 changes: 118 additions & 3 deletions include/LightGBM/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,30 @@ class Metadata {
~Metadata();

/*!
* \brief Initial work, will allocate space for label, weight(if exists) and query(if exists)
* \brief Initial work, will allocate space for label, weight (if exists) and query (if exists)
* \param num_data Number of training data
* \param weight_idx Index of weight column, < 0 means doesn't exists
* \param query_idx Index of query id column, < 0 means doesn't exists
*/
void Init(data_size_t num_data, int weight_idx, int query_idx);

/*!
* \brief Allocate space for label, weight (if exists), initial score (if exists) and query (if exists)
* \param num_data Number of data
* \param reference Reference metadata
*/
void InitByReference(data_size_t num_data, const Metadata* reference);

/*!
* \brief Allocate space for label, weight (if exists), initial score (if exists) and query (if exists)
* \param num_data Number of data rows
* \param has_weights Whether the metadata has weights
* \param has_init_scores Whether the metadata has initial scores
* \param has_queries Whether the metadata has queries
* \param nclasses Number of classes for initial scores
*/
void Init(data_size_t num_data, int32_t has_weights, int32_t has_init_scores, int32_t has_queries, int32_t nclasses);

/*!
* \brief Partition label by used indices
* \param used_indices Indices of local used
Expand Down Expand Up @@ -138,6 +155,19 @@ class Metadata {
weights_[idx] = value;
}

/*!
* \brief Set initial scores for one record. Note that init_score might have multiple columns and is stored in column format.
* \param idx Index of this record
* \param values Initial score values for this record, one per class
*/
inline void SetInitScoreAt(data_size_t idx, const double* values) {
const auto nclasses = num_classes();
const double* val_ptr = values;
for (int i = idx; i < nclasses * num_data_; i += num_data_, ++val_ptr) {
svotaw marked this conversation as resolved.
Show resolved Hide resolved
init_score_[i] = *val_ptr;
}
}

/*!
* \brief Set Query Id for one record
* \param idx Index of this record
Expand All @@ -150,6 +180,26 @@ class Metadata {
/*! \brief Load initial scores from file */
void LoadInitialScore(const std::string& data_filename);

/*!
* \brief Insert data from a given data to the current data at a specified index
* \param start_index The target index to begin the insertion
* \param count Number of records to insert
* \param labels Pointer to label data
* \param weights Pointer to weight data, or null
* \param init_scores Pointer to init-score data, or null
* \param queries Pointer to query data, or null
*/
void InsertAt(data_size_t start_index,
data_size_t count,
const float* labels,
const float* weights,
const double* init_scores,
const int32_t* queries);

/*!
* \brief Perform any extra operations after all data has been loaded
*/
void FinishLoad();
/*!
* \brief Get weights, if not exists, will return nullptr
* \return Pointer of weights
Expand Down Expand Up @@ -212,6 +262,16 @@ class Metadata {
*/
inline int64_t num_init_score() const { return num_init_score_; }

/*!
* \brief Get number of classes
*/
inline int32_t num_classes() const {
if (num_data_ && num_init_score_) {
return static_cast<int>(num_init_score_ / num_data_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is num_init_score_ always set to non zero values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, sometimes there are no initial scores. Should we return 1 even in the case of no initial score inputs?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but when there is no num_init_score_, num_classes could be > 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense to return 1 for all non-multiclass scenarios, so I made the change.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that when multiclass, and num_init_score_ == 0, this could still return 1?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@svotaw is this addressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like I missed this one. Is there a particular concern/bug? What is it that you suggest it should return? This is currently used as part of allocation. If num_init_score_ == 0, this is not used. Happy to fix it if needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_init_score_ does not always have values.
I believe the num_class is a hyper-parameter, which can get from config. So I am not sure why you use num_init_score_ to get num_class.

If dataset->num_classes() is currently only used for init_score assignment, maybe we should use a different name to avoid future bugs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. How about num_init_score_classes()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks good to me

}
return 1;
}

/*! \brief Disable copy */
Metadata& operator=(const Metadata&) = delete;
/*! \brief Disable copy */
Expand All @@ -230,8 +290,18 @@ class Metadata {
void LoadWeights();
/*! \brief Load query boundaries from file */
void LoadQueryBoundaries();
/*! \brief Load query wights */
void LoadQueryWeights();
/*! \brief Calculate query weights from queries */
void CalculateQueryWeights();
/*! \brief Calculate query boundaries from queries */
void CalculateQueryBoundaries();
/*! \brief Insert labels at the given index */
void InsertLabels(const label_t* labels, data_size_t start_index, data_size_t len);
/*! \brief Insert weights at the given index */
void InsertWeights(const label_t* weights, data_size_t start_index, data_size_t len);
/*! \brief Insert initial scores at the given index */
void InsertInitScores(const double* init_scores, data_size_t start_index, data_size_t len, data_size_t source_size);
/*! \brief Insert queries at the given index */
void InsertQueries(const data_size_t* queries, data_size_t start_index, data_size_t len);
/*! \brief Filename of current data */
std::string data_filename_;
/*! \brief Number of data */
Expand Down Expand Up @@ -374,6 +444,27 @@ class Dataset {
/*! \brief Destructor */
LIGHTGBM_EXPORT ~Dataset();

/*!
* \brief Initialize from the given reference
* \param num_data Number of data
* \param reference Reference dataset
*/
LIGHTGBM_EXPORT void InitByReference(data_size_t num_data, const Dataset* reference) {
metadata_.InitByReference(num_data, &reference->metadata());
}

LIGHTGBM_EXPORT void InitStreaming(data_size_t num_data,
int32_t has_weights,
int32_t has_init_scores,
int32_t has_queries,
int32_t nclasses,
int32_t nthreads) {
metadata_.Init(num_data, has_weights, has_init_scores, has_queries, nclasses);
for (int i = 0; i < num_groups_; ++i) {
feature_groups_[i]->InitStreaming(nthreads);
}
}

LIGHTGBM_EXPORT bool CheckAlign(const Dataset& other) const {
if (num_features_ != other.num_features_) {
return false;
Expand Down Expand Up @@ -452,6 +543,15 @@ class Dataset {
}
}

inline void InsertMetadataAt(data_size_t start_index,
data_size_t count,
const label_t* labels,
const label_t* weights,
const double* init_scores,
const data_size_t* queries) {
metadata_.InsertAt(start_index, count, labels, weights, init_scores, queries);
}

inline int RealFeatureIndex(int fidx) const {
return real_feature_idx_[fidx];
}
Expand Down Expand Up @@ -743,6 +843,18 @@ class Dataset {
/*! \brief Get Number of data */
inline data_size_t num_data() const { return num_data_; }

/*! \brief Get whether FinishLoad is automatically called when pushing last row. */
inline bool wait_for_manual_finish() const { return wait_for_manual_finish_; }

/*! \brief Set whether the Dataset is finished automatically when last row is pushed or with a manual
* MarkFinished API call. Set to true for thread-safe streaming and/or if will be coalesced later.
* FinishLoad should not be called on any Dataset that will be coalesced.
*/
inline void set_wait_for_manual_finish(bool value) {
std::lock_guard<std::mutex> lock(mutex_);
wait_for_manual_finish_ = value;
}

/*! \brief Disable copy */
Dataset& operator=(const Dataset&) = delete;
/*! \brief Disable copy */
Expand Down Expand Up @@ -834,12 +946,15 @@ class Dataset {
bool zero_as_missing_;
std::vector<int> feature_need_push_zeros_;
std::vector<std::vector<float>> raw_data_;
bool wait_for_manual_finish_;
svotaw marked this conversation as resolved.
Show resolved Hide resolved
bool has_raw_;
/*! map feature (inner index) to its index in the list of numeric (non-categorical) features */
std::vector<int> numeric_feature_map_;
int num_numeric_features_;
std::string device_type_;
int gpu_device_id_;
/*! \brief mutex for threading safe call */
std::mutex mutex_;

#ifdef USE_CUDA_EXP
std::unique_ptr<CUDAColumnData> cuda_column_data_;
Expand Down
20 changes: 17 additions & 3 deletions include/LightGBM/feature_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,28 @@ class FeatureGroup {
/*! \brief Destructor */
~FeatureGroup() {}

/*!
* \brief Initialize for pushing in a streaming fashion. By default, no action needed.
* \param num_thread The number of external threads that will be calling the push APIs
*/
void InitStreaming(int32_t num_thread) {
if (is_multi_val_) {
for (int i = 0; i < num_feature_; ++i) {
multi_bin_data_[i]->InitStreaming(num_thread);
}
} else {
bin_data_->InitStreaming(num_thread);
}
}

/*!
* \brief Push one record, will auto convert to bin and push to bin data
* \param tid Thread id
* \param idx Index of record
* \param sub_feature_idx Index of the subfeature
* \param line_idx Index of record
* \param value feature value of record
*/
inline void PushData(int tid, int sub_feature_idx, data_size_t line_idx,
double value) {
inline void PushData(int tid, int sub_feature_idx, data_size_t line_idx, double value) {
uint32_t bin = bin_mappers_[sub_feature_idx]->ValueToBin(value);
if (bin == bin_mappers_[sub_feature_idx]->GetMostFreqBin()) {
return;
Expand Down
Loading