Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limiter primitives #235

Merged
merged 25 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions core/src/main/scala/ox/resilience/GenericRateLimiter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package ox.resilience

import GenericRateLimiter.*
import ox.*

/** Rate limiter which allows to pass a configuration value to the execution. This can include both runtime and compile time information,
* allowing for customization of return types and runtime behavior. If the only behavior needed is to block or drop operations, the
* `RateLimiter` class provides a simpler interface.
*/
case class GenericRateLimiter[Returns[_[_]] <: Strategy[_]](
executor: Executor[Returns],
algorithm: RateLimiterAlgorithm
)(using Ox):

import GenericRateLimiter.Strategy.given

val _ =
fork:
update()

/** Limits the rate of execution of the given operation with a custom Result type
*/
def apply[T, Result[_]](operation: => T)(using Returns[Result]): Result[T] =
executor.execute(algorithm, operation)
end apply

private def update(): Unit =
val waitTime = algorithm.getNextUpdate
val millis = waitTime / 1000000
val nanos = waitTime % 1000000
Thread.sleep(millis, nanos.toInt)
algorithm.update
update()
end update
end GenericRateLimiter

object GenericRateLimiter:

type Id[A] = A

/** Describes the execution strategy that must be used by the rate limiter in a given operation. It allows the encoding of return types
* and custom runtime behavior.
*/
trait Strategy[F[*]]:
def run[T](operation: => T): F[T]

object Strategy:
sealed trait Blocking[F[*]] extends Strategy[F]
sealed trait Dropping[F[*]] extends Strategy[F]
sealed trait BlockOrDrop[F[*]] extends Strategy[F]

case class Block() extends Blocking[Id] with BlockOrDrop[Id]:
def run[T](operation: => T): T = operation

case class Drop() extends Dropping[Option] with BlockOrDrop[Option]:
def run[T](operation: => T): Option[T] = Some(operation)

given Blocking[Id] = Block()
given Dropping[Option] = Drop()
end Strategy

/** Determines the policy to apply when the rate limiter is full. The executor is responsible of managing the inner state of the algorithm
* employed. In particular, it must ensure that operations are executed only if allowed and that the algorithm is updated.
*/
trait Executor[Returns[_[_]] <: Strategy[_]]:
/** Executes the operation and returns the expected result depending on the strategy. It might perform scheduling tasks if they are not
* independent from the execution.
*/
def execute[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using Returns[Result]): Result[T]

/** Runs the operation and returns the result using the given strategy.
*/
def run[T, Result[_]](operation: => T)(using cfg: Returns[Result]): Result[T] =
cfg.run(operation).asInstanceOf[Result[T]]

end Executor

object Executor:
/** Block rejected operations until the rate limiter is ready to accept them.
*/
case class Block() extends Executor[Strategy.Blocking]:
def execute[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using cfg: Strategy.Blocking[Result]): Result[T] =
algorithm.acquire
run(operation)

end Block

/** Drops rejected operations
*/
case class Drop() extends Executor[Strategy.Dropping]:
def execute[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using cfg: Strategy.Dropping[Result[*]]): Result[T] =
if algorithm.tryAcquire then cfg.run(operation)
else None.asInstanceOf[Result[T]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we're not trying to be overly flexible here. Drop on one hand seems to work with any result type, but in practice requires an option (because of the case here). Maybe simply the executor should have a fixed return type (Block - identity, Drop - Option). Would we loose any flexibility then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the main problem is then to integrate them easily with GenericRateLimiter. If we are going for fixed return type I would put all the logic inside RateLimiter because otherwise it's just wrapping of logic.

In the recent push I've deleted the GRL and Executor classes and passed the updating logic to RateLimiter. If any user wants to customize how the algorithm is manipulated, then the easiest way would be to create its own interface. I've also updated the docs and tests.

end Drop

/** Blocks rejected operations until the rate limiter is ready to accept them or drops them depending on the choosen strategy.
*/
case class BlockOrDrop() extends Executor[Strategy.BlockOrDrop]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this hybrid strategy looks weird ... maybe we should only pass the strategies as a parameter, instead of parametrizing the whole rate limiter with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what are you thinking exactly. Something like this would be equivalent but would also accept "bad" strategies.

case class GenericRateLimiter {
  def apply[T, Result[_], Returns[_[_]] <: Strategy[_]](
    operation: => T
  )(using Returns[Result]): Result[T]
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad in what sense? We determine the strategy at RateLimiter's call site, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the RateLimiter level there wouldn't be any problem but the point of having GenericRateLimiter would be to allow customizing through passing a Strategy possibly customized by a user. The user could use a strategy for which there is not a corresponding executor. Parametrizing the GRL would make any Strategy not extending the Returns type a compile error.

It should be possible to pass directly the executor, although depending on the use it might create problems, e.g., if the user creates a custom executor with internal state and doesn't reuse the same executor in different calls to the rate limiter or if different executors need some common internal state. It would make also more difficult to pass a parameter to customize executor behaviour if there is some internal state that needs to be shared.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see the problem. But this BlockOrDrop is fishy anyway. In its update call, you only call blockExecutor.update. Shouldn't the executor be somehow shared? What if the user calls .runBlocking and .runOrDrop interchangably? Would be good to have a test which checks for such combinations. And this either needs simplification, or good docs why this is done this way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the BlockOrDrop executor at the moment just redirects to the appropriate executor depending on the strategy. I can expand the tests to make more coverage of the behaviour, but I am not really sure what kind of simplifications do you have in mind. For this particular executor, I don't see any need for shared state (after simplifying there will be no internal state in any case). The internal state to check whether an operation can be accepted is always in the RateLimiterAlgorithm while the executor should only be concerned with how this information is used.

This will disappear after simplifying the updating so the following is not important but might provide context. BlockOrDrop only called the block updater because the drop updater didn't do anything. A common method to update rate limiters is when they receive a call so there are no background threads involved. The problem is that this only works for the drop executor while the blocking one needs some kind of queueing mechanism and thus background updating. Although in the case of fair blocking it might introduce unfairness to the BlockOrDrop if there is not shared state.


val blockExecutor = Block()
val dropExecutor = Drop()

def execute[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using cfg: Strategy.BlockOrDrop[Result]): Result[T] =
cfg match
case cfg: Strategy.Block =>
blockExecutor.execute(algorithm, operation)(using cfg.asInstanceOf[Strategy.Blocking[Result]])
case cfg: Strategy.Drop =>
dropExecutor.execute(algorithm, operation)(using cfg)
end BlockOrDrop

end Executor
end GenericRateLimiter
69 changes: 69 additions & 0 deletions core/src/main/scala/ox/resilience/RateLimiter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package ox.resilience

import scala.concurrent.duration.FiniteDuration
import ox.Ox

/** Rate Limiter with customizable algorithm. It allows to choose between blocking or dropping an incoming operation.
*/
case class RateLimiter(
algorithm: RateLimiterAlgorithm
)(using Ox):
import GenericRateLimiter.*

private val rateLimiter =
GenericRateLimiter(Executor.BlockOrDrop(), algorithm)

/** Blocks the operation until the rate limiter allows it.
*/
def runBlocking[T](operation: => T): T = rateLimiter(operation)(using Strategy.Block())

/** Drops the operation if not allowed by the rate limiter returning `None`.
*/
def runOrDrop[T](operation: => T): Option[T] = rateLimiter(operation)(using Strategy.Drop())

end RateLimiter

object RateLimiter:

/** Rate limiter with fixed rate algorithm with possibility to drop or block an operation if not allowed to run
*
* @param maxRequests
* Maximum number of requests per consecutive window
* @param windowSize
* Interval of time to pass before reset of the rate limiter
*/
def fixedRate(
maxRequests: Int,
windowSize: FiniteDuration
)(using Ox): RateLimiter =
RateLimiter(RateLimiterAlgorithm.FixedRate(maxRequests, windowSize))
end fixedRate

/** Rate limiter with sliding window algorithm with possibility to drop or block an operation if not allowed to run
*
* @param maxRequests
* Maximum number of requests in any window of time
* @param windowSize
* Size of the window
*/
def slidingWindow(
maxRequests: Int,
windowSize: FiniteDuration
)(using Ox): RateLimiter =
RateLimiter(RateLimiterAlgorithm.SlidingWindow(maxRequests, windowSize))
end slidingWindow

/** Rate limiter with token/leaky bucket algorithm with possibility to drop or block an operation if not allowed to run
*
* @param maxTokens
* Max capacity of tokens in the algorithm
* @param refillInterval
* Interval of time after which a token is added
*/
def bucket(
maxTokens: Int,
refillInterval: FiniteDuration
)(using Ox): RateLimiter =
RateLimiter(RateLimiterAlgorithm.Bucket(maxTokens, refillInterval))
end bucket
end RateLimiter
150 changes: 150 additions & 0 deletions core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package ox.resilience

import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.Semaphore
import java.util.{LinkedList, Queue}

/** Determines the algorithm to use for the rate limiter
*/
trait RateLimiterAlgorithm:

/** Acquires a permit to execute the operation. This method should block until a permit is available.
*/
final def acquire: Unit =
acquire(1)

/** Acquires permits to execute the operation. This method should block until a permit is available.
*/
def acquire(permits: Int): Unit

/** Tries to acquire a permit to execute the operation. This method should not block.
*/
final def tryAcquire: Boolean =
tryAcquire(1)

/** Tries to acquire permits to execute the operation. This method should not block.
*/
def tryAcquire(permits: Int): Boolean

/** Updates the internal state of the rate limiter to check whether new operations can be accepted.
*/
def update: Unit

/** Returns the time in nanoseconds that needs to elapse until the next update. It should not modify internal state.
*/
def getNextUpdate: Long

end RateLimiterAlgorithm

object RateLimiterAlgorithm:
/** Fixed rate algorithm It allows starting at most `rate` operations in consecutively segments of duration `per`.
*/
case class FixedRate(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm:
private val lastUpdate = new AtomicLong(System.nanoTime())
private val semaphore = new Semaphore(rate)

def acquire(permits: Int): Unit =
semaphore.acquire(permits)

def tryAcquire(permits: Int): Boolean =
semaphore.tryAcquire(permits)

def getNextUpdate: Long =
val waitTime = lastUpdate.get() + per.toNanos - System.nanoTime()
if waitTime > 0 then waitTime else 0L

def update: Unit =
val now = System.nanoTime()
lastUpdate.set(now)
semaphore.release(rate - semaphore.availablePermits())
end update

end FixedRate

/** Sliding window algorithm It allows to start at most `rate` operations in the lapse of `per` before current time.
*/
case class SlidingWindow(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm:
// stores the timestamp and the number of permits acquired after calling acquire or tryAcquire succesfully
private val log = new AtomicReference[Queue[(Long, Int)]](new LinkedList[(Long, Int)]())
private val semaphore = new Semaphore(rate)

def acquire(permits: Int): Unit =
semaphore.acquire(permits)
// adds timestamp to log
val now = System.nanoTime()
log.updateAndGet { q =>
q.add((now, permits))
q
}
()
end acquire

def tryAcquire(permits: Int): Boolean =
if semaphore.tryAcquire(permits) then
// adds timestamp to log
val now = System.nanoTime()
log.updateAndGet { q =>
q.add((now, permits))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we use an immutable data structure here, as updateAndGet can be called multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

q
}
true
else false

def getNextUpdate: Long =
if log.get().size() == 0 then
// no logs so no need to update until `per` has passed
per.toNanos
else
// oldest log provides the new updating point
val waitTime = log.get().peek()._1 + per.toNanos - System.nanoTime()
if waitTime > 0 then waitTime else 0L
end getNextUpdate

def update: Unit =
val now = System.nanoTime()
// retrieving current queue to append it later if some elements were added concurrently
val q = log.getAndUpdate(_ => new LinkedList[(Long, Int)]())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here the log becomes empty for some time, allowing operations to be started, even if that would exceed the rate limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operations are started if the semaphore allows it, so the log is unrelated to that. Once the log is processed, permits will be restored and the semaphore will allow new operations depending on how many are being restored.

// remove records older than window size
while semaphore.availablePermits() < rate && q.peek()._1 + per.toNanos < now
do
val (_, permits) = q.poll()
semaphore.release(permits)
// merge old records with the ones concurrently added
val _ = log.updateAndGet(q2 =>
val qBefore = q
while q2.size() > 0
do
qBefore.add(q2.poll())
()
qBefore
)
end update

end SlidingWindow

/** Token/leaky bucket algorithm It adds a token to start an new operation each `per` with a maximum number of tokens of `rate`.
*/
case class Bucket(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm:
private val refillInterval = per.toNanos
private val lastRefillTime = new AtomicLong(System.nanoTime())
private val semaphore = new Semaphore(1)

def acquire(permits: Int): Unit =
semaphore.acquire(permits)

def tryAcquire(permits: Int): Boolean =
semaphore.tryAcquire(permits)

def getNextUpdate: Long =
val waitTime = lastRefillTime.get() + refillInterval - System.nanoTime()
if waitTime > 0 then waitTime else 0L

def update: Unit =
val now = System.nanoTime()
lastRefillTime.set(now)
if semaphore.availablePermits() < rate then semaphore.release()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the difference with FixedRate is that we always release 1 permit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's it, and permits are accumulated if not used


end Bucket
end RateLimiterAlgorithm
Loading
Loading