Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve GOAWAY handling #3492

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*******************************************************************************
* Copyright (c) 2024 Christoph Läubrich and others.
* This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Christoph Läubrich - initial API and implementation
*******************************************************************************/
package org.eclipse.tycho.p2maven.transport;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.List;
import java.util.Map;

public interface Headers extends AutoCloseable {

String ENCODING_IDENTITY = "identity";
String HEADER_ACCEPT_ENCODING = "Accept-Encoding";
String HEADER_CONTENT_ENCODING = "Content-Encoding";
String ENCODING_GZIP = "gzip";
String ETAG_HEADER = "ETag";
String LAST_MODIFIED_HEADER = "Last-Modified";
String EXPIRES_HEADER = "Expires";
String CACHE_CONTROL_HEADER = "Cache-Control";
String MAX_AGE_DIRECTIVE = "max-age";
String MUST_REVALIDATE_DIRECTIVE = "must-revalidate";

int statusCode() throws IOException;

Map<String, List<String>> headers();

@Override
void close();

URI getURI();

String getHeader(String header);

long getLastModified();

default void checkResponseCode() throws FileNotFoundException, IOException {
int code = statusCode();
if (code >= HttpURLConnection.HTTP_BAD_REQUEST) {
if (code == HttpURLConnection.HTTP_NOT_FOUND || code == HttpURLConnection.HTTP_GONE) {
throw new FileNotFoundException(getURI().toString());
} else {
throw new java.io.IOException("Server returned HTTP code: " + code + " for URL " + getURI().toString());
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package org.eclipse.tycho.p2maven.transport;

import java.io.IOException;
import java.io.InputStream;

import org.eclipse.tycho.p2maven.transport.Response.ResponseConsumer;

public interface HttpTransport {

void setHeader(String key, String value);

Response<InputStream> get() throws IOException;
<T> T get(ResponseConsumer<T> consumer) throws IOException;

Response<Void> head() throws IOException;
Headers head() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.SocketAddress;
Expand Down Expand Up @@ -42,6 +43,7 @@
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.eclipse.tycho.p2maven.helper.ProxyHelper;
import org.eclipse.tycho.p2maven.transport.Response.ResponseConsumer;

/**
* A transport using Java11 HttpClient
Expand Down Expand Up @@ -77,7 +79,7 @@ public class Java11HttpTransportFactory implements HttpTransportFactory, Initial
@Override
public HttpTransport createTransport(URI uri) {
Java11HttpTransport transport = new Java11HttpTransport(client, clientHttp1, HttpRequest.newBuilder().uri(uri),
logger);
uri, logger);
authenticator.preemtiveAuth((k, v) -> transport.setHeader(k, v), uri);
return transport;
}
Expand All @@ -88,11 +90,13 @@ private static final class Java11HttpTransport implements HttpTransport {
private HttpClient client;
private Logger logger;
private HttpClient clientHttp1;
private URI uri;

public Java11HttpTransport(HttpClient client, HttpClient clientHttp1, Builder builder, Logger logger) {
public Java11HttpTransport(HttpClient client, HttpClient clientHttp1, Builder builder, URI uri, Logger logger) {
this.client = client;
this.clientHttp1 = clientHttp1;
this.builder = builder;
this.uri = uri;
this.logger = logger;
}

Expand All @@ -102,78 +106,106 @@ public void setHeader(String key, String value) {
}

@Override
public Response<InputStream> get() throws IOException {
public <T> T get(ResponseConsumer<T> consumer) throws IOException {
try {
HttpResponse<InputStream> response = performGet();
return new ResponseImplementation<>(response) {

@Override
public void close() {
if (response.version() == Version.HTTP_1_1) {
// discard any remaining data and close the stream to return the connection to
// the pool..
try (InputStream stream = body()) {
int discarded = 0;
while (discarded < MAX_DISCARD) {
int read = stream.read(DUMMY_BUFFER);
if (read < 0) {
break;
}
discarded += read;
}
} catch (IOException e) {
// don't care...
}
} else {
// just closing should be enough to signal to the framework...
try (InputStream stream = body()) {
} catch (IOException e) {
// don't care...
}
}
try {
return performGet(consumer, client);
} catch (IOException e) {
if (isGoaway(e)) {
logger.info("Received GOAWAY from server " + uri.getHost() + " will retry with Http/1...");
TimeUnit.SECONDS.sleep(1);
return performGet(consumer, clientHttp1);
}
};
throw e;
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}

private HttpResponse<InputStream> performGet() throws IOException, InterruptedException {
private <T> T performGet(ResponseConsumer<T> consumer, HttpClient httpClient)
throws IOException, InterruptedException {
HttpRequest request = builder.GET().timeout(Duration.ofSeconds(TIMEOUT_SECONDS)).build();
try {
return client.send(request, BodyHandlers.ofInputStream());
} catch (IOException e) {
if (isGoaway(e)) {
logger.warn("Received GOAWAY from server " + request.uri().getHost()
+ " will retry after one second with Http/1...");
TimeUnit.SECONDS.sleep(1);
return clientHttp1.send(request, BodyHandlers.ofInputStream());
HttpResponse<InputStream> response = httpClient.send(request, BodyHandlers.ofInputStream());
try (ResponseImplementation<InputStream> implementation = new ResponseImplementation<>(response) {

@Override
public void close() {
if (response.version() == Version.HTTP_1_1) {
// discard any remaining data and close the stream to return the connection to
// the pool..
try (InputStream stream = response.body()) {
int discarded = 0;
while (discarded < MAX_DISCARD) {
int read = stream.read(DUMMY_BUFFER);
if (read < 0) {
break;
}
discarded += read;
}
} catch (IOException e) {
// don't care...
}
} else {
// just closing should be enough to signal to the framework...
try (InputStream stream = response.body()) {
} catch (IOException e) {
// don't care...
}
}
}
throw e;

@Override
public void transferTo(OutputStream outputStream, ContentEncoding transportEncoding)
throws IOException {
transportEncoding.decode(response.body()).transferTo(outputStream);
}
}) {
return consumer.handleResponse(implementation);
}
}

@Override
public Response<Void> head() throws IOException {
public Response head() throws IOException {
try {
HttpResponse<Void> response = client.send(
builder.method("HEAD", BodyPublishers.noBody()).timeout(Duration.ofSeconds(TIMEOUT_SECONDS))
.build(),
BodyHandlers.discarding());
return new ResponseImplementation<>(response) {
@Override
public void close() {
// nothing...
try {
return doHead(client);
} catch (IOException e) {
if (isGoaway(e)) {
logger.debug("Received GOAWAY from server " + uri.getHost()
+ " will retry with Http/1...");
TimeUnit.SECONDS.sleep(1);
return doHead(clientHttp1);
}
};
throw e;
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}

private Response doHead(HttpClient httpClient) throws IOException, InterruptedException {
HttpResponse<Void> response = httpClient.send(
builder.method("HEAD", BodyPublishers.noBody()).timeout(Duration.ofSeconds(TIMEOUT_SECONDS))
.build(),
BodyHandlers.discarding());
return new ResponseImplementation<>(response) {
@Override
public void close() {
// nothing...
}

@Override
public void transferTo(OutputStream outputStream, ContentEncoding transportEncoding)
throws IOException {
throw new IOException("HEAD returns no body");
}
};
}

}

private static abstract class ResponseImplementation<T> implements Response<T> {
private static abstract class ResponseImplementation<T> implements Response {
private final HttpResponse<T> response;

private ResponseImplementation(HttpResponse<T> response) {
Expand All @@ -190,11 +222,6 @@ public Map<String, List<String>> headers() {
return response.headers().map();
}

@Override
public T body() {
return response.body();
}

@Override
public String getHeader(String header) {
return response.headers().firstValue(header).orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,32 @@
*******************************************************************************/
package org.eclipse.tycho.p2maven.transport;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.List;
import java.util.Map;

public interface Response<T> extends AutoCloseable {

int statusCode() throws IOException;

Map<String, List<String>> headers();

@Override
void close();

T body() throws IOException;

URI getURI();
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.GZIPInputStream;

public interface Response extends Headers {

default void transferTo(OutputStream outputStream) throws IOException {
String encoding = getHeader(Headers.HEADER_CONTENT_ENCODING);
if (Headers.ENCODING_GZIP.equals(encoding)) {
transferTo(outputStream, GZIPInputStream::new);
} else if (encoding == null || encoding.isEmpty() || Headers.ENCODING_IDENTITY.equals(encoding)) {
transferTo(outputStream, stream -> stream);
} else {
throw new IOException("Unknown content encoding: " + encoding);
}
}

String getHeader(String header);
void transferTo(OutputStream outputStream, ContentEncoding transportEncoding) throws IOException;

long getLastModified();
interface ContentEncoding {
InputStream decode(InputStream raw) throws IOException;
}

default void checkResponseCode() throws FileNotFoundException, IOException {
int code = statusCode();
if (code >= HttpURLConnection.HTTP_BAD_REQUEST) {
if (code == HttpURLConnection.HTTP_NOT_FOUND || code == HttpURLConnection.HTTP_GONE) {
throw new FileNotFoundException(getURI().toString());
} else {
throw new java.io.IOException("Server returned HTTP code: " + code + " for URL " + getURI().toString());
}
}
interface ResponseConsumer<T> {
T handleResponse(Response response) throws IOException;
}

}
Loading
Loading