diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index 4d6e7585cf2..9ddac89c0f3 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -19,6 +19,7 @@ import java.util.concurrent.ExecutorService; import java.util.regex.Pattern; + /** * Constants */ @@ -246,6 +247,15 @@ public class Constants { public static final String HEARTBEAT_KEY = "heartbeat"; + public static final int HEARTBEAT_TICK = 3; + + public static final long LEAST_HEARTBEAT_TICK = 1000; + + /** + * ticks per wheel. Currently only contains two tasks, so 16 locations are enough + */ + public static final int TICKS_PER_WHEEL = 16; + public static final String HEARTBEAT_TIMEOUT_KEY = "heartbeat.timeout"; public static final String CONNECT_TIMEOUT_KEY = "connect.timeout"; diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java index a54cc70b9b4..7801ce06a47 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java @@ -367,6 +367,11 @@ public Set stop() { return worker.unprocessedTimeouts(); } + @Override + public boolean isStop() { + return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this); + } + @Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timer.java b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timer.java index 9e87059124f..881fab95a57 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timer.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/Timer.java @@ -45,4 +45,11 @@ public interface Timer { * this method */ Set stop(); + + /** + * the timer is stop + * + * @return true for stop + */ + boolean isStop(); } \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java new file mode 100644 index 00000000000..a2d6a2b8ec0 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.remoting.exchange.support.header; + +import org.apache.dubbo.common.timer.Timeout; +import org.apache.dubbo.common.timer.Timer; +import org.apache.dubbo.common.timer.TimerTask; +import org.apache.dubbo.remoting.Channel; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +/** + * AbstractTimerTask + */ +public abstract class AbstractTimerTask implements TimerTask { + + protected final ChannelProvider channelProvider; + + protected final Long tick; + + protected AbstractTimerTask(ChannelProvider channelProvider, Long tick) { + if (channelProvider == null || tick == null) { + throw new IllegalArgumentException(); + } + this.tick = tick; + this.channelProvider = channelProvider; + } + + protected Long lastRead(Channel channel) { + return (Long) channel.getAttribute( + HeaderExchangeHandler.KEY_READ_TIMESTAMP); + } + + protected Long lastWrite(Channel channel) { + return (Long) channel.getAttribute( + HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); + } + + protected Long now() { + return System.currentTimeMillis(); + } + + protected void reput(Timeout timeout, Long tick) { + if (timeout == null || tick == null) { + throw new IllegalArgumentException(); + } + Timer timer = timeout.timer(); + if (timer.isStop()) { + return; + } + if (timeout.isCancelled()) { + return; + } + timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS); + } + + @Override + public void run(Timeout timeout) throws Exception { + Collection c = channelProvider.getChannels(); + for (Channel channel : c) { + if (channel.isClosed()) { + continue; + } + doTask(channel); + } + reput(timeout, tick); + } + + protected abstract void doTask(Channel channel); + + interface ChannelProvider { + Collection getChannels(); + } +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java index feac1bff7d6..66cf55ceaee 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java @@ -18,10 +18,7 @@ import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.NamedThreadFactory; -import org.apache.dubbo.remoting.Channel; +import org.apache.dubbo.common.timer.HashedWheelTimer; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.Client; import org.apache.dubbo.remoting.RemotingException; @@ -31,10 +28,7 @@ import org.apache.dubbo.remoting.exchange.ResponseFuture; import java.net.InetSocketAddress; -import java.util.Collection; import java.util.Collections; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -42,17 +36,14 @@ */ public class HeaderExchangeClient implements ExchangeClient { - private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeClient.class); - - private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true)); private final Client client; private final ExchangeChannel channel; - // heartbeat timer - private ScheduledFuture heartbeatTimer; // heartbeat(ms), default value is 0 , won't execute a heartbeat. private int heartbeat; private int heartbeatTimeout; + private HashedWheelTimer heartbeatTimer; + public HeaderExchangeClient(Client client, boolean needHeartbeat) { if (client == null) { throw new IllegalArgumentException("client == null"); @@ -65,7 +56,12 @@ public HeaderExchangeClient(Client client, boolean needHeartbeat) { if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } + if (needHeartbeat) { + long heartbeatTick = calcLeastTick(heartbeat); + + // use heartbeatTick as every tick. + heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL); startHeartbeatTimer(); } } @@ -181,37 +177,40 @@ public boolean hasAttribute(String key) { } private void startHeartbeatTimer() { - stopHeartbeatTimer(); - if (heartbeat > 0) { - heartbeatTimer = scheduled.scheduleWithFixedDelay( - new HeartBeatTask(new HeartBeatTask.ChannelProvider() { - @Override - public Collection getChannels() { - return Collections.singletonList(HeaderExchangeClient.this); - } - }, heartbeat, heartbeatTimeout), - heartbeat, heartbeat, TimeUnit.MILLISECONDS); - } + AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); + + long heartbeatTick = calcLeastTick(heartbeat); + long heartbeatTimeoutTick = calcLeastTick(heartbeatTimeout); + HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); + ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout); + + // init task and start timer. + heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); + heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); } private void stopHeartbeatTimer() { - if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) { - try { - heartbeatTimer.cancel(true); - scheduled.purge(); - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn(e.getMessage(), e); - } - } + if (heartbeatTimer != null) { + heartbeatTimer.stop(); + heartbeatTimer = null; } - heartbeatTimer = null; } private void doClose() { stopHeartbeatTimer(); } + /** + * Each interval cannot be less than 1000ms. + */ + private long calcLeastTick(int time) { + if (time / Constants.HEARTBEAT_TICK <= 0) { + return Constants.LEAST_HEARTBEAT_TICK; + } else { + return time / Constants.HEARTBEAT_TICK; + } + } + @Override public String toString() { return "HeaderExchangeClient [channel=" + channel + "]"; diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java index 8eceb182e4b..d091b8f45a7 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java @@ -21,7 +21,7 @@ import org.apache.dubbo.common.Version; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.timer.HashedWheelTimer; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; @@ -34,9 +34,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,18 +44,14 @@ public class HeaderExchangeServer implements ExchangeServer { protected final Logger logger = LoggerFactory.getLogger(getClass()); - private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, - new NamedThreadFactory( - "dubbo-remoting-server-heartbeat", - true)); private final Server server; - // heartbeat timer - private ScheduledFuture heartbeatTimer; // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat. private int heartbeat; private int heartbeatTimeout; private AtomicBoolean closed = new AtomicBoolean(false); + private HashedWheelTimer heartbeatTimer; + public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); @@ -69,6 +62,10 @@ public HeaderExchangeServer(Server server) { if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } + + long heartbeatTick = calcLeastTick(heartbeat); + // use heartbeatTick as every tick. + heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL); startHeartbeatTimer(); } @@ -153,11 +150,6 @@ private void doClose() { return; } stopHeartbeatTimer(); - try { - scheduled.shutdown(); - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } } @Override @@ -223,6 +215,12 @@ public void reset(URL url) { if (h != heartbeat || t != heartbeatTimeout) { heartbeat = h; heartbeatTimeout = t; + + stopHeartbeatTimer(); + + long heartbeatTick = calcLeastTick(heartbeat); + // use heartbeatTick as every tick. + heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL); startHeartbeatTimer(); } } @@ -254,29 +252,32 @@ public void send(Object message, boolean sent) throws RemotingException { } private void startHeartbeatTimer() { - stopHeartbeatTimer(); - if (heartbeat > 0) { - heartbeatTimer = scheduled.scheduleWithFixedDelay( - new HeartBeatTask(new HeartBeatTask.ChannelProvider() { - @Override - public Collection getChannels() { - return Collections.unmodifiableCollection( - HeaderExchangeServer.this.getChannels()); - } - }, heartbeat, heartbeatTimeout), - heartbeat, heartbeat, TimeUnit.MILLISECONDS); + AbstractTimerTask.ChannelProvider cp = () -> Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels()); + + long heartbeatTick = calcLeastTick(heartbeat); + long heartbeatTimeoutTick = calcLeastTick(heartbeatTimeout); + HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); + ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout); + + // init task and start timer. + heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); + heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); + } + + /** + * Each interval cannot be less than 1000ms. + */ + private long calcLeastTick(int time) { + if (time / Constants.HEARTBEAT_TICK <= 0) { + return Constants.LEAST_HEARTBEAT_TICK; + } else { + return time / Constants.HEARTBEAT_TICK; } } private void stopHeartbeatTimer() { - try { - ScheduledFuture timer = heartbeatTimer; - if (timer != null && !timer.isCancelled()) { - timer.cancel(true); - } - } catch (Throwable t) { - logger.warn(t.getMessage(), t); - } finally { + if (heartbeatTimer != null) { + heartbeatTimer.stop(); heartbeatTimer = null; } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTask.java deleted file mode 100644 index 97bb1811ad8..00000000000 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTask.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dubbo.remoting.exchange.support.header; - -import org.apache.dubbo.common.Version; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.remoting.Channel; -import org.apache.dubbo.remoting.Client; -import org.apache.dubbo.remoting.exchange.Request; - -import java.util.Collection; - -final class HeartBeatTask implements Runnable { - - private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); - - private ChannelProvider channelProvider; - - private int heartbeat; - - private int heartbeatTimeout; - - HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) { - this.channelProvider = provider; - this.heartbeat = heartbeat; - this.heartbeatTimeout = heartbeatTimeout; - } - - @Override - public void run() { - try { - long now = System.currentTimeMillis(); - for (Channel channel : channelProvider.getChannels()) { - if (channel.isClosed()) { - continue; - } - try { - Long lastRead = (Long) channel.getAttribute( - HeaderExchangeHandler.KEY_READ_TIMESTAMP); - Long lastWrite = (Long) channel.getAttribute( - HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); - if ((lastRead != null && now - lastRead > heartbeat) - || (lastWrite != null && now - lastWrite > heartbeat)) { - Request req = new Request(); - req.setVersion(Version.getProtocolVersion()); - req.setTwoWay(true); - req.setEvent(Request.HEARTBEAT_EVENT); - channel.send(req); - if (logger.isDebugEnabled()) { - logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() - + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); - } - } - if (lastRead != null && now - lastRead > heartbeatTimeout) { - logger.warn("Close channel " + channel - + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); - if (channel instanceof Client) { - try { - ((Client) channel).reconnect(); - } catch (Exception e) { - //do nothing - } - } else { - channel.close(); - } - } - } catch (Throwable t) { - logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); - } - } - } catch (Throwable t) { - logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t); - } - } - - interface ChannelProvider { - Collection getChannels(); - } - -} - diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java new file mode 100644 index 00000000000..a229cca4c00 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.remoting.exchange.support.header; + +import org.apache.dubbo.common.Version; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.remoting.Channel; +import org.apache.dubbo.remoting.exchange.Request; + +/** + * HeartbeatTimerTask + */ +public class HeartbeatTimerTask extends AbstractTimerTask { + + private static final Logger logger = LoggerFactory.getLogger(HeartbeatTimerTask.class); + + private final int heartbeat; + + protected HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) { + super(channelProvider, heartbeatTick); + this.heartbeat = heartbeat; + } + + @Override + protected void doTask(Channel channel) { + try { + Long lastRead = lastRead(channel); + Long lastWrite = lastWrite(channel); + if ((lastRead != null && now() - lastRead > heartbeat) + || (lastWrite != null && now() - lastWrite > heartbeat)) { + Request req = new Request(); + req.setVersion(Version.getProtocolVersion()); + req.setTwoWay(true); + req.setEvent(Request.HEARTBEAT_EVENT); + channel.send(req); + if (logger.isDebugEnabled()) { + logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); + } + } + } catch (Throwable t) { + logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); + } + } +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java new file mode 100644 index 00000000000..f6ee5c698d8 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.remoting.exchange.support.header; + +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.remoting.Channel; +import org.apache.dubbo.remoting.Client; + +/** + * ReconnectTimerTask + */ +public class ReconnectTimerTask extends AbstractTimerTask { + + private static final Logger logger = LoggerFactory.getLogger(ReconnectTimerTask.class); + + private final int heartbeatTimeout; + + protected ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int heartbeatTimeout1) { + super(channelProvider, heartbeatTimeoutTick); + this.heartbeatTimeout = heartbeatTimeout1; + } + + @Override + protected void doTask(Channel channel) { + try { + Long lastRead = lastRead(channel); + Long now = now(); + if (lastRead != null && now - lastRead > heartbeatTimeout) { + logger.warn("Close channel " + channel + + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); + if (channel instanceof Client) { + try { + ((Client) channel).reconnect(); + } catch (Exception e) { + //do nothing + } + } else { + channel.close(); + } + } + } catch (Throwable t) { + logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t); + } + } +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java index 5909a586663..b80a569f626 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java @@ -19,32 +19,30 @@ import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.HashedWheelTimer; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.exchange.Request; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; public class HeartBeatTaskTest { private URL url = URL.valueOf("dubbo://localhost:20880"); private MockChannel channel; - private HeartBeatTask task; + + private HeartbeatTimerTask heartbeatTimerTask; + private HashedWheelTimer heartbeatTimer; @Before public void setup() throws Exception { - task = new HeartBeatTask(new HeartBeatTask.ChannelProvider() { - - public Collection getChannels() { - return Collections.singletonList(channel); - } - }, 1000, 1000 * 3); + long tickDuration = 1000; + heartbeatTimer = new HashedWheelTimer(tickDuration / Constants.HEARTBEAT_TICK, TimeUnit.MILLISECONDS); channel = new MockChannel() { @@ -53,17 +51,24 @@ public URL getUrl() { return url; } }; + + AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(channel); + heartbeatTimerTask = new HeartbeatTimerTask(cp, tickDuration / Constants.HEARTBEAT_TICK, (int) tickDuration); } @Test public void testHeartBeat() throws Exception { + long now = System.currentTimeMillis(); + url = url.addParameter(Constants.DUBBO_VERSION_KEY, "2.1.1"); channel.setAttribute( - HeaderExchangeHandler.KEY_READ_TIMESTAMP, System.currentTimeMillis()); + HeaderExchangeHandler.KEY_READ_TIMESTAMP, now); channel.setAttribute( - HeaderExchangeHandler.KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); + HeaderExchangeHandler.KEY_WRITE_TIMESTAMP, now); + + heartbeatTimer.newTimeout(heartbeatTimerTask, 250, TimeUnit.MILLISECONDS); + Thread.sleep(2000L); - task.run(); List objects = channel.getSentObjects(); Assert.assertTrue(objects.size() > 0); Object obj = objects.get(0); diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java index 867854a3a16..7db1fb5743d 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java @@ -29,9 +29,8 @@ import org.apache.dubbo.remoting.exchange.ExchangeServer; import org.apache.dubbo.remoting.exchange.Exchangers; import org.apache.dubbo.remoting.transport.dispatcher.FakeChannelHandlers; - -import org.junit.Assert; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import java.util.concurrent.CompletableFuture; @@ -66,6 +65,11 @@ public void testServerHeartbeat() throws Exception { FakeChannelHandlers.setTestingChannelHandlers(); serverURL = serverURL.removeParameter(Constants.HEARTBEAT_KEY); + + // Let the client not reply to the heartbeat, and turn off automatic reconnect to simulate the client dropped. + serverURL = serverURL.addParameter(Constants.HEARTBEAT_KEY, 600 * 1000); + serverURL = serverURL.addParameter(Constants.RECONNECT_KEY, false); + client = Exchangers.connect(serverURL); Thread.sleep(10000); Assert.assertTrue(handler.disconnectCount > 0); diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientToServerTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientToServerTest.java index a2cff1dcc2b..3d4e52d022f 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientToServerTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/NettyClientToServerTest.java @@ -16,6 +16,7 @@ */ package org.apache.dubbo.remoting.transport.netty; +import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.exchange.ExchangeChannel; @@ -29,11 +30,17 @@ public class NettyClientToServerTest extends ClientToServerTest { protected ExchangeServer newServer(int port, Replier receiver) throws RemotingException { - return Exchangers.bind(URL.valueOf("exchange://localhost:" + port + "?server=netty3"), receiver); + // add heartbeat cycle to avoid unstable ut. + URL url = URL.valueOf("exchange://localhost:" + port + "?server=netty3"); + url = url.addParameter(Constants.HEARTBEAT_KEY, 600 * 1000); + return Exchangers.bind(url, receiver); } protected ExchangeChannel newClient(int port) throws RemotingException { - return Exchangers.connect(URL.valueOf("exchange://localhost:" + port + "?client=netty3&timeout=3000")); + // add heartbeat cycle to avoid unstable ut. + URL url = URL.valueOf("exchange://localhost:" + port + "?client=netty3&timeout=3000"); + url = url.addParameter(Constants.HEARTBEAT_KEY, 600 * 1000); + return Exchangers.connect(url); } } \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientToServerTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientToServerTest.java index af20985e051..1d3d5132d8a 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientToServerTest.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientToServerTest.java @@ -16,6 +16,7 @@ */ package org.apache.dubbo.remoting.transport.netty4; +import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.exchange.ExchangeChannel; @@ -29,11 +30,17 @@ public class NettyClientToServerTest extends ClientToServerTest { protected ExchangeServer newServer(int port, Replier receiver) throws RemotingException { - return Exchangers.bind(URL.valueOf("exchange://localhost:" + port + "?server=netty4"), receiver); + // add heartbeat cycle to avoid unstable ut. + URL url = URL.valueOf("exchange://localhost:" + port + "?server=netty4"); + url = url.addParameter(Constants.HEARTBEAT_KEY, 600 * 1000); + return Exchangers.bind(url, receiver); } protected ExchangeChannel newClient(int port) throws RemotingException { - return Exchangers.connect(URL.valueOf("exchange://localhost:" + port + "?client=netty4&timeout=3000")); + // add heartbeat cycle to avoid unstable ut. + URL url = URL.valueOf("exchange://localhost:" + port + "?client=netty4&timeout=3000"); + url = url.addParameter(Constants.HEARTBEAT_KEY, 600 * 1000); + return Exchangers.connect(url); } } \ No newline at end of file