Skip to content

Commit

Permalink
Let coroutines dispatch-method be flowable #1567
Browse files Browse the repository at this point in the history
  • Loading branch information
sokomishalov authored and mp911de committed Jan 8, 2021
1 parent ee3e70f commit d79fc24
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.lettuce.core.ExperimentalLettuceCoroutinesApi
import io.lettuce.core.output.CommandOutput
import io.lettuce.core.protocol.CommandArgs
import io.lettuce.core.protocol.ProtocolKeyword
import kotlinx.coroutines.flow.Flow

/**
* Coroutine executed commands for basic commands.
Expand Down Expand Up @@ -135,7 +136,7 @@ interface BaseRedisCoroutinesCommands<K : Any, V : Any> {
* @param <T> response type.
* @return the command response.
*/
suspend fun <T> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>): T?
fun <T : Any> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>): Flow<T>

/**
* Dispatch a command to the Redis Server. Please note the command output type must fit to the command response.
Expand All @@ -146,7 +147,7 @@ interface BaseRedisCoroutinesCommands<K : Any, V : Any> {
* @param <T> response type.
* @return the command response.
*/
suspend fun <T> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>, args: CommandArgs<K, V>): T?
fun <T : Any> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>, args: CommandArgs<K, V>): Flow<T>

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.lettuce.core.api.reactive.BaseRedisReactiveCommands
import io.lettuce.core.output.CommandOutput
import io.lettuce.core.protocol.CommandArgs
import io.lettuce.core.protocol.ProtocolKeyword
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
Expand Down Expand Up @@ -62,9 +63,9 @@ internal class BaseRedisCoroutinesCommandsImpl<K : Any, V : Any>(private val ops

override suspend fun waitForReplication(replicas: Int, timeout: Long): Long? = ops.waitForReplication(replicas, timeout).awaitFirstOrNull()

override suspend fun <T> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>): T? = ops.dispatch<T>(type, output).awaitFirstOrNull()
override fun <T : Any> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>): Flow<T> = ops.dispatch<T>(type, output).asFlow()

override suspend fun <T> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>, args: CommandArgs<K, V>): T? = ops.dispatch<T>(type, output, args).awaitFirstOrNull()
override fun <T : Any> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>, args: CommandArgs<K, V>): Flow<T> = ops.dispatch<T>(type, output, args).asFlow()

override fun isOpen(): Boolean = ops.isOpen

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.lettuce.core.output.CommandOutput
import io.lettuce.core.protocol.CommandArgs
import io.lettuce.core.protocol.ProtocolKeyword
import java.net.SocketAddress
import kotlinx.coroutines.flow.Flow

/**
* Coroutine executed commands for Redis Sentinel.
Expand Down Expand Up @@ -189,7 +190,7 @@ interface RedisSentinelCoroutinesCommands<K : Any, V : Any> {
* @return the command response.
* @since 6.0.2
*/
suspend fun <T> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>): T?
fun <T : Any> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>): Flow<T>

/**
* Dispatch a command to the Redis Server. Please note the command output type must fit to the command response.
Expand All @@ -201,7 +202,7 @@ interface RedisSentinelCoroutinesCommands<K : Any, V : Any> {
* @return the command response.
* @since 6.0.2
*/
suspend fun <T> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>, args: CommandArgs<K, V>): T?
fun <T : Any> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>, args: CommandArgs<K, V>): Flow<T>

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.lettuce.core.output.CommandOutput
import io.lettuce.core.protocol.CommandArgs
import io.lettuce.core.protocol.ProtocolKeyword
import io.lettuce.core.sentinel.api.reactive.RedisSentinelReactiveCommands
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
Expand Down Expand Up @@ -75,9 +76,9 @@ internal class RedisSentinelCoroutinesCommandsImpl<K : Any, V : Any>(private val

override suspend fun ping(): String = ops.ping().awaitLast()

override suspend fun <T> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>): T? = ops.dispatch<T>(type, output).awaitFirstOrNull()
override fun <T : Any> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>): Flow<T> = ops.dispatch<T>(type, output).asFlow()

override suspend fun <T> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>, args: CommandArgs<K, V>): T? = ops.dispatch<T>(type, output, args).awaitFirstOrNull()
override fun <T : Any> dispatch(type: ProtocolKeyword, output: CommandOutput<K, V, T>, args: CommandArgs<K, V>): Flow<T> = ops.dispatch<T>(type, output, args).asFlow()

override fun isOpen(): Boolean = ops.isOpen

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class KotlinCompilationUnitFactory {
private static final Set<String> SKIP_IMPORTS = LettuceSets.unmodifiableSet("java.util.List", "java.util.Set", "java.util.Map");
private static final Set<String> NON_SUSPENDABLE_METHODS = LettuceSets.unmodifiableSet("isOpen", "flushCommands", "setAutoFlushCommands");
private static final Set<String> SKIP_METHODS = LettuceSets.unmodifiableSet("BaseRedisCommands.reset", "getStatefulConnection");
private static final Set<String> FLOW_METHODS = LettuceSets.unmodifiableSet("geohash", "georadius", "georadiusbymember",
private static final Set<String> FLOW_METHODS = LettuceSets.unmodifiableSet("dispatch", "geohash", "georadius", "georadiusbymember",
"hgetall", "hkeys", "hmget", "hvals", "keys", "mget", "sdiff", "sinter", "smembers", "smismember", "sort", "srandmember", "sunion",
"xclaim", "xpending", "xrange", "xread", "xreadgroup", "xrevrange", "zinter", "zinterWithScores", "zpopmax", "zpopmin", "zrange",
"zrangeWithScores", "zrangebylex", "zrangebyscore", "zrangebyscoreWithScores", "zrevrange", "zrevrangeWithScores", "zrevrangebylex",
Expand Down Expand Up @@ -173,7 +173,7 @@ public void visit(MethodDeclaration method, Object arg) {
.append(extractAnnotations(method))
.append(contains(NON_SUSPENDABLE_METHODS, method) || isFlowable(method) ? "" : "suspend ")
.append("fun ")
.append(method.getTypeParameters().isNonEmpty() ? extractTypeParams(method.getTypeParameters(), null).concat(" ") : "")
.append(method.getTypeParameters().isNonEmpty() ? extractTypeParams(method.getTypeParameters(), "Any").concat(" ") : "")
.append(method.getNameAsString())
.append("(")
.append(extractParameters(method))
Expand Down Expand Up @@ -218,7 +218,7 @@ private boolean isCollection(Type type) {
}

private boolean isFlowable(MethodDeclaration method) {
return contains(FLOW_METHODS, method) && (isCollection(method.getType()) || method.getType().asString().startsWith("Map<"));
return contains(FLOW_METHODS, method);
}

private String toKotlinType(Type type, boolean isFlowable, boolean isForceNonNullable) {
Expand Down Expand Up @@ -246,7 +246,8 @@ private String toKotlinType(Type type, boolean isFlowable, boolean isForceNonNul
.asString()
.replace("List", "Flow")
.replace("Set", "Flow")
.replace("Map", "Flow");
.replace("Map", "Flow")
.replace("T", "Flow<T>");
} else {
fixedType = type
.asString()
Expand Down

0 comments on commit d79fc24

Please sign in to comment.