From bcc6150d8e45499014ddd9053c6d54054b609118 Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Tue, 30 Oct 2018 17:09:23 +0800 Subject: [PATCH] polish the code for pull request 2658: Optimize heartbeat and reconnect task (#2709) --- .../org/apache/dubbo/common/Constants.java | 12 ++++- .../support/header/AbstractTimerTask.java | 27 +++++------ .../support/header/HeaderExchangeClient.java | 22 ++++----- .../support/header/HeaderExchangeServer.java | 47 +++++++++---------- .../support/header/HeartbeatTimerTask.java | 5 +- .../support/header/ReconnectTimerTask.java | 6 +-- .../support/header/HeartBeatTaskTest.java | 4 +- 7 files changed, 64 insertions(+), 59 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index 9ddac89c0f3a..18ad19f5246e 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -247,9 +247,17 @@ public class Constants { public static final String HEARTBEAT_KEY = "heartbeat"; - public static final int HEARTBEAT_TICK = 3; + /** + * Every heartbeat duration / HEATBEAT_CHECK_TICK, check if a heartbeat should be sent. Every heartbeat timeout + * duration / HEATBEAT_CHECK_TICK, check if a connection should be closed on server side, and if reconnect on + * client side + */ + public static final int HEARTBEAT_CHECK_TICK = 3; - public static final long LEAST_HEARTBEAT_TICK = 1000; + /** + * the least heartbeat during is 1000 ms. + */ + public static final long LEAST_HEARTBEAT_DURATION = 1000; /** * ticks per wheel. Currently only contains two tasks, so 16 locations are enough diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java index a2d6a2b8ec05..003af243d860 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java @@ -30,11 +30,11 @@ */ public abstract class AbstractTimerTask implements TimerTask { - protected final ChannelProvider channelProvider; + private final ChannelProvider channelProvider; - protected final Long tick; + private final Long tick; - protected AbstractTimerTask(ChannelProvider channelProvider, Long tick) { + AbstractTimerTask(ChannelProvider channelProvider, Long tick) { if (channelProvider == null || tick == null) { throw new IllegalArgumentException(); } @@ -42,31 +42,28 @@ protected AbstractTimerTask(ChannelProvider channelProvider, Long tick) { this.channelProvider = channelProvider; } - protected Long lastRead(Channel channel) { - return (Long) channel.getAttribute( - HeaderExchangeHandler.KEY_READ_TIMESTAMP); + static Long lastRead(Channel channel) { + return (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP); } - protected Long lastWrite(Channel channel) { - return (Long) channel.getAttribute( - HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); + static Long lastWrite(Channel channel) { + return (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); } - protected Long now() { + static Long now() { return System.currentTimeMillis(); } - protected void reput(Timeout timeout, Long tick) { + private void reput(Timeout timeout, Long tick) { if (timeout == null || tick == null) { throw new IllegalArgumentException(); } + Timer timer = timeout.timer(); - if (timer.isStop()) { - return; - } - if (timeout.isCancelled()) { + if (timer.isStop() || timeout.isCancelled()) { return; } + timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java index 66cf55ceaeed..bfd2ece11c98 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java @@ -51,17 +51,17 @@ public HeaderExchangeClient(Client client, boolean needHeartbeat) { this.client = client; this.channel = new HeaderExchangeChannel(client); String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); - this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); + + this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && + dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } if (needHeartbeat) { - long heartbeatTick = calcLeastTick(heartbeat); - - // use heartbeatTick as every tick. - heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL); + long tickDuration = calculateLeastDuration(heartbeat); + heartbeatTimer = new HashedWheelTimer(tickDuration, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL); startHeartbeatTimer(); } } @@ -179,8 +179,8 @@ public boolean hasAttribute(String key) { private void startHeartbeatTimer() { AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); - long heartbeatTick = calcLeastTick(heartbeat); - long heartbeatTimeoutTick = calcLeastTick(heartbeatTimeout); + long heartbeatTick = calculateLeastDuration(heartbeat); + long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout); HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout); @@ -203,11 +203,11 @@ private void doClose() { /** * Each interval cannot be less than 1000ms. */ - private long calcLeastTick(int time) { - if (time / Constants.HEARTBEAT_TICK <= 0) { - return Constants.LEAST_HEARTBEAT_TICK; + private long calculateLeastDuration(int time) { + if (time / Constants.HEARTBEAT_CHECK_TICK <= 0) { + return Constants.LEAST_HEARTBEAT_DURATION; } else { - return time / Constants.HEARTBEAT_TICK; + return time / Constants.HEARTBEAT_CHECK_TICK; } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java index d091b8f45a75..bc7e3a72d823 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java @@ -33,10 +33,11 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static java.util.Collections.unmodifiableCollection; + /** * ExchangeServerImpl */ @@ -63,9 +64,6 @@ public HeaderExchangeServer(Server server) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } - long heartbeatTick = calcLeastTick(heartbeat); - // use heartbeatTick as every tick. - heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL); startHeartbeatTimer(); } @@ -217,10 +215,6 @@ public void reset(URL url) { heartbeatTimeout = t; stopHeartbeatTimer(); - - long heartbeatTick = calcLeastTick(heartbeat); - // use heartbeatTick as every tick. - heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL); startHeartbeatTimer(); } } @@ -238,7 +232,8 @@ public void reset(org.apache.dubbo.common.Parameters parameters) { @Override public void send(Object message) throws RemotingException { if (closed.get()) { - throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!"); + throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + + ", cause: The server " + getLocalAddress() + " is closed!"); } server.send(message); } @@ -246,16 +241,31 @@ public void send(Object message) throws RemotingException { @Override public void send(Object message, boolean sent) throws RemotingException { if (closed.get()) { - throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!"); + throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + + ", cause: The server " + getLocalAddress() + " is closed!"); } server.send(message, sent); } + /** + * Each interval cannot be less than 1000ms. + */ + private long calculateLeastDuration(int time) { + if (time / Constants.HEARTBEAT_CHECK_TICK <= 0) { + return Constants.LEAST_HEARTBEAT_DURATION; + } else { + return time / Constants.HEARTBEAT_CHECK_TICK; + } + } + private void startHeartbeatTimer() { - AbstractTimerTask.ChannelProvider cp = () -> Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels()); + long tickDuration = calculateLeastDuration(heartbeat); + heartbeatTimer = new HashedWheelTimer(tickDuration, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL); - long heartbeatTick = calcLeastTick(heartbeat); - long heartbeatTimeoutTick = calcLeastTick(heartbeatTimeout); + AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); + + long heartbeatTick = calculateLeastDuration(heartbeat); + long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout); HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout); @@ -264,17 +274,6 @@ private void startHeartbeatTimer() { heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); } - /** - * Each interval cannot be less than 1000ms. - */ - private long calcLeastTick(int time) { - if (time / Constants.HEARTBEAT_TICK <= 0) { - return Constants.LEAST_HEARTBEAT_TICK; - } else { - return time / Constants.HEARTBEAT_TICK; - } - } - private void stopHeartbeatTimer() { if (heartbeatTimer != null) { heartbeatTimer.stop(); diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java index a229cca4c00a..cbe01f85063c 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java @@ -32,7 +32,7 @@ public class HeartbeatTimerTask extends AbstractTimerTask { private final int heartbeat; - protected HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) { + HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) { super(channelProvider, heartbeatTick); this.heartbeat = heartbeat; } @@ -51,7 +51,8 @@ protected void doTask(Channel channel) { channel.send(req); if (logger.isDebugEnabled()) { logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() - + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); + + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + + heartbeat + "ms"); } } } catch (Throwable t) { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java index f6ee5c698d83..dccbe5332cd6 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java @@ -31,7 +31,7 @@ public class ReconnectTimerTask extends AbstractTimerTask { private final int heartbeatTimeout; - protected ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int heartbeatTimeout1) { + ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int heartbeatTimeout1) { super(channelProvider, heartbeatTimeoutTick); this.heartbeatTimeout = heartbeatTimeout1; } @@ -42,8 +42,8 @@ protected void doTask(Channel channel) { Long lastRead = lastRead(channel); Long now = now(); if (lastRead != null && now - lastRead > heartbeatTimeout) { - logger.warn("Close channel " + channel - + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); + logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + + heartbeatTimeout + "ms"); if (channel instanceof Client) { try { ((Client) channel).reconnect(); diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java index b80a569f6268..b302b555f5b3 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java @@ -42,7 +42,7 @@ public class HeartBeatTaskTest { @Before public void setup() throws Exception { long tickDuration = 1000; - heartbeatTimer = new HashedWheelTimer(tickDuration / Constants.HEARTBEAT_TICK, TimeUnit.MILLISECONDS); + heartbeatTimer = new HashedWheelTimer(tickDuration / Constants.HEARTBEAT_CHECK_TICK, TimeUnit.MILLISECONDS); channel = new MockChannel() { @@ -53,7 +53,7 @@ public URL getUrl() { }; AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(channel); - heartbeatTimerTask = new HeartbeatTimerTask(cp, tickDuration / Constants.HEARTBEAT_TICK, (int) tickDuration); + heartbeatTimerTask = new HeartbeatTimerTask(cp, tickDuration / Constants.HEARTBEAT_CHECK_TICK, (int) tickDuration); } @Test