Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize heartbeat and reconnect task. #2658

Merged
merged 4 commits into from
Oct 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;

/**
* Constants
*/
Expand Down Expand Up @@ -246,6 +247,15 @@ public class Constants {

public static final String HEARTBEAT_KEY = "heartbeat";

public static final int HEARTBEAT_TICK = 3;

public static final long LEAST_HEARTBEAT_TICK = 1000;

/**
* ticks per wheel. Currently only contains two tasks, so 16 locations are enough
*/
public static final int TICKS_PER_WHEEL = 16;

public static final String HEARTBEAT_TIMEOUT_KEY = "heartbeat.timeout";

public static final String CONNECT_TIMEOUT_KEY = "connect.timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,11 @@ public Set<Timeout> stop() {
return worker.unprocessedTimeouts();
}

@Override
public boolean isStop() {
return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this);
}

@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ public interface Timer {
* this method
*/
Set<Timeout> stop();

/**
* the timer is stop
*
* @return true for stop
*/
boolean isStop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.remoting.exchange.support.header;

import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.common.timer.Timer;
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.remoting.Channel;

import java.util.Collection;
import java.util.concurrent.TimeUnit;

/**
* AbstractTimerTask
*/
public abstract class AbstractTimerTask implements TimerTask {

protected final ChannelProvider channelProvider;

protected final Long tick;

protected AbstractTimerTask(ChannelProvider channelProvider, Long tick) {
if (channelProvider == null || tick == null) {
throw new IllegalArgumentException();
}
this.tick = tick;
this.channelProvider = channelProvider;
}

protected Long lastRead(Channel channel) {
return (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP);
}

protected Long lastWrite(Channel channel) {
return (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
}

protected Long now() {
return System.currentTimeMillis();
}

protected void reput(Timeout timeout, Long tick) {
if (timeout == null || tick == null) {
throw new IllegalArgumentException();
}
Timer timer = timeout.timer();
if (timer.isStop()) {
return;
}
if (timeout.isCancelled()) {
return;
}
timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
}

@Override
public void run(Timeout timeout) throws Exception {
Collection<Channel> c = channelProvider.getChannels();
for (Channel channel : c) {
if (channel.isClosed()) {
continue;
}
doTask(channel);
}
reput(timeout, tick);
}

protected abstract void doTask(Channel channel);

interface ChannelProvider {
Collection<Channel> getChannels();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Client;
import org.apache.dubbo.remoting.RemotingException;
Expand All @@ -31,28 +28,22 @@
import org.apache.dubbo.remoting.exchange.ResponseFuture;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* DefaultMessageClient
*/
public class HeaderExchangeClient implements ExchangeClient {

private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeClient.class);

private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
private final Client client;
private final ExchangeChannel channel;
// heartbeat timer
private ScheduledFuture<?> heartbeatTimer;
// heartbeat(ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;

private HashedWheelTimer heartbeatTimer;

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
Expand All @@ -65,7 +56,12 @@ public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}

if (needHeartbeat) {
long heartbeatTick = calcLeastTick(heartbeat);

// use heartbeatTick as every tick.
heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
}
}
Expand Down Expand Up @@ -181,37 +177,40 @@ public boolean hasAttribute(String key) {
}

private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);

long heartbeatTick = calcLeastTick(heartbeat);
long heartbeatTimeoutTick = calcLeastTick(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);

// init task and start timer.
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}

private void stopHeartbeatTimer() {
if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
try {
heartbeatTimer.cancel(true);
scheduled.purge();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(e.getMessage(), e);
}
}
if (heartbeatTimer != null) {
heartbeatTimer.stop();
heartbeatTimer = null;
}
heartbeatTimer = null;
}

private void doClose() {
stopHeartbeatTimer();
}

/**
* Each interval cannot be less than 1000ms.
*/
private long calcLeastTick(int time) {
if (time / Constants.HEARTBEAT_TICK <= 0) {
return Constants.LEAST_HEARTBEAT_TICK;
} else {
return time / Constants.HEARTBEAT_TICK;
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
public String toString() {
return "HeaderExchangeClient [channel=" + channel + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
Expand All @@ -34,9 +34,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -47,18 +44,14 @@ public class HeaderExchangeServer implements ExchangeServer {

protected final Logger logger = LoggerFactory.getLogger(getClass());

private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
new NamedThreadFactory(
"dubbo-remoting-server-heartbeat",
true));
private final Server server;
// heartbeat timer
private ScheduledFuture<?> heartbeatTimer;
// heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);

private HashedWheelTimer heartbeatTimer;

public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
Expand All @@ -69,6 +62,10 @@ public HeaderExchangeServer(Server server) {
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}

long heartbeatTick = calcLeastTick(heartbeat);
// use heartbeatTick as every tick.
heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
}

Expand Down Expand Up @@ -153,11 +150,6 @@ private void doClose() {
return;
}
stopHeartbeatTimer();
try {
scheduled.shutdown();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}

@Override
Expand Down Expand Up @@ -223,6 +215,12 @@ public void reset(URL url) {
if (h != heartbeat || t != heartbeatTimeout) {
heartbeat = h;
heartbeatTimeout = t;

stopHeartbeatTimer();

long heartbeatTick = calcLeastTick(heartbeat);
// use heartbeatTick as every tick.
heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
}
}
Expand Down Expand Up @@ -254,29 +252,32 @@ public void send(Object message, boolean sent) throws RemotingException {
}

private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels());
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
AbstractTimerTask.ChannelProvider cp = () -> Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());

long heartbeatTick = calcLeastTick(heartbeat);
long heartbeatTimeoutTick = calcLeastTick(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);

// init task and start timer.
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}

/**
* Each interval cannot be less than 1000ms.
*/
private long calcLeastTick(int time) {
if (time / Constants.HEARTBEAT_TICK <= 0) {
return Constants.LEAST_HEARTBEAT_TICK;
} else {
return time / Constants.HEARTBEAT_TICK;
}
}

private void stopHeartbeatTimer() {
try {
ScheduledFuture<?> timer = heartbeatTimer;
if (timer != null && !timer.isCancelled()) {
timer.cancel(true);
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
} finally {
if (heartbeatTimer != null) {
heartbeatTimer.stop();
heartbeatTimer = null;
}
}
Expand Down
Loading