diff --git a/CHANGELOG.md b/CHANGELOG.md index 80b45f5e..262718e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Re.this Changelog +## 0.1.8 + +* Improved pub/sub flow, added common event handler. + ## 0.1.7 * Supported android targets (androidNativeArm32, androidNativeArm64, androidNativeX64, androidNativeX86). diff --git a/api/re.this.api b/api/re.this.api index c4268c0f..d4b51eca 100644 --- a/api/re.this.api +++ b/api/re.this.api @@ -8,7 +8,7 @@ public final class eu/vendeli/rethis/ReThis { public final fun execute (Ljava/util/List;ZLkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun execute$default (Leu/vendeli/rethis/ReThis;Ljava/util/List;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public final fun getProtocol ()Leu/vendeli/rethis/types/core/RespVer; - public final fun getSubscriptions ()Ljava/util/Map; + public final fun getSubscriptions ()Leu/vendeli/rethis/types/core/ActiveSubscriptions; public final fun isDisconnected ()Z public final fun pipeline (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun reconnect ()V @@ -190,9 +190,8 @@ public final class eu/vendeli/rethis/commands/ListCommandsKt { } public final class eu/vendeli/rethis/commands/PubSubCommandsKt { - public static final fun pSubscribe (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Leu/vendeli/rethis/types/core/ReThisExceptionHandler;Leu/vendeli/rethis/types/core/SubscriptionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun pSubscribe (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Leu/vendeli/rethis/types/core/SubscriptionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun pSubscribe (Leu/vendeli/rethis/ReThis;[Leu/vendeli/rethis/types/common/ChannelSubscription;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun pSubscribe$default (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Leu/vendeli/rethis/types/core/ReThisExceptionHandler;Leu/vendeli/rethis/types/core/SubscriptionHandler;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun pUnsubscribe (Leu/vendeli/rethis/ReThis;[Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun pubSubChannels (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun pubSubChannels$default (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; @@ -203,13 +202,11 @@ public final class eu/vendeli/rethis/commands/PubSubCommandsKt { public static final fun pubSubShardNumSub (Leu/vendeli/rethis/ReThis;[Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun publish (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun sPublish (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static final fun sSubscribe (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Leu/vendeli/rethis/types/core/ReThisExceptionHandler;Leu/vendeli/rethis/types/core/SubscriptionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun sSubscribe (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Leu/vendeli/rethis/types/core/SubscriptionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun sSubscribe (Leu/vendeli/rethis/ReThis;[Leu/vendeli/rethis/types/common/ChannelSubscription;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun sSubscribe$default (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Leu/vendeli/rethis/types/core/ReThisExceptionHandler;Leu/vendeli/rethis/types/core/SubscriptionHandler;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun sUnsubscribe (Leu/vendeli/rethis/ReThis;[Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static final fun subscribe (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Leu/vendeli/rethis/types/core/ReThisExceptionHandler;Leu/vendeli/rethis/types/core/SubscriptionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun subscribe (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Leu/vendeli/rethis/types/core/SubscriptionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun subscribe (Leu/vendeli/rethis/ReThis;[Leu/vendeli/rethis/types/common/ChannelSubscription;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun subscribe$default (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Leu/vendeli/rethis/types/core/ReThisExceptionHandler;Leu/vendeli/rethis/types/core/SubscriptionHandler;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun unsubscribe (Leu/vendeli/rethis/ReThis;[Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } @@ -346,10 +343,8 @@ public final class eu/vendeli/rethis/commands/TransactionCommandsKt { } public final class eu/vendeli/rethis/types/common/ChannelSubscription { - public fun (Ljava/lang/String;Leu/vendeli/rethis/types/core/ReThisExceptionHandler;Leu/vendeli/rethis/types/core/SubscriptionHandler;)V - public synthetic fun (Ljava/lang/String;Leu/vendeli/rethis/types/core/ReThisExceptionHandler;Leu/vendeli/rethis/types/core/SubscriptionHandler;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Ljava/lang/String;Leu/vendeli/rethis/types/core/SubscriptionHandler;)V public final fun getChannel ()Ljava/lang/String; - public final fun getExceptionHandler ()Leu/vendeli/rethis/types/core/ReThisExceptionHandler; public final fun getHandler ()Leu/vendeli/rethis/types/core/SubscriptionHandler; } @@ -642,6 +637,15 @@ public final class eu/vendeli/rethis/types/common/ZPopResult { public fun toString ()Ljava/lang/String; } +public final class eu/vendeli/rethis/types/core/ActiveSubscriptions { + public fun ()V + public final fun getSize ()I + public final fun isActive (Ljava/lang/String;)Z + public final fun setEventHandler (Leu/vendeli/rethis/types/core/SubscriptionEventHandler;)V + public final fun unsubscribe (Ljava/lang/String;)Z + public final fun unsubscribeAll ()Z +} + public abstract class eu/vendeli/rethis/types/core/Address { } @@ -910,10 +914,6 @@ public final class eu/vendeli/rethis/types/core/RType$Raw : eu/vendeli/rethis/ty public fun getValue ()[B } -public abstract interface class eu/vendeli/rethis/types/core/ReThisExceptionHandler { - public abstract fun handle (Ljava/lang/Exception;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; -} - public abstract class eu/vendeli/rethis/types/core/RespVer { public synthetic fun (ILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun getLiteral ()I @@ -933,6 +933,12 @@ public final class eu/vendeli/rethis/types/core/RespVer$V3 : eu/vendeli/rethis/t public fun toString ()Ljava/lang/String; } +public abstract interface class eu/vendeli/rethis/types/core/SubscriptionEventHandler { + public abstract fun onException (Ljava/lang/String;Ljava/lang/Exception;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun onSubscribe (Ljava/lang/String;JLkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun onUnsubscribe (Ljava/lang/String;JLkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public abstract interface class eu/vendeli/rethis/types/core/SubscriptionHandler { public abstract fun onMessage (Leu/vendeli/rethis/ReThis;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } diff --git a/api/re.this.klib.api b/api/re.this.klib.api index 04545404..ed296956 100644 --- a/api/re.this.klib.api +++ b/api/re.this.klib.api @@ -58,16 +58,18 @@ final enum class eu.vendeli.rethis.types.options/ZAggregate : kotlin/Enum // eu.vendeli.rethis.types.options/ZAggregate.values|values#static(){}[0] } -abstract fun interface eu.vendeli.rethis.types.core/ReThisExceptionHandler { // eu.vendeli.rethis.types.core/ReThisExceptionHandler|null[0] - abstract suspend fun handle(kotlin/Exception) // eu.vendeli.rethis.types.core/ReThisExceptionHandler.handle|handle(kotlin.Exception){}[0] -} - abstract fun interface eu.vendeli.rethis.types.core/SubscriptionHandler { // eu.vendeli.rethis.types.core/SubscriptionHandler|null[0] abstract suspend fun onMessage(eu.vendeli.rethis/ReThis, kotlin/String) // eu.vendeli.rethis.types.core/SubscriptionHandler.onMessage|onMessage(eu.vendeli.rethis.ReThis;kotlin.String){}[0] } abstract interface eu.vendeli.rethis.types.core/Argument // eu.vendeli.rethis.types.core/Argument|null[0] +abstract interface eu.vendeli.rethis.types.core/SubscriptionEventHandler { // eu.vendeli.rethis.types.core/SubscriptionEventHandler|null[0] + abstract suspend fun onException(kotlin/String, kotlin/Exception) // eu.vendeli.rethis.types.core/SubscriptionEventHandler.onException|onException(kotlin.String;kotlin.Exception){}[0] + abstract suspend fun onSubscribe(kotlin/String, kotlin/Long) // eu.vendeli.rethis.types.core/SubscriptionEventHandler.onSubscribe|onSubscribe(kotlin.String;kotlin.Long){}[0] + abstract suspend fun onUnsubscribe(kotlin/String, kotlin/Long) // eu.vendeli.rethis.types.core/SubscriptionEventHandler.onUnsubscribe|onUnsubscribe(kotlin.String;kotlin.Long){}[0] +} + abstract interface eu.vendeli.rethis.types.core/VaryingArgument { // eu.vendeli.rethis.types.core/VaryingArgument|null[0] abstract val data // eu.vendeli.rethis.types.core/VaryingArgument.data|{}data[0] abstract fun (): kotlin.collections/List // eu.vendeli.rethis.types.core/VaryingArgument.data.|(){}[0] @@ -90,12 +92,10 @@ final class <#A: kotlin/Any?> eu.vendeli.rethis.types.common/ScanResult { // eu. } final class eu.vendeli.rethis.types.common/ChannelSubscription { // eu.vendeli.rethis.types.common/ChannelSubscription|null[0] - constructor (kotlin/String, eu.vendeli.rethis.types.core/ReThisExceptionHandler? = ..., eu.vendeli.rethis.types.core/SubscriptionHandler) // eu.vendeli.rethis.types.common/ChannelSubscription.|(kotlin.String;eu.vendeli.rethis.types.core.ReThisExceptionHandler?;eu.vendeli.rethis.types.core.SubscriptionHandler){}[0] + constructor (kotlin/String, eu.vendeli.rethis.types.core/SubscriptionHandler) // eu.vendeli.rethis.types.common/ChannelSubscription.|(kotlin.String;eu.vendeli.rethis.types.core.SubscriptionHandler){}[0] final val channel // eu.vendeli.rethis.types.common/ChannelSubscription.channel|{}channel[0] final fun (): kotlin/String // eu.vendeli.rethis.types.common/ChannelSubscription.channel.|(){}[0] - final val exceptionHandler // eu.vendeli.rethis.types.common/ChannelSubscription.exceptionHandler|{}exceptionHandler[0] - final fun (): eu.vendeli.rethis.types.core/ReThisExceptionHandler? // eu.vendeli.rethis.types.common/ChannelSubscription.exceptionHandler.|(){}[0] final val handler // eu.vendeli.rethis.types.common/ChannelSubscription.handler|{}handler[0] final fun (): eu.vendeli.rethis.types.core/SubscriptionHandler // eu.vendeli.rethis.types.common/ChannelSubscription.handler.|(){}[0] } @@ -413,6 +413,18 @@ final class eu.vendeli.rethis.types.common/ZPopResult { // eu.vendeli.rethis.typ final fun toString(): kotlin/String // eu.vendeli.rethis.types.common/ZPopResult.toString|toString(){}[0] } +final class eu.vendeli.rethis.types.core/ActiveSubscriptions { // eu.vendeli.rethis.types.core/ActiveSubscriptions|null[0] + constructor () // eu.vendeli.rethis.types.core/ActiveSubscriptions.|(){}[0] + + final val size // eu.vendeli.rethis.types.core/ActiveSubscriptions.size|{}size[0] + final fun (): kotlin/Int // eu.vendeli.rethis.types.core/ActiveSubscriptions.size.|(){}[0] + + final fun isActive(kotlin/String): kotlin/Boolean // eu.vendeli.rethis.types.core/ActiveSubscriptions.isActive|isActive(kotlin.String){}[0] + final fun setEventHandler(eu.vendeli.rethis.types.core/SubscriptionEventHandler) // eu.vendeli.rethis.types.core/ActiveSubscriptions.setEventHandler|setEventHandler(eu.vendeli.rethis.types.core.SubscriptionEventHandler){}[0] + final fun unsubscribe(kotlin/String): kotlin/Boolean // eu.vendeli.rethis.types.core/ActiveSubscriptions.unsubscribe|unsubscribe(kotlin.String){}[0] + final fun unsubscribeAll(): kotlin/Boolean // eu.vendeli.rethis.types.core/ActiveSubscriptions.unsubscribeAll|unsubscribeAll(){}[0] +} + final class eu.vendeli.rethis.types.core/AuthConfiguration { // eu.vendeli.rethis.types.core/AuthConfiguration|null[0] constructor (kotlin/String, kotlin/String? = ...) // eu.vendeli.rethis.types.core/AuthConfiguration.|(kotlin.String;kotlin.String?){}[0] @@ -780,7 +792,7 @@ final class eu.vendeli.rethis/ReThis { // eu.vendeli.rethis/ReThis|null[0] final val protocol // eu.vendeli.rethis/ReThis.protocol|{}protocol[0] final fun (): eu.vendeli.rethis.types.core/RespVer // eu.vendeli.rethis/ReThis.protocol.|(){}[0] final val subscriptions // eu.vendeli.rethis/ReThis.subscriptions|{}subscriptions[0] - final fun (): kotlin.collections/Map // eu.vendeli.rethis/ReThis.subscriptions.|(){}[0] + final fun (): eu.vendeli.rethis.types.core/ActiveSubscriptions // eu.vendeli.rethis/ReThis.subscriptions.|(){}[0] final fun disconnect() // eu.vendeli.rethis/ReThis.disconnect|disconnect(){}[0] final fun reconnect() // eu.vendeli.rethis/ReThis.reconnect|reconnect(){}[0] @@ -1802,7 +1814,7 @@ final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/pExpireA final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/pExpireAt(kotlin/String, kotlinx.datetime/Instant, eu.vendeli.rethis.types.options/UpdateStrategyOption? = ...): kotlin/Boolean // eu.vendeli.rethis.commands/pExpireAt|pExpireAt@eu.vendeli.rethis.ReThis(kotlin.String;kotlinx.datetime.Instant;eu.vendeli.rethis.types.options.UpdateStrategyOption?){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/pExpireTime(kotlin/String): kotlin/Long? // eu.vendeli.rethis.commands/pExpireTime|pExpireTime@eu.vendeli.rethis.ReThis(kotlin.String){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/pSubscribe(kotlin/Array...) // eu.vendeli.rethis.commands/pSubscribe|pSubscribe@eu.vendeli.rethis.ReThis(kotlin.Array...){}[0] -final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/pSubscribe(kotlin/String, eu.vendeli.rethis.types.core/ReThisExceptionHandler? = ..., eu.vendeli.rethis.types.core/SubscriptionHandler) // eu.vendeli.rethis.commands/pSubscribe|pSubscribe@eu.vendeli.rethis.ReThis(kotlin.String;eu.vendeli.rethis.types.core.ReThisExceptionHandler?;eu.vendeli.rethis.types.core.SubscriptionHandler){}[0] +final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/pSubscribe(kotlin/String, eu.vendeli.rethis.types.core/SubscriptionHandler) // eu.vendeli.rethis.commands/pSubscribe|pSubscribe@eu.vendeli.rethis.ReThis(kotlin.String;eu.vendeli.rethis.types.core.SubscriptionHandler){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/pTTL(kotlin/String): kotlin/Long? // eu.vendeli.rethis.commands/pTTL|pTTL@eu.vendeli.rethis.ReThis(kotlin.String){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/pUnsubscribe(kotlin/Array...): eu.vendeli.rethis.types.core/RType // eu.vendeli.rethis.commands/pUnsubscribe|pUnsubscribe@eu.vendeli.rethis.ReThis(kotlin.Array...){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/persist(kotlin/String): kotlin/Boolean // eu.vendeli.rethis.commands/persist|persist@eu.vendeli.rethis.ReThis(kotlin.String){}[0] @@ -1844,7 +1856,7 @@ final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/sRandMem final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/sRem(kotlin/String, kotlin/Array...): kotlin/Long // eu.vendeli.rethis.commands/sRem|sRem@eu.vendeli.rethis.ReThis(kotlin.String;kotlin.Array...){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/sScan(kotlin/String, kotlin/Long, kotlin/Array...): eu.vendeli.rethis.types.common/ScanResult // eu.vendeli.rethis.commands/sScan|sScan@eu.vendeli.rethis.ReThis(kotlin.String;kotlin.Long;kotlin.Array...){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/sSubscribe(kotlin/Array...) // eu.vendeli.rethis.commands/sSubscribe|sSubscribe@eu.vendeli.rethis.ReThis(kotlin.Array...){}[0] -final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/sSubscribe(kotlin/String, eu.vendeli.rethis.types.core/ReThisExceptionHandler? = ..., eu.vendeli.rethis.types.core/SubscriptionHandler) // eu.vendeli.rethis.commands/sSubscribe|sSubscribe@eu.vendeli.rethis.ReThis(kotlin.String;eu.vendeli.rethis.types.core.ReThisExceptionHandler?;eu.vendeli.rethis.types.core.SubscriptionHandler){}[0] +final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/sSubscribe(kotlin/String, eu.vendeli.rethis.types.core/SubscriptionHandler) // eu.vendeli.rethis.commands/sSubscribe|sSubscribe@eu.vendeli.rethis.ReThis(kotlin.String;eu.vendeli.rethis.types.core.SubscriptionHandler){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/sUnion(kotlin/Array...): kotlin.collections/Set // eu.vendeli.rethis.commands/sUnion|sUnion@eu.vendeli.rethis.ReThis(kotlin.Array...){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/sUnionStore(kotlin/String, kotlin/Array...): kotlin/Long // eu.vendeli.rethis.commands/sUnionStore|sUnionStore@eu.vendeli.rethis.ReThis(kotlin.String;kotlin.Array...){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/sUnsubscribe(kotlin/Array...): eu.vendeli.rethis.types.core/RType // eu.vendeli.rethis.commands/sUnsubscribe|sUnsubscribe@eu.vendeli.rethis.ReThis(kotlin.Array...){}[0] @@ -1862,7 +1874,7 @@ final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/sort(kot final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/sortRo(kotlin/String, kotlin/Array...): kotlin.collections/List // eu.vendeli.rethis.commands/sortRo|sortRo@eu.vendeli.rethis.ReThis(kotlin.String;kotlin.Array...){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/strlen(kotlin/String): kotlin/Long // eu.vendeli.rethis.commands/strlen|strlen@eu.vendeli.rethis.ReThis(kotlin.String){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/subscribe(kotlin/Array...) // eu.vendeli.rethis.commands/subscribe|subscribe@eu.vendeli.rethis.ReThis(kotlin.Array...){}[0] -final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/subscribe(kotlin/String, eu.vendeli.rethis.types.core/ReThisExceptionHandler? = ..., eu.vendeli.rethis.types.core/SubscriptionHandler) // eu.vendeli.rethis.commands/subscribe|subscribe@eu.vendeli.rethis.ReThis(kotlin.String;eu.vendeli.rethis.types.core.ReThisExceptionHandler?;eu.vendeli.rethis.types.core.SubscriptionHandler){}[0] +final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/subscribe(kotlin/String, eu.vendeli.rethis.types.core/SubscriptionHandler) // eu.vendeli.rethis.commands/subscribe|subscribe@eu.vendeli.rethis.ReThis(kotlin.String;eu.vendeli.rethis.types.core.SubscriptionHandler){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/touch(kotlin/Array...): kotlin/Long // eu.vendeli.rethis.commands/touch|touch@eu.vendeli.rethis.ReThis(kotlin.Array...){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/ttl(kotlin/String): kotlin/Long? // eu.vendeli.rethis.commands/ttl|ttl@eu.vendeli.rethis.ReThis(kotlin.String){}[0] final suspend fun (eu.vendeli.rethis/ReThis).eu.vendeli.rethis.commands/type(kotlin/String): kotlin/String? // eu.vendeli.rethis.commands/type|type@eu.vendeli.rethis.ReThis(kotlin.String){}[0] diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts index be4ed8da..8b70663d 100644 --- a/benchmarks/build.gradle.kts +++ b/benchmarks/build.gradle.kts @@ -1,7 +1,7 @@ plugins { kotlin("jvm") - kotlin("plugin.allopen") version "2.0.20" - id("org.jetbrains.kotlinx.benchmark") version "0.4.11" + kotlin("plugin.allopen") version "2.0.21" + id("org.jetbrains.kotlinx.benchmark") version "0.4.12" } repositories { @@ -9,7 +9,7 @@ repositories { } dependencies { - implementation("org.jetbrains.kotlinx:kotlinx-benchmark-runtime:0.4.11") + implementation("org.jetbrains.kotlinx:kotlinx-benchmark-runtime:0.4.12") implementation(project(":")) implementation("redis.clients:jedis:5.2.0") implementation("io.lettuce:lettuce-core:6.4.0.RELEASE") diff --git a/build.gradle.kts b/build.gradle.kts index f3411640..5284d1a0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -23,41 +23,35 @@ repositories { configureKotlin { sourceSets { - commonMain { - dependencies { - implementation(libs.ktor.network) - implementation(libs.kotlinx.io.core) + commonMain.dependencies { + implementation(libs.ktor.network) + implementation(libs.kotlinx.io.core) - api(libs.ktor.network.tls) - api(libs.bignum) - api(libs.coroutines.core) - api(libs.kotlinx.datetime) - } + api(libs.ktor.network.tls) + api(libs.bignum) + api(libs.coroutines.core) + api(libs.kotlinx.datetime) } - jvmTest { - dependencies { - implementation(libs.kotlin.reflect) - implementation(libs.test.kotest.junit5) - implementation(libs.test.kotest.assertions) - implementation(libs.logback) - implementation("com.redis:testcontainers-redis:1.7.0") { - exclude("commons-io", "commons-io") - exclude("org.apache.commons", "commons-compress") - exclude("com.fasterxml.woodstox", "woodstox-core") - } - implementation("commons-io:commons-io:2.14.0") - implementation("org.apache.commons:commons-compress:1.26.0") - implementation("com.fasterxml.woodstox:woodstox-core:6.5.0") + jvmTest.dependencies { + implementation(libs.kotlin.reflect) + implementation(libs.test.kotest.junit5) + implementation(libs.test.kotest.assertions) + implementation(libs.logback) + implementation("com.redis:testcontainers-redis:1.7.0") { + exclude("commons-io", "commons-io") + exclude("org.apache.commons", "commons-compress") + exclude("com.fasterxml.woodstox", "woodstox-core") } + implementation("commons-io:commons-io:2.17.0") + implementation("org.apache.commons:commons-compress:1.27.1") + implementation("com.fasterxml.woodstox:woodstox-core:7.0.0") } } } buildscript { - dependencies { - classpath(libs.dokka.base) - } + dependencies.classpath(libs.dokka.base) } tasks { @@ -74,10 +68,8 @@ tasks { } } -apiValidation { - @OptIn(ExperimentalBCVApi::class) - klib.enabled = true -} +@OptIn(ExperimentalBCVApi::class) +apiValidation.klib.enabled = true detekt { buildUponDefaultConfig = true diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 0c5d191f..7a122fda 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,21 +1,21 @@ [versions] ktor = "3.0.0" -kotlin = "2.0.20" -coroutines = "1.8.1" +kotlin = "2.0.21" +coroutines = "1.9.0" io = "0.5.4" -datetime = "0.6.0" +datetime = "0.6.1" bignum = "0.3.10" dokka = "1.9.20" kotlinter = "4.4.1" -deteKT = "1.23.6" +deteKT = "1.23.7" publisher = "0.29.0" binvalid = "0.16.3" kover = "0.8.3" kotest = "5.9.1" -logback = "1.5.8" +logback = "1.5.10" [libraries] kotlinx-io-core = { module = "org.jetbrains.kotlinx:kotlinx-io-core", version.ref = "io" } diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 249e5832..a4b76b95 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index e7de7ca0..df97d72b 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,7 @@ -#Sat Sep 21 16:36:53 MSK 2024 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.10-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-bin.zip +networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 1b6c7873..f5feea6d 100755 --- a/gradlew +++ b/gradlew @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +# SPDX-License-Identifier: Apache-2.0 +# ############################################################################## # @@ -55,7 +57,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -80,13 +82,12 @@ do esac done -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -APP_NAME="Gradle" +# This is normally unused +# shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s +' "$PWD" ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -133,22 +134,29 @@ location of your Java installation." fi else JAVACMD=java - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac case $MAX_FD in #( '' | soft) :;; #( *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -193,11 +201,15 @@ if "$cygwin" || "$msys" ; then done fi -# Collect all arguments for the java command; -# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of -# shell script including quotes and variable substitutions, so put them in -# double quotes to make sure that they get re-expanded; and -# * put everything else in single quotes, so that it's not re-expanded. + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ @@ -205,6 +217,12 @@ set -- \ org.gradle.wrapper.GradleWrapperMain \ "$@" +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + # Use "xargs" to parse quoted args. # # With -n1 it outputs one arg per line, with the quotes and backslashes removed. diff --git a/gradlew.bat b/gradlew.bat index 107acd32..9b42019c 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,89 +1,94 @@ -@rem -@rem Copyright 2015 the original author or authors. -@rem -@rem Licensed under the Apache License, Version 2.0 (the "License"); -@rem you may not use this file except in compliance with the License. -@rem You may obtain a copy of the License at -@rem -@rem https://www.apache.org/licenses/LICENSE-2.0 -@rem -@rem Unless required by applicable law or agreed to in writing, software -@rem distributed under the License is distributed on an "AS IS" BASIS, -@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -@rem See the License for the specific language governing permissions and -@rem limitations under the License. -@rem - -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Resolve any "." and ".." in APP_HOME to make it shorter. -for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto execute - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto execute - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem +@rem SPDX-License-Identifier: Apache-2.0 +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt index 166af4c7..b8ac3539 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/ReThis.kt @@ -13,7 +13,6 @@ import eu.vendeli.rethis.utils.writeRedisValue import io.ktor.network.sockets.* import io.ktor.util.logging.* import io.ktor.utils.io.* -import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.currentCoroutineContext import kotlinx.io.Buffer @@ -33,7 +32,6 @@ class ReThis( internal val logger = KtorSimpleLogger("eu.vendeli.rethis.ReThis") internal val cfg: ClientConfiguration = ClientConfiguration().apply(configurator) internal val rootJob = SupervisorJob() - internal val subscriptionHandlers = mutableMapOf() internal val connectionPool by lazy { ConnectionPool(this, address.socket).also { it.prepare() } } init { @@ -50,7 +48,7 @@ class ReThis( } } - val subscriptions: Map get() = subscriptionHandlers + val subscriptions = ActiveSubscriptions() val isDisconnected: Boolean get() = connectionPool.isEmpty fun disconnect() = connectionPool.disconnect() diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/annotations/ReThisInternal.kt b/src/commonMain/kotlin/eu/vendeli/rethis/annotations/ReThisInternal.kt index e03f5bf1..0257e5f7 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/annotations/ReThisInternal.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/annotations/ReThisInternal.kt @@ -7,7 +7,8 @@ package eu.vendeli.rethis.annotations */ @RequiresOptIn( level = RequiresOptIn.Level.WARNING, - message = "This API is internal in ReThis. It could be removed or changed without notice.", + message = "This API is internal in ReThis. " + + "It could be removed or changed without notice. Use may break internal logic, use with caution.", ) @MustBeDocumented @Target( diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/commands/PubSubCommands.kt b/src/commonMain/kotlin/eu/vendeli/rethis/commands/PubSubCommands.kt index addf60be..0134244f 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/commands/PubSubCommands.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/commands/PubSubCommands.kt @@ -8,20 +8,18 @@ import eu.vendeli.rethis.utils.registerSubscription import eu.vendeli.rethis.utils.writeArg suspend fun ReThis.pSubscribe(vararg subscription: ChannelSubscription) = subscription.forEach { - pSubscribe(it.channel, it.exceptionHandler, it.handler) + pSubscribe(it.channel, it.handler) } suspend fun ReThis.pSubscribe( pattern: String, - exceptionHandler: ReThisExceptionHandler? = null, handler: SubscriptionHandler, ) { registerSubscription( regCommand = "PSUBSCRIBE", unRegCommand = "PUNSUBSCRIBE", - exHandler = exceptionHandler, target = pattern, - messageMarker = "message", + messageMarker = "pmessage", handler = handler, ) } @@ -81,12 +79,16 @@ suspend fun ReThis.pubSubShardNumSub(vararg channel: String): List() + internal var eventHandler: SubscriptionEventHandler? = null + + val size get() = jobs.size + + fun unsubscribe(id: String): Boolean = jobs[id]?.let { + it.cancel() + jobs.remove(id) + it.isCancelled + } ?: false + + fun unsubscribeAll(): Boolean { + jobs.forEach { unsubscribe(it.key) } + jobs.clear() + return jobs.isEmpty() + } + + fun setEventHandler(eventHandler: SubscriptionEventHandler) { + this.eventHandler = eventHandler + } + + fun isActive(id: String): Boolean = jobs[id]?.isActive ?: false +} diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt index c00e2197..119c17b8 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ConnectionPool.kt @@ -81,9 +81,7 @@ internal class ConnectionPool( @OptIn(ExperimentalCoroutinesApi::class) fun disconnect() = runBlocking { logger.debug("Disconnecting from Redis") - while (!connections.isEmpty) { - connections.receive().socket.close() - } + while (!connections.isEmpty) { connections.receive().socket.close() } } } diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ReThisExceptionHandler.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ReThisExceptionHandler.kt deleted file mode 100644 index a24004b8..00000000 --- a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/ReThisExceptionHandler.kt +++ /dev/null @@ -1,5 +0,0 @@ -package eu.vendeli.rethis.types.core - -fun interface ReThisExceptionHandler { - suspend fun handle(ex: Exception) -} diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/types/core/SubscriptionEventHandler.kt b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/SubscriptionEventHandler.kt new file mode 100644 index 00000000..0f0be33e --- /dev/null +++ b/src/commonMain/kotlin/eu/vendeli/rethis/types/core/SubscriptionEventHandler.kt @@ -0,0 +1,7 @@ +package eu.vendeli.rethis.types.core + +interface SubscriptionEventHandler { + suspend fun onSubscribe(id: String, subscribedChannels: Long) + suspend fun onUnsubscribe(id: String, subscribedChannels: Long) + suspend fun onException(id: String, ex: Exception) +} diff --git a/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt b/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt index a71739ee..5c43c107 100644 --- a/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt +++ b/src/commonMain/kotlin/eu/vendeli/rethis/utils/CommonUtils.kt @@ -17,10 +17,16 @@ internal inline fun Any.cast(): T = this as T @Suppress("UNCHECKED_CAST", "NOTHING_TO_INLINE") internal inline fun Any.safeCast(): T? = this as? T +@Suppress("NOTHING_TO_INLINE") +private inline fun String?.isEqTo(other: String) = if (this != null) { + compareTo(other.lowercase()) == 0 +} else { + false +} + internal suspend inline fun ReThis.registerSubscription( regCommand: String, unRegCommand: String, - exHandler: ReThisExceptionHandler? = null, target: String, messageMarker: String, handler: SubscriptionHandler, @@ -38,23 +44,38 @@ internal suspend inline fun ReThis.registerSubscription( val input = if (msg is Push) msg.value else msg.safeCast()?.value logger.debug("Handling event in $target channel subscription") - input - ?.takeIf { - it.first().value == messageMarker && it.getOrNull(1)?.value == target - }?.also { + val inputType = input?.firstOrNull()?.value?.safeCast() + when { + inputType.isEqTo(regCommand) -> { + val targetCh = input?.getOrNull(1)?.unwrap() ?: target + val subscribers = input?.lastOrNull()?.unwrap() ?: 0L + subscriptions.eventHandler?.onSubscribe(targetCh, subscribers) + } + + inputType.isEqTo(unRegCommand) -> { + val targetCh = input?.getOrNull(1)?.unwrap()?: target + val subscribers = input?.lastOrNull()?.unwrap() ?: 0L + subscriptions.eventHandler?.onUnsubscribe(targetCh, subscribers) + subscriptions.unsubscribe(targetCh) + } + + inputType == messageMarker && input.getOrNull(1)?.value == target -> { handler.onMessage(this@registerSubscription, input.last().unwrap() ?: "") } + } delay(1) } } catch (e: Exception) { - logger.debug("Caught exception in $target channel handler") - exHandler?.handle(e) + logger.error("Caught exception in $target channel handler") + subscriptions.eventHandler?.onException(target, e) } finally { conn.output.writeBuffer(bufferValues(listOf(unRegCommand.toArg(), target.toArg()), cfg.charset)) conn.output.flush() connectionPool.release(conn) + subscriptions.unsubscribe(target) } } - subscriptionHandlers[target] = handlerJob + + subscriptions.jobs[target] = handlerJob } diff --git a/src/jvmTest/kotlin/eu/vendeli/rethis/tests/commands/PubSubCommandTest.kt b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/commands/PubSubCommandTest.kt index f151e936..b5783822 100644 --- a/src/jvmTest/kotlin/eu/vendeli/rethis/tests/commands/PubSubCommandTest.kt +++ b/src/jvmTest/kotlin/eu/vendeli/rethis/tests/commands/PubSubCommandTest.kt @@ -1,21 +1,25 @@ package eu.vendeli.rethis.tests.commands +import eu.vendeli.rethis.ReThisException import eu.vendeli.rethis.ReThisTestCtx import eu.vendeli.rethis.commands.* +import eu.vendeli.rethis.exception import eu.vendeli.rethis.types.common.PubSubNumEntry import eu.vendeli.rethis.types.core.BulkString import eu.vendeli.rethis.types.core.Int64 import eu.vendeli.rethis.types.core.Push +import eu.vendeli.rethis.types.core.SubscriptionEventHandler import io.kotest.matchers.nulls.shouldNotBeNull import io.kotest.matchers.shouldBe -import kotlinx.coroutines.cancelAndJoin +import io.kotest.matchers.throwable.shouldHaveMessage +import io.kotest.matchers.types.shouldBeTypeOf +import io.kotest.matchers.types.shouldNotBeTypeOf import kotlinx.coroutines.delay class PubSubCommandTest : ReThisTestCtx() { - private suspend fun clearSubs() { - client.subscriptions.entries.forEach { - it.value.cancelAndJoin() - } + @BeforeEach + suspend fun clearSubs() { + client.subscriptions.unsubscribeAll() } @Test @@ -23,7 +27,6 @@ class PubSubCommandTest : ReThisTestCtx() { client.subscribe("testChannel") { _, _ -> println("test") } delay(100) client.publish("testChannel", "testMessage") shouldBe 1L - clearSubs() } @Test @@ -31,7 +34,6 @@ class PubSubCommandTest : ReThisTestCtx() { client.subscribe("testChannel2") { _, _ -> println("test") } delay(100) client.pubSubChannels() shouldBe listOf("testChannel2") - clearSubs() } @Test @@ -39,7 +41,6 @@ class PubSubCommandTest : ReThisTestCtx() { client.pSubscribe("testP*") { _, _ -> println("test") } delay(100) client.pubSubNumPat() shouldBe 1L - clearSubs() } @Test @@ -100,7 +101,7 @@ class PubSubCommandTest : ReThisTestCtx() { client.pSubscribe("testPattern") { _, m -> println(m) } - client.subscriptions["testPattern"].shouldNotBeNull() + client.subscriptions.isActive("testPattern") shouldBe true } @Test @@ -108,7 +109,7 @@ class PubSubCommandTest : ReThisTestCtx() { client.sSubscribe("testShardChannel") { _, m -> println(m) } - client.subscriptions["testShardChannel"].shouldNotBeNull() + client.subscriptions.isActive("testShardChannel") shouldBe true } @Test @@ -116,6 +117,63 @@ class PubSubCommandTest : ReThisTestCtx() { client.subscribe("testChannel") { _, m -> println(m) } - client.subscriptions["testChannel"].shouldNotBeNull() + client.subscriptions.isActive("testChannel") shouldBe true + } + + @Test + suspend fun `test unsubscription command`() { + client.subscribe("testChannel") { _, m -> + println(m) + } + client.subscriptions.isActive("testChannel") shouldBe true + + client.subscriptions.unsubscribe("testChannel") shouldBe true + client.subscriptions.isActive("testChannel") shouldBe false + client.subscriptions.size shouldBe 0 + } + + @Test + suspend fun `test subscription evenHandler`() { + var onSub = 0 + var onUnsub = 0 + var caughtEx: Exception? = null + + client.subscriptions.setEventHandler( + object : SubscriptionEventHandler { + override suspend fun onSubscribe(id: String, subscribedChannels: Long) { + println("-- id $id count: $subscribedChannels") + onSub++ + } + + override suspend fun onUnsubscribe(id: String, subscribedChannels: Long) { + println("!-- id $id count: $subscribedChannels") + onUnsub++ + } + + override suspend fun onException(id: String, ex: Exception) { + caughtEx = ex + } + + }, + ) + client.subscribe("testChannel") { _, m -> + println("-----------$m") + } + client.subscriptions.isActive("testChannel") shouldBe true + + client.pSubscribe("testCh*"){ _, m -> + exception { "test" } + } + client.subscriptions.isActive("testCh*") shouldBe true + + client.unsubscribe("testChannel") + client.subscriptions.isActive("testChannel") shouldBe false + client.publish("testChannel", "test") + + delay(100) + + onSub shouldBe 2 + onUnsub shouldBe 0 + caughtEx.shouldNotBeNull().shouldBeTypeOf().shouldHaveMessage("test") } } diff --git a/src/jvmTest/resources/logback-test.xml b/src/jvmTest/resources/logback-test.xml index 71af66fd..79b69533 100644 --- a/src/jvmTest/resources/logback-test.xml +++ b/src/jvmTest/resources/logback-test.xml @@ -1,7 +1,7 @@ - %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n + %d{HH:mm:ss.SSS} [%t] %highlight([%level]) %logger{36} - %msg%n