From 136fe37955ffbb1a4d27a0d067f42c364c364829 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 4 Jun 2024 16:35:38 +0300 Subject: [PATCH] Adapt access log to the delayed last flush operation This is in addition to #3271 --- .../logging/AbstractAccessLogArgProvider.java | 19 +++++++++- .../logging/AccessLogArgProviderH1.java | 8 +++- .../server/logging/AccessLogHandlerH1.java | 18 ++++++--- .../HttpServerOutboundCompleteTest.java | 38 ++++++++++++++----- 4 files changed, 66 insertions(+), 17 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AbstractAccessLogArgProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AbstractAccessLogArgProvider.java index fd4e99d55b..a12b37482a 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AbstractAccessLogArgProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AbstractAccessLogArgProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,11 +19,13 @@ import io.netty.handler.codec.http.cookie.Cookie; import reactor.netty.ReactorNetty; import reactor.netty.http.server.ConnectionInformation; +import reactor.netty.internal.util.MapUtils; import reactor.util.annotation.Nullable; import java.net.SocketAddress; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.function.Supplier; @@ -57,6 +59,21 @@ abstract class AbstractAccessLogArgProvider copy) { + this.remoteAddress = copy.remoteAddress; + this.connectionInfo = copy.connectionInfo; + this.zonedDateTime = copy.zonedDateTime; + this.accessDateTime = copy.accessDateTime; + this.method = copy.method; + this.uri = copy.uri; + this.protocol = copy.protocol; + this.chunked = copy.chunked; + this.contentLength = copy.contentLength; + this.startTime = copy.startTime; + this.cookies = new HashMap<>(MapUtils.calculateInitialCapacity(copy.cookies.size())); + this.cookies.putAll(copy.cookies); + } + @Override @Nullable @Deprecated diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AccessLogArgProviderH1.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AccessLogArgProviderH1.java index aa43fb27ec..e362a9948f 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AccessLogArgProviderH1.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/logging/AccessLogArgProviderH1.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,12 @@ final class AccessLogArgProviderH1 extends AbstractAccessLogArgProvider accessLog) { @@ -60,6 +63,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (accessLogArgProvider == null) { accessLogArgProvider = new AccessLogArgProviderH1(ctx.channel().remoteAddress()); } + else { + accessLogArgProvider.clear(); + } ChannelOperations ops = ChannelOperations.get(ctx.channel()); if (ops instanceof HttpServerRequest) { @@ -75,14 +81,14 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } if (msg instanceof LastHttpContent) { accessLogArgProvider.increaseContentLength(((LastHttpContent) msg).content().readableBytes()); + AccessLogArgProviderH1 copy = LAST_FLUSH_WHEN_NO_READ ? new AccessLogArgProviderH1(accessLogArgProvider) : null; ctx.write(msg, promise.unvoid()) .addListener(future -> { if (future.isSuccess()) { - AccessLog log = accessLog.apply(accessLogArgProvider); - if (log != null) { - log.log(); - } - accessLogArgProvider.clear(); + AccessLog log = copy != null ? accessLog.apply(copy) : accessLog.apply(accessLogArgProvider); + if (log != null) { + log.log(); + } } }); return; diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerOutboundCompleteTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerOutboundCompleteTest.java index 2b680d64ab..befc600734 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerOutboundCompleteTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerOutboundCompleteTest.java @@ -27,9 +27,9 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.LastHttpContent; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -37,6 +37,7 @@ import reactor.netty.BaseHttpTest; import reactor.netty.Connection; import reactor.netty.DisposableServer; +import reactor.netty.LogTracker; import reactor.netty.http.HttpProtocol; import reactor.netty.http.client.HttpClient; import reactor.netty.tcp.TcpClient; @@ -233,16 +234,18 @@ void httpGetRespondsSendObject(HttpProtocol protocol) throws Exception { assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(1); } - @Test - void httpPipeliningGetRespondsSendMono() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void httpPipeliningGetRespondsSendMono(boolean enableAccessLog) throws Exception { String oldValue = System.getProperty("reactor.netty.http.server.lastFlushWhenNoRead", "false"); System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", "true"); - try { + String message = "\"GET /1 HTTP/1.1\" 200 1024"; + try (LogTracker logTracker = new LogTracker("reactor.netty.http.server.AccessLog", 16, message)) { CountDownLatch latch = new CountDownLatch(64); EventsRecorder recorder = new EventsRecorder(latch); disposableServer = createServer(recorder, HttpProtocol.HTTP11, r -> r.get("/1", (req, res) -> res.sendString(Mono.just(REPEAT).delayElement(Duration.ofMillis(10)) - .doOnEach(recorder).doOnCancel(recorder)))); + .doOnEach(recorder).doOnCancel(recorder))), enableAccessLog); Connection client = TcpClient.create() @@ -278,22 +281,29 @@ void httpPipeliningGetRespondsSendMono() throws Exception { assertThat(recorder.fullResponseIsSent.get()).isEqualTo(16); assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(16); assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(16); + + if (enableAccessLog) { + assertThat(logTracker.latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(logTracker.actualMessages).hasSize(16); + } } finally { System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", oldValue); } } - @Test - void httpPipeliningGetRespondsSendObject() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void httpPipeliningGetRespondsSendObject(boolean enableAccessLog) throws Exception { String oldValue = System.getProperty("reactor.netty.http.server.lastFlushWhenNoRead", "false"); System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", "true"); - try { + String message = "\"GET /1 HTTP/1.1\" 200 1024"; + try (LogTracker logTracker = new LogTracker("reactor.netty.http.server.AccessLog", 16, message)) { CountDownLatch latch = new CountDownLatch(64); EventsRecorder recorder = new EventsRecorder(latch); disposableServer = createServer(recorder, HttpProtocol.HTTP11, r -> r.get("/1", (req, res) -> res.sendObject(Unpooled.wrappedBuffer(REPEAT.getBytes(Charset.defaultCharset()))) - .then().doOnEach(recorder).doOnCancel(recorder))); + .then().doOnEach(recorder).doOnCancel(recorder)), enableAccessLog); Connection client = TcpClient.create() @@ -329,6 +339,11 @@ void httpPipeliningGetRespondsSendObject() throws Exception { assertThat(recorder.fullResponseIsSent.get()).isEqualTo(16); assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(16); assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(16); + + if (enableAccessLog) { + assertThat(logTracker.latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(logTracker.actualMessages).hasSize(16); + } } finally { System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", oldValue); @@ -470,6 +485,10 @@ void httpPostRespondsSendObject(HttpProtocol protocol) throws Exception { } static DisposableServer createServer(EventsRecorder recorder, HttpProtocol protocol, Consumer routes) { + return createServer(recorder, protocol, routes, false); + } + + static DisposableServer createServer(EventsRecorder recorder, HttpProtocol protocol, Consumer routes, boolean enableAccessLog) { return createServer() .protocol(protocol) .doOnChannelInit((obs, ch, addr) -> { @@ -483,6 +502,7 @@ static DisposableServer createServer(EventsRecorder recorder, HttpProtocol proto conn.channel().pipeline().addBefore(HttpTrafficHandler, "eventsRecorderHandler", new EventsRecorderHandler(recorder)); } }) + .accessLog(enableAccessLog) .route(routes) .bindNow(); }