Skip to content

Commit

Permalink
feat: actor load balance model.
Browse files Browse the repository at this point in the history
  • Loading branch information
yankun1992 committed Sep 7, 2024
1 parent 66ba399 commit ebecdac
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 51 deletions.
3 changes: 0 additions & 3 deletions core/src/cc/otavia/core/actor/AbstractActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ private[core] abstract class AbstractActor[M <: Call] extends FutureDispatcher w

private var currentSendMessageId: Long = Long.MinValue

private[core] var instances: Int = 1
private[core] var instanceIndex: Int = 0

final private[core] def stackEndRate: Float =
if (revAsks != 0) sendAsks.toFloat / revAsks.toFloat else Float.MaxValue

Expand Down
27 changes: 17 additions & 10 deletions core/src/cc/otavia/core/actor/ActorContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,26 @@

package cc.otavia.core.actor

import cc.otavia.core.actor.Actor
import cc.otavia.core.address.Address
import cc.otavia.core.message.Message
import cc.otavia.core.system.ActorSystem

/** [[Actor]] content info, the content info is create by actor system when a [[Actor]] instance is creating by actor
/** [[Actor]] content info, the content info is created by actor system when a [[Actor]] instance is creating by actor
* system, and inject to [[Actor]] instance by [[Actor.setCtx]]
*
* @param system
* actor system
* @param address
* physical address
* @param actorId
* id distributed by actor system
*/
final case class ActorContext(system: ActorSystem, address: Address[? <: Message], actorId: Long)
trait ActorContext {

/** actor system */
def system: ActorSystem

/** physical address of the binding actor. */
def address: Address[? <: Message]

/** id distributed by actor system */
def actorId: Long

def isLoadBalance: Boolean

def mountedThreadId: Int

}
7 changes: 4 additions & 3 deletions core/src/cc/otavia/core/address/RobinAddress.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import cc.otavia.core.actor.AbstractActor
import cc.otavia.core.message.*
import cc.otavia.core.stack.MessageFuture

class RobinAddress[M <: Call](val underlying: Array[ActorAddress[M]]) extends ProxyAddress[M] {
class RobinAddress[M <: Call](val underlying: Array[ActorAddress[M]], val isLB: Boolean = false)
extends ProxyAddress[M] {

private var noticeCursor: Int = 0
private var askCursor: Int = 0
Expand Down Expand Up @@ -51,8 +52,8 @@ class RobinAddress[M <: Call](val underlying: Array[ActorAddress[M]]) extends Pr
}

final private def getAddress(using sender: AbstractActor[? <: Call]): Address[M] = {
if (sender.instances == underlying.length) {
underlying(sender.instanceIndex)
if (sender.context.isLoadBalance && isLB) {
underlying(sender.context.mountedThreadId)
} else {
val index = askCursor % underlying.length
askCursor += 1
Expand Down
2 changes: 2 additions & 0 deletions core/src/cc/otavia/core/reactor/IoExecutionContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ trait IoExecutionContext {
*/
def canBlock: Boolean

def canNotBlock: Boolean = !canBlock

/** Returns the amount of time left until the scheduled task with the closest deadline should run. */
def delayNanos(currentTimeNanos: Long): Long = 0

Expand Down
25 changes: 20 additions & 5 deletions core/src/cc/otavia/core/system/ActorHouse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import cc.otavia.core.message.*
import cc.otavia.core.system.ActorHouse.*
import cc.otavia.core.util.Nextable

import java.lang.management.*
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
import scala.language.unsafeNulls
Expand All @@ -34,11 +33,15 @@ import scala.language.unsafeNulls
* @tparam M
* the message type of the mounted actor instance can handle
*/
final private[core] class ActorHouse(val manager: HouseManager) {
final private[core] class ActorHouse(val manager: HouseManager) extends ActorContext {

private var dweller: AbstractActor[? <: Call] = _
private var actorAddress: ActorAddress[Call] = _
private var dwellerId: Long = -1
private var atp: Int = 0

private var isLB: Boolean = false

private val noticeMailbox: MailBox = new MailBox(this)
private val askMailbox: MailBox = new MailBox(this)
private val replyMailbox: MailBox = new MailBox(this)
Expand All @@ -52,6 +55,8 @@ final private[core] class ActorHouse(val manager: HouseManager) {

@volatile private var _inHighPriorityQueue: Boolean = false

override def mountedThreadId: Int = manager.thread.index

def highPriority: Boolean = (replyMailbox.size() > HIGH_PRIORITY_REPLY_SIZE) ||
(eventMailbox.size() > HIGH_PRIORITY_EVENT_SIZE) || (dweller.stackEndRate < 0.6)

Expand Down Expand Up @@ -92,9 +97,19 @@ final private[core] class ActorHouse(val manager: HouseManager) {
case _ => throw new IllegalStateException("")
}

def setActorId(id: Long): Unit = dwellerId = id

def setLB(boolean: Boolean): Unit = isLB = boolean

override def isLoadBalance: Boolean = isLB

def actor: AbstractActor[? <: Call] = this.dweller

def system: ActorSystem = manager.thread.system
override def system: ActorSystem = manager.system

override def address: ActorAddress[_ <: Message] = actorAddress

override def actorId: Long = dwellerId

def actorType: Int = atp

Expand Down Expand Up @@ -304,11 +319,11 @@ final private[core] class ActorHouse(val manager: HouseManager) {
address
}

private[core] def createUntypedAddress(): ActorAddress[?] = {
private[core] def createUntypedAddress(): Unit = {
val address = new ActorAddress[Call](this)
if (actor.isInstanceOf[AutoCleanable])
manager.thread.registerAddressRef(address)
address
actorAddress = address
}

override def toString: String = s"events=${eventMailbox.size()}, notices=${noticeMailbox.size()}, " +
Expand Down
35 changes: 15 additions & 20 deletions core/src/cc/otavia/core/system/ActorSystemImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,23 @@

package cc.otavia.core.system

import cc.otavia.buffer.BufferAllocator
import cc.otavia.buffer.pool.{DirectPooledPageAllocator, HeapPooledPageAllocator}
import cc.otavia.common.SystemPropertyUtil
import cc.otavia.core.actor.*
import cc.otavia.core.address.*
import cc.otavia.core.cache.ThreadLocal
import cc.otavia.core.channel.ChannelFactory
import cc.otavia.core.ioc.{BeanDefinition, BeanManager, Module}
import cc.otavia.core.message.Call
import cc.otavia.core.slf4a.{LogLevel, Logger}
import cc.otavia.core.slf4a.Logger
import cc.otavia.core.system.monitor.{ReactorMonitor, SystemMonitor, SystemMonitorTask, ThreadMonitor}
import cc.otavia.core.timer.{Timeout, Timer, TimerImpl}
import cc.otavia.core.transport.TransportFactory

import java.io.File
import java.lang.management.{ManagementFactory, MemoryMXBean}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.StandardOpenOption
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.{MILLISECONDS, MINUTES, TimeUnit}
import scala.concurrent.duration.MILLISECONDS
import scala.language.unsafeNulls

final private[core] class ActorSystemImpl(val name: String, val actorThreadFactory: ActorThreadFactory)
Expand Down Expand Up @@ -148,20 +141,17 @@ final private[core] class ActorSystemImpl(val name: String, val actorThreadFacto
address match
case addr: ActorAddress[?] => addr.house.mount()
case robinAddress: RobinAddress[?] =>
robinAddress.underlying.foreach { addr =>
addr.asInstanceOf[ActorAddress[?]].house.mount()
}
robinAddress.underlying.foreach { addr => addr.house.mount() }
}

private def setActorContext(actor: AbstractActor[?], thread: ActorThread): ActorAddress[?] = {
private def setActorContext(actor: AbstractActor[?], thread: ActorThread, lb: Boolean = false): ActorAddress[?] = {
val house = thread.createActorHouse()
house.setActor(actor)
val address = house.createUntypedAddress()
val context = ActorContext(this, address, generator.getAndIncrement())
house.setActorId(generator.getAndIncrement())
house.setLB(lb)
actor.setCtx(house)

actor.setCtx(context)

address
house.address
}

private def createActor(factory: ActorFactory[?], num: Int): (Address[?], Class[?]) = {
Expand All @@ -172,6 +162,13 @@ final private[core] class ActorSystemImpl(val name: String, val actorThreadFacto
val address = setActorContext(actor, thread)
logger.debug(s"Created actor $actor")
(address, actor.getClass)
} else if (num == pool.size) {
val address = pool.workers.map { thread =>
val actor = factory.create().asInstanceOf[AbstractActor[? <: Call]]
setActorContext(actor, thread, true)
}
val clz = address.head.house.actor.getClass
(new RobinAddress[Call](address.asInstanceOf[Array[ActorAddress[Call]]], true), clz)
} else if (num > 1) {
val range = (0 until num).toArray
val actors = range.map(_ => factory.create().asInstanceOf[AbstractActor[? <: Call]])
Expand All @@ -180,8 +177,6 @@ final private[core] class ActorSystemImpl(val name: String, val actorThreadFacto
val address = range.map { index =>
val actor = actors(index)
val thread = threads(index)
actor.instances = range.length
actor.instanceIndex = index
setActorContext(actor, thread)
}
logger.debug(s"Created actors ${actors.mkString("Array(", ", ", ")")}")
Expand Down
6 changes: 6 additions & 0 deletions core/src/cc/otavia/core/system/ActorThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ final class ActorThread(private[core] val system: ActorSystem) extends Thread()

private[core] def createActorHouse(): ActorHouse = {
val house = new ActorHouse(manager)
house.createUntypedAddress()
house
}

Expand Down Expand Up @@ -192,10 +193,15 @@ final class ActorThread(private[core] val system: ActorSystem) extends Thread()

override def run(): Unit = {
status = ST_RUNNING

val ioCtx = new IoExecutionContext {
override def canBlock: Boolean = !manager.runnable && refSet.isEmpty && eventQueue.isEmpty

override def canNotBlock: Boolean = manager.runnable || !eventQueue.isEmpty || refSet.nonEmpty
}

prepared()

while (!confirmShutdown()) {
// run current thread tasks
val stops = if (refSet.isEmpty) 0 else this.stopActors() // stop and release died actor.
Expand Down
17 changes: 9 additions & 8 deletions core/src/cc/otavia/core/system/HouseManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import cc.otavia.core.system.monitor.HouseManagerMonitor
import scala.collection.mutable
import scala.language.unsafeNulls

class HouseManager(val thread: ActorThread) {
final class HouseManager(val thread: ActorThread) {

private val logger = Logger.getLogger(getClass, thread.system)

Expand All @@ -50,12 +50,14 @@ class HouseManager(val thread: ActorThread) {

private var currentRunning: Actor[?] = _

def system: ActorSystem = thread.system

private[core] def currentRunningActor: Actor[?] = currentRunning

def laterTasks: mutable.ArrayDeque[Runnable] = thread.laterTasks

def runnable: Boolean =
actorQueue.nonEmpty || mountingQueue.nonEmpty || channelsActorQueue.nonEmpty // || serverActorQueue.nonEmpty
actorQueue.nonEmpty || channelsActorQueue.nonEmpty || mountingQueue.nonEmpty // || serverActorQueue.nonEmpty

def mount(house: ActorHouse): Unit = {
mountingQueue.enqueue(house)
Expand Down Expand Up @@ -87,7 +89,7 @@ class HouseManager(val thread: ActorThread) {
}
}

final private def adjustPriority(queue: PriorityHouseQueue, house: ActorHouse): Unit = {
private def adjustPriority(queue: PriorityHouseQueue, house: ActorHouse): Unit = {
if (queue.adjust(house)) {
queue.enqueue(house)
}
Expand All @@ -99,7 +101,7 @@ class HouseManager(val thread: ActorThread) {
* true if run some [[ActorHouse]], otherwise false.
*/
def run(): Boolean = {
runningStart = System.nanoTime()
// runningStart = System.nanoTime()

var success = false

Expand All @@ -113,19 +115,18 @@ class HouseManager(val thread: ActorThread) {
val house = mountingQueue.dequeue()
if (house != null) {
currentRunning = house.actor
if (house.actorType == CHANNELS_ACTOR) thread.prepared()
house.doMount()
currentRunning = null
success = true
}
}

runningStart = Long.MaxValue
// runningStart = Long.MaxValue

success
}

final private def run0(houseQueue: HouseQueue): Boolean = {
private def run0(houseQueue: HouseQueue): Boolean = {
val house = houseQueue.dequeue()
if (house != null) {
currentRunning = house.actor
Expand Down Expand Up @@ -160,7 +161,7 @@ class HouseManager(val thread: ActorThread) {
}

/** Steal running by other [[ActorThread]] */
final private def runSteal(): Boolean = {
private def runSteal(): Boolean = {
if (actorQueue.available) {
val house = actorQueue.dequeue()
if (house != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,9 @@ final class NioHandler(val selectorProvider: SelectorProvider, val selectStrateg
@throws[IOException]
private def selectNow(): Int = selector.selectNow()

@throws[IOException]
private def select(context: IoExecutionContext): Unit = {
try {
if (!context.canBlock) {
if (context.canNotBlock) {
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
Expand Down

0 comments on commit ebecdac

Please sign in to comment.