From 118ad9201bd902dcc71f98b9dc341dee47593f20 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 31 Dec 2024 15:55:28 +0200 Subject: [PATCH] [test] CancelReceiverHandlerTest: Move the check for ByteBuf.release to channelReadComplete --- .../netty/CancelReceiverHandlerTest.java | 44 ++++++++++++++----- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/reactor-netty-core/src/testFixtures/java/reactor/netty/CancelReceiverHandlerTest.java b/reactor-netty-core/src/testFixtures/java/reactor/netty/CancelReceiverHandlerTest.java index e27dc850c2..8ca8037952 100644 --- a/reactor-netty-core/src/testFixtures/java/reactor/netty/CancelReceiverHandlerTest.java +++ b/reactor-netty-core/src/testFixtures/java/reactor/netty/CancelReceiverHandlerTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2023-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,8 @@ import reactor.util.Logger; import reactor.util.Loggers; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,13 +49,18 @@ public final class CancelReceiverHandlerTest extends ChannelInboundHandlerAdapte */ final Runnable cancelAction; + /** + * List with incoming message body parts which are expected to be all released. + */ + private final List buffers; + /** * Flag set to true when the cancel action has already been invoked. */ private final AtomicBoolean cancelled = new AtomicBoolean(); /** - * Latch initialized with the number of incoming message body parts which are exected to be all released. + * Latch initialized with the number of incoming message body parts which are expected to be all released. */ private final CountDownLatch expectedReleaseCount; @@ -75,28 +82,43 @@ public CancelReceiverHandlerTest(Runnable cancelAction) { * message buffers are all released. */ public CancelReceiverHandlerTest(Runnable cancelAction, int expectedReleaseCount) { + this.buffers = new ArrayList<>(expectedReleaseCount); this.cancelAction = cancelAction; this.expectedReleaseCount = new CountDownLatch(expectedReleaseCount); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - // If the incoming message is a kind of a buffer, calls the cancel action. + // If the incoming message is ByteBuf/ByteBufHolder, calls the cancel action. ByteBuf buf = (msg instanceof ByteBufHolder) ? ((ByteBufHolder) msg).content() : ((msg instanceof ByteBuf) ? (ByteBuf) msg : null); - if (buf != null && cancelled.compareAndSet(false, true)) { - log.debug("Executing cancel action"); - cancelAction.run(); + if (buf != null) { + if (cancelled.compareAndSet(false, true)) { + log.debug("Executing cancel action"); + cancelAction.run(); + } + if (!(buf instanceof EmptyByteBuf)) { + buffers.add(buf); + } } + ctx.fireChannelRead(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { try { - ctx.fireChannelRead(msg); + ctx.fireChannelReadComplete(); } finally { - if (buf != null && !(buf instanceof EmptyByteBuf) && buf.refCnt() == 0) { - expectedReleaseCount.countDown(); - if (expectedReleaseCount.getCount() == 0) { - log.debug("All received messages have been released."); + for (int i = 0; i < buffers.size(); i++) { + ByteBuf buf = buffers.get(i); + if (buf.refCnt() == 0) { + buffers.remove(i); + expectedReleaseCount.countDown(); + if (expectedReleaseCount.getCount() == 0) { + log.debug("All received messages have been released."); + } } } }