Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a fairness parameter #564

Merged
merged 21 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@ Licensed under the MIT License (see LICENSE)
This software contains portions of code derived from http-client
https://github.com/snoyberg/http-client
Copyright (c) 2013 Michael Snoyman
Licensed under MIT (see licenses/LICENSE_http-client)
Licensed under MIT (see licenses/LICENSE_http-client)

This software contains portions of code derived from cats-effect
https://github.com/typelevel/cats-effect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does Typelevel really need to explicitly license code from different projects to each other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure, but especially since keypool and cats-effect are licensed differently I thought so (I would defer to @rossabaker's wisdom here)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I didn't write it, and I copy it into a new project, I credit it, even if I'm a maintainer on both projects. It's probably not strictly necessary in this case, but once we play fast and loose with attribution, it's a slippery slope toward working for Microsoft1.

Footnotes

  1. fuckers

Copyright (c) 2020-2023 The Typelevel Cats-effect Project Developers
Licensed under Apache License 2.0 (see licenses/LICENSE_cats-effect)
34 changes: 34 additions & 0 deletions core/src/main/scala/org/typelevel/keypool/Fairness.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2019 Typelevel
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package org.typelevel.keypool

/**
* Fairness defines the order in which pending requests acquire a connection from the pool.
*
* Lifo will process requests in last-in-first-out order. Fifo will process requests in
* first-in-first-out order.
*/
sealed trait Fairness extends Product with Serializable
object Fairness {
case object Lifo extends Fairness
case object Fifo extends Fairness
}
13 changes: 10 additions & 3 deletions core/src/main/scala/org/typelevel/keypool/KeyPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package org.typelevel.keypool
import cats._
import cats.effect.kernel._
import cats.effect.kernel.syntax.spawn._
import cats.effect.std.Semaphore
import cats.syntax.all._
import scala.concurrent.duration._
import org.typelevel.keypool.internal._
Expand Down Expand Up @@ -67,7 +66,7 @@ object KeyPool {
private[keypool] val kpMaxPerKey: A => Int,
private[keypool] val kpMaxIdle: Int,
private[keypool] val kpMaxTotal: Int,
private[keypool] val kpMaxTotalSem: Semaphore[F],
private[keypool] val kpMaxTotalSem: RequestSemaphore[F],
private[keypool] val kpVar: Ref[F, PoolMap[A, (B, F[Unit])]]
) extends KeyPool[F, A, B] {

Expand Down Expand Up @@ -322,6 +321,7 @@ object KeyPool {
val kpMaxPerKey: A => Int,
val kpMaxIdle: Int,
val kpMaxTotal: Int,
val fairness: Fairness,
val onReaperException: Throwable => F[Unit]
) {
private def copy(
Expand All @@ -332,6 +332,7 @@ object KeyPool {
kpMaxPerKey: A => Int = this.kpMaxPerKey,
kpMaxIdle: Int = this.kpMaxIdle,
kpMaxTotal: Int = this.kpMaxTotal,
fairness: Fairness = this.fairness,
onReaperException: Throwable => F[Unit] = this.onReaperException
): Builder[F, A, B] = new Builder[F, A, B](
kpRes,
Expand All @@ -341,6 +342,7 @@ object KeyPool {
kpMaxPerKey,
kpMaxIdle,
kpMaxTotal,
fairness,
onReaperException
)

Expand Down Expand Up @@ -370,6 +372,9 @@ object KeyPool {
def withMaxTotal(total: Int): Builder[F, A, B] =
copy(kpMaxTotal = total)

def withFairness(fairness: Fairness): Builder[F, A, B] =
copy(fairness = fairness)

def withOnReaperException(f: Throwable => F[Unit]): Builder[F, A, B] =
copy(onReaperException = f)

Expand All @@ -380,7 +385,7 @@ object KeyPool {
kpVar <- Resource.make(
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
)(kpVar => KeyPool.destroy(kpVar))
kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong))
kpMaxTotalSem <- Resource.eval(RequestSemaphore[F](fairness, kpMaxTotal))
_ <- (idleTimeAllowedInPool, durationBetweenEvictionRuns) match {
case (fdI: FiniteDuration, fdE: FiniteDuration) if fdE >= 0.seconds =>
val idleNanos = 0.seconds.max(fdI)
Expand Down Expand Up @@ -414,6 +419,7 @@ object KeyPool {
Defaults.maxPerKey,
Defaults.maxIdle,
Defaults.maxTotal,
Defaults.fairness,
Defaults.onReaperException[F]
)

Expand All @@ -430,6 +436,7 @@ object KeyPool {
def maxPerKey[K](k: K): Int = Function.const(100)(k)
val maxIdle = 100
val maxTotal = 100
val fairness = Fairness.Fifo
def onReaperException[F[_]: Applicative] = { (t: Throwable) =>
Function.const(Applicative[F].unit)(t)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import cats._
import cats.syntax.all._
import cats.effect.kernel._
import cats.effect.kernel.syntax.spawn._
import cats.effect.std.Semaphore
import scala.concurrent.duration._
import org.typelevel.keypool.internal._

@deprecated("use KeyPool.Builder", "0.4.7")
final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
Expand Down Expand Up @@ -91,7 +91,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
kpVar <- Resource.make(
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
)(kpVar => KeyPool.destroy(kpVar))
kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong))
kpMaxTotalSem <- Resource.eval(RequestSemaphore[F](Fairness.Fifo, kpMaxTotal))
_ <- idleTimeAllowedInPool match {
case fd: FiniteDuration =>
val nanos = 0.seconds.max(fd)
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/typelevel/keypool/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ object Pool {
val durationBetweenEvictionRuns: Duration,
val kpMaxIdle: Int,
val kpMaxTotal: Int,
val fairness: Fairness,
val onReaperException: Throwable => F[Unit]
) {
private def copy(
Expand All @@ -86,6 +87,7 @@ object Pool {
durationBetweenEvictionRuns: Duration = this.durationBetweenEvictionRuns,
kpMaxIdle: Int = this.kpMaxIdle,
kpMaxTotal: Int = this.kpMaxTotal,
fairness: Fairness = this.fairness,
onReaperException: Throwable => F[Unit] = this.onReaperException
): Builder[F, B] = new Builder[F, B](
kpRes,
Expand All @@ -94,6 +96,7 @@ object Pool {
durationBetweenEvictionRuns,
kpMaxIdle,
kpMaxTotal,
fairness,
onReaperException
)

Expand All @@ -120,6 +123,9 @@ object Pool {
def withMaxTotal(total: Int): Builder[F, B] =
copy(kpMaxTotal = total)

def withFairness(fairness: Fairness): Builder[F, B] =
copy(fairness = fairness)

def withOnReaperException(f: Throwable => F[Unit]): Builder[F, B] =
copy(onReaperException = f)

Expand All @@ -132,6 +138,7 @@ object Pool {
kpMaxPerKey = _ => kpMaxTotal,
kpMaxIdle = kpMaxIdle,
kpMaxTotal = kpMaxTotal,
fairness = fairness,
onReaperException = onReaperException
)

Expand All @@ -155,6 +162,7 @@ object Pool {
Defaults.durationBetweenEvictionRuns,
Defaults.maxIdle,
Defaults.maxTotal,
Defaults.fairness,
Defaults.onReaperException[F]
)

Expand All @@ -170,6 +178,7 @@ object Pool {
val durationBetweenEvictionRuns = 5.seconds
val maxIdle = 100
val maxTotal = 100
val fairness = Fairness.Fifo
def onReaperException[F[_]: Applicative] = { (t: Throwable) =>
Function.const(Applicative[F].unit)(t)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) 2019 Typelevel
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package org.typelevel.keypool.internal

import cats.syntax.all._
import cats.effect.kernel.syntax.all._
import cats.effect.kernel._
import scala.collection.immutable.{Queue => ScalaQueue}
import scala.annotation.nowarn

import org.typelevel.keypool.Fairness

/**
* RequestSemaphore moderates access to pooled connections by setting the number of permits
* available to the total number of connections. This is a custom semaphore implementation that only
* provides the `permit` operation. Additionally it takes a [[Fairness]] parameter, used to toggle
* the order in which requests acquire a permit.
*
* Derived from cats-effect MiniSemaphore
* https://github.com/typelevel/cats-effect/blob/v3.5.4/kernel/shared/src/main/scala/cats/effect/kernel/MiniSemaphore.scala#L29
*/
private[keypool] abstract class RequestSemaphore[F[_]] {
samspills marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a nit but I'm struggling to understand what Request stands for in terms of Semaphore.

Additionally it takes a [[Fairness]] parameter, used to toggle the order in which requests acquire a permit.

Maybe it's more likely DirectionalSemaphore but it reveals implementation details though...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I struggled a lot with naming here. It's a RequestSemaphore in the sense that it is the Semaphore that controls how Requests for connections are processed through keypool. It's used in the take function and is what blocks on taking a connection from the pool if one isn't available.

def permit: Resource[F, Unit]
}

private[keypool] object RequestSemaphore {
private trait BackingQueue[CC[_], A] {
def cleanup(fa: CC[A], elem: A): CC[A]
def offer(fa: CC[A], elem: A): CC[A]
def take(fa: CC[A]): (CC[A], A)
def nonEmpty(fa: CC[A]): Boolean
}

private implicit def listQueue[A <: AnyRef]: BackingQueue[List, A] = new BackingQueue[List, A] {
def cleanup(fa: List[A], elem: A) = fa.filterNot(_ eq elem)
def offer(fa: List[A], elem: A) = elem :: fa
def take(fa: List[A]) = (fa.tail, fa.head)
def nonEmpty(fa: List[A]) = fa.nonEmpty
}

private implicit def queueQueue[A <: AnyRef]: BackingQueue[ScalaQueue, A] =
new BackingQueue[ScalaQueue, A] {
def cleanup(fa: ScalaQueue[A], elem: A) = fa.filterNot(_ eq elem)
def offer(fa: ScalaQueue[A], elem: A) = fa :+ elem
def take(fa: ScalaQueue[A]) = (fa.tail, fa.head)
def nonEmpty(fa: ScalaQueue[A]) = fa.nonEmpty
}

private case class State[CC[_], A](waiting: CC[A], permits: Int)(implicit
@nowarn ev: BackingQueue[CC, A]
)

def apply[F[_]](fairness: Fairness, numPermits: Int)(implicit
F: Concurrent[F]
): F[RequestSemaphore[F]] = {

def require(condition: Boolean, errorMessage: => String): F[Unit] =
if (condition) F.unit else new IllegalArgumentException(errorMessage).raiseError[F, Unit]

require(numPermits >= 0, s"numPermits must be nonnegative, was: $numPermits") *> {
fairness match {
case Fairness.Fifo =>
F.ref(State(ScalaQueue.empty[Deferred[F, Unit]], numPermits)).map(semaphore(_))
case Fairness.Lifo =>
F.ref(State(List.empty[Deferred[F, Unit]], numPermits)).map(semaphore(_))
}
}
}

private def semaphore[F[_], CC[_]](
state: Ref[F, State[CC, Deferred[F, Unit]]]
)(implicit
F: GenConcurrent[F, _],
B: BackingQueue[CC, Deferred[F, Unit]]
): RequestSemaphore[F] = {
new RequestSemaphore[F] {
private def acquire: F[Unit] =
F.deferred[Unit].flatMap { wait =>
val cleanup = state.update { case s @ State(waiting, permits) =>
if (B.nonEmpty(waiting))
State(B.cleanup(waiting, wait), permits)
else s
}

state.flatModifyFull { case (poll, State(waiting, permits)) =>
if (permits == 0) {
State(B.offer(waiting, wait), permits) -> poll(wait.get).onCancel(cleanup)
} else
State(waiting, permits - 1) -> F.unit
}
}

private def release: F[Unit] =
state.flatModify { case State(waiting, permits) =>
if (B.nonEmpty(waiting)) {
val (rest, next) = B.take(waiting)
State(rest, permits) -> next.complete(()).void
} else
State(waiting, permits + 1) -> F.unit
}

def permit: Resource[F, Unit] =
Resource.makeFull((poll: Poll[F]) => poll(acquire))(_ => release)
}
}
}
59 changes: 59 additions & 0 deletions core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,65 @@ class KeyPoolSpec extends CatsEffectSuite {
}
}

test("requests served in FIFO order by default") {
TestControl.executeEmbed {
KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
nothing
)
.withMaxTotal(1)
.build
.use { pool =>
for {
ref <- IO.ref(List.empty[Int])
f1 <- reqAction(pool, ref, 1).start <* IO.sleep(1.milli)
f2 <- reqAction(pool, ref, 2).start <* IO.sleep(1.milli)
f3 <- reqAction(pool, ref, 3).start <* IO.sleep(1.milli)
f4 <- reqAction(pool, ref, 4).start <* IO.sleep(1.milli)
_ <- f1.cancel
_ <- f2.join *> f3.join *> f4.join
order <- ref.get
} yield assertEquals(order, List(1, 2, 3, 4))
}
}
}

test("requests served in LIFO order if fairness is false") {
TestControl.executeEmbed {
KeyPool
.Builder(
(i: Int) => Ref.of[IO, Int](i),
nothing
)
.withMaxTotal(1)
.withFairness(Fairness.Lifo)
.build
.use { pool =>
for {
ref <- IO.ref(List.empty[Int])
f1 <- reqAction(pool, ref, 1).start <* IO.sleep(1.milli)
f2 <- reqAction(pool, ref, 2).start <* IO.sleep(1.milli)
f3 <- reqAction(pool, ref, 3).start <* IO.sleep(1.milli)
f4 <- reqAction(pool, ref, 4).start <* IO.sleep(1.milli)
_ <- f1.cancel
_ <- f2.join *> f3.join *> f4.join
order <- ref.get
} yield assertEquals(order, List(1, 4, 3, 2))
}
}
}

private def reqAction(
pool: KeyPool[IO, Int, Ref[IO, Int]],
ref: Ref[IO, List[Int]],
id: Int
) =
if (id == 1)
pool.take(1).use(_ => ref.update(l => l :+ id) *> IO.never)
else
pool.take(1).use(_ => ref.update(l => l :+ id))

private def nothing(ref: Ref[IO, Int]): IO[Unit] =
ref.get.void

Expand Down
Loading