Skip to content

Commit

Permalink
Initial support for Micrometer Observation (#3845)
Browse files Browse the repository at this point in the history
* Initial support for Micrometer Observation

* Add respective Observation dependencies
* Refactor an `AbstractMessageHandler` logic for potential Observation hooks
* Introduce an `ObservationPropagationChannelInterceptor` to propagate
an `Observation` from one thread to another through message channels

* Adds an example of propagation

* Fixed the user code and receiving spans

* * Clean up for Tracing unit test
* Make `micrometer-observation` as an `api` dep - non-optional for direct API usage

Co-authored-by: Marcin Grzejszczak <[email protected]>
  • Loading branch information
artembilan and marcingrzejszczak authored Aug 16, 2022
1 parent 78fa297 commit 2bfcb32
Show file tree
Hide file tree
Showing 7 changed files with 465 additions and 38 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ project('spring-integration-core') {
exclude group: 'org.springframework'
}
api 'io.projectreactor:reactor-core'
api 'io.micrometer:micrometer-observation'

optionalApi 'com.fasterxml.jackson.core:jackson-databind'
optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
Expand All @@ -512,6 +513,8 @@ project('spring-integration-core') {

testImplementation "org.aspectj:aspectjweaver:$aspectjVersion"
testImplementation "org.hamcrest:hamcrest-core:$hamcrestVersion"
testImplementation 'io.micrometer:micrometer-observation-test'
testImplementation 'io.micrometer:micrometer-tracing-test'
}

dokkaHtmlPartial {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.channel.interceptor;

import org.springframework.aop.support.AopUtils;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;

import io.micrometer.common.lang.Nullable;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;

/**
* The {@link org.springframework.messaging.support.ExecutorChannelInterceptor}
* implementation responsible for an {@link Observation} propagation from one message
* flow's thread to another through the {@link MessageChannel}s involved in the flow.
* Opens a new {@link Observation.Scope} on another thread and cleans up it in the end.
*
* @author Artem Bilan
*
* @since 6.0
*/
public class ObservationPropagationChannelInterceptor extends ThreadStatePropagationChannelInterceptor<Observation> {

private final ThreadLocal<Observation.Scope> scopes = new ThreadLocal<>();

private final ObservationRegistry observationRegistry;

public ObservationPropagationChannelInterceptor(ObservationRegistry observationRegistry) {
Assert.notNull(observationRegistry, "'observationRegistry' must noty be null");
this.observationRegistry = observationRegistry;
}

@Override
@Nullable
protected Observation obtainPropagatingContext(Message<?> message, MessageChannel channel) {
if (!DirectChannel.class.isAssignableFrom(AopUtils.getTargetClass(channel))) {
return this.observationRegistry.getCurrentObservation();
}
return null;
}

@Override
protected void populatePropagatedContext(@Nullable Observation state, Message<?> message, MessageChannel channel) {
if (state != null) {
Observation.Scope scope = state.openScope();
this.scopes.set(scope);
}
}

@Override
public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
Observation.Scope scope = this.scopes.get();
if (scope != null && scope == this.observationRegistry.getCurrentObservationScope()) {
scope.close();
this.scopes.remove();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.ExecutorChannelInterceptor;

import io.micrometer.common.lang.Nullable;

/**
* The {@link ExecutorChannelInterceptor} implementation responsible for
* the {@link Thread} (any?) state propagation from one message flow's thread to another
Expand All @@ -47,16 +49,16 @@
*
* @author Artem Bilan
* @author Gary Russell
*
* @since 4.2
*/
public abstract class ThreadStatePropagationChannelInterceptor<S>
implements ExecutorChannelInterceptor {
public abstract class ThreadStatePropagationChannelInterceptor<S> implements ExecutorChannelInterceptor {

@Override
public final Message<?> preSend(Message<?> message, MessageChannel channel) {
S threadContext = obtainPropagatingContext(message, channel);
if (threadContext != null) {
return new MessageWithThreadState<S>(message, threadContext);
return new MessageWithThreadState<>(message, threadContext);
}
else {
return message;
Expand All @@ -80,15 +82,10 @@ public final Message<?> beforeHandle(Message<?> message, MessageChannel channel,
return postReceive(message, channel);
}

@Override
public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler,
Exception ex) {
// No-op
}

@Nullable
protected abstract S obtainPropagatingContext(Message<?> message, MessageChannel channel);

protected abstract void populatePropagatedContext(S state, Message<?> message, MessageChannel channel);
protected abstract void populatePropagatedContext(@Nullable S state, Message<?> message, MessageChannel channel);


private static final class MessageWithThreadState<S> implements Message<Object>, MessageDecorator {
Expand Down Expand Up @@ -129,4 +126,3 @@ public String toString() {
}

}

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,31 +39,42 @@ public abstract class AbstractMessageHandler extends MessageHandlerSupport

@Override // NOSONAR
public void handleMessage(Message<?> message) {
Message<?> messageToUse = message;
Assert.notNull(messageToUse, "Message must not be null");
Assert.notNull(message, "Message must not be null");
if (isLoggingEnabled() && this.logger.isDebugEnabled()) {
this.logger.debug(this + " received message: " + messageToUse);
this.logger.debug(this + " received message: " + message);
}
SampleFacade sample = null;
MetricsCaptor metricsCaptor = getMetricsCaptor();
if (metricsCaptor != null) {
sample = metricsCaptor.start();
handleWithMetrics(message, metricsCaptor);
}
else {
doHandleMessage(message);
}
}

private void handleWithMetrics(Message<?> message, MetricsCaptor metricsCaptor) {
SampleFacade sample = metricsCaptor.start();
try {
doHandleMessage(message);
sample.stop(sendTimer());
}
catch (Exception ex) {
sample.stop(buildSendTimer(false, ex.getClass().getSimpleName()));
throw ex;
}
}

private void doHandleMessage(Message<?> message) {
Message<?> messageToUse = message;
try {
if (shouldTrack()) {
messageToUse = MessageHistory.write(messageToUse, this, getMessageBuilderFactory());
}
handleMessageInternal(messageToUse);
if (sample != null) {
sample.stop(sendTimer());
}
}
catch (Exception e) {
if (sample != null) {
sample.stop(buildSendTimer(false, e.getClass().getSimpleName()));
}
catch (Exception ex) {
throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(messageToUse,
() -> "error occurred in message handler [" + this + "]", e);
() -> "error occurred in message handler [" + this + "]", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* Provides classes to support of Micrometer Observation API.
*/

@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.integration.support.management.observation;
Loading

0 comments on commit 2bfcb32

Please sign in to comment.