Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limiter primitives #235

Merged
merged 25 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions core/src/main/scala/ox/resilience/GenericRateLimiter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package ox.resilience

import java.util.concurrent.Semaphore
import GenericRateLimiter.*
import ox.resilience.GenericRateLimiter.Strategy.Blocking
import ox.*

/** Rate limiter which allows to pass a configuration value to the execution. This can include both runtime and compile time information,
* allowing for customization of return types and runtime behavior. If the only behavior needed is to block or drop operations, the
* `RateLimiter` class provides a simpler interface.
*/
case class GenericRateLimiter[Returns[_[_]] <: Strategy[_]](
executor: Executor[Returns],
algorithm: RateLimiterAlgorithm
):

import GenericRateLimiter.Strategy.given

/** Limits the rate of execution of the given operation with a custom Result type
*/
def apply[T, Result[_]](operation: => T)(using Returns[Result]): Result[T] =
executor.schedule(algorithm, operation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the schedule/execute distinction needed? can't it be combined in a single method call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically yes but it seems to me better organized that way. If updating is now done by the GenericRateLimiter, I think we need to pass a semaphore to allow locking and unlocking of the updater so we would need both.

executor.execute(algorithm, operation)
end apply
end GenericRateLimiter

object GenericRateLimiter:

type Id[A] = A

/** Describe the execution strategy that must be used by the rate limiter in a given operation. It allows the encoding of return types and
* custom runtime behavior.
*/
sealed trait Strategy[F[*]]:
def run[T](operation: => T): F[T]

object Strategy:
sealed trait Blocking[F[*]] extends Strategy[F]
sealed trait Dropping[F[*]] extends Strategy[F]
sealed trait BlockOrDrop[F[*]] extends Strategy[F]

case class Block() extends Blocking[Id] with BlockOrDrop[Id]:
def run[T](operation: => T): T = operation

case class Drop() extends Dropping[Option] with BlockOrDrop[Option]:
def run[T](operation: => T): Option[T] = Some(operation)

given Blocking[Id] = Block()
given Dropping[Option] = Drop()
end Strategy

/** Determines the policy to apply when the rate limiter is full. The executor is responsible of managing the inner state of the algorithm
* employed. In particular, it must ensure that operations are executed only if allowed and that the algorithm is updated.
*/
trait Executor[Returns[_[_]] <: Strategy[_]]:

/** Performs any tasks needed to delay the operation or alter the execution mode. Usually, this will involve using `acquire` or
* `tryAcquire` methods from the algorithm and taking care of updating it.
*/
def schedule[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using Returns[Result]): Unit

/** Executes the operation and returns the expected result depending on the strategy. It might perform scheduling tasks if they are not
* independent from the execution.
*/
def execute[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using Returns[Result]): Result[T]

/** Runs the operation and returns the result using the given strategy.
*/
def run[T, Result[_]](operation: => T)(using cfg: Returns[Result]): Result[T] =
cfg.run(operation).asInstanceOf[Result[T]]

end Executor

object Executor:
/** Block rejected operations until the rate limiter is ready to accept them.
*/
case class Block() extends Executor[Strategy.Blocking]:

val updateLock = new Semaphore(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the update lock needed? we're always starting update as a background proces in a fork, no? and updating only from that fork

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a way to avoid two semaphores: one is needed to block and unblock the updater so all performed updates are really needed. The other one in this case is to avoid race conditions when giving permits and avoiding giving more than 1.

Although if we just let the updater run in the background whether it's needed or not, it would simplify the code, also for downstream users implementing their own algorithm. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ... I thought the updater is always run in the background. What's the scenario for not running it in the background?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this simplification sounds good, RateLimiter needs the Ox capability anyway


val schedule = new Semaphore(1)

def execute[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using cfg: Strategy.Blocking[Result]): Result[T] =
cfg.run(operation)

def schedule[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using Strategy.Blocking[Result[*]]): Unit =
if !algorithm.tryAcquire then
// starts scheduler if not already running
if schedule.tryAcquire() then
supervised:
val _ = forkUser:
runScheduler(algorithm)
pablf marked this conversation as resolved.
Show resolved Hide resolved
()
algorithm.acquire

private def runScheduler(algorithm: RateLimiterAlgorithm): Unit =
val waitTime = algorithm.getNextTime()
algorithm.update
if waitTime > 0 then
pablf marked this conversation as resolved.
Show resolved Hide resolved
val millis = waitTime / 1000000
val nanos = waitTime % 1000000
Thread.sleep(millis, nanos.toInt)
runScheduler(algorithm)
else schedule.release()
end runScheduler

end Block

/** Drops rejected operations
*/
case class Drop() extends Executor[Strategy.Dropping]:

def schedule[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using Strategy.Dropping[Result[*]]): Unit =
()

def execute[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using cfg: Strategy.Dropping[Result[*]]): Result[T] =
algorithm.update
if algorithm.tryAcquire then cfg.run(operation)
else None.asInstanceOf[Result[T]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we're not trying to be overly flexible here. Drop on one hand seems to work with any result type, but in practice requires an option (because of the case here). Maybe simply the executor should have a fixed return type (Block - identity, Drop - Option). Would we loose any flexibility then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the main problem is then to integrate them easily with GenericRateLimiter. If we are going for fixed return type I would put all the logic inside RateLimiter because otherwise it's just wrapping of logic.

In the recent push I've deleted the GRL and Executor classes and passed the updating logic to RateLimiter. If any user wants to customize how the algorithm is manipulated, then the easiest way would be to create its own interface. I've also updated the docs and tests.


end Drop

/** Blocks rejected operations until the rate limiter is ready to accept them or drops them depending on the choosen strategy.
*/
case class BlockOrDrop() extends Executor[Strategy.BlockOrDrop]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this hybrid strategy looks weird ... maybe we should only pass the strategies as a parameter, instead of parametrizing the whole rate limiter with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what are you thinking exactly. Something like this would be equivalent but would also accept "bad" strategies.

case class GenericRateLimiter {
  def apply[T, Result[_], Returns[_[_]] <: Strategy[_]](
    operation: => T
  )(using Returns[Result]): Result[T]
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad in what sense? We determine the strategy at RateLimiter's call site, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the RateLimiter level there wouldn't be any problem but the point of having GenericRateLimiter would be to allow customizing through passing a Strategy possibly customized by a user. The user could use a strategy for which there is not a corresponding executor. Parametrizing the GRL would make any Strategy not extending the Returns type a compile error.

It should be possible to pass directly the executor, although depending on the use it might create problems, e.g., if the user creates a custom executor with internal state and doesn't reuse the same executor in different calls to the rate limiter or if different executors need some common internal state. It would make also more difficult to pass a parameter to customize executor behaviour if there is some internal state that needs to be shared.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see the problem. But this BlockOrDrop is fishy anyway. In its update call, you only call blockExecutor.update. Shouldn't the executor be somehow shared? What if the user calls .runBlocking and .runOrDrop interchangably? Would be good to have a test which checks for such combinations. And this either needs simplification, or good docs why this is done this way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the BlockOrDrop executor at the moment just redirects to the appropriate executor depending on the strategy. I can expand the tests to make more coverage of the behaviour, but I am not really sure what kind of simplifications do you have in mind. For this particular executor, I don't see any need for shared state (after simplifying there will be no internal state in any case). The internal state to check whether an operation can be accepted is always in the RateLimiterAlgorithm while the executor should only be concerned with how this information is used.

This will disappear after simplifying the updating so the following is not important but might provide context. BlockOrDrop only called the block updater because the drop updater didn't do anything. A common method to update rate limiters is when they receive a call so there are no background threads involved. The problem is that this only works for the drop executor while the blocking one needs some kind of queueing mechanism and thus background updating. Although in the case of fair blocking it might introduce unfairness to the BlockOrDrop if there is not shared state.


val blockExecutor = Block()
val dropExecutor = Drop()

def execute[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using cfg: Strategy.BlockOrDrop[Result]): Result[T] =
cfg match
case cfg: Strategy.Block =>
blockExecutor.execute(algorithm, operation)(using cfg.asInstanceOf[Strategy.Blocking[Result]])
case cfg: Strategy.Drop =>
dropExecutor.execute(algorithm, operation)(using cfg)

def schedule[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using Strategy.BlockOrDrop[Result]): Unit =
implicitly[Strategy.BlockOrDrop[Result]] match
case cfg: Strategy.Block =>
blockExecutor.schedule(algorithm, operation)(using cfg.asInstanceOf[Strategy.Blocking[Result]])
case cfg: Strategy.Drop =>
dropExecutor.schedule(algorithm, operation)(using cfg)
end BlockOrDrop

end Executor
end GenericRateLimiter
57 changes: 57 additions & 0 deletions core/src/main/scala/ox/resilience/RateLimiter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package ox.resilience

import scala.concurrent.duration.*
import ox.*

/** Rate Limiter with customizable algorithm. It allows to choose between blocking or dropping an operation.
*/
case class RateLimiter(
algorithm: RateLimiterAlgorithm
):
import GenericRateLimiter.*

private val rateLimiter =
supervised:
pablf marked this conversation as resolved.
Show resolved Hide resolved
GenericRateLimiter(Executor.BlockOrDrop(), algorithm)

/** Blocks the operation until the rate limiter allows it.
*/
def runBlocking[T](operation: => T): T = rateLimiter(operation)(using Strategy.Block())

/** Drops the operation if not allowed by the rate limiter returning `None`.
*/
def runOrDrop[T](operation: => T): Option[T] = rateLimiter(operation)(using Strategy.Drop())

end RateLimiter

object RateLimiter:

def leakyBucket(
capacity: Int,
leakInterval: FiniteDuration
): RateLimiter =
RateLimiter(RateLimiterAlgorithm.LeakyBucket(capacity, leakInterval))
end leakyBucket

def tokenBucket(
maxTokens: Int,
refillInterval: FiniteDuration
): RateLimiter =
RateLimiter(RateLimiterAlgorithm.TokenBucket(maxTokens, refillInterval))
end tokenBucket

def fixedRate(
maxRequests: Int,
windowSize: FiniteDuration
): RateLimiter =
RateLimiter(RateLimiterAlgorithm.FixedRate(maxRequests, windowSize))
end fixedRate

def slidingWindow(
maxRequests: Int,
windowSize: FiniteDuration
): RateLimiter =
RateLimiter(RateLimiterAlgorithm.SlidingWindow(maxRequests, windowSize))
end slidingWindow

end RateLimiter
189 changes: 189 additions & 0 deletions core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package ox.resilience

import ox.*
import ox.resilience.RateLimiterAlgorithm.*
import scala.concurrent.duration.*
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.*
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.Semaphore
import java.util.{LinkedList, Queue}

/** Determines the algorithm to use for the rate limiter
*/
trait RateLimiterAlgorithm:

/** Acquire a permit to execute the operation. This method should block until a permit is available.
*/
def acquire: Unit

/** Try to acquire a permit to execute the operation. This method should not block.
*/
def tryAcquire: Boolean

/** Updates the internal state of the rate limiter to check whether new operations can be accepted.
*/
def update: Unit

/** Returns the time until the next operation can be accepted to be used by the `GenericRateLimiter.Executor`. It should return 0 only if
* there is no need of rescheduling an update in the future. It should not modify internal state.
*/
def getNextTime(): Long

end RateLimiterAlgorithm

object RateLimiterAlgorithm:
/** Fixed rate algorithm
*/
case class FixedRate(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm:
private val lastUpdate = new AtomicLong(System.nanoTime())
private val semaphore = new Semaphore(rate)
val lock = new java.util.concurrent.locks.ReentrantLock()

def acquire: Unit =
semaphore.acquire()

def tryAcquire: Boolean =
semaphore.tryAcquire()

def getNextTime(): Long =
val waitTime = lastUpdate.get() + per.toNanos - System.nanoTime()
val q = semaphore.getQueueLength()
if waitTime > 0 then waitTime
else if q > 0 then per.toNanos
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why if q>0 the wait time is per?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It hanged before but I think it might be better to return here an Option[Long] so we can differentiate between
no updating None, updating only once Some(0L) and continue updating.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but do we ever stop updating, if it's done in a background process?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be possible to only schedule if needed, for example, if there are no calls in 10 minutes surpassing the rate and it updates each minute, we could update after 10 minutes when the rate is surpassed. If there is a thread anyway instead of starting one only when needed, I don't think we gain much. Probably better to just schedule always.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think that optimization wouldn't save much. Let's simplify an schedule always

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great then :)

else 0L

def update: Unit =
val now = System.nanoTime()
lastUpdate.updateAndGet { time =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to docs, updateAndGet should be side-effect free - here, we're manipulating the semaphores

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we should also move all the updating mechanism to GenericRateLimiter so that update doesn't need to be thread-safe and avoid some atomic references in the algorithm's implementations

if time + per.toNanos < now then
semaphore.drainPermits()
semaphore.release(rate)
now
else time
}
()
end update

end FixedRate

/** Sliding window algorithm
*/
case class SlidingWindow(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm:
private val log = new AtomicReference[Queue[Long]](new LinkedList[Long]())
private val semaphore = new Semaphore(rate)

def acquire: Unit =
semaphore.acquire()
val now = System.nanoTime()
log.updateAndGet { q =>
q.add(now)
q
}
()
end acquire

def tryAcquire: Boolean =
if semaphore.tryAcquire() then
val now = System.nanoTime()
log.updateAndGet { q =>
q.add(now)
q
}
true
else false

def getNextTime(): Long =
val furtherLog = log.get().peek()
if null eq furtherLog then
if semaphore.getQueueLength() > 0 then per.toNanos
else 0L
else
val waitTime = log.get().peek() + per.toNanos - System.nanoTime()
val q = semaphore.getQueueLength()
if waitTime > 0 then waitTime
else if q > 0 then
update
getNextTime()
else 0L
end if
end getNextTime

def update: Unit =
val now = System.nanoTime()
while semaphore.availablePermits() < rate && log
.updateAndGet { q =>
if q.peek() < now - per.toNanos then
q.poll()
semaphore.release()
q
else q
}
.peek() < now - per.toNanos
do ()
end while
end update

end SlidingWindow

/** Token bucket algorithm
*/
case class TokenBucket(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm:
private val refillInterval = per.toNanos
private val lastRefillTime = new AtomicLong(System.nanoTime())
private val semaphore = new Semaphore(1)

def acquire: Unit =
semaphore.acquire()

def tryAcquire: Boolean =
semaphore.tryAcquire()

def getNextTime(): Long =
val waitTime = lastRefillTime.get() + refillInterval - System.nanoTime()
val q = semaphore.getQueueLength()
if waitTime > 0 then waitTime
else if q > 0 then refillInterval
else 0L

def update: Unit =
val now = System.nanoTime()
val elapsed = now - lastRefillTime.get()
val newTokens = elapsed / refillInterval
lastRefillTime.set(newTokens * refillInterval + lastRefillTime.get())
semaphore.release(newTokens.toInt)

end TokenBucket

/** Leaky bucket algorithm
*/
case class LeakyBucket(capacity: Int, leakRate: FiniteDuration) extends RateLimiterAlgorithm:
private val leakInterval = leakRate.toNanos
private val lastLeakTime = new AtomicLong(System.nanoTime())
private val semaphore = new Semaphore(capacity)

def acquire: Unit =
semaphore.acquire()

def tryAcquire: Boolean =
semaphore.tryAcquire()

def getNextTime(): Long =
val waitTime = lastLeakTime.get() + leakInterval - System.nanoTime()
val q = semaphore.getQueueLength()
if waitTime > 0 then waitTime
else if q > 0 then leakInterval
else 0L

def update: Unit =
val now = System.nanoTime()
val lastLeak = lastLeakTime.get()
val elapsed = now - lastLeak
val leaking = elapsed / leakInterval
val newTime = leaking * leakInterval + lastLeak
semaphore.release(leaking.toInt)
lastLeakTime.set(newTime)
end update

end LeakyBucket
end RateLimiterAlgorithm
Loading
Loading