Skip to content

Commit

Permalink
Adapt access log to the delayed last flush operation (#3280)
Browse files Browse the repository at this point in the history
This is in addition to #3271
  • Loading branch information
violetagg authored Jun 4, 2024
1 parent 61b6583 commit 7fab480
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,11 +19,13 @@
import io.netty.handler.codec.http.cookie.Cookie;
import reactor.netty.ReactorNetty;
import reactor.netty.http.server.ConnectionInformation;
import reactor.netty.internal.util.MapUtils;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
Expand Down Expand Up @@ -57,6 +59,21 @@ abstract class AbstractAccessLogArgProvider<SELF extends AbstractAccessLogArgPro
this.remoteAddress = remoteAddress;
}

AbstractAccessLogArgProvider(AbstractAccessLogArgProvider<?> copy) {
this.remoteAddress = copy.remoteAddress;
this.connectionInfo = copy.connectionInfo;
this.zonedDateTime = copy.zonedDateTime;
this.accessDateTime = copy.accessDateTime;
this.method = copy.method;
this.uri = copy.uri;
this.protocol = copy.protocol;
this.chunked = copy.chunked;
this.contentLength = copy.contentLength;
this.startTime = copy.startTime;
this.cookies = new HashMap<>(MapUtils.calculateInitialCapacity(copy.cookies.size()));
this.cookies.putAll(copy.cookies);
}

@Override
@Nullable
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,12 @@ final class AccessLogArgProviderH1 extends AbstractAccessLogArgProvider<AccessLo
super(remoteAddress);
}

AccessLogArgProviderH1(AccessLogArgProviderH1 copy) {
super(copy);
this.request = copy.request;
this.response = copy.response;
}

AccessLogArgProviderH1 request(HttpServerRequest request) {
this.request = Objects.requireNonNull(request, "request");
onRequest();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,9 @@
*/
final class AccessLogHandlerH1 extends BaseAccessLogHandler {

static final boolean LAST_FLUSH_WHEN_NO_READ = Boolean.parseBoolean(
System.getProperty("reactor.netty.http.server.lastFlushWhenNoRead", "false"));

AccessLogArgProviderH1 accessLogArgProvider;

AccessLogHandlerH1(@Nullable Function<AccessLogArgProvider, AccessLog> accessLog) {
Expand All @@ -60,6 +63,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (accessLogArgProvider == null) {
accessLogArgProvider = new AccessLogArgProviderH1(ctx.channel().remoteAddress());
}
else {
accessLogArgProvider.clear();
}

ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops instanceof HttpServerRequest) {
Expand All @@ -75,14 +81,14 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
if (msg instanceof LastHttpContent) {
accessLogArgProvider.increaseContentLength(((LastHttpContent) msg).content().readableBytes());
AccessLogArgProviderH1 copy = LAST_FLUSH_WHEN_NO_READ ? new AccessLogArgProviderH1(accessLogArgProvider) : null;
ctx.write(msg, promise.unvoid())
.addListener(future -> {
if (future.isSuccess()) {
AccessLog log = accessLog.apply(accessLogArgProvider);
if (log != null) {
log.log();
}
accessLogArgProvider.clear();
AccessLog log = copy != null ? accessLog.apply(copy) : accessLog.apply(accessLogArgProvider);
if (log != null) {
log.log();
}
}
});
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.netty.BaseHttpTest;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.LogTracker;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
Expand Down Expand Up @@ -233,16 +234,18 @@ void httpGetRespondsSendObject(HttpProtocol protocol) throws Exception {
assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(1);
}

@Test
void httpPipeliningGetRespondsSendMono() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
void httpPipeliningGetRespondsSendMono(boolean enableAccessLog) throws Exception {
String oldValue = System.getProperty("reactor.netty.http.server.lastFlushWhenNoRead", "false");
System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", "true");
try {
String message = "\"GET /1 HTTP/1.1\" 200 1024";
try (LogTracker logTracker = new LogTracker("reactor.netty.http.server.AccessLog", 16, message)) {
CountDownLatch latch = new CountDownLatch(64);
EventsRecorder recorder = new EventsRecorder(latch);
disposableServer = createServer(recorder, HttpProtocol.HTTP11,
r -> r.get("/1", (req, res) -> res.sendString(Mono.just(REPEAT).delayElement(Duration.ofMillis(10))
.doOnEach(recorder).doOnCancel(recorder))));
.doOnEach(recorder).doOnCancel(recorder))), enableAccessLog);

Connection client =
TcpClient.create()
Expand Down Expand Up @@ -278,22 +281,29 @@ void httpPipeliningGetRespondsSendMono() throws Exception {
assertThat(recorder.fullResponseIsSent.get()).isEqualTo(16);
assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(16);
assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(16);

if (enableAccessLog) {
assertThat(logTracker.latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(logTracker.actualMessages).hasSize(16);
}
}
finally {
System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", oldValue);
}
}

@Test
void httpPipeliningGetRespondsSendObject() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
void httpPipeliningGetRespondsSendObject(boolean enableAccessLog) throws Exception {
String oldValue = System.getProperty("reactor.netty.http.server.lastFlushWhenNoRead", "false");
System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", "true");
try {
String message = "\"GET /1 HTTP/1.1\" 200 1024";
try (LogTracker logTracker = new LogTracker("reactor.netty.http.server.AccessLog", 16, message)) {
CountDownLatch latch = new CountDownLatch(64);
EventsRecorder recorder = new EventsRecorder(latch);
disposableServer = createServer(recorder, HttpProtocol.HTTP11,
r -> r.get("/1", (req, res) -> res.sendObject(Unpooled.wrappedBuffer(REPEAT.getBytes(Charset.defaultCharset())))
.then().doOnEach(recorder).doOnCancel(recorder)));
.then().doOnEach(recorder).doOnCancel(recorder)), enableAccessLog);

Connection client =
TcpClient.create()
Expand Down Expand Up @@ -329,6 +339,11 @@ void httpPipeliningGetRespondsSendObject() throws Exception {
assertThat(recorder.fullResponseIsSent.get()).isEqualTo(16);
assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(16);
assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(16);

if (enableAccessLog) {
assertThat(logTracker.latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(logTracker.actualMessages).hasSize(16);
}
}
finally {
System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", oldValue);
Expand Down Expand Up @@ -470,6 +485,10 @@ void httpPostRespondsSendObject(HttpProtocol protocol) throws Exception {
}

static DisposableServer createServer(EventsRecorder recorder, HttpProtocol protocol, Consumer<? super HttpServerRoutes> routes) {
return createServer(recorder, protocol, routes, false);
}

static DisposableServer createServer(EventsRecorder recorder, HttpProtocol protocol, Consumer<? super HttpServerRoutes> routes, boolean enableAccessLog) {
return createServer()
.protocol(protocol)
.doOnChannelInit((obs, ch, addr) -> {
Expand All @@ -483,6 +502,7 @@ static DisposableServer createServer(EventsRecorder recorder, HttpProtocol proto
conn.channel().pipeline().addBefore(HttpTrafficHandler, "eventsRecorderHandler", new EventsRecorderHandler(recorder));
}
})
.accessLog(enableAccessLog)
.route(routes)
.bindNow();
}
Expand Down

0 comments on commit 7fab480

Please sign in to comment.