From 8a7486cea2d768708342bf77645547ddee86e610 Mon Sep 17 00:00:00 2001 From: vendelieu Date: Sun, 6 Oct 2024 04:44:41 +0300 Subject: [PATCH] add benchmark --- README.md | 15 +++- benchmarks/build.gradle.kts | 23 +++++ .../rethis/benchmarks/JedisBenchmark.kt | 37 ++++++++ .../rethis/benchmarks/KredsBenchmark.kt | 58 +++++++++++++ .../rethis/benchmarks/RethisBenchmark.kt | 47 ++++++++++ settings.gradle.kts | 8 ++ .../kotlin/eu/vendeli/rethis/ReThis.kt | 12 +-- .../rethis/types/core/ConnectionPool.kt | 4 +- .../eu/vendeli/rethis/utils/CommonUtils.kt | 2 +- .../eu/vendeli/rethis/utils/RequestUtils.kt | 4 +- .../eu/vendeli/rethis/utils/ResponseUtils.kt | 87 +++++++------------ .../tests/commands/TransactionCommandTest.kt | 8 +- 12 files changed, 232 insertions(+), 73 deletions(-) create mode 100644 benchmarks/build.gradle.kts create mode 100644 benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/JedisBenchmark.kt create mode 100644 benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/KredsBenchmark.kt create mode 100644 benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/RethisBenchmark.kt diff --git a/README.md b/README.md index ff950d0c..3e609698 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,19 @@ dependencies { } ``` +# Comparison + +```toml +main summary: +Benchmark Mode Cnt Score Error Units +JedisBenchmark.jedisGet thrpt 5 61059.312 ± 12766.457 ops/s +JedisBenchmark.jedisSet thrpt 5 63010.876 ± 3385.390 ops/s +KredsBenchmark.kredsGet thrpt 4 827647.673 ± 216763.090 ops/s +KredsBenchmark.kredsSet thrpt 4 843548.354 ± 163456.680 ops/s +RethisBenchmark.rethisGet thrpt 5 1186328.857 ± 717051.132 ops/s +RethisBenchmark.rethisSet thrpt 5 1176619.151 ± 1747469.858 ops/s +``` + # Usage ### Connecting to Redis @@ -97,7 +110,7 @@ transaction functionality also takes into account fail-state cases and gracefull Also you can execute Redis commands using the execute method: ```kotlin -val result = client.execute(listOf("SET", "key", "value")) +val result = client.execute(listOf("SET".toArg(), "key".toArg(), "value".toArg())) ``` # Documentation diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts new file mode 100644 index 00000000..69160259 --- /dev/null +++ b/benchmarks/build.gradle.kts @@ -0,0 +1,23 @@ +plugins { + kotlin("jvm") + kotlin("plugin.allopen") version "2.0.20" + id("org.jetbrains.kotlinx.benchmark") version "0.4.11" +} + +repositories { + mavenCentral() +} + +dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-benchmark-runtime:0.4.11") + implementation(project(":")) + implementation(libs.testcontainer.redis) + implementation("redis.clients:jedis:5.2.0") + implementation("io.github.crackthecodeabhi:kreds:0.9.1") +} + +allOpen.annotation("org.openjdk.jmh.annotations.State") + +benchmark { + targets.register("main") +} diff --git a/benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/JedisBenchmark.kt b/benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/JedisBenchmark.kt new file mode 100644 index 00000000..fb206742 --- /dev/null +++ b/benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/JedisBenchmark.kt @@ -0,0 +1,37 @@ +package eu.vendeli.rethis.benchmarks + +import kotlinx.benchmark.* +import org.openjdk.jmh.annotations.Timeout +import redis.clients.jedis.JedisPooled +import java.util.concurrent.TimeUnit + +@BenchmarkMode(Mode.Throughput) +@State(Scope.Benchmark) +@Warmup(iterations = 2, time = 1000, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 5, time = 1000, timeUnit = TimeUnit.MILLISECONDS) +@Timeout(time = 10, timeUnit = TimeUnit.SECONDS) +class JedisBenchmark { + private lateinit var jedis: JedisPooled + + @Setup + fun setup() { + jedis = JedisPooled("localhost", 6379) + jedis.ping() + } + + @TearDown + fun tearDown() { + jedis.close() + } + + @Benchmark + fun jedisSet(bh: Blackhole) { + + bh.consume(jedis.set("key", "value")) + } + + @Benchmark + fun jedisGet(bh: Blackhole) { + bh.consume(jedis.get("key")) + } +} diff --git a/benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/KredsBenchmark.kt b/benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/KredsBenchmark.kt new file mode 100644 index 00000000..33df271e --- /dev/null +++ b/benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/KredsBenchmark.kt @@ -0,0 +1,58 @@ +package eu.vendeli.rethis.benchmarks + +import io.github.crackthecodeabhi.kreds.connection.Endpoint +import io.github.crackthecodeabhi.kreds.connection.KredsClient +import io.github.crackthecodeabhi.kreds.connection.newClient +import kotlinx.benchmark.Benchmark +import kotlinx.benchmark.Blackhole +import kotlinx.benchmark.Setup +import kotlinx.benchmark.TearDown +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import org.openjdk.jmh.annotations.* +import java.util.concurrent.TimeUnit + +@DelicateCoroutinesApi +@BenchmarkMode(Mode.Throughput) +@State(Scope.Benchmark) +@Warmup(iterations = 2, time = 1000, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 5, time = 1000, timeUnit = TimeUnit.MILLISECONDS) +@Timeout(time = 10, timeUnit = TimeUnit.SECONDS) +class KredsBenchmark { + private lateinit var kreds: KredsClient + + @Setup + fun setup() { + kreds = newClient(Endpoint("localhost", 6379)) + GlobalScope.launch { +// kreds.use { it.ping("test") } + kreds.ping("test") + } + } + + @TearDown + fun tearDown() { + kreds.close() + } + + @Benchmark + fun kredsSet(bh: Blackhole) { + GlobalScope.launch { + bh.consume( + kreds.set("key", "value"), +// kreds.use { it.set("key", "value") }, + ) + } + } + + @Benchmark + fun kredsGet(bh: Blackhole) { + GlobalScope.launch { + bh.consume( +// kreds.use { it.get("key") } + kreds.get("key"), + ) + } + } +} diff --git a/benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/RethisBenchmark.kt b/benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/RethisBenchmark.kt new file mode 100644 index 00000000..f2ae14d8 --- /dev/null +++ b/benchmarks/src/main/kotlin/eu/vendeli/rethis/benchmarks/RethisBenchmark.kt @@ -0,0 +1,47 @@ +package eu.vendeli.rethis.benchmarks + +import eu.vendeli.rethis.ReThis +import eu.vendeli.rethis.commands.get +import eu.vendeli.rethis.commands.ping +import eu.vendeli.rethis.commands.set +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import org.openjdk.jmh.annotations.* +import org.openjdk.jmh.infra.Blackhole +import java.util.concurrent.TimeUnit + +@DelicateCoroutinesApi +@BenchmarkMode(Mode.Throughput) +@State(Scope.Benchmark) +@Warmup(iterations = 2, time = 1000, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 5, time = 1000, timeUnit = TimeUnit.MILLISECONDS) +@Timeout(time = 10, timeUnit = TimeUnit.SECONDS) +class RethisBenchmark { + private lateinit var rethis: ReThis + + @Setup + fun setup() { + rethis = ReThis("localhost", 6379) + GlobalScope.launch { rethis.ping("test") } + } + + @TearDown + fun tearDown() { + rethis.disconnect() + } + + @Benchmark + fun rethisSet(bh: Blackhole) { + GlobalScope.launch { + bh.consume(rethis.runCatching { set("key", "value") }) + } + } + + @Benchmark + fun rethisGet(bh: Blackhole) { + GlobalScope.launch { + bh.consume(rethis.runCatching { get("key") }) + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index c9f9e5dc..93c632ea 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,2 +1,10 @@ rootProject.name = "re.this" +include("benchmarks") + +pluginManagement { + repositories { + gradlePluginPortal() + mavenCentral() + } +} diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt index 8b4e79f8..5144a786 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt @@ -71,12 +71,12 @@ class ReThis( if (ctxConn != null) { ctxConn!!.output.writeBuffer(pipelinedPayload) ctxConn!!.output.flush() - requests.forEach { _ -> responses.add(ctxConn!!.input.readRedisMessage()) } + requests.forEach { _ -> responses.add(ctxConn!!.input.readRedisMessage(cfg.charset)) } } else { connectionPool.use { connection -> connection.output.writeBuffer(pipelinedPayload) connection.output.flush() - requests.forEach { _ -> responses.add(connection.input.readRedisMessage()) } + requests.forEach { _ -> responses.add(connection.input.readRedisMessage(cfg.charset)) } } } requests.clear() @@ -89,7 +89,7 @@ class ReThis( logger.debug("Started transaction") conn.output.writeBuffer(bufferValues(listOf("MULTI".toArg()), cfg.charset)) conn.output.flush() - require(conn.input.readRedisMessage().value == "OK") + require(conn.input.readRedisMessage(cfg.charset).value == "OK") var e: Throwable? = null coLaunch(currentCoroutineContext() + CoLocalConn(conn)) { @@ -98,7 +98,7 @@ class ReThis( e?.also { conn.output.writeBuffer(bufferValues(listOf("DISCARD".toArg()), cfg.charset)) conn.output.flush() - require(conn.input.readRedisMessage().value == "OK") + require(conn.input.readRedisMessage(cfg.charset).value == "OK") logger.error("Transaction canceled", it) return@use emptyList() } @@ -106,7 +106,7 @@ class ReThis( logger.debug("Transaction completed") conn.output.writeBuffer(bufferValues(listOf("EXEC".toArg()), cfg.charset)) conn.output.flush() - conn.input.readRedisMessage().unwrapList() + conn.input.readRedisMessage(cfg.charset).unwrapList() } @ReThisInternal @@ -137,6 +137,6 @@ class ReThis( logger.trace("Executing request with such payload $payload") output.writeBuffer(bufferValues(payload, cfg.charset)) output.flush() - return input.readRedisMessage(raw) + return input.readRedisMessage(cfg.charset, raw) } } diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt index 78a5cc87..fb9cd344 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt @@ -17,7 +17,7 @@ import kotlin.contracts.InvocationKind import kotlin.contracts.contract internal class ConnectionPool( - private val client: ReThis, + internal val client: ReThis, private val address: SocketAddress, ) { internal val logger = KtorSimpleLogger("eu.vendeli.rethis.ConnectionPool") @@ -55,7 +55,7 @@ internal class ConnectionPool( conn.output.writeBuffer(reqBuffer) conn.output.flush() repeat(requests) { - logger.trace("Connection establishment response: " + conn.input.readRedisMessage()) + logger.trace("Connection establishment response: " + conn.input.readRedisMessage(client.cfg.charset)) } return conn diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt b/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt index 8bc283ec..a71739ee 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt @@ -34,7 +34,7 @@ internal suspend inline fun ReThis.registerSubscription( while (isActive) { conn.input.awaitContent() - val msg = conn.input.readRedisMessage() + val msg = conn.input.readRedisMessage(cfg.charset) val input = if (msg is Push) msg.value else msg.safeCast()?.value logger.debug("Handling event in $target channel subscription") diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/utils/RequestUtils.kt b/src/commonMain/kotlin/eu/vendeli/rethis/utils/RequestUtils.kt index f8b28fcf..dff0d264 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/utils/RequestUtils.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/utils/RequestUtils.kt @@ -24,8 +24,8 @@ internal fun Sink.writeRedisValue( is Array<*> -> writeArrayValue(data, charset) is StringArg -> writeByteArray(data.value.toByteArray(charset)) - is LongArg -> writeByteArray(data.value.toString(10).toByteArray(charset)) - is IntArg -> writeByteArray(data.value.toString(10).toByteArray(charset)) + is LongArg -> writeByteArray(data.value.toString().toByteArray(charset)) + is IntArg -> writeByteArray(data.value.toString().toByteArray(charset)) is DoubleArg -> writeByteArray(data.value.toString().toByteArray(charset)) is BaArg -> writeByteArray(data.value) } diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt b/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt index 778e9061..65b4a07c 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/utils/ResponseUtils.kt @@ -3,21 +3,14 @@ package eu.vendeli.rethis.utils import com.ionspin.kotlin.bignum.integer.BigInteger import eu.vendeli.rethis.exception import eu.vendeli.rethis.types.core.* -import eu.vendeli.rethis.utils.Const.DEFAULT_REDIS_BUFFER_SIZE -import eu.vendeli.rethis.utils.Const.DEFAULT_REDIS_POOL_CAPACITY import io.ktor.utils.io.* -import io.ktor.utils.io.pool.* -import kotlinx.io.readByteArray +import io.ktor.utils.io.charsets.* +import io.ktor.utils.io.core.* +import kotlinx.io.Buffer -internal object RedisByteArrayPool : DefaultPool(DEFAULT_REDIS_POOL_CAPACITY) { - override fun produceInstance(): ByteArray = ByteArray(DEFAULT_REDIS_BUFFER_SIZE) - - override fun clearInstance(instance: ByteArray): ByteArray = instance.apply { fill(0) } -} - -internal suspend fun ByteReadChannel.readRedisMessage(rawOnly: Boolean = false): RType { +internal suspend fun ByteReadChannel.readRedisMessage(charset: Charset, rawOnly: Boolean = false): RType { val type = RespCode.fromCode(readByte()) // Read the type byte (e.g., +, -, :, $, *) - val line = readCRLFLine() // Read until CRLF for simple types + val line = readCRLFLine(charset) if (rawOnly) line.toIntOrNull()?.let { return RType.Raw(readByteArray(it)) @@ -31,17 +24,17 @@ internal suspend fun ByteReadChannel.readRedisMessage(rawOnly: Boolean = false): RespCode.INTEGER -> Int64(line.toLongOrNull() ?: exception { "Invalid number format: $line" }) RespCode.BULK -> { - val size = line.toInt() // Parse the size from the bulk string header ($) + val size = line.toLong() // Parse the size from the bulk string header ($) if (size < 0) return RType.Null // Handle null bulk string (`$-1`) - val content = readRemaining(size.toLong()).readByteArray() // Read the specified size of bytes + val content = readRemaining(size).readText(charset) // Read the specified size of bytes readShort() // Skip CRLF after the bulk string - BulkString(content.decodeToString()) + BulkString(content) } RespCode.ARRAY -> { val arraySize = line.toInt() if (arraySize < 0) return RType.Null // Handle null array (`*-1`) - val elements = List(arraySize) { readRedisMessage() } // Recursively read each array element + val elements = List(arraySize) { readRedisMessage(charset, rawOnly) } // Recursively read each array element RArray(elements) } @@ -62,21 +55,21 @@ internal suspend fun ByteReadChannel.readRedisMessage(rawOnly: Boolean = false): } RespCode.BULK_ERROR -> { - val size = line.toInt() // Parse the size from the bulk error header + val size = line.toLong() // Parse the size from the bulk error header if (size < 0) exception { "Invalid bulk error size: $size" } - val content = readRemaining(size.toLong()).readByteArray() // Read the error content + val content = readRemaining(size) // Read the error content readShort() // Skip CRLF after the bulk error - RType.Error(content.decodeToString()) + RType.Error(content.readText(charset)) } RespCode.VERBATIM_STRING -> { - val size = line.toInt() + val size = line.toLong() if (size < 0) return RType.Null // Handle null verbatim string - val content = readRemaining(size.toLong()).readByteArray() + val content = readRemaining(size).readText(charset) readShort() // Skip CRLF - val encoding = content.decodeToString(0, 3) // First 3 bytes are encoding - val data = content.decodeToString(4, size - 4) // Skip encoding and colon (:) - VerbatimString(encoding, data) + val encoding = content.subSequence(0, 3) // First 3 bytes are encoding + val data = content.subSequence(4, size.toInt() - 4) // Skip encoding and colon (:) + VerbatimString(encoding.toString(), data.toString()) } RespCode.MAP -> { @@ -84,8 +77,8 @@ internal suspend fun ByteReadChannel.readRedisMessage(rawOnly: Boolean = false): if (mapSize < 0) return RType.Null // Handle null map val resultMap = mutableMapOf() repeat(mapSize) { - val key = readRedisMessage() as RPrimitive - val value = readRedisMessage() + val key = readRedisMessage(charset, rawOnly) as RPrimitive + val value = readRedisMessage(charset, rawOnly) resultMap[key] = value } RMap(resultMap) @@ -96,7 +89,7 @@ internal suspend fun ByteReadChannel.readRedisMessage(rawOnly: Boolean = false): if (setSize < 0) return RType.Null // Handle null set val resultSet = mutableSetOf() repeat(setSize) { - resultSet.add(readRedisMessage() as RPrimitive) + resultSet.add(readRedisMessage(charset, rawOnly) as RPrimitive) } RSet(resultSet) } @@ -104,43 +97,23 @@ internal suspend fun ByteReadChannel.readRedisMessage(rawOnly: Boolean = false): RespCode.PUSH -> { val pushSize = line.toInt() if (pushSize < 0) return RType.Null // Handle null push message - val elements = List(pushSize) { readRedisMessage() as RPrimitive } + val elements = List(pushSize) { readRedisMessage(charset, rawOnly) as RPrimitive } Push(elements) } } } -internal suspend fun ByteReadChannel.readCRLFLine(): String { - val result = StringBuilder() - var lastByte: Byte - var secondLastByte: Byte = 0 - var bytesRead = 0 - - RedisByteArrayPool.useInstance { buffer -> - while (true) { - lastByte = readByte() - if (secondLastByte == '\r'.code.toByte() && lastByte == '\n'.code.toByte()) { - break // End of line found - } - if (bytesRead < buffer.size) { - buffer[bytesRead] = lastByte // Store byte in buffer - bytesRead++ - } else { - // Resize buffer dynamically instead of throwing an exception - result.append(buffer.decodeToString(0, bytesRead)) - bytesRead = 0 - } - secondLastByte = lastByte - } - - // Skip the last CR byte - bytesRead-- // Exclude the CR - // Convert the buffer to a string - result.append(buffer.decodeToString(0, bytesRead)) +internal suspend fun ByteReadChannel.readCRLFLine(charset: Charset): String { + val buffer = Buffer() + while (true) { + val byte = readByte() + buffer.writeByte(byte) + if (byte == '\n'.code.toByte() && buffer.size > 1 && buffer[buffer.size - 2] == '\r'.code.toByte()) { + break // End of line found + } } - - return result.toString() + return buffer.readText(charset, (buffer.size - 2).toInt()) } internal inline fun RType.unwrapRespIndMap(): Map? = diff --git a/src/jvmTest/kotlin/eu/vendeli/rethis/tests/commands/TransactionCommandTest.kt b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/commands/TransactionCommandTest.kt index 1b540e0d..d36dde1f 100644 --- a/src/jvmTest/kotlin/eu/vendeli/rethis/tests/commands/TransactionCommandTest.kt +++ b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/commands/TransactionCommandTest.kt @@ -24,19 +24,19 @@ class TransactionCommandTest : ReThisTestCtx() { conn.output.writeBuffer(bufferValues(listOf("MULTI".toArg()), Charsets.UTF_8)) conn.output.flush() - conn.input.readRedisMessage() shouldBe PlainString("OK") + conn.input.readRedisMessage(Charsets.UTF_8) shouldBe PlainString("OK") conn.output.writeBuffer(bufferValues(listOf("SET".toArg(), "test3".toArg(), "testv3".toArg()), Charsets.UTF_8)) conn.output.flush() - conn.input.readRedisMessage() shouldBe PlainString("QUEUED") + conn.input.readRedisMessage(Charsets.UTF_8) shouldBe PlainString("QUEUED") conn.output.writeBuffer(bufferValues(listOf("SET".toArg(), "test4".toArg(), "testv4".toArg()), Charsets.UTF_8)) conn.output.flush() - conn.input.readRedisMessage() shouldBe PlainString("QUEUED") + conn.input.readRedisMessage(Charsets.UTF_8) shouldBe PlainString("QUEUED") conn.output.writeBuffer(bufferValues(listOf("EXEC".toArg()), Charsets.UTF_8)) conn.output.flush() - conn.input.readRedisMessage() shouldBe RArray(listOf(PlainString("OK"), PlainString("OK"))) + conn.input.readRedisMessage(Charsets.UTF_8) shouldBe RArray(listOf(PlainString("OK"), PlainString("OK"))) } @Test