diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt index 3f69497e7d..d402becbfe 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommands.kt @@ -20,6 +20,7 @@ import io.lettuce.core.ExperimentalLettuceCoroutinesApi import io.lettuce.core.output.CommandOutput import io.lettuce.core.protocol.CommandArgs import io.lettuce.core.protocol.ProtocolKeyword +import kotlinx.coroutines.flow.Flow /** * Coroutine executed commands for basic commands. @@ -135,7 +136,7 @@ interface BaseRedisCoroutinesCommands { * @param response type. * @return the command response. */ - suspend fun dispatch(type: ProtocolKeyword, output: CommandOutput): T? + fun dispatch(type: ProtocolKeyword, output: CommandOutput): Flow /** * Dispatch a command to the Redis Server. Please note the command output type must fit to the command response. @@ -146,7 +147,7 @@ interface BaseRedisCoroutinesCommands { * @param response type. * @return the command response. */ - suspend fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): T? + fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): Flow /** * diff --git a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt index c5fb756a0b..c27395f85b 100644 --- a/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/api/coroutines/BaseRedisCoroutinesCommandsImpl.kt @@ -21,6 +21,7 @@ import io.lettuce.core.api.reactive.BaseRedisReactiveCommands import io.lettuce.core.output.CommandOutput import io.lettuce.core.protocol.CommandArgs import io.lettuce.core.protocol.ProtocolKeyword +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.awaitFirstOrNull @@ -62,9 +63,9 @@ internal class BaseRedisCoroutinesCommandsImpl(private val ops override suspend fun waitForReplication(replicas: Int, timeout: Long): Long? = ops.waitForReplication(replicas, timeout).awaitFirstOrNull() - override suspend fun dispatch(type: ProtocolKeyword, output: CommandOutput): T? = ops.dispatch(type, output).awaitFirstOrNull() + override fun dispatch(type: ProtocolKeyword, output: CommandOutput): Flow = ops.dispatch(type, output).asFlow() - override suspend fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): T? = ops.dispatch(type, output, args).awaitFirstOrNull() + override fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): Flow = ops.dispatch(type, output, args).asFlow() override fun isOpen(): Boolean = ops.isOpen diff --git a/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommands.kt b/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommands.kt index bed620618c..903b8762f4 100644 --- a/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommands.kt +++ b/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommands.kt @@ -22,6 +22,7 @@ import io.lettuce.core.output.CommandOutput import io.lettuce.core.protocol.CommandArgs import io.lettuce.core.protocol.ProtocolKeyword import java.net.SocketAddress +import kotlinx.coroutines.flow.Flow /** * Coroutine executed commands for Redis Sentinel. @@ -189,7 +190,7 @@ interface RedisSentinelCoroutinesCommands { * @return the command response. * @since 6.0.2 */ - suspend fun dispatch(type: ProtocolKeyword, output: CommandOutput): T? + fun dispatch(type: ProtocolKeyword, output: CommandOutput): Flow /** * Dispatch a command to the Redis Server. Please note the command output type must fit to the command response. @@ -201,7 +202,7 @@ interface RedisSentinelCoroutinesCommands { * @return the command response. * @since 6.0.2 */ - suspend fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): T? + fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): Flow /** * diff --git a/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt b/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt index 8c55afbeb5..30044486aa 100644 --- a/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt +++ b/src/main/kotlin/io/lettuce/core/sentinel/api/coroutines/RedisSentinelCoroutinesCommandsImpl.kt @@ -22,6 +22,7 @@ import io.lettuce.core.output.CommandOutput import io.lettuce.core.protocol.CommandArgs import io.lettuce.core.protocol.ProtocolKeyword import io.lettuce.core.sentinel.api.reactive.RedisSentinelReactiveCommands +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.awaitFirstOrNull @@ -75,9 +76,9 @@ internal class RedisSentinelCoroutinesCommandsImpl(private val override suspend fun ping(): String = ops.ping().awaitLast() - override suspend fun dispatch(type: ProtocolKeyword, output: CommandOutput): T? = ops.dispatch(type, output).awaitFirstOrNull() + override fun dispatch(type: ProtocolKeyword, output: CommandOutput): Flow = ops.dispatch(type, output).asFlow() - override suspend fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): T? = ops.dispatch(type, output, args).awaitFirstOrNull() + override fun dispatch(type: ProtocolKeyword, output: CommandOutput, args: CommandArgs): Flow = ops.dispatch(type, output, args).asFlow() override fun isOpen(): Boolean = ops.isOpen diff --git a/src/test/java/io/lettuce/apigenerator/KotlinCompilationUnitFactory.java b/src/test/java/io/lettuce/apigenerator/KotlinCompilationUnitFactory.java index dd88155d41..4aee5ddf48 100644 --- a/src/test/java/io/lettuce/apigenerator/KotlinCompilationUnitFactory.java +++ b/src/test/java/io/lettuce/apigenerator/KotlinCompilationUnitFactory.java @@ -54,7 +54,7 @@ class KotlinCompilationUnitFactory { private static final Set SKIP_IMPORTS = LettuceSets.unmodifiableSet("java.util.List", "java.util.Set", "java.util.Map"); private static final Set NON_SUSPENDABLE_METHODS = LettuceSets.unmodifiableSet("isOpen", "flushCommands", "setAutoFlushCommands"); private static final Set SKIP_METHODS = LettuceSets.unmodifiableSet("BaseRedisCommands.reset", "getStatefulConnection"); - private static final Set FLOW_METHODS = LettuceSets.unmodifiableSet("geohash", "georadius", "georadiusbymember", + private static final Set FLOW_METHODS = LettuceSets.unmodifiableSet("dispatch", "geohash", "georadius", "georadiusbymember", "hgetall", "hkeys", "hmget", "hvals", "keys", "mget", "sdiff", "sinter", "smembers", "smismember", "sort", "srandmember", "sunion", "xclaim", "xpending", "xrange", "xread", "xreadgroup", "xrevrange", "zinter", "zinterWithScores", "zpopmax", "zpopmin", "zrange", "zrangeWithScores", "zrangebylex", "zrangebyscore", "zrangebyscoreWithScores", "zrevrange", "zrevrangeWithScores", "zrevrangebylex", @@ -173,7 +173,7 @@ public void visit(MethodDeclaration method, Object arg) { .append(extractAnnotations(method)) .append(contains(NON_SUSPENDABLE_METHODS, method) || isFlowable(method) ? "" : "suspend ") .append("fun ") - .append(method.getTypeParameters().isNonEmpty() ? extractTypeParams(method.getTypeParameters(), null).concat(" ") : "") + .append(method.getTypeParameters().isNonEmpty() ? extractTypeParams(method.getTypeParameters(), "Any").concat(" ") : "") .append(method.getNameAsString()) .append("(") .append(extractParameters(method)) @@ -218,7 +218,7 @@ private boolean isCollection(Type type) { } private boolean isFlowable(MethodDeclaration method) { - return contains(FLOW_METHODS, method) && (isCollection(method.getType()) || method.getType().asString().startsWith("Map<")); + return contains(FLOW_METHODS, method); } private String toKotlinType(Type type, boolean isFlowable, boolean isForceNonNullable) { @@ -246,7 +246,8 @@ private String toKotlinType(Type type, boolean isFlowable, boolean isForceNonNul .asString() .replace("List", "Flow") .replace("Set", "Flow") - .replace("Map", "Flow"); + .replace("Map", "Flow") + .replace("T", "Flow"); } else { fixedType = type .asString()