Skip to content

Commit

Permalink
Improve GOAWAY handling
Browse files Browse the repository at this point in the history
Currently GOAWAY is detected/handled only in the connection phase but
sometimes can happen in the transfer phase as well.

This now wraps the whole request inside a block that is retried if a
GOAWAY is detected.
  • Loading branch information
laeubi committed Feb 9, 2024
1 parent 500c41e commit deb31c7
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 140 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*******************************************************************************
* Copyright (c) 2024 Christoph Läubrich and others.
* This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Christoph Läubrich - initial API and implementation
*******************************************************************************/
package org.eclipse.tycho.p2maven.transport;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.List;
import java.util.Map;

public interface Headers extends AutoCloseable {

String ENCODING_IDENTITY = "identity";
String HEADER_ACCEPT_ENCODING = "Accept-Encoding";
String HEADER_CONTENT_ENCODING = "Content-Encoding";
String ENCODING_GZIP = "gzip";
String ETAG_HEADER = "ETag";
String LAST_MODIFIED_HEADER = "Last-Modified";
String EXPIRES_HEADER = "Expires";
String CACHE_CONTROL_HEADER = "Cache-Control";
String MAX_AGE_DIRECTIVE = "max-age";
String MUST_REVALIDATE_DIRECTIVE = "must-revalidate";

int statusCode() throws IOException;

Map<String, List<String>> headers();

@Override
void close();

URI getURI();

String getHeader(String header);

long getLastModified();

default void checkResponseCode() throws FileNotFoundException, IOException {
int code = statusCode();
if (code >= HttpURLConnection.HTTP_BAD_REQUEST) {
if (code == HttpURLConnection.HTTP_NOT_FOUND || code == HttpURLConnection.HTTP_GONE) {
throw new FileNotFoundException(getURI().toString());
} else {
throw new java.io.IOException("Server returned HTTP code: " + code + " for URL " + getURI().toString());
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package org.eclipse.tycho.p2maven.transport;

import java.io.IOException;
import java.io.InputStream;

import org.eclipse.tycho.p2maven.transport.Response.ResponseConsumer;

public interface HttpTransport {

void setHeader(String key, String value);

Response<InputStream> get() throws IOException;
<T> T get(ResponseConsumer<T> consumer) throws IOException;

Response<Void> head() throws IOException;
Headers head() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.SocketAddress;
Expand Down Expand Up @@ -42,6 +43,7 @@
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.eclipse.tycho.p2maven.helper.ProxyHelper;
import org.eclipse.tycho.p2maven.transport.Response.ResponseConsumer;

/**
* A transport using Java11 HttpClient
Expand Down Expand Up @@ -77,7 +79,7 @@ public class Java11HttpTransportFactory implements HttpTransportFactory, Initial
@Override
public HttpTransport createTransport(URI uri) {
Java11HttpTransport transport = new Java11HttpTransport(client, clientHttp1, HttpRequest.newBuilder().uri(uri),
logger);
uri, logger);
authenticator.preemtiveAuth((k, v) -> transport.setHeader(k, v), uri);
return transport;
}
Expand All @@ -88,11 +90,13 @@ private static final class Java11HttpTransport implements HttpTransport {
private HttpClient client;
private Logger logger;
private HttpClient clientHttp1;
private URI uri;

public Java11HttpTransport(HttpClient client, HttpClient clientHttp1, Builder builder, Logger logger) {
public Java11HttpTransport(HttpClient client, HttpClient clientHttp1, Builder builder, URI uri, Logger logger) {
this.client = client;
this.clientHttp1 = clientHttp1;
this.builder = builder;
this.uri = uri;
this.logger = logger;
}

Expand All @@ -102,78 +106,106 @@ public void setHeader(String key, String value) {
}

@Override
public Response<InputStream> get() throws IOException {
public <T> T get(ResponseConsumer<T> consumer) throws IOException {
try {
HttpResponse<InputStream> response = performGet();
return new ResponseImplementation<>(response) {

@Override
public void close() {
if (response.version() == Version.HTTP_1_1) {
// discard any remaining data and close the stream to return the connection to
// the pool..
try (InputStream stream = body()) {
int discarded = 0;
while (discarded < MAX_DISCARD) {
int read = stream.read(DUMMY_BUFFER);
if (read < 0) {
break;
}
discarded += read;
}
} catch (IOException e) {
// don't care...
}
} else {
// just closing should be enough to signal to the framework...
try (InputStream stream = body()) {
} catch (IOException e) {
// don't care...
}
}
try {
return performGet(consumer, client);
} catch (IOException e) {
if (isGoaway(e)) {
logger.info("Received GOAWAY from server " + uri.getHost() + " will retry with Http/1...");
TimeUnit.SECONDS.sleep(1);
return performGet(consumer, clientHttp1);
}
};
throw e;
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}

private HttpResponse<InputStream> performGet() throws IOException, InterruptedException {
private <T> T performGet(ResponseConsumer<T> consumer, HttpClient httpClient)
throws IOException, InterruptedException {
HttpRequest request = builder.GET().timeout(Duration.ofSeconds(TIMEOUT_SECONDS)).build();
try {
return client.send(request, BodyHandlers.ofInputStream());
} catch (IOException e) {
if (isGoaway(e)) {
logger.warn("Received GOAWAY from server " + request.uri().getHost()
+ " will retry after one second with Http/1...");
TimeUnit.SECONDS.sleep(1);
return clientHttp1.send(request, BodyHandlers.ofInputStream());
HttpResponse<InputStream> response = httpClient.send(request, BodyHandlers.ofInputStream());
try (ResponseImplementation<InputStream> implementation = new ResponseImplementation<>(response) {

@Override
public void close() {
if (response.version() == Version.HTTP_1_1) {
// discard any remaining data and close the stream to return the connection to
// the pool..
try (InputStream stream = response.body()) {
int discarded = 0;
while (discarded < MAX_DISCARD) {
int read = stream.read(DUMMY_BUFFER);
if (read < 0) {
break;
}
discarded += read;
}
} catch (IOException e) {
// don't care...
}
} else {
// just closing should be enough to signal to the framework...
try (InputStream stream = response.body()) {
} catch (IOException e) {
// don't care...
}
}
}
throw e;

@Override
public void transferTo(OutputStream outputStream, ContentEncoding transportEncoding)
throws IOException {
transportEncoding.decode(response.body()).transferTo(outputStream);
}
}) {
return consumer.handleResponse(implementation);
}
}

@Override
public Response<Void> head() throws IOException {
public Response head() throws IOException {
try {
HttpResponse<Void> response = client.send(
builder.method("HEAD", BodyPublishers.noBody()).timeout(Duration.ofSeconds(TIMEOUT_SECONDS))
.build(),
BodyHandlers.discarding());
return new ResponseImplementation<>(response) {
@Override
public void close() {
// nothing...
try {
return doHead(client);
} catch (IOException e) {
if (isGoaway(e)) {
logger.debug("Received GOAWAY from server " + uri.getHost()
+ " will retry with Http/1...");
TimeUnit.SECONDS.sleep(1);
return doHead(clientHttp1);
}
};
throw e;
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}

private Response doHead(HttpClient httpClient) throws IOException, InterruptedException {
HttpResponse<Void> response = httpClient.send(
builder.method("HEAD", BodyPublishers.noBody()).timeout(Duration.ofSeconds(TIMEOUT_SECONDS))
.build(),
BodyHandlers.discarding());
return new ResponseImplementation<>(response) {
@Override
public void close() {
// nothing...
}

@Override
public void transferTo(OutputStream outputStream, ContentEncoding transportEncoding)
throws IOException {
throw new IOException("HEAD returns no body");
}
};
}

}

private static abstract class ResponseImplementation<T> implements Response<T> {
private static abstract class ResponseImplementation<T> implements Response {
private final HttpResponse<T> response;

private ResponseImplementation(HttpResponse<T> response) {
Expand All @@ -190,11 +222,6 @@ public Map<String, List<String>> headers() {
return response.headers().map();
}

@Override
public T body() {
return response.body();
}

@Override
public String getHeader(String header) {
return response.headers().firstValue(header).orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,32 @@
*******************************************************************************/
package org.eclipse.tycho.p2maven.transport;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.List;
import java.util.Map;

public interface Response<T> extends AutoCloseable {

int statusCode() throws IOException;

Map<String, List<String>> headers();

@Override
void close();

T body() throws IOException;

URI getURI();
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.GZIPInputStream;

public interface Response extends Headers {

default void transferTo(OutputStream outputStream) throws IOException {
String encoding = getHeader(Headers.HEADER_CONTENT_ENCODING);
if (Headers.ENCODING_GZIP.equals(encoding)) {
transferTo(outputStream, GZIPInputStream::new);
} else if (encoding == null || encoding.isEmpty() || Headers.ENCODING_IDENTITY.equals(encoding)) {
transferTo(outputStream, stream -> stream);
} else {
throw new IOException("Unknown content encoding: " + encoding);
}
}

String getHeader(String header);
void transferTo(OutputStream outputStream, ContentEncoding transportEncoding) throws IOException;

long getLastModified();
interface ContentEncoding {
InputStream decode(InputStream raw) throws IOException;
}

default void checkResponseCode() throws FileNotFoundException, IOException {
int code = statusCode();
if (code >= HttpURLConnection.HTTP_BAD_REQUEST) {
if (code == HttpURLConnection.HTTP_NOT_FOUND || code == HttpURLConnection.HTTP_GONE) {
throw new FileNotFoundException(getURI().toString());
} else {
throw new java.io.IOException("Server returned HTTP code: " + code + " for URL " + getURI().toString());
}
}
interface ResponseConsumer<T> {
T handleResponse(Response response) throws IOException;
}

}
Loading

0 comments on commit deb31c7

Please sign in to comment.