Skip to content

Commit

Permalink
Extract message serialization from TcpTransport (elastic#37034)
Browse files Browse the repository at this point in the history
This commit introduces a NetworkMessage class. This class has two
subclasses - InboundMessage and OutboundMessage. These messages can
be serialized and deserialized independent of the transport. This allows
more granular testing. Additionally, the serialization mechanism is now
a simple Supplier. This builds the framework to eventually move the
serialization of transport messages to the network thread. This is the
one serialization component that is not currently performed on the
network thread (transport deserialization and http serialization and
deserialization are all on the network thread).
  • Loading branch information
Tim-Brooks authored Jan 21, 2019
1 parent f516d68 commit 21838d7
Show file tree
Hide file tree
Showing 18 changed files with 1,104 additions and 400 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ public WriteOperation createWriteOperation(SocketChannelContext context, Object
return new FlushReadyWrite(context, (ByteBuffer[]) message, listener);
}

@Override
public List<FlushOperation> writeToBytes(WriteOperation writeOperation) {
assert writeOperation instanceof FlushReadyWrite : "Write operation must be flush ready";
return Collections.singletonList((FlushReadyWrite) writeOperation);
}

@Override
public List<FlushOperation> pollFlushOperations() {
return EMPTY_LIST;
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.InboundMessage;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportSettings;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -111,13 +111,9 @@ public ExceptionThrowingNetty4Transport(
}

@Override
protected String handleRequest(TcpChannel channel, String profileName,
StreamInput stream, long requestId, int messageLengthBytes, Version version,
InetSocketAddress remoteAddress, byte status) throws IOException {
String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version,
remoteAddress, status);
protected void handleRequest(TcpChannel channel, InboundMessage.RequestMessage request, int messageLengthBytes) throws IOException {
super.handleRequest(channel, request, messageLengthBytes);
channelProfileName = TransportSettings.DEFAULT_PROFILE;
return action;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.InboundMessage;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportSettings;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -113,13 +113,9 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
}

@Override
protected String handleRequest(TcpChannel channel, String profileName,
StreamInput stream, long requestId, int messageLengthBytes, Version version,
InetSocketAddress remoteAddress, byte status) throws IOException {
String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version,
remoteAddress, status);
protected void handleRequest(TcpChannel channel, InboundMessage.RequestMessage request, int messageLengthBytes) throws IOException {
super.handleRequest(channel, request, messageLengthBytes);
channelProfileName = TransportSettings.DEFAULT_PROFILE;
return action;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
* written to this stream. If compression is enabled, the proper EOS bytes will be written at that point.
* The underlying {@link BytesReference} will be returned.
*
* {@link CompressibleBytesOutputStream#close()} should be called when the bytes are no longer needed and
* can be safely released.
* {@link CompressibleBytesOutputStream#close()} will NOT close the underlying stream. The byte stream passed
* in the constructor must be closed individually.
*/
final class CompressibleBytesOutputStream extends StreamOutput {

Expand Down Expand Up @@ -92,12 +92,9 @@ public void flush() throws IOException {

@Override
public void close() throws IOException {
if (stream == bytesStreamOutput) {
assert shouldCompress == false : "If the streams are the same we should not be compressing";
IOUtils.close(stream);
} else {
if (stream != bytesStreamOutput) {
assert shouldCompress : "If the streams are different we should be compressing";
IOUtils.close(stream, bytesStreamOutput);
IOUtils.close(stream);
}
}

Expand Down
168 changes: 168 additions & 0 deletions server/src/main/java/org/elasticsearch/transport/InboundMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotCompressedException;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;

public abstract class InboundMessage extends NetworkMessage implements Closeable {

private final StreamInput streamInput;

InboundMessage(ThreadContext threadContext, Version version, byte status, long requestId, StreamInput streamInput) {
super(threadContext, version, status, requestId);
this.streamInput = streamInput;
}

StreamInput getStreamInput() {
return streamInput;
}

static class Reader {

private final Version version;
private final NamedWriteableRegistry namedWriteableRegistry;
private final ThreadContext threadContext;

Reader(Version version, NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
this.version = version;
this.namedWriteableRegistry = namedWriteableRegistry;
this.threadContext = threadContext;
}

InboundMessage deserialize(BytesReference reference) throws IOException {
int messageLengthBytes = reference.length();
final int totalMessageSize = messageLengthBytes + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
// we have additional bytes to read, outside of the header
boolean hasMessageBytesToRead = (totalMessageSize - TcpHeader.HEADER_SIZE) > 0;
StreamInput streamInput = reference.streamInput();
boolean success = false;
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
long requestId = streamInput.readLong();
byte status = streamInput.readByte();
Version remoteVersion = Version.fromId(streamInput.readInt());
final boolean isHandshake = TransportStatus.isHandshake(status);
ensureVersionCompatibility(remoteVersion, version, isHandshake);
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && streamInput.available() > 0) {
Compressor compressor;
try {
final int bytesConsumed = TcpHeader.REQUEST_ID_SIZE + TcpHeader.STATUS_SIZE + TcpHeader.VERSION_ID_SIZE;
compressor = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed));
} catch (NotCompressedException ex) {
int maxToRead = Math.min(reference.length(), 10);
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [")
.append(maxToRead).append("] content bytes out of [").append(reference.length())
.append("] readable bytes with message size [").append(messageLengthBytes).append("] ").append("] are [");
for (int i = 0; i < maxToRead; i++) {
sb.append(reference.get(i)).append(",");
}
sb.append("]");
throw new IllegalStateException(sb.toString());
}
streamInput = compressor.streamInput(streamInput);
}
streamInput = new NamedWriteableAwareStreamInput(streamInput, namedWriteableRegistry);
streamInput.setVersion(remoteVersion);

threadContext.readHeaders(streamInput);

InboundMessage message;
if (TransportStatus.isRequest(status)) {
final Set<String> features;
if (remoteVersion.onOrAfter(Version.V_6_3_0)) {
features = Collections.unmodifiableSet(new TreeSet<>(Arrays.asList(streamInput.readStringArray())));
} else {
features = Collections.emptySet();
}
final String action = streamInput.readString();
message = new RequestMessage(threadContext, remoteVersion, status, requestId, action, features, streamInput);
} else {
message = new ResponseMessage(threadContext, remoteVersion, status, requestId, streamInput);
}
success = true;
return message;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(streamInput);
}
}
}
}

@Override
public void close() throws IOException {
streamInput.close();
}

private static void ensureVersionCompatibility(Version version, Version currentVersion, boolean isHandshake) {
// for handshakes we are compatible with N-2 since otherwise we can't figure out our initial version
// since we are compatible with N-1 and N+1 so we always send our minCompatVersion as the initial version in the
// handshake. This looks odd but it's required to establish the connection correctly we check for real compatibility
// once the connection is established
final Version compatibilityVersion = isHandshake ? currentVersion.minimumCompatibilityVersion() : currentVersion;
if (version.isCompatible(compatibilityVersion) == false) {
final Version minCompatibilityVersion = isHandshake ? compatibilityVersion : compatibilityVersion.minimumCompatibilityVersion();
String msg = "Received " + (isHandshake ? "handshake " : "") + "message from unsupported version: [";
throw new IllegalStateException(msg + version + "] minimal compatible version is: [" + minCompatibilityVersion + "]");
}
}

public static class RequestMessage extends InboundMessage {

private final String actionName;
private final Set<String> features;

RequestMessage(ThreadContext threadContext, Version version, byte status, long requestId, String actionName, Set<String> features,
StreamInput streamInput) {
super(threadContext, version, status, requestId, streamInput);
this.actionName = actionName;
this.features = features;
}

String getActionName() {
return actionName;
}

Set<String> getFeatures() {
return features;
}
}

public static class ResponseMessage extends InboundMessage {

ResponseMessage(ThreadContext threadContext, Version version, byte status, long requestId, StreamInput streamInput) {
super(threadContext, version, status, requestId, streamInput);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.common.util.concurrent.ThreadContext;

/**
* Represents a transport message sent over the network. Subclasses implement serialization and
* deserialization.
*/
public abstract class NetworkMessage {

protected final Version version;
protected final ThreadContext threadContext;
protected final ThreadContext.StoredContext storedContext;
protected final long requestId;
protected final byte status;

NetworkMessage(ThreadContext threadContext, Version version, byte status, long requestId) {
this.threadContext = threadContext;
storedContext = threadContext.stashContext();
storedContext.restore();
this.version = version;
this.requestId = requestId;
this.status = status;
}

public Version getVersion() {
return version;
}

public long getRequestId() {
return requestId;
}

boolean isCompress() {
return TransportStatus.isCompress(status);
}

ThreadContext.StoredContext getStoredContext() {
return storedContext;
}

boolean isResponse() {
return TransportStatus.isRequest(status) == false;
}

boolean isRequest() {
return TransportStatus.isRequest(status);
}

boolean isHandshake() {
return TransportStatus.isHandshake(status);
}

boolean isError() {
return TransportStatus.isError(status);
}
}
Loading

0 comments on commit 21838d7

Please sign in to comment.