Skip to content

Commit

Permalink
Implementing Netty-based GRPC Client Transport. Server-side will be i…
Browse files Browse the repository at this point in the history
…mplemented in a follow-up CL.

Overview:
- NettyClientTransport - this is the entry point into the client transport. This creates streams as well as the NettyClientHandler

- NettyClientStream - client stream implementation. This sends commands to the NettyClientHandler via commands.  Callbacks on the stream are made directly from the NettyClientHandler in the channel thread context.

- NettyClientHandler - A Netty HTTP/2 handler that acts as a bridge between the NettyClientStreams and Netty's HTTP/2 processing.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=70970028
  • Loading branch information
nathanmittler authored and ejona86 committed Jan 8, 2015
1 parent 0359907 commit cc7cdb1
Show file tree
Hide file tree
Showing 20 changed files with 1,774 additions and 110 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/com/google/net/stubby/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
public class Status {

public static final Status OK = new Status(Transport.Code.OK);
public static final Status CANCELLED = new Status(Transport.Code.CANCELLED);

public static Status fromThrowable(Throwable t) {
for (Throwable cause : Throwables.getCausalChain(t)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.google.net.stubby.newtransport;


/**
* Extension of {@link Stream} to support client-side termination semantics.
*/
public interface ClientStream extends Stream {

/**
* Used to abnormally terminate the stream. Any internally buffered messages are dropped. After
* this is called, no further messages may be sent and no further {@link StreamListener} callbacks
* (with the exception of onClosed) will be invoked for this stream. Any frames received for this
* stream after returning from this method will be discarded.
* Used to abnormally terminate the stream. After calling this method, no further messages will be
* sent or received, however it may still be possible to receive buffered messages for a brief
* period until {@link StreamListener#closed} is called.
*/
void cancel();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.google.net.stubby.newtransport;


/** Pre-configured factory for created {@link ClientTransport} instances. */
public interface ClientTransportFactory {
/** Create an unstarted transport for exclusive use. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.net.stubby.Operation;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import com.google.protobuf.ByteString;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
Expand All @@ -26,6 +27,7 @@ public abstract class Deframer<F> implements Framer.Sink<F> {
private boolean inFrame;
private byte currentFlags;
private int currentLength = LENGTH_NOT_SET;
private boolean statusDelivered;

public Deframer(Framer target) {
this.target = target;
Expand All @@ -34,8 +36,12 @@ public Deframer(Framer target) {
@Override
public void deliverFrame(F frame, boolean endOfStream) {
int remaining = internalDeliverFrame(frame);
if (endOfStream && remaining > 0) {
target.writeStatus(new Status(Transport.Code.UNKNOWN, "EOF on incomplete frame"));
if (endOfStream) {
if (remaining > 0) {
writeStatus(new Status(Transport.Code.UNKNOWN, "EOF on incomplete frame"));
} else if (!statusDelivered) {
writeStatus(Status.OK);
}
}
}

Expand Down Expand Up @@ -91,8 +97,8 @@ private int internalDeliverFrame(F frame) {
// deal with out-of-order tags etc.
Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk);
try {
target.writeContext(contextValue.getKey(),
contextValue.getValue().newInput(), currentLength);
ByteString value = contextValue.getValue();
target.writeContext(contextValue.getKey(), value.newInput(), value.size());
} finally {
currentLength = LENGTH_NOT_SET;
inFrame = false;
Expand All @@ -104,10 +110,9 @@ private int internalDeliverFrame(F frame) {
try {
if (code == null) {
// Log for unknown code
target.writeStatus(
new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
writeStatus(new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
} else {
target.writeStatus(new Status(code));
writeStatus(new Status(code));
}
} finally {
currentLength = LENGTH_NOT_SET;
Expand All @@ -121,7 +126,7 @@ private int internalDeliverFrame(F frame) {
}
} catch (IOException ioe) {
Status status = new Status(Transport.Code.UNKNOWN, ioe);
target.writeStatus(status);
writeStatus(status);
throw status.asRuntimeException();
}
}
Expand All @@ -148,6 +153,11 @@ private boolean ensure(InputStream input, int len) throws IOException {
return (input.available() >= len);
}

private void writeStatus(Status status) {
target.writeStatus(status);
statusDelivered = true;
}

/**
* Return a message of {@code len} bytes than can be read from the buffer. If sufficient
* bytes are unavailable then buffer the available bytes and return null.
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/java/com/google/net/stubby/newtransport/Framer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
public interface Framer {

/**
* Sink implemented by the transport layer to receive frames and forward them to their
* destination
* Sink implemented by the transport layer to receive frames and forward them to their destination
*/
public interface Sink<T> {
/**
* Deliver a frame via the transport.
*
* @param frame the contents of the frame to deliver
* @param endOfStream whether the frame is the last one for the GRPC stream
*/
Expand Down Expand Up @@ -47,12 +47,20 @@ public interface Sink<T> {
public void flush();

/**
* Flushes and closes the framer and releases any buffers.
* Indicates whether or not this {@link Framer} has been closed via a call to either
* {@link #close()} or {@link #dispose()}.
*/
public boolean isClosed();

/**
* Flushes and closes the framer and releases any buffers. After the {@link Framer} is closed or
* disposed, additional calls to this method will have no affect.
*/
public void close();

/**
* Closes the framer and releases any buffers, but does not flush.
* Closes the framer and releases any buffers, but does not flush. After the {@link Framer} is
* closed or disposed, additional calls to this method will have no affect.
*/
public void dispose();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.google.net.stubby.newtransport;

/**
* Constants for GRPC-over-HTTP (or HTTP/2)
*/
public final class HttpUtil {
/**
* The Content-Type header name. Defined here since it is not explicitly defined by the HTTP/2
* spec.
*/
public static final String CONTENT_TYPE_HEADER = "content-type";

/**
* Content-Type used for GRPC-over-HTTP/2.
*/
public static final String CONTENT_TYPE_PROTORPC = "application/protorpc";

/**
* The HTTP method used for GRPC requests.
*/
public static final String HTTP_METHOD = "POST";

private HttpUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public MessageFramer(Sink<ByteBuffer> sink, int maxFrameSize) {
* Sets whether compression is encouraged.
*/
public void setAllowCompression(boolean enable) {
verifyNotClosed();
framer.setAllowCompression(enable);
}

Expand All @@ -76,11 +77,13 @@ public void setAllowCompression(boolean enable) {
* @see java.util.zip.Deflater#setLevel
*/
public void setCompressionLevel(int level) {
verifyNotClosed();
framer.setCompressionLevel(level);
}

@Override
public void writePayload(InputStream message, int messageLength) {
verifyNotClosed();
try {
scratch.clear();
scratch.put(GrpcFramingUtil.PAYLOAD_FRAME);
Expand All @@ -98,6 +101,7 @@ public void writePayload(InputStream message, int messageLength) {

@Override
public void writeContext(String key, InputStream message, int messageLen) {
verifyNotClosed();
try {
scratch.clear();
scratch.put(GrpcFramingUtil.CONTEXT_VALUE_FRAME);
Expand Down Expand Up @@ -132,6 +136,7 @@ public void writeContext(String key, InputStream message, int messageLen) {

@Override
public void writeStatus(Status status) {
verifyNotClosed();
short code = (short) status.getCode().getNumber();
scratch.clear();
scratch.put(GrpcFramingUtil.STATUS_FRAME);
Expand All @@ -144,14 +149,22 @@ public void writeStatus(Status status) {

@Override
public void flush() {
verifyNotClosed();
framer.flush();
}

@Override
public boolean isClosed() {
return framer == null;
}

@Override
public void close() {
// TODO(user): Returning buffer to a pool would go here
framer.close();
framer = null;
if (!isClosed()) {
// TODO(user): Returning buffer to a pool would go here
framer.close();
framer = null;
}
}

@Override
Expand All @@ -160,6 +173,12 @@ public void dispose() {
framer = null;
}

private void verifyNotClosed() {
if (isClosed()) {
throw new IllegalStateException("Framer already closed");
}
}

/**
* Write a raw VarInt32 to the buffer
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package com.google.net.stubby.newtransport;


/**
* Utility functions for transport layer framing.
*
* Within a given transport frame we reserve the first byte to indicate the
* type of compression used for the contents of the transport frame.
* <p>Within a given transport frame we reserve the first byte to indicate the type of compression
* used for the contents of the transport frame.
*/
public class TransportFrameUtil {
public final class TransportFrameUtil {

// Compression modes (lowest order 3 bits of frame flags)
public static final byte NO_COMPRESS_FLAG = 0x0;
public static final byte FLATE_FLAG = 0x1;
public static final byte FLATE_FLAG = 0x1;
public static final byte COMPRESSION_FLAG_MASK = 0x7;

public static boolean isNotCompressed(int b) {
Expand All @@ -20,4 +21,6 @@ public static boolean isNotCompressed(int b) {
public static boolean isFlateCompressed(int b) {
return ((b & COMPRESSION_FLAG_MASK) == FLATE_FLAG);
}

private TransportFrameUtil() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.google.net.stubby.newtransport.netty;

import com.google.net.stubby.newtransport.Deframer;
import com.google.net.stubby.newtransport.Framer;
import com.google.net.stubby.newtransport.TransportFrameUtil;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;

import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteOrder;

/**
* Parse a sequence of {@link ByteBuf} instances that represent the frames of a GRPC call
*/
public class ByteBufDeframer extends Deframer<ByteBuf> {

private final CompositeByteBuf buffer;

public ByteBufDeframer(Framer target) {
this(UnpooledByteBufAllocator.DEFAULT, target);
}

public ByteBufDeframer(ByteBufAllocator alloc, Framer target) {
super(target);
buffer = alloc.compositeBuffer();
}

public void dispose() {
// Remove the components from the composite buffer. This should set the reference
// count on all buffers to zero.
buffer.removeComponents(0, buffer.numComponents());

// Release the composite buffer
buffer.release();
}

@Override
protected DataInputStream prefix(ByteBuf frame) throws IOException {
buffer.addComponent(frame);
buffer.writerIndex(buffer.writerIndex() + frame.writerIndex() - frame.readerIndex());
return new DataInputStream(new ByteBufInputStream(buffer));
}

@Override
protected int consolidate() {
buffer.consolidate();
return buffer.readableBytes();
}

@Override
protected ByteBuf decompress(ByteBuf frame) throws IOException {
frame = frame.order(ByteOrder.BIG_ENDIAN);
int compressionType = frame.readUnsignedByte();
int frameLength = frame.readUnsignedMedium();
if (frameLength != frame.readableBytes()) {
throw new IllegalArgumentException("GRPC and buffer lengths misaligned. Frame length="
+ frameLength + ", readableBytes=" + frame.readableBytes());
}
if (TransportFrameUtil.isNotCompressed(compressionType)) {
// Need to retain the frame as we may be holding it over channel events
frame.retain();
return frame;
}
throw new IOException("Unknown compression type " + compressionType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.google.net.stubby.newtransport.netty;

import com.google.common.base.Preconditions;

/**
* Command sent from a Netty client stream to the handler to cancel the stream.
*/
class CancelStreamCommand {
private final NettyClientStream stream;

CancelStreamCommand(NettyClientStream stream) {
this.stream = Preconditions.checkNotNull(stream, "stream");
}

NettyClientStream stream() {
return stream;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.google.net.stubby.newtransport.netty;

import com.google.common.base.Preconditions;
import com.google.net.stubby.MethodDescriptor;

/**
* A command to create a new stream. This is created by {@link NettyClientStream} and passed to the
* {@link NettyClientHandler} for processing in the Channel thread.
*/
class CreateStreamCommand {
final MethodDescriptor<?, ?> method;
final NettyClientStream stream;

CreateStreamCommand(MethodDescriptor<?, ?> method, NettyClientStream stream) {
this.method = Preconditions.checkNotNull(method, "method");
this.stream = Preconditions.checkNotNull(stream, "stream");
}

MethodDescriptor<?, ?> method() {
return method;
}

NettyClientStream stream() {
return stream;
}
}
Loading

0 comments on commit cc7cdb1

Please sign in to comment.