Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass DiscoveryNode to initiateChannel #32958

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -222,7 +223,8 @@ protected ChannelHandler getClientChannelInitializer() {
static final AttributeKey<Netty4TcpServerChannel> SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel");

@Override
protected Netty4TcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> listener) throws IOException {
protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> listener) throws IOException {
InetSocketAddress address = node.getAddress().address();
ChannelFuture channelFuture = bootstrap.connect(address);
Channel channel = channelFuture.channel();
if (channel == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
Expand Down Expand Up @@ -82,7 +83,8 @@ protected NioTcpServerChannel bind(String name, InetSocketAddress address) throw
}

@Override
protected NioTcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
protected NioTcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
InetSocketAddress address = node.getAddress().address();
NioTcpChannel channel = nioGroup.openChannel(address, clientChannelFactory);
channel.addConnectListener(ActionListener.toBiConsumer(connectListener));
return channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect
try {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
connectionFutures.add(connectFuture);
TcpChannel channel = initiateChannel(node.getAddress().address(), connectFuture);
TcpChannel channel = initiateChannel(node, connectFuture);
logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel));
channels.add(channel);
} catch (Exception e) {
Expand Down Expand Up @@ -856,12 +856,12 @@ protected void serverAcceptedChannel(TcpChannel channel) {
/**
* Initiate a single tcp socket channel.
*
* @param address address for the initiated connection
* @param node for the initiated connection
* @param connectListener listener to be called when connection complete
* @return the pending connection
* @throws IOException if an I/O exception occurs while opening the channel
*/
protected abstract TcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException;
protected abstract TcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException;

/**
* Called to tear down internal resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ protected FakeChannel bind(String name, InetSocketAddress address) throws IOExce
}

@Override
protected FakeChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
protected FakeChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
return new FakeChannel(messageCaptor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.transport;

import org.elasticsearch.cli.SuppressForbidden;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -162,7 +163,8 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx

@Override
@SuppressForbidden(reason = "real socket for mocking remote connections")
protected MockChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
protected MockChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
InetSocketAddress address = node.getAddress().address();
final MockSocket socket = new MockSocket();
final MockChannel channel = new MockChannel(socket, address, "none");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -85,7 +86,8 @@ protected MockServerChannel bind(String name, InetSocketAddress address) throws
}

@Override
protected MockSocketChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
protected MockSocketChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
InetSocketAddress address = node.getAddress().address();
MockSocketChannel channel = nioGroup.openChannel(address, clientChannelFactory);
channel.addConnectListener(ActionListener.toBiConsumer(connectListener));
return channel;
Expand Down