Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limiter primitives #235

Merged
merged 25 commits into from
Nov 15, 2024
Merged

Add rate limiter primitives #235

merged 25 commits into from
Nov 15, 2024

Conversation

pablf
Copy link
Contributor

@pablf pablf commented Oct 19, 2024

Implements a customizable rate limiter. The behaviour depend on a RateLimiterConfig built from a BlockingPolicy and a RateLimiterAlgorithm. BlockingPolicy should deal exclusively with the response to rejected operations while RateLimiterAlgorithm must control only whether an operation can be accepted or not.

Currently, there are two blocking policies: Block and Drop.

  • Block: If the algorithm gets blocked, new operations will be queued so that when the algorithm gets unblocked, these operations will be processed first.
  • Drop: Operations passed to the rate limiter when the algorithm is blocked will be discarded

There are also 4 algorithm implementations: fixed rate, sliding window, leaky bucket and token bucket.

Both BlockingPolicy and RateLimiterAlgorithm present an interface (which I hope is not confusing) that makes very easy to implement new behaviour. If the guidelines for implementation are followed, things like throttling operations or blocking a particular number and discarding thereafter should be very easy to build.

Tests include behaviour for all the 8 different combinations and test the behaviour also in a concurrent context.

/claim #120
fixes #120

):
/** Limits the rate of execution of the given operation
*/
def apply[T](operation: => T): Option[T] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do I understand correctly that the result is None when the limit is exceeded, and the policy is Drop? and Some(_) when the limit is not exceeded, or it is exceeded, but the policy is to Block?

If so, I think we'd have to split this into two operations: RateLimiter.runBlocking(t: T): T and RateLimiter.runOrDrop(t: T): Option[T]. There definitely are scenarios for both policies, but the most basic use-case is to run an operation and block, if the limit is exceeded. If you know that the policy is Block, you'd have to always .get the returned Option, which is bad.

Copy link
Contributor Author

@pablf pablf Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! That was the intended behaviour. The problem with splitting is that it would render new BlockingPolicy implementations difficult to make. For example, users might want a custom policy where the operation is just slowed down or blocking the first and dropping the rest. A possibility would be continuing with just apply by making RateLimiter generic:

RateLimiter[F[_]]:
  def apply[T](operation: => T): F[T]

I also think that a good idea would be to allow running with a particular configuration, so the final API would be this. Actually we could use dependent types:

RateLimiter(config: RateLimiterConfig):
  def apply[T](operation: => T): config.F[T]
  def apply[T](operation: => T, opCfg: Cfg): config.F[T]

This would allow a custom BlockingPolicy to implement blocking or dropping (or something different like throttling) behaviour per operation:

rateLimiter(operation, CustomCfg.block())
rateLimiter(operation, CustomCfg.drop())

Only disadvantage might be verbosity but I believe the possibility of custom implementations outweighs it. What are your thoughts before proceeding?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could work, but I'm afraid would be too complicated. You're right that we might loose some flexibility, but the main goal would be to address to most-common use-cases - which have to be served well. To be honest, cases such as slowing down the first operation / dropping the rest, seem to be quite specialised, and it would be totally acceptable for them to require writing some custom code. That is, you could reuse the algorithm part, but everything around it would need to be written by hand.

So I'd opt for a simple interface (no dependent / higher-order types) solving the common case, while providing building blocks for implementing more advanced use-cases

if config.blockingPolicy.isUnblocked then
if config.algorithm.isUnblocked then
if config.isReady then
config.acceptOperation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't dive into the implementation yet, but isn't this race'y? That is, if two threads concurrently proceed through the three if-s, they could both concurrently call .acceptOperation, even if this would exceed the limit? It feels like accepting should be an atomic operation, which might fail (due to other threads exceeding the limit)

scope.shutdown()
scope.join().discard
// join might have been interrupted
try f(using capability)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this reformatted by accident? doesn't look fine, maybe we need braces

@adamw
Copy link
Member

adamw commented Oct 21, 2024

Thanks for the PR! I left some initial comments. Once these are resolved I'll do a more thorough review.

One thing that's missing and that we'd definitely need is some documentation (in doc/utils), describing and showing how to use the API

@pablf
Copy link
Contributor Author

pablf commented Oct 22, 2024

@adamw I've adressed the comments and added documentation. I've splitted the API in RateLimiter with runBlocking and runOrDrop returning the appropriate types and GenericRateLimiter allowing for custom implementations of behaviour. GenericRateLimiter is more complex than in the first commit but is also more flexible and nothing regarding its complexity leaks to RateLimiter. It might be better to put both GenericRateLimiter and RateLimiterAlgorithm in an internal package given that it's possible to use RateLimiter without both.

def apply[T, Result[_]](operation: => T)(using Returns[Result]): Result[T] =
val future = executor.add(algorithm, operation)
executor.execute(algorithm, operation)
Await.result(future, Duration.Inf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid we can't simply use futures. One, because it introduces an unnecessary asynchronous barrier in the "happy path" (when the rate limiter is open, the operation should just go through on the same thread. Two, because it disrupts the stack trace, in case of an exception - either loosing context or adding an additional layer of exceptions.

So we want to run the operations on the same thread, from which they are called, optionally blocking, if necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the apply method so that it executes the operation in same thread. I'm not sure if it would be possible to avoid the use of Future or some synchronization in the implementation of the blocking algorithm because ideally, blocked operations would be executed before the ones scheduled later on. If we checked if the queue was full or not before checking whether the rate limiter is open we would need a lock or something to make this atomically. The dropping one is now free of use of futures.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm yes good point. Let's delegate this to the VT scheduler, and document properly: that we don't guarantee that the first operation to become blocked will be the first to execute. And fairness is up to virtual threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a fairness parameter for the blocking executor and the RateLimiter API that defaults to false. There is still some use of Futures which I think is unavoidable as they take care of updating the rate limiter.

It's tricky to avoid them because if you just use acquire on the Semaphore, you're blocking the thread and can't update the rate limiter. And it would be worse if there is a parallel thread taking care of this. Currently the internal state is updated if possible when trying to acquire the rate limiter, so Future appears only when scheduling updates of the internal state of the rate limiter when it's blocked.

Regardless of fairness, the operation is always executed in the same thread that called the rate limiter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what kind of updates do you need to do when the RateLimiter is blocked? I don't want to keep any kind of internal queue, the queue of blocked virtual threads should be enough. I might be missing something, but implementing a rate limiter doesn't seem to require that.

We shouldn't use Futures. If a background process is required, this should be done by using Ox's forks & a structured concurrency scope. Using Futures causes that threads have an indefinite lifetime, defeating the whole purpose of the Ox project.

Fairness refers to the fact if threads are given a "fair" chance to complete, once blocked. But we want to delegate this to the VT scheduler as well.

Copy link
Contributor Author

@pablf pablf Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. The updates would be to update the number of permits held by the semaphore, not related to fairness or queues. This is possible also with some kind of "smart" polling. If we have 10 operations to run and we just acquire the underlying semaphore, they will just be blocked without unblocking because no new permits will be added. So either a parallel process checks next updating time to schedule for it and let the threads acquire randomly the semaphore (current implementation for unfair) or, instead of acquiring the semaphore, we just "try" acquiring it and if it's not possible to run we make all threads sleep until the next update and repeate this process until they are able to acquire successfully. We wouldn't be actually blocking through Semaphore.acquire. Some of this 10 operations might run after unlocking and the others would continue polling.

Maybe there is some other way but I don't see it at the moment. Are you referring to this?

Respect to the fair version, maybe there is some implementation following some of these lines without use of Futures. Would you like to also have a fair version in addition or just do away with it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think the RateLimiter will have to be created in a supervised scope and create a fork which manages the replenishing of the permits? Maybe through an actor?

Another axis, is whether we're taking into account the start time of the operation, or the completion time. So if you say "2 operations per second" does it mean that (A) two operations might start per second, or (B) two operations might run concurrently in every second? But I guess that's the responsibility of the various algorithms. But will need to be documented well.

As for fairness, let's go with the simples possible solution, that is compatible with strutured concurrency & the rest of the Ox project. So let's for now not implement a fair flag, instead delegating to the JVM scheduler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made some modifications and now there's only the unfair approach with updating of state inside a supervised scope. The updating process is called in the first call to the rate limiter that is not able to pass. In this way, there shouldn't be any overhead when the rate limiter is not blocked.

I'm not sure whether this is the right place to start the fork so I've explored the solution of calling a modified updating method from the constructor of RateLimiter or GenericRateLimiter which would be blocking until the executor signals the need of unblocking. This approach hanged indefinitely when eventually calling ox.sleep and I'm not sure if this is the expected behavior when calling the method from the body of the case class (always inside a fork).

Currently, the algorithm only checks the start time of the operation, but the other version should be possible to implement modifying the actual one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand how you got the deadlock, can you show it in code?

My idea was to start the whole RateLimiter inside a concurrency scope (requiring an Ox parameter), and there as part of the construction process start a fork which would handle the replenishing of permits

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried reproducing again the deadlock and surprisingly it works now. I don't remember doing anything differently so I am not sure about what caused the issue... From doing some debugging it was caused by the Thread.sleep. Probably it was similar to what I've written now but had some small bug provoking that. I'll update the branch to the correct version now.


def update: Unit =
val now = System.nanoTime()
lastUpdate.updateAndGet { time =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to docs, updateAndGet should be side-effect free - here, we're manipulating the semaphores

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we should also move all the updating mechanism to GenericRateLimiter so that update doesn't need to be thread-safe and avoid some atomic references in the algorithm's implementations


/** Blocks rejected operations until the rate limiter is ready to accept them or drops them depending on the choosen strategy.
*/
case class BlockOrDrop() extends Executor[Strategy.BlockOrDrop]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this hybrid strategy looks weird ... maybe we should only pass the strategies as a parameter, instead of parametrizing the whole rate limiter with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what are you thinking exactly. Something like this would be equivalent but would also accept "bad" strategies.

case class GenericRateLimiter {
  def apply[T, Result[_], Returns[_[_]] <: Strategy[_]](
    operation: => T
  )(using Returns[Result]): Result[T]
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad in what sense? We determine the strategy at RateLimiter's call site, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the RateLimiter level there wouldn't be any problem but the point of having GenericRateLimiter would be to allow customizing through passing a Strategy possibly customized by a user. The user could use a strategy for which there is not a corresponding executor. Parametrizing the GRL would make any Strategy not extending the Returns type a compile error.

It should be possible to pass directly the executor, although depending on the use it might create problems, e.g., if the user creates a custom executor with internal state and doesn't reuse the same executor in different calls to the rate limiter or if different executors need some common internal state. It would make also more difficult to pass a parameter to customize executor behaviour if there is some internal state that needs to be shared.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see the problem. But this BlockOrDrop is fishy anyway. In its update call, you only call blockExecutor.update. Shouldn't the executor be somehow shared? What if the user calls .runBlocking and .runOrDrop interchangably? Would be good to have a test which checks for such combinations. And this either needs simplification, or good docs why this is done this way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the BlockOrDrop executor at the moment just redirects to the appropriate executor depending on the strategy. I can expand the tests to make more coverage of the behaviour, but I am not really sure what kind of simplifications do you have in mind. For this particular executor, I don't see any need for shared state (after simplifying there will be no internal state in any case). The internal state to check whether an operation can be accepted is always in the RateLimiterAlgorithm while the executor should only be concerned with how this information is used.

This will disappear after simplifying the updating so the following is not important but might provide context. BlockOrDrop only called the block updater because the drop updater didn't do anything. A common method to update rate limiters is when they receive a call so there are no background threads involved. The problem is that this only works for the drop executor while the blocking one needs some kind of queueing mechanism and thus background updating. Although in the case of fair blocking it might introduce unfairness to the BlockOrDrop if there is not shared state.

/** Limits the rate of execution of the given operation with a custom Result type
*/
def apply[T, Result[_]](operation: => T)(using Returns[Result]): Result[T] =
executor.schedule(algorithm, operation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the schedule/execute distinction needed? can't it be combined in a single method call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically yes but it seems to me better organized that way. If updating is now done by the GenericRateLimiter, I think we need to pass a semaphore to allow locking and unlocking of the updater so we would need both.

val waitTime = lastUpdate.get() + per.toNanos - System.nanoTime()
val q = semaphore.getQueueLength()
if waitTime > 0 then waitTime
else if q > 0 then per.toNanos
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why if q>0 the wait time is per?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It hanged before but I think it might be better to return here an Option[Long] so we can differentiate between
no updating None, updating only once Some(0L) and continue updating.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but do we ever stop updating, if it's done in a background process?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be possible to only schedule if needed, for example, if there are no calls in 10 minutes surpassing the rate and it updates each minute, we could update after 10 minutes when the rate is surpassed. If there is a thread anyway instead of starting one only when needed, I don't think we gain much. Probably better to just schedule always.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think that optimization wouldn't save much. Let's simplify an schedule always

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great then :)

@adamw
Copy link
Member

adamw commented Nov 6, 2024

I think the implementation could use some cleanup and review of correctness (esp when it comes to concurrency) after all the changes. It takes a lot of time to review, and there are still some significant issues

*/
case class Block() extends Executor[Strategy.Blocking]:

val updateLock = new Semaphore(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the update lock needed? we're always starting update as a background proces in a fork, no? and updating only from that fork

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a way to avoid two semaphores: one is needed to block and unblock the updater so all performed updates are really needed. The other one in this case is to avoid race conditions when giving permits and avoiding giving more than 1.

Although if we just let the updater run in the background whether it's needed or not, it would simplify the code, also for downstream users implementing their own algorithm. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ... I thought the updater is always run in the background. What's the scenario for not running it in the background?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this simplification sounds good, RateLimiter needs the Ox capability anyway

@pablf
Copy link
Contributor Author

pablf commented Nov 6, 2024

Thank you for your time reviewing this! I think that some of these changes might simplify the code.

@pablf
Copy link
Contributor Author

pablf commented Nov 9, 2024

I've separated the updating mechanism from the executors, added details and examples in docs about the use of Strategies and Executors and how to customize algorithms and executors and checked the algorithm API. There is also a new test at the end of RateLimiterTest.scala to check the behavior of calling concurrently a blocked rate limiter through both the runOrDrop and runBlocking methods.

Finally, I've merged the token bucket and leaky bucket implementations into one as they were very similar and added the possibility of acquiring an arbitrary number of permits in the RateLimiterAlgorithm API. This is useful if downstream users want to modify the number of permits acquired by an operation, for example, size of data to be processed.

All this should be independent from the final aspect of the GenericRateLimiter or Executor interface, so we can modify those independently from these changes. I think that the current interface allows for a nice equilibrium between customization of compile time and runtime behavior and complexity and, after users create their custom executors, they can hide any generic details using type alias for example. If they don't need to implement custom executors or strategies, custom algorithms can be used directly with the simpler RateLimiter interface. But please let me know any changes that you see fit either in the code or the docs!

def update: Unit =
val now = System.nanoTime()
// retrieving current queue to append it later if some elements were added concurrently
val q = log.getAndUpdate(_ => new LinkedList[(Long, Int)]())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here the log becomes empty for some time, allowing operations to be started, even if that would exceed the rate limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operations are started if the semaphore allows it, so the log is unrelated to that. Once the log is processed, permits will be restored and the semaphore will allow new operations depending on how many are being restored.

// adds timestamp to log
val now = System.nanoTime()
log.updateAndGet { q =>
q.add((now, permits))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we use an immutable data structure here, as updateAndGet can be called multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

def update: Unit =
val now = System.nanoTime()
lastRefillTime.set(now)
if semaphore.availablePermits() < rate then semaphore.release()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the difference with FixedRate is that we always release 1 permit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's it, and permits are accumulated if not used

case class Drop() extends Executor[Strategy.Dropping]:
def execute[T, Result[*]](algorithm: RateLimiterAlgorithm, operation: => T)(using cfg: Strategy.Dropping[Result[*]]): Result[T] =
if algorithm.tryAcquire then cfg.run(operation)
else None.asInstanceOf[Result[T]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we're not trying to be overly flexible here. Drop on one hand seems to work with any result type, but in practice requires an option (because of the case here). Maybe simply the executor should have a fixed return type (Block - identity, Drop - Option). Would we loose any flexibility then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the main problem is then to integrate them easily with GenericRateLimiter. If we are going for fixed return type I would put all the logic inside RateLimiter because otherwise it's just wrapping of logic.

In the recent push I've deleted the GRL and Executor classes and passed the updating logic to RateLimiter. If any user wants to customize how the algorithm is manipulated, then the easiest way would be to create its own interface. I've also updated the docs and tests.

q.dequeueOption match
case None => q
case Some((head, tail)) =>
if semaphore.availablePermits() < rate && head._1 + per.toNanos < now then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it at all possible that head._1+per.toNanos.now (the oldest entry should be release), but there's more permits available than rate? In other words, is the first part of this condition necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no, thank you. I've just changed it!

@adamw adamw merged commit 6bdd3d0 into softwaremill:master Nov 15, 2024
5 checks passed
@adamw
Copy link
Member

adamw commented Nov 15, 2024

I added some final polishing, and we are done - thanks! :)

@pablf
Copy link
Contributor Author

pablf commented Nov 15, 2024

Great! Thank you for your time reviewing this, really appreciate it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Rate control primitives?
2 participants