From 61368bbae0f35e67f2a172e9d401ee2ee92c324a Mon Sep 17 00:00:00 2001
From: Artem Bilan <artem.bilan@broadcom.com>
Date: Tue, 17 Dec 2024 11:35:09 -0500
Subject: [PATCH] GH-3686: Fix observation scope closure in the
 `KafkaMLContainer`

Fixes: https://github.com/spring-projects/spring-kafka/issues/3686

According to our investigation around the `try-with-resource`,
it looks like the resource is already closed when we reach the `catch` block.

* Rework  `KafkaMessageListenerContainer.ListenerConsumer.doInvokeRecordListener()`
to `observation.openScope()` before the `try` and close it manually in the `finally` block
* Verify `RecordInterceptor.failure()` has a scope in the `ObservationTests`
---
 .../KafkaMessageListenerContainer.java        |  8 +++--
 .../support/micrometer/ObservationTests.java  | 29 +++++++++++++++++--
 2 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
index de258bac48..09947227b7 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
@@ -2750,7 +2750,6 @@ private void pauseForNackSleep() {
 		 * @throws Error an error.
 		 */
 		@Nullable
-		@SuppressWarnings("try")
 		private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cRecord, // NOSONAR
 				Iterator<ConsumerRecord<K, V>> iterator) {
 
@@ -2763,7 +2762,9 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
 					this.observationRegistry);
 
 			observation.start();
-			try (Observation.Scope ignored = observation.openScope()) {
+			Observation.Scope observationScope = observation.openScope();
+			// We cannot use 'try-with-resource' because the resource is closed just before catch block
+			try {
 				invokeOnMessage(cRecord);
 				successTimer(sample, cRecord);
 				recordInterceptAfter(cRecord, null);
@@ -2802,6 +2803,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
 				if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
 					observation.stop();
 				}
+				observationScope.close();
 			}
 			return null;
 		}
@@ -4020,6 +4022,6 @@ private static class StopAfterFenceException extends KafkaException {
 
 	}
 
-	private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { };
+	private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { }
 
 }
diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java
index 0a48bf63fd..a020bf0363 100644
--- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java
+++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java
@@ -32,6 +32,7 @@
 import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
 import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
 import io.micrometer.core.tck.MeterRegistryAssert;
+import io.micrometer.observation.Observation;
 import io.micrometer.observation.ObservationHandler;
 import io.micrometer.observation.ObservationRegistry;
 import io.micrometer.observation.tck.TestObservationRegistry;
@@ -45,6 +46,7 @@
 import io.micrometer.tracing.test.simple.SimpleSpan;
 import io.micrometer.tracing.test.simple.SimpleTracer;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -74,6 +76,7 @@
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.core.ProducerFactory;
 import org.springframework.kafka.listener.MessageListenerContainer;
+import org.springframework.kafka.listener.RecordInterceptor;
 import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
 import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
 import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -356,7 +359,7 @@ private void assertThatAdmin(Object object, KafkaAdmin admin, String brokersStri
 	@Test
 	void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
 			@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> runtimeExceptionTemplate,
-			@Autowired KafkaListenerEndpointRegistry endpointRegistry)
+			@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired Config config)
 					throws ExecutionException, InterruptedException, TimeoutException {
 
 		runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS);
@@ -372,6 +375,8 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir
 		assertThat(span.getError().getCause())
 				.isInstanceOf(IllegalStateException.class)
 				.hasMessage("obs4 run time exception");
+
+		assertThat(config.scopeInFailureReference.get()).isNotNull();
 	}
 
 	@Test
@@ -445,6 +450,8 @@ public static class Config {
 
 		KafkaAdmin mockAdmin = mock(KafkaAdmin.class);
 
+		AtomicReference<Observation.Scope> scopeInFailureReference = new AtomicReference<>();
+
 		@Bean
 		KafkaAdmin admin(EmbeddedKafkaBroker broker) {
 			String[] brokers = StringUtils.commaDelimitedListToStringArray(broker.getBrokersAsString());
@@ -512,7 +519,7 @@ KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
 
 		@Bean
 		ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
-				ConsumerFactory<Integer, String> cf) {
+				ConsumerFactory<Integer, String> cf, ObservationRegistry observationRegistry) {
 
 			ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
 					new ConcurrentKafkaListenerContainerFactory<>();
@@ -522,6 +529,24 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
 				if (container.getListenerId().equals("obs3")) {
 					container.setKafkaAdmin(this.mockAdmin);
 				}
+				if (container.getListenerId().equals("obs4")) {
+					container.setRecordInterceptor(new RecordInterceptor<>() {
+
+						@Override
+						public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
+								Consumer<Integer, String> consumer) {
+
+							return record;
+						}
+
+						@Override
+						public void failure(ConsumerRecord<Integer, String> record, Exception exception,
+								Consumer<Integer, String> consumer) {
+
+							Config.this.scopeInFailureReference.set(observationRegistry.getCurrentObservationScope());
+						}
+					});
+				}
 			});
 			return factory;
 		}