From 5bf23bff676ff810c1be0c98bd73872afb34202a Mon Sep 17 00:00:00 2001 From: caojiajun Date: Wed, 8 Nov 2023 15:10:21 +0800 Subject: [PATCH] camellia-redis-proxy optimize error reply in sharding-pubsub (#160) --- .../camellia/redis/proxy/reply/ErrorReply.java | 1 + .../redis/proxy/upstream/utils/PubSubUtils.java | 17 +++++++++++++---- update-en.md | 12 ++++++++++++ update-zh.md | 12 ++++++++++++ 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java index d9a5b4248..22dfe2998 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/reply/ErrorReply.java @@ -36,6 +36,7 @@ public class ErrorReply implements Reply { public static final ErrorReply REDIS_CLUSTER_ASK_ERROR = new ErrorReply("ERR redis cluster ASK error"); public static final ErrorReply SYNTAX_ERROR = new ErrorReply(Utils.syntaxError); public static final ErrorReply TOO_MANY_CLIENTS = new ErrorReply("ERR max number of clients reached"); + public static final ErrorReply COMMAND_MOVED_BY_UPSTREAM_SERVER = new ErrorReply("ERR command moved by upstream server"); private static final char MARKER = Marker.ErrorReply.getMarker(); private final String error; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/utils/PubSubUtils.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/utils/PubSubUtils.java index c1927fd1d..f1fcd1189 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/utils/PubSubUtils.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/utils/PubSubUtils.java @@ -7,13 +7,10 @@ import com.netease.nim.camellia.redis.proxy.monitor.ProxyMonitorCollector; import com.netease.nim.camellia.redis.proxy.monitor.UpstreamFailMonitor; import com.netease.nim.camellia.redis.proxy.netty.ChannelInfo; +import com.netease.nim.camellia.redis.proxy.reply.*; import com.netease.nim.camellia.redis.proxy.upstream.connection.RedisConnection; import com.netease.nim.camellia.redis.proxy.plugin.converter.KeyConverter; import com.netease.nim.camellia.redis.proxy.enums.RedisCommand; -import com.netease.nim.camellia.redis.proxy.reply.BulkReply; -import com.netease.nim.camellia.redis.proxy.reply.IntegerReply; -import com.netease.nim.camellia.redis.proxy.reply.MultiBulkReply; -import com.netease.nim.camellia.redis.proxy.reply.Reply; import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector; import com.netease.nim.camellia.redis.proxy.util.Utils; @@ -40,6 +37,12 @@ private static void sendByBindClient(Resource resource, RedisConnection connecti CompletableFuture completableFuture = new CompletableFuture<>(); futures.add(completableFuture); completableFuture.thenAccept(reply -> { + if (reply instanceof ErrorReply) { + String error = ((ErrorReply) reply).getError(); + if (error != null && error.startsWith("MOVED")) { + reply = ErrorReply.COMMAND_MOVED_BY_UPSTREAM_SERVER; + } + } //parse reply must before send reply to connection SubscribeCount subscribeCount = tryGetSubscribeChannelCount(reply); future.complete(reply); @@ -66,6 +69,12 @@ private static void sendByBindClient(Resource resource, RedisConnection connecti if (connection.queueSize() < 8 && connection.isValid()) { sendByBindClient(resource, connection, taskQueue, null, null, false, redisCommand); } + if (reply instanceof ErrorReply) { + String error = ((ErrorReply) reply).getError(); + if (error != null && error.startsWith("MOVED")) { + reply = ErrorReply.COMMAND_MOVED_BY_UPSTREAM_SERVER; + } + } //parse reply must before send reply to connection SubscribeCount subscribeCount = tryGetSubscribeChannelCount(reply); taskQueue.reply(redisCommand, reply, false); diff --git a/update-en.md b/update-en.md index 8842ecd11..5ec296526 100644 --- a/update-en.md +++ b/update-en.md @@ -1,5 +1,17 @@ [中文版](update-zh.md) + +# 1.2.20(2023/11/xx) +### add +* none + +### update +* camellia-redis-proxy optimize error reply in sharding-pubsub + +### fix +* none + + # 1.2.19(2023/11/07) ### add * provide camellia-redis-proxy-nacos-bootstrap, so Operations Engineer can use nacos to manager camellia-redis-proxy cluster without java development diff --git a/update-zh.md b/update-zh.md index 316d2f6f3..5c355bd15 100644 --- a/update-zh.md +++ b/update-zh.md @@ -1,5 +1,17 @@ [ENGLISH](update-en.md) + +# 1.2.20(2023/11/xx) +### 新增 +* 无 + +### 更新 +* camellia-redis-proxy,当使用shard-pubsub命令时,后端可能返回`MOVED XXX`错误信息,proxy应该重写error信息,避免上游的ip被暴露给客户端 + +### fix +* 无 + + # 1.2.19(2023/11/07) ### 新增 * 新增camellia-redis-proxy-nacos-bootstrap,从而方便运维人员不需要进行java开发即可使用nacos管理proxy集群