Skip to content

Commit

Permalink
Remove hedged rpc call support, implement the logic in MaterRegistry …
Browse files Browse the repository at this point in the history
…directly
  • Loading branch information
Apache9 committed Apr 27, 2020
1 parent 84f2e95 commit da9980a
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 617 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -47,6 +47,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
Expand All @@ -60,6 +61,7 @@
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

/**
Expand Down Expand Up @@ -512,13 +514,6 @@ public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
}

@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
// Check HedgedRpcChannel implementation for detailed comments.
throw new UnsupportedOperationException("Hedging not supported for this implementation.");
}

private static class AbstractRpcChannel {

protected final InetSocketAddress addr;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;

import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
Expand Down Expand Up @@ -82,19 +76,6 @@ protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOEx
return new NettyRpcConnection(this, remoteId);
}

@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
Set<InetSocketAddress> addresses = new HashSet<>();
for (ServerName sn: sns) {
addresses.add(createAddr(sn));
}
return new HedgedRpcChannel(this, addresses, user, rpcTimeout,
hedgedRpcFanOut);
}

@Override
protected void closeInternal() {
if (shutdownGroupWhenClose) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;

Expand Down Expand Up @@ -82,16 +82,6 @@ BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTim
RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout)
throws IOException;

/**
* Creates a channel that can hedge request to multiple underlying channels.
* @param sns Set of servers for underlying channels.
* @param user user for the connection.
* @param rpcTimeout rpc timeout to use.
* @return A hedging rpc channel for this rpc client instance.
*/
RpcChannel createHedgedRpcChannel(final Set<ServerName> sns, final User user, int rpcTimeout)
throws IOException;

/**
* Interrupt the connections to the given server. This should be called if the server
* is known as actually dead. This will not prevent current operation to be retried, and,
Expand Down
Loading

0 comments on commit da9980a

Please sign in to comment.