Skip to content

Commit

Permalink
Merge pull request #4619 from shawkins/consumeLines
Browse files Browse the repository at this point in the history
fix #4201: moving consumeLines out of the clients
  • Loading branch information
manusa authored Nov 30, 2022
2 parents 009974f + b2456ca commit b743b2f
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,6 @@ public DerivedClientBuilder newBuilder() {
return this.builder.copy(this);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, AsyncBody.Consumer<String> consumer) {
return sendAsync(request, () -> {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<>(r.response, r.asyncBody));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request,
AsyncBody.Consumer<List<ByteBuffer>> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.eclipse.jetty.websocket.client.WebSocketClient;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -76,29 +75,6 @@ public DerivedClientBuilder newBuilder() {
return builder.copy(this);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(
HttpRequest originalRequest, AsyncBody.Consumer<String> consumer) {
final var request = toStandardHttpRequest(originalRequest);
final var future = new JettyAsyncResponseListener(request) {

final StringBuilder builder = new StringBuilder();

@Override
protected void onContent(ByteBuffer content) throws Exception {
for (char c : StandardCharsets.UTF_8.decode(content).array()) {
if (c == '\n') {
consumer.consume(builder.toString(), this);
builder.setLength(0);
} else {
builder.append(c);
}
}
}
}.listen(newRequest(request));
return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(
HttpRequest originalRequest, AsyncBody.Consumer<List<ByteBuffer>> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,21 +214,6 @@ public DerivedClientBuilder newBuilder() {
return new OkHttpClientBuilderImpl(httpClient.newBuilder(), this.factory, this.config);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(
HttpRequest request, AsyncBody.Consumer<String> consumer) {
Function<BufferedSource, AsyncBody> handler = s -> new OkHttpAsyncBody<String>(consumer, s) {
@Override
protected String process(BufferedSource source) throws IOException {
// this should probably be strict instead
// when non-strict if no newline is present, this will create a truncated string from
// what is available. However as strict it will be blocking.
return source.readUtf8Line();
}
};
return sendAsync(request, handler);
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(
HttpRequest request, AsyncBody.Consumer<List<ByteBuffer>> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -184,41 +182,6 @@ void testAsyncBody() throws Exception {
assertEquals(byteCount, consumed.get(10, TimeUnit.SECONDS));
}

@Test
void testConsumeLines() throws Exception {
server.expect().withPath("/async").andReturn(200, "hello\nworld\nlines\n").always();

ArrayList<String> strings = new ArrayList<>();
CompletableFuture<Void> consumed = new CompletableFuture<>();

CompletableFuture<HttpResponse<AsyncBody>> responseFuture = client.getHttpClient().consumeLines(
client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async"))
.build(),
(value, asyncBody) -> {
strings.add(value);
asyncBody.consume();
});

responseFuture.whenComplete((r, t) -> {
if (t != null) {
consumed.completeExceptionally(t);
}
if (r != null) {
r.body().consume();
r.body().done().whenComplete((v, ex) -> {
if (ex != null) {
consumed.completeExceptionally(ex);
} else {
consumed.complete(null);
}
});
}
});

consumed.get(10, TimeUnit.SECONDS);
assertEquals(Arrays.asList("hello", "world", "lines"), strings);
}

@DisplayName("Supported response body types")
@ParameterizedTest(name = "{index}: {0}")
@ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.fabric8.kubernetes.client.RequestConfig;
import io.fabric8.kubernetes.client.utils.HttpClientUtils;

import java.io.BufferedReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -179,16 +178,6 @@ default <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Cl
return HttpResponse.SupportedResponses.from(type).sendAsync(request, this);
}

/**
* Send a request and consume the lines of the response body using the same logic as {@link BufferedReader} to
* break up the lines.
*
* @param request the HttpRequest to send
* @param consumer the response body consumer
* @return the future which will be ready after the headers have been read
*/
CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, AsyncBody.Consumer<String> consumer);

/**
* Send a request and consume the bytes of the resulting response body
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,10 @@

import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

public abstract class AbstractAsyncBodyTest {

Expand All @@ -50,72 +45,6 @@ static void afterAll() {

protected abstract HttpClient.Factory getHttpClientFactory();

@Test
@DisplayName("Lines are processed and consumed only after the consume() invocation")
public void consumeLinesProcessedAfterConsume() throws Exception {
try (final HttpClient client = getHttpClientFactory().newBuilder().build()) {
server.expect().withPath("/consume-lines")
.andReturn(200, "This is the response body\n")
.always();
final StringBuffer responseText = new StringBuffer();
final HttpResponse<AsyncBody> asyncBodyResponse = client.consumeLines(
client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(),
(value, asyncBody) -> {
responseText.append(value);
asyncBody.consume();
})
.get(10L, TimeUnit.SECONDS);
assertThat(responseText).isEmpty();
asyncBodyResponse.body().consume();
asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS);
assertThat(responseText).contains("This is the response body");
}
}

@Test
@DisplayName("Lines are not processed when cancel() invocation")
public void consumeLinesNotProcessedIfCancelled() throws Exception {
try (final HttpClient client = getHttpClientFactory().newBuilder().build()) {
server.expect().withPath("/cancel")
.andReturn(200, "This would be the response body")
.always();
final StringBuffer responseText = new StringBuffer();
final HttpResponse<AsyncBody> asyncBodyResponse = client
.consumeLines(client.newHttpRequestBuilder()
.uri(server.url("/cancel")).build(), (value, asyncBody) -> {
responseText.append(value);
asyncBody.consume();
})
.get(10L, TimeUnit.SECONDS);
asyncBodyResponse.body().cancel();
asyncBodyResponse.body().consume();
final CompletableFuture<Void> doneFuture = asyncBodyResponse.body().done();
assertThrows(CancellationException.class, () -> doneFuture.get(10L, TimeUnit.SECONDS));
assertThat(responseText).isEmpty();
}
}

@Test
@DisplayName("Lines are processed completely")
public void consumeLinesProcessesAllLines() throws Exception {
try (final HttpClient client = getHttpClientFactory().newBuilder().build()) {
server.expect().withPath("/consume-lines")
.andReturn(200, "This is the response body\nWith\nMultiple\n lines\n")
.always();
final List<String> receivedLines = new ArrayList<>();
final HttpResponse<AsyncBody> asyncBodyResponse = client.consumeLines(
client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(),
(value, asyncBody) -> {
receivedLines.add(value);
asyncBody.consume();
})
.get(10L, TimeUnit.SECONDS);
asyncBodyResponse.body().consume();
asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS);
assertThat(receivedLines).containsExactly("This is the response body", "With", "Multiple", " lines");
}
}

@Test
@DisplayName("Bytes are processed and consumed only after the consume() invocation")
public void consumeBytesProcessedAfterConsume() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,41 +115,6 @@ public CompletableFuture<Boolean> afterFailure(BasicBuilder builder, HttpRespons
}
}

@Test
@DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeLines")
public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception {
// Given
server.expect().withPath("/intercepted-url").andReturn(200, "This works\n").once();
final HttpClient.Builder builder = getHttpClientFactory().newBuilder()
.addOrReplaceInterceptor("test", new Interceptor() {
@Override
public CompletableFuture<Boolean> afterFailure(BasicBuilder builder, HttpResponse<?> response) {
builder.uri(URI.create(server.url("/intercepted-url")));
return CompletableFuture.completedFuture(true);
}
});
final CompletableFuture<String> result = new CompletableFuture<>();
// When
try (HttpClient client = builder.build()) {
final HttpResponse<AsyncBody> asyncR = client.consumeLines(
client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), (s, ab) -> {
result.complete(s);
ab.consume();
})
.get(10L, TimeUnit.SECONDS);
asyncR.body().consume();
asyncR.body().done().whenComplete((v, t) -> {
if (t != null) {
result.completeExceptionally(t);
} else {
result.complete(null);
}
});
// Then
assertThat(result.get(10L, TimeUnit.SECONDS)).isEqualTo("This works");
}
}

@Test
@DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeBytes")
public void afterHttpFailureReplacesResponseInConsumeBytes() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -66,8 +68,18 @@ public WatchHTTPManager(final HttpClient client,
protected synchronized void start(URL url, Map<String, String> headers) {
HttpRequest.Builder builder = client.newHttpRequestBuilder().url(url);
headers.forEach(builder::header);
call = client.consumeLines(builder.build(), (s, a) -> {
onMessage(s);
StringBuffer buffer = new StringBuffer();
call = client.consumeBytes(builder.build(), (b, a) -> {
for (ByteBuffer content : b) {
for (char c : StandardCharsets.UTF_8.decode(content).array()) {
if (c == '\n') {
onMessage(buffer.toString());
buffer.setLength(0);
} else {
buffer.append(c);
}
}
}
a.consume();
});
call.whenComplete((response, t) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void testReconnectOnException() throws MalformedURLException, InterruptedExcepti
BaseOperation baseOperation = Mockito.mock(BaseOperation.class);
Mockito.when(baseOperation.getNamespacedUrl()).thenReturn(new URL("http://localhost"));
CompletableFuture<HttpResponse<AsyncBody>> future = new CompletableFuture<>();
Mockito.when(client.consumeLines(Mockito.any(), Mockito.any())).thenReturn(future);
Mockito.when(client.consumeBytes(Mockito.any(), Mockito.any())).thenReturn(future);

CountDownLatch reconnect = new CountDownLatch(1);
WatchHTTPManager<HasMetadata, KubernetesResourceList<HasMetadata>> watch = new WatchHTTPManager(client,
Expand Down
Loading

0 comments on commit b743b2f

Please sign in to comment.