Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support NettyConnector & RequestEntityProcessing.BUFFERED #5306

Merged
merged 1 commit into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.message.internal.OutboundMessageContext;
import org.glassfish.jersey.netty.connector.internal.JerseyChunkedInput;
import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter;

/**
* Netty connector implementation.
Expand Down Expand Up @@ -391,27 +391,34 @@ public void operationComplete(io.netty.util.concurrent.Future<? super Void> futu
}
};
ch.closeFuture().addListener(closeListener);
if (jerseyRequest.getLengthLong() == -1) {
HttpUtil.setTransferEncodingChunked(nettyRequest, true);
} else {
nettyRequest.headers().add(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong());

final NettyEntityWriter entityWriter = NettyEntityWriter.getInstance(jerseyRequest, ch);
switch (entityWriter.getType()) {
case CHUNKED:
HttpUtil.setTransferEncodingChunked(nettyRequest, true);
break;
case PRESET:
nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong());
break;
// case DELAYED:
// // Set later after the entity is "written"
// break;
}

// Send the HTTP request.
ch.writeAndFlush(nettyRequest);
entityWriter.writeAndFlush(nettyRequest);

final JerseyChunkedInput jerseyChunkedInput = new JerseyChunkedInput(ch);
jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
@Override
public OutputStream getOutputStream(int contentLength) throws IOException {
return jerseyChunkedInput;
return entityWriter.getOutputStream();
}
});

if (HttpUtil.isTransferEncodingChunked(nettyRequest)) {
ch.write(new HttpChunkedInput(jerseyChunkedInput));
entityWriter.write(new HttpChunkedInput(entityWriter.getChunkedInput()));
} else {
ch.write(jerseyChunkedInput);
entityWriter.write(entityWriter.getChunkedInput());
}

executorService.execute(new Runnable() {
Expand All @@ -422,19 +429,28 @@ public void run() {

try {
jerseyRequest.writeEntity();

if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) {
replaceHeaders(jerseyRequest, nettyRequest.headers()); // WriterInterceptor changes
nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, entityWriter.getLength());
entityWriter.flush();
}

} catch (IOException e) {
responseDone.completeExceptionally(e);
}
}
});

ch.flush();
if (entityWriter.getType() != NettyEntityWriter.Type.DELAYED) {
entityWriter.flush();
}
} else {
// Send the HTTP request.
ch.writeAndFlush(nettyRequest);
}

} catch (InterruptedException e) {
} catch (IOException | InterruptedException e) {
responseDone.completeExceptionally(e);
}
}
Expand Down Expand Up @@ -508,4 +524,11 @@ private static HttpHeaders setHeaders(ClientRequest jerseyRequest, HttpHeaders h
}
return headers;
}

private static HttpHeaders replaceHeaders(ClientRequest jerseyRequest, HttpHeaders headers) {
for (final Map.Entry<String, List<String>> e : jerseyRequest.getStringHeaders().entrySet()) {
headers.set(e.getKey(), e.getValue());
}
return headers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.netty.connector.internal;

import io.netty.channel.Channel;
import io.netty.handler.stream.ChunkedInput;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.ClientRequest;
import org.glassfish.jersey.client.RequestEntityProcessing;

import java.io.IOException;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.List;

/**
* The Entity Writer is used to write entity in Netty. One implementation is delayed,
* so that the complete message length can be set to Content-Length header.
*/
public interface NettyEntityWriter {

/**
* Type of the entity writer. {@code CHUNKED} is used for chunked data. {@code PRESET} is for buffered data, but the
* content length was pre-set by the customer. {@code DELAYED} is for buffered data where the content-length is unknown.
* The headers must not be written before the entity is provided by MessageBodyWriter to know the exact length.
*/
enum Type {
CHUNKED,
PRESET,
DELAYED
}

/**
* Writes the Object to the channel
* @param object object to be written
*/
void write(Object object);

/**
* Writes the Object to the channel and flush.
* @param object object to be written
*/
void writeAndFlush(Object object);

/**
* Flushes the writen objects. Can throw IOException.
* @throws IOException
*/
void flush() throws IOException;

/**
* Get the netty Chunked Input to be written.
* @return The Chunked input instance
*/
ChunkedInput getChunkedInput();

/**
* Get the {@link OutputStream} used to write an entity
* @return the OutputStream to write an entity
*/
OutputStream getOutputStream();

/**
* Get the length of the entity written to the {@link OutputStream}
* @return
*/
long getLength();

/**
* Return Type of
* @return
*/
Type getType();

static NettyEntityWriter getInstance(ClientRequest clientRequest, Channel channel) {
final long lengthLong = clientRequest.getLengthLong();
final RequestEntityProcessing entityProcessing = clientRequest.resolveProperty(
ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.class);

if ((entityProcessing == null && lengthLong == -1) || entityProcessing == RequestEntityProcessing.CHUNKED) {
return new DirectEntityWriter(channel, Type.CHUNKED);
} else if (lengthLong != -1) {
return new DirectEntityWriter(channel, Type.PRESET);
} else {
return new DelayedEntityWriter(channel, Type.DELAYED);
}
}

class DirectEntityWriter implements NettyEntityWriter {
private final Channel channel;
private final JerseyChunkedInput stream;
private final Type type;

public DirectEntityWriter(Channel channel, Type type) {
this.channel = channel;
stream = new JerseyChunkedInput(channel);
this.type = type;
}

@Override
public void write(Object object) {
channel.write(object);
}

@Override
public void writeAndFlush(Object object) {
channel.writeAndFlush(object);
}

@Override
public void flush() {
channel.flush();
}

@Override
public ChunkedInput getChunkedInput() {
return stream;
}

@Override
public OutputStream getOutputStream() {
return stream;
}

@Override
public long getLength() {
return stream.progress();
}

@Override
public Type getType() {
return type;
}
}

class DelayedEntityWriter implements NettyEntityWriter {
private final List<Runnable> delayedOps;
private final DirectEntityWriter writer;
private final DelayedOutputStream outputStream;

private boolean flushed = false;
private boolean closed = false;

public DelayedEntityWriter(Channel channel, Type type) {
this.writer = new DirectEntityWriter(channel, type);
this.delayedOps = new LinkedList<>();
this.outputStream = new DelayedOutputStream();
}


@Override
public void write(Object object) {
if (!flushed) {
delayedOps.add(() -> writer.write(object));
} else {
writer.write(object);
}
}

@Override
public void writeAndFlush(Object object) {
if (!flushed) {
delayedOps.add(() -> writer.writeAndFlush(object));
} else {
writer.writeAndFlush(object);
}
}

@Override
public void flush() throws IOException {
_flush();
if (!closed) {
closed = true;
writer.getOutputStream().close(); // Jersey automatically closes DelayedOutputStream not this one!
}
writer.flush();
}

private void _flush() throws IOException {
if (!flushed) {
flushed = true;
for (Runnable runnable : delayedOps) {
runnable.run();
}

if (outputStream.b != null) {
writer.getOutputStream().write(outputStream.b, outputStream.off, outputStream.len);
}
}
}

@Override
public ChunkedInput getChunkedInput() {
return writer.getChunkedInput();
}

@Override
public OutputStream getOutputStream() {
return outputStream;
}


@Override
public long getLength() {
return outputStream.len - outputStream.off;
}

@Override
public Type getType() {
return writer.getType();
}

private class DelayedOutputStream extends OutputStream {
private byte[] b;
private int off;
private int len;

@Override
public void write(int b) throws IOException {
write(new byte[]{(byte) (b & 0xFF)}, 0, 1);
}

@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (!flushed && this.b == null) {
this.b = b;
this.off = off;
this.len = len;
} else {
DelayedEntityWriter.this._flush();
writer.getOutputStream().write(b, off, len);
}
}
}
}
}
Loading