Skip to content

Commit

Permalink
docs: Improved LightGBM docs (#2003)
Browse files Browse the repository at this point in the history
* Improved LightGBM docs

* acrolinx fixes

* more acrolinx fixes

* fixes

* nit fix

* annotation fix
  • Loading branch information
svotaw authored Jul 10, 2023
1 parent de94a64 commit e487c69
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ object NetworkManager {
* and then sends back the information to the executors.
*
* @param numTasks The total number of training tasks to wait for.
* @param spark The Spark session.
* @param driverListenPort The port to listen for the driver on.
* @param timeout The timeout (in seconds).
* @param useBarrierExecutionMode Whether to use barrier mode.
* @return The NetworkTopology.
*/
def create(numTasks: Int,
Expand Down Expand Up @@ -93,7 +97,7 @@ object NetworkManager {
* @param partitionId The partition id.
* @param shouldExecuteTraining Whether this task should be a part of the training network.
* @param measures Instrumentation for perf measurements.
* @returns Information about the network topology.
* @return Information about the network topology.
*/
def getGlobalNetworkInfo(ctx: TrainingContext,
log: Logger,
Expand All @@ -120,7 +124,7 @@ object NetworkManager {
out
}

def getNetworkTopologyInfoFromDriver(networkParams: NetworkParams,
private def getNetworkTopologyInfoFromDriver(networkParams: NetworkParams,
taskId: Long,
partitionId: Int,
localListenPort: Int,
Expand Down Expand Up @@ -168,7 +172,7 @@ object NetworkManager {
}.get
}

def parseExecutorPartitionList(partitionsByExecutorStr: String, executorId: String): Array[Int] = {
private def parseExecutorPartitionList(partitionsByExecutorStr: String, executorId: String): Array[Int] = {
// extract this executors partition ids as an array, from a string that is formatter like this:
// executor1=partition1,partition2:executor2=partition3,partition4
val partitionsByExecutor = partitionsByExecutorStr.split(":")
Expand Down Expand Up @@ -225,7 +229,7 @@ object NetworkManager {
mainPort.toInt
}

def findOpenPort(ctx: TrainingContext, log: Logger): Option[Socket] = {
private def findOpenPort(ctx: TrainingContext, log: Logger): Option[Socket] = {
val defaultListenPort: Int = ctx.networkParams.defaultListenPort
val basePort = defaultListenPort + (LightGBMUtils.getWorkerId * ctx.numTasksPerExecutor)
if (basePort > LightGBMConstants.MaxPort) {
Expand Down Expand Up @@ -257,7 +261,7 @@ object NetworkManager {
taskServerSocket
}

def setFinishedStatus(networkParams: NetworkParams, log: Logger): Unit = {
private def setFinishedStatus(networkParams: NetworkParams, log: Logger): Unit = {
using(new Socket(networkParams.ipAddress, networkParams.port)) {
driverSocket =>
using(new BufferedWriter(new OutputStreamWriter(driverSocket.getOutputStream))) {
Expand Down Expand Up @@ -306,7 +310,7 @@ case class NetworkManager(numTasks: Int,

// Concatenate with commas, eg: host1:port1,host2:port2, ... etc
// Also make sure the order is deterministic by sorting on minimum partition id
lazy val networkTopologyAsString: String = {
private lazy val networkTopologyAsString: String = {
val hostPortsList = hostAndPorts.map(_._2).sortBy(hostPort => {
val host = hostPort.split(":")(0)
hostToMinPartition(host)
Expand All @@ -316,7 +320,7 @@ case class NetworkManager(numTasks: Int,

// Create a string representing of the partitionsByExecutor map
// e.g. executor1=partition1,partition2:executor2=partition3,partition4
lazy val partitionsByExecutorAsString: String = {
private lazy val partitionsByExecutorAsString: String = {
val executorList = partitionsByExecutor.map { case (executor, partitionList) =>
executor + "=" + partitionList.mkString(",")
}
Expand All @@ -338,7 +342,7 @@ case class NetworkManager(numTasks: Int,
Await.result(networkCommunicationThread, Duration(timeout, SECONDS))
}

def waitForAllTasksToReport(): Unit = {
private def waitForAllTasksToReport(): Unit = {
if (useBarrierExecutionMode) {
log.info(s"driver using barrier execution mode for $numTasks tasks...")

Expand Down Expand Up @@ -411,7 +415,7 @@ case class NetworkManager(numTasks: Int,
}
}

def sendDataToExecutors(lightGBMNetworkTopology: String, partitionsByExecutor: String): Unit = {
private def sendDataToExecutors(lightGBMNetworkTopology: String, partitionsByExecutor: String): Unit = {
// TODO optimize and not send for bulk mode helpers
// Send aggregated network information back to all tasks and helper tasks on executors
val count = hostAndPorts.length + loadOnlyHostAndPorts.length
Expand All @@ -431,7 +435,7 @@ case class NetworkManager(numTasks: Int,
})
}

def closeConnections(): Unit = {
private def closeConnections(): Unit = {
log.info("driver closing all sockets and server socket")
hostAndPorts.foreach(_._1.close())
driverServerSocket.close()
Expand Down
46 changes: 41 additions & 5 deletions website/docs/features/lightgbm/about.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ GPU enabled decision tree algorithms for ranking, classification, and
many other machine learning tasks. LightGBM is part of Microsoft's
[DMTK](http://github.com/microsoft/dmtk) project.

### Advantages of LightGBM
### Advantages of LightGBM through SynapseML

- **Composability**: LightGBM models can be incorporated into existing
SparkML Pipelines, and used for batch, streaming, and serving
Expand Down Expand Up @@ -126,6 +126,38 @@ using `saveNativeModel()`. Additionally, they're fully compatible with [PMML](ht
can be converted to PMML format through the
[JPMML-SparkML-LightGBM](https://github.com/alipay/jpmml-sparkml-lightgbm) plugin.

#### Dynamic Allocation Limitations
The native LightGBM library has a *distributed mode* that allows the algorithm to work over multiple *machines*. SynapseML
uses this mode to call LightGBM from Spark. SynapseML first gathers all the Spark executor networking information, passes that to LightGBM, and then
waits for LightGBM to complete its work. However, the native LightGBM algorithm implementation assumes all networking is constant over the time period of a single
training or scoring session. The native LightGBM distributed mode was designed this way and isn't a limitation of SynapseML by itself.

Dynamic compute changes can cause LightGBM problems if the Spark executors change during data processing. Spark can naturally
take advantage of cluster autoscaling and can also dynamically replace any failed executor with another, but LightGBM can't
handle these networking changes. Large datasets are affected in particular since they're more likely to cause executor scaling
or have a single executor fail during a single processing pass.

If you're experiencing problems with LightGBM as exposed through SynapseML due to executor changes (for example, occasional Task failures or networking hangs),
there are several options.
1. In the Spark platform, turn off any autoscaling on the cluster you have provisioned.
2. Set *numTasks* manually to be smaller so that fewer executors are used (reducing probability of single executor failure).
3. Turn off dynamic executor scaling with configuration in a notebook cell. In Synapse and Fabric, you can use:

```python
%%configure
{
"conf":
{
"spark.dynamicAllocation.enabled": "false"
}
}
```
Note: setting any custom configuration can affect cluster startup time if your compute platform takes advantage of "live pools"
to improve notebook performance.

If you still have problems, you can consider splitting your data into smaller segments using *numBatches*. Splitting into multiple
batches increases total processing time, but can potentially be used to increase reliability.

### Data Transfer Mode

SynapseML must pass data from Spark partitions to LightGBM native Datasets before turning over control to
Expand Down Expand Up @@ -211,11 +243,14 @@ any parameter that affects bin boundaries and reusing the same estimator, you sh

### Barrier Execution Mode

By default LightGBM uses regular spark paradigm for launching tasks and communicates with the driver to coordinate task execution.
By default LightGBM uses the regular spark paradigm for launching tasks and communicates with the driver to coordinate task execution.
The driver thread aggregates all task host:port information and then communicates the full list back to the workers in order for NetworkInit to be called.
This procedure requires the driver to know how many tasks there are, and a mismatch between the expected number of tasks and the actual number causes the initialization to deadlock.
To avoid this issue, use the `UseBarrierExecutionMode` flag, to use Apache Spark's `barrier()` stage to ensure all tasks execute at the same time.
Barrier execution mode simplifies the logic to aggregate `host:port` information across all tasks.
This procedure requires the driver to know how many tasks there are, and a mismatch between the expected number of tasks and the actual number causes
the initialization to deadlock.

If you're experiencing network issues, you can try using Spark's *barrier* execution mode. SynapseML provides a `UseBarrierExecutionMode` flag,
to use Apache Spark's `barrier()` stage to ensure all tasks execute at the same time.
Barrier execution mode changes the logic to aggregate `host:port` information across all tasks in a synchronized way.
To use it in scala, you can call setUseBarrierExecutionMode(true), for example:

val lgbm = new LightGBMClassifier()
Expand All @@ -224,3 +259,4 @@ To use it in scala, you can call setUseBarrierExecutionMode(true), for example:
.setUseBarrierExecutionMode(true)
...
<train classifier>
Note: barrier execution mode can also cause complicated issues, so use it only if needed.

0 comments on commit e487c69

Please sign in to comment.