Skip to content

Commit

Permalink
feat: Refactoring actor message scheduling policies
Browse files Browse the repository at this point in the history
  • Loading branch information
yankun1992 committed Sep 13, 2024
1 parent ebecdac commit f95c09c
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 134 deletions.
21 changes: 0 additions & 21 deletions core/src/cc/otavia/core/actor/Actor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import cc.otavia.core.actor.Actor.*
import cc.otavia.core.address.Address
import cc.otavia.core.message.*
import cc.otavia.core.reactor.Reactor
import cc.otavia.core.stack.*
import cc.otavia.core.system.ActorSystem
import cc.otavia.core.timer.Timer

Expand Down Expand Up @@ -78,14 +77,6 @@ trait Actor[+M <: Call] {

def maxFetchPerRunning: Int = system.defaultMaxFetchPerRunning

def niceAsk: Int = NICE_ASK

def niceReply: Int = NICE_REPLY

def niceNotice: Int = NICE_NOTICE

def niceEvent: Int = NICE_EVENT

def nice: Int = 8

/** user actor override this to control whether restart when occur exception */
Expand Down Expand Up @@ -165,16 +156,4 @@ object Actor {
val ASK_TYPE: MessageType = 1
val REPLY_TYPE: MessageType = 2

private val NICE_MESSAGE_DEFAULT = 16
private val NICE_MESSAGE = SystemPropertyUtil.getInt("cc.otavia.core.actor.nice.message", NICE_MESSAGE_DEFAULT)

private val NICE_ASK = SystemPropertyUtil.getInt("cc.otavia.core.actor.nice.ask", NICE_MESSAGE)

private val NICE_REPLY = SystemPropertyUtil.getInt("cc.otavia.core.actor.nice.reply", NICE_MESSAGE)

private val NICE_NOTICE = SystemPropertyUtil.getInt("cc.otavia.core.actor.nice.notice", NICE_MESSAGE)

private val NICE_EVENT_DEFAULT = 32
private val NICE_EVENT = SystemPropertyUtil.getInt("cc.otavia.core.actor.nice.notice", NICE_EVENT_DEFAULT)

}
2 changes: 1 addition & 1 deletion core/src/cc/otavia/core/actor/MessageOf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package cc.otavia.core.actor

import cc.otavia.core.message.Call

/** Message type of a actor can receive */
/** Message type of actor can receive */
type MessageOf[A <: Actor[?]] <: Call = A match {
case Actor[m] => m
}
20 changes: 11 additions & 9 deletions core/src/cc/otavia/core/channel/AbstractChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import cc.otavia.core.message.*
import cc.otavia.core.slf4a.Logger
import cc.otavia.core.stack.*
import cc.otavia.core.stack.helper.ChannelFutureState
import cc.otavia.core.system.{ActorSystem, ActorThread}
import cc.otavia.core.system.{ActorHouse, ActorSystem, ActorThread}

import java.net.SocketAddress
import java.nio.ByteBuffer
Expand All @@ -42,8 +42,8 @@ abstract class AbstractChannel(val system: ActorSystem) extends Channel, Channel

import AbstractChannel.*

protected val logger: Logger = Logger.getLogger(getClass, system)
private var actor: ChannelsActor[?] = _
protected val logger: Logger = Logger.getLogger(getClass, system)
private var actorHouse: ActorHouse = _

private var channelId: Int = -1
private var pipe: ChannelPipelineImpl = _
Expand All @@ -69,8 +69,6 @@ abstract class AbstractChannel(val system: ActorSystem) extends Channel, Channel

protected var outboundQueue: mutable.ArrayDeque[AdaptiveBufferOffset | FileRegion] = mutable.ArrayDeque.empty

protected var mountedThread: ActorThread = _

// initial channel state on constructing
created = true
registering = false
Expand All @@ -92,8 +90,12 @@ abstract class AbstractChannel(val system: ActorSystem) extends Channel, Channel
closed = false
closing = false

private def actor: ChannelsActor[?] = actorHouse.actor.asInstanceOf[ChannelsActor[?]]

override final def isMounted: Boolean = mounted

protected def mountedThread: ActorThread = actorHouse.manager.thread

// impl ChannelInflight

/** Set inbound message barrier function
Expand Down Expand Up @@ -270,10 +272,8 @@ abstract class AbstractChannel(val system: ActorSystem) extends Channel, Channel

final private[core] def mount(channelsActor: ChannelsActor[?]): Unit = {
assert(!mounted, s"The channel $this has been mounted already, you can't mount it twice!")
actor = channelsActor
val thread = ActorThread.currentThread()
mountedThread = thread
channelId = executor.generateChannelId()
actorHouse = channelsActor.context.asInstanceOf[ActorHouse]
channelId = channelsActor.generateChannelId()
pipe = newChannelPipeline()
mounted = true
}
Expand Down Expand Up @@ -482,6 +482,8 @@ abstract class AbstractChannel(val system: ActorSystem) extends Channel, Channel
if (resize) outboundQueue.clearAndShrink()
}

override final def flushable(): Boolean = outboundQueue.nonEmpty

private[core] def bindTransport(local: SocketAddress, channelPromise: ChannelPromise): Unit

// format: off
Expand Down
2 changes: 0 additions & 2 deletions core/src/cc/otavia/core/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ trait Channel extends ChannelAddress {

final def writeAndFlush(msg: AnyRef, msgId: Long): Unit = pipeline.writeAndFlush(msg, msgId)

final def flush(): this.type = { pipeline.flush(); this }

final def sendOutboundEvent(event: AnyRef): Unit = pipeline.sendOutboundEvent(event)

final def assertExecutor(): Unit =
Expand Down
16 changes: 11 additions & 5 deletions core/src/cc/otavia/core/channel/ChannelAddress.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@

package cc.otavia.core.channel

import cc.otavia.core.actor.{Actor, ChannelsActor}
import cc.otavia.core.actor.Actor
import cc.otavia.core.channel.message.ReadPlan
import cc.otavia.core.reactor.Reactor
import cc.otavia.core.stack.ChannelFuture
import cc.otavia.core.stack.helper.ChannelFutureState
import cc.otavia.core.stack.{ChannelFuture, Future}
import cc.otavia.core.system.ActorSystem

import java.io.File
import java.net.SocketAddress
import java.nio.channels.FileChannel
import java.nio.file.attribute.FileAttribute
import java.nio.file.{FileAlreadyExistsException, OpenOption, Path}
import java.nio.file.{OpenOption, Path}
import scala.language.unsafeNulls

/** An interface for [[Channel]] used in [[Actor]] */
Expand Down Expand Up @@ -269,7 +268,7 @@ trait ChannelAddress {
this
}

/** actor send ask message to channel, in underlying, it call channel.write
/** actor send ask message to channel, in underlying, it calls channel.write
* @param value
* ask message which need this [[Channel]] relpy a message.
* @param future
Expand All @@ -286,4 +285,11 @@ trait ChannelAddress {

def batchNotice(notices: Seq[AnyRef]): Unit

final def flush(): this.type = {
pipeline.flush()
this
}

def flushable(): Boolean

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package cc.otavia.core.channel
import cc.otavia.core.actor.ChannelsActor
import cc.otavia.core.channel.message.ReadPlan
import cc.otavia.core.reactor.Reactor
import cc.otavia.core.stack.{ChannelFuture, Future}
import cc.otavia.core.stack.ChannelFuture
import cc.otavia.core.system.ActorSystem

import java.io.File
Expand Down
151 changes: 76 additions & 75 deletions core/src/cc/otavia/core/system/ActorHouse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ final private[core] class ActorHouse(val manager: HouseManager) extends ActorCon
private val exceptionMailbox: MailBox = new MailBox(this)
private val eventMailbox: MailBox = new MailBox(this)

private var tmpAskCursor: Nextable = _
private var tmpNoticeCursor: Nextable = _

private[system] val status: AtomicInteger = new AtomicInteger(CREATED)

@volatile private var preHouse: ActorHouse = _
Expand Down Expand Up @@ -166,70 +169,15 @@ final private[core] class ActorHouse(val manager: HouseManager) extends ActorCon

def run(): Unit = {
if (status.compareAndSet(SCHEDULED, RUNNING)) {
if (replyMailbox.size() > dweller.niceReply * 2) dispatchReplies(dweller.niceReply * 2)
else {
if (replyMailbox.nonEmpty) { dispatchReplies(dweller.niceReply) }
if (exceptionMailbox.nonEmpty) { dispatchExceptions(dweller.niceReply) }
if (eventMailbox.nonEmpty) { dispatchEvents(dweller.niceEvent) }
if (!dweller.inBarrier && noticeMailbox.nonEmpty) {
if (dweller.batchable) {
var cursor = noticeMailbox.getChain(dweller.maxBatchSize)
val buf = ActorThread.threadBuffer[Notice]
while (cursor != null) {
val envelope = cursor.asInstanceOf[Envelope[?]]
cursor = envelope.next
envelope.deChain()
val notice = envelope.message.asInstanceOf[Notice]
if (dweller.batchNoticeFilter(notice)) {
buf.addOne(notice)
// TODO: recycle envelope
} else {
if (buf.nonEmpty) handleBatchNotice(buf)
dweller.receiveNotice(envelope)
runLaterTasks()
}
}
if (buf.nonEmpty) handleBatchNotice(buf)
} else {
var cursor = noticeMailbox.getChain(dweller.niceNotice)
while (cursor != null) {
val msg = cursor
cursor = msg.next
msg.deChain()
dweller.receiveNotice(msg.asInstanceOf[Envelope[?]])
runLaterTasks()
}
}
}
if (!dweller.inBarrier && askMailbox.nonEmpty) {
if (dweller.batchable) {
var cursor = askMailbox.getChain(dweller.maxBatchSize)
val buf = ActorThread.threadBuffer[Envelope[Ask[?]]]
while (cursor != null) {
val envelope = cursor.asInstanceOf[Envelope[Ask[?]]]
cursor = envelope.next
envelope.deChain()
val ask = envelope.message
if (dweller.batchAskFilter(ask)) buf.addOne(envelope)
else {
if (buf.nonEmpty) handleBatchAsk(buf)
dweller.receiveAsk(envelope)
runLaterTasks()
}
}
if (buf.nonEmpty) handleBatchAsk(buf)
} else {
var cursor = askMailbox.getChain(dweller.niceAsk)
while (cursor != null) {
val msg = cursor
cursor = msg.next
msg.deChain()
dweller.receiveAsk(msg.asInstanceOf[Envelope[Ask[?]]])
runLaterTasks()
}
}
}
}
if (replyMailbox.nonEmpty) dispatchReplies()
if (exceptionMailbox.nonEmpty) dispatchExceptions()

if (!dweller.inBarrier && askMailbox.nonEmpty) dispatchAsks()
if (!dweller.inBarrier && noticeMailbox.nonEmpty) dispatchNotices()

if (eventMailbox.nonEmpty) dispatchEvents()

runLaterTasks()

completeRunning()
}
Expand All @@ -251,51 +199,104 @@ final private[core] class ActorHouse(val manager: HouseManager) extends ActorCon
}
}

private def dispatchReplies(size: Int): Unit = {
var cursor = replyMailbox.getChain(size)
private def dispatchAsks(): Unit = if (dweller.batchable) dispatchBatchAsks() else dispatchAsks0()

private def dispatchAsks0(): Unit = {
if (tmpAskCursor == null) tmpAskCursor = askMailbox.getAll
while (tmpAskCursor != null && !dweller.inBarrier) {
val msg = tmpAskCursor
tmpAskCursor = msg.next
msg.deChain()
dweller.receiveAsk(msg.asInstanceOf[Envelope[Ask[?]]])
}
}

private def dispatchBatchAsks(): Unit = {
if (tmpAskCursor == null) tmpAskCursor = askMailbox.getAll
val buf = ActorThread.threadBuffer[Envelope[Ask[?]]]
while (tmpAskCursor != null && !dweller.inBarrier) {
val envelope = tmpAskCursor.asInstanceOf[Envelope[Ask[?]]]
tmpAskCursor = envelope.next
envelope.deChain()
val ask = envelope.message
if (dweller.batchAskFilter(ask)) buf.addOne(envelope)
else {
if (buf.nonEmpty) handleBatchAsk(buf)
dweller.receiveAsk(envelope)
}
}
if (buf.nonEmpty) handleBatchAsk(buf)
}

private def dispatchNotices(): Unit = if (dweller.batchable) dispatchBatchNotices() else dispatchNotices0()

private def dispatchNotices0(): Unit = {
if (tmpNoticeCursor == null) tmpNoticeCursor = noticeMailbox.getAll
while (tmpNoticeCursor != null && !dweller.inBarrier) {
val msg = tmpNoticeCursor
tmpNoticeCursor = msg.next
msg.deChain()
dweller.receiveNotice(msg.asInstanceOf[Envelope[?]])
}
}

private def dispatchBatchNotices(): Unit = {
if (tmpNoticeCursor == null) tmpNoticeCursor = noticeMailbox.getAll
val buf = ActorThread.threadBuffer[Notice]
while (tmpNoticeCursor != null && !dweller.inBarrier) {
val envelope = tmpNoticeCursor.asInstanceOf[Envelope[?]]
tmpNoticeCursor = envelope.next
envelope.deChain()
val notice = envelope.message.asInstanceOf[Notice]
if (dweller.batchNoticeFilter(notice)) buf.addOne(notice)
else {
if (buf.nonEmpty) handleBatchNotice(buf)
dweller.receiveNotice(envelope)
}
}
if (buf.nonEmpty) handleBatchNotice(buf)
}

private def dispatchReplies(): Unit = {
var cursor = replyMailbox.getAll
while (cursor != null) {
val msg = cursor
cursor = msg.next
msg.deChain()
dweller.receiveReply(msg.asInstanceOf[Envelope[?]])
runLaterTasks()
}
}

private def dispatchExceptions(size: Int): Unit = {
var cursor = exceptionMailbox.getChain(size)
private def dispatchExceptions(): Unit = {
var cursor = exceptionMailbox.getAll
while (cursor != null) {
val msg = cursor
cursor = msg.next
msg.deChain()
dweller.receiveExceptionReply(msg.asInstanceOf[Envelope[?]])
runLaterTasks()
}
}

private def dispatchEvents(size: Int): Unit = {
var cursor = eventMailbox.getChain(size)
private def dispatchEvents(): Unit = {
var cursor = eventMailbox.getAll
while (cursor != null) {
val msg = cursor.asInstanceOf[Event]
cursor = msg.next
msg.deChain()
dweller.receiveEvent(msg)
runLaterTasks()
}
}

private def handleBatchNotice(buf: mutable.ArrayBuffer[Notice]): Unit = {
val notices = buf.toSeq
buf.clear()
dweller.receiveBatchNotice(notices)
runLaterTasks()
}

private def handleBatchAsk(buf: mutable.ArrayBuffer[Envelope[Ask[?]]]): Unit = {
val asks = buf.toSeq
buf.clear()
dweller.receiveBatchAsk(asks)
runLaterTasks()
}

private def runLaterTasks(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/cc/otavia/core/system/ActorSystemImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ final private[core] class ActorSystemImpl(val name: String, val actorThreadFacto

override def defaultMaxFetchPerRunning: Int = ???

override def defaultMaxBatchSize: Int = 16
override def defaultMaxBatchSize: Int = 100000

// format: off
override def buildActor[A <: Actor[? <: Call]](factory: ActorFactory[A], num: Int = 1,
Expand Down
Loading

0 comments on commit f95c09c

Please sign in to comment.