Skip to content

Commit

Permalink
Revert "Close channel on handshake error with old version (elastic#56989
Browse files Browse the repository at this point in the history
)"

This reverts commit c81a189.
  • Loading branch information
DaveCTurner committed May 21, 2020
1 parent c81a189 commit 99f7115
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public int internalDecode(ReleasableBytesReference reference, Consumer<Object> f
} else {
totalNetworkSize = messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE;

Header header = readHeader(version, messageLength, reference);
Header header = readHeader(messageLength, reference);
bytesConsumed += headerBytesToRead;
if (header.isCompressed()) {
decompressor = new TransportDecompressor(recycler);
Expand Down Expand Up @@ -166,8 +166,7 @@ private int headerBytesToRead(BytesReference reference) {
}
}

// exposed for use in tests
static Header readHeader(Version version, int networkMessageSize, BytesReference bytesReference) throws IOException {
private Header readHeader(int networkMessageSize, BytesReference bytesReference) throws IOException {
try (StreamInput streamInput = bytesReference.streamInput()) {
streamInput.skip(TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
long requestId = streamInput.readLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
try {
handshaker.handleHandshake(transportChannel, requestId, stream);
} catch (Exception e) {
if (Version.CURRENT.isCompatible(header.getVersion())) {
sendErrorResponse(action, transportChannel, e);
} else {
logger.warn(new ParameterizedMessage(
"could not send error response to handshake received on [{}] using wire format version [{}], closing channel",
channel, header.getVersion()), e);
channel.close();
}
sendErrorResponse(action, transportChannel, e);
}
} else {
final TransportChannel transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,25 @@

package org.elasticsearch.transport;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -177,82 +165,4 @@ public TestResponse read(StreamInput in) throws IOException {
assertEquals(responseValue, responseCaptor.get().value);
}
}

public void testSendsErrorResponseToHandshakeFromCompatibleVersion() throws Exception {
// Nodes use their minimum compatibility version for the TCP handshake, so a node from v(major-1).x will report its version as
// v(major-2).last in the TCP handshake, with which we are not really compatible. We put extra effort into making sure that if
// successful we can respond correctly in a format this old, but we do not guarantee that we can respond correctly with an error
// response. However if the two nodes are from the same major version then we do guarantee compatibility of error responses.

final Version remoteVersion = VersionUtils.randomCompatibleVersion(random(), version);
final long requestId = randomNonNegativeLong();
final Header requestHeader = new Header(between(0, 100), requestId,
TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), remoteVersion);
final InboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader);
requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
requestHeader.headers = Tuple.tuple(org.elasticsearch.common.collect.Map.of(), org.elasticsearch.common.collect.Map.of());
handler.inboundMessage(channel, requestMessage);

final BytesReference responseBytesReference = channel.getMessageCaptor().get();
final Header responseHeader = InboundDecoder.readHeader(remoteVersion, responseBytesReference.length(), responseBytesReference);
assertTrue(responseHeader.isResponse());
assertTrue(responseHeader.isError());
}


public void testClosesChannelOnErrorInHandshakeWithIncompatibleVersion() throws Exception {
// Nodes use their minimum compatibility version for the TCP handshake, so a node from v(major-1).x will report its version as
// v(major-2).last in the TCP handshake, with which we are not really compatible. We put extra effort into making sure that if
// successful we can respond correctly in a format this old, but we do not guarantee that we can respond correctly with an error
// response so we must just close the connection on an error. To avoid the failure disappearing into a black hole we at least log
// it.

final MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"expected message",
InboundHandler.class.getCanonicalName(),
Level.WARN,
"could not send error response to handshake"));
final Logger inboundHandlerLogger = LogManager.getLogger(InboundHandler.class);
Loggers.addAppender(inboundHandlerLogger, mockAppender);

try {
final AtomicBoolean isClosed = new AtomicBoolean();
channel.addCloseListener(ActionListener.wrap(() -> assertTrue(isClosed.compareAndSet(false, true))));

final Version remoteVersion = Version.fromId(randomIntBetween(0, version.minimumCompatibilityVersion().id - 1));
final long requestId = randomNonNegativeLong();
final Header requestHeader = new Header(between(0, 100), requestId,
TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), remoteVersion);
final InboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader);
requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
requestHeader.headers = Tuple.tuple(org.elasticsearch.common.collect.Map.of(), org.elasticsearch.common.collect.Map.of());
handler.inboundMessage(channel, requestMessage);
assertTrue(isClosed.get());
assertNull(channel.getMessageCaptor().get());
mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(inboundHandlerLogger, mockAppender);
mockAppender.stop();
}
}

private static InboundMessage unreadableInboundHandshake(Version remoteVersion, Header requestHeader) {
return new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> { }) {
@Override
public StreamInput openOrGetStreamInput() {
final StreamInput streamInput = new InputStreamStreamInput(new InputStream() {
@Override
public int read() {
throw new ElasticsearchException("unreadable handshake");
}
});
streamInput.setVersion(remoteVersion);
return streamInput;
}
};
}

}

0 comments on commit 99f7115

Please sign in to comment.