Skip to content

Commit

Permalink
[CELEBORN-1568] Support worker retries in MiniCluster
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

### Why are the changes needed?
https://github.com/apache/celeborn/actions/runs/10417785546/job/28852691241#step:4:5804

Now the worker retry logic, the first time of sleep is 2000, the second time is 4000000, and the third time is 8000000000 milliseconds. It is estimated that it will be difficult to complete the retry.

```scala
Thread.sleep(math.pow(2000, workerStartRetry).toInt)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

Closes #2692 from cxzl25/CELEBORN-1568.

Authored-by: sychen <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 973e31e)
Signed-off-by: mingji <[email protected]>
  • Loading branch information
cxzl25 authored and FMX committed Aug 20, 2024
1 parent 34c6aea commit b5ac49f
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy
import java.io.IOException
import java.net.BindException
import java.nio.file.Files
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.{Lock, ReentrantLock}

import scala.collection.mutable
Expand Down Expand Up @@ -202,12 +203,12 @@ trait MiniClusterFeature extends Logging {
logError(s"cannot start worker $i, reached to max retrying", ex)
throw ex
} else {
Thread.sleep(math.pow(2000, workerStartRetry).toInt)
TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry).toLong)
}
}
}
})
workerThread.setName(s"worker ${i} starter thread")
workerThread.setName(s"worker $i starter thread")
workerThread
}
threads.foreach(_.start())
Expand All @@ -228,7 +229,7 @@ trait MiniClusterFeature extends Logging {
workerInfos.foreach { case (worker, _) => assert(worker.registered.get()) }
allWorkersStarted = true
} catch {
case ex: Exception =>
case ex: Throwable =>
logError("all workers haven't been started retrying", ex)
Thread.sleep(5000)
workersWaitingTime += 5000
Expand Down

0 comments on commit b5ac49f

Please sign in to comment.