Skip to content

Commit

Permalink
revert some pool changes
Browse files Browse the repository at this point in the history
  • Loading branch information
vendelieu committed Dec 15, 2024
1 parent 65c61c5 commit 1779db0
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 35 deletions.
28 changes: 8 additions & 20 deletions src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -125,39 +125,30 @@ class ReThis(

@ReThisInternal
suspend fun execute(payload: List<Argument>, rawResponse: Boolean = false): RType =
handleRequest(payload) {
readRedisMessage(cfg.charset, rawResponse)
} ?: RType.Null
handleRequest(payload)?.input?.readRedisMessage(cfg.charset, rawResponse) ?: RType.Null

@ReThisInternal
@JvmName("executeSimple")
internal suspend inline fun <reified T> execute(
payload: List<Argument>,
): T? = handleRequest(payload) {
processRedisSimpleResponse(cfg.charset)
}
): T? = handleRequest(payload)?.input?.processRedisSimpleResponse(cfg.charset)

@ReThisInternal
@JvmName("executeList")
internal suspend inline fun <reified T> execute(
payload: List<Argument>,
isCollectionResponse: Boolean = false,
): List<T>? = handleRequest(payload) {
processRedisListResponse(cfg.charset)
}
): List<T>? = handleRequest(payload)?.input?.processRedisListResponse(cfg.charset)

@ReThisInternal
@JvmName("executeMap")
internal suspend inline fun <reified K : Any, reified V> execute(
payload: List<Argument>,
): Map<K, V?>? = handleRequest(payload) {
processRedisMapResponse(cfg.charset)
}
): Map<K, V?>? = handleRequest(payload)?.input?.processRedisMapResponse(cfg.charset)

private suspend fun <T> handleRequest(
private suspend fun handleRequest(
payload: List<Argument>,
responseHandle: suspend ByteReadChannel.() -> T,
): T? {
): Connection? {
val currentCoCtx = currentCoroutineContext()
val coLocalConn = currentCoCtx[CoLocalConn]
val coPipeline = currentCoCtx[CoPipelineCtx]
Expand All @@ -168,14 +159,11 @@ class ReThis(
}

coLocalConn != null -> {
coLocalConn.connection
.sendRequest(payload)
.input
.responseHandle()
coLocalConn.connection.sendRequest(payload)
}

else -> connectionPool.use { connection ->
connection.sendRequest(payload).input.responseHandle()
connection.sendRequest(payload)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ data class AuthConfiguration(

@ConfigurationDSL
data class PoolConfiguration(
var minConnections: Int = 5,
var maxConnections: Int = 50,
var poolSize: Int = 50,
var dispatcher: CoroutineDispatcher = Dispatchers.IO,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal class ConnectionPool(
internal val isEmpty: Boolean get() = connections.isEmpty

private val job = SupervisorJob(client.rootJob)
private val connections = Channel<Connection>(client.cfg.poolConfiguration.maxConnections)
private val connections = Channel<Connection>(client.cfg.poolConfiguration.poolSize)
private val selector = SelectorManager(client.cfg.poolConfiguration.dispatcher + job)

internal suspend fun createConn(): Connection {
Expand Down Expand Up @@ -78,24 +78,14 @@ internal class ConnectionPool(
@Suppress("OPT_IN_USAGE")
fun prepare() = GlobalScope.launch {
logger.info("Filling ConnectionPool with connections")
repeat(client.cfg.poolConfiguration.minConnections) {
repeat(client.cfg.poolConfiguration.poolSize) {
client.coLaunch { connections.trySend(createConn()) }
}
}

suspend fun acquire(): Connection {
val conn = connections.tryReceive().getOrNull() ?: createConn()

if (conn.input.isClosedForRead) {
conn.socket.close()
return acquire()
}

return conn
}
suspend fun acquire(): Connection = connections.receive()

suspend fun release(connection: Connection) {
reset(connection)
connections.send(connection)
}

Expand Down

0 comments on commit 1779db0

Please sign in to comment.