diff --git a/pom.xml b/pom.xml
index 63f2f82..aae7e41 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,10 +213,11 @@
test
- org.springframework.boot
- spring-boot-starter-data-jpa
- 3.1.2
+ net.logstash.logback
+ logstash-logback-encoder
+ 7.4
+
diff --git a/src/main/java/it/pagopa/interop/signalhub/push/service/SignalHubPushServiceApplication.java b/src/main/java/it/pagopa/interop/signalhub/push/service/SignalHubPushServiceApplication.java
index e8a5e86..4f279d0 100644
--- a/src/main/java/it/pagopa/interop/signalhub/push/service/SignalHubPushServiceApplication.java
+++ b/src/main/java/it/pagopa/interop/signalhub/push/service/SignalHubPushServiceApplication.java
@@ -3,7 +3,6 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.cache.annotation.EnableCaching;
import org.springframework.data.r2dbc.config.EnableR2dbcAuditing;
@Slf4j
diff --git a/src/main/java/it/pagopa/interop/signalhub/push/service/config/ReactorConfiguration.java b/src/main/java/it/pagopa/interop/signalhub/push/service/config/ReactorConfiguration.java
new file mode 100644
index 0000000..37ca5cc
--- /dev/null
+++ b/src/main/java/it/pagopa/interop/signalhub/push/service/config/ReactorConfiguration.java
@@ -0,0 +1,20 @@
+package it.pagopa.interop.signalhub.push.service.config;
+
+
+import it.pagopa.interop.signalhub.push.service.logging.ContextLifter;
+import it.pagopa.interop.signalhub.push.service.logging.MdcContextLifter;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import reactor.core.publisher.Hooks;
+import reactor.core.publisher.Operators;
+
+@Configuration
+public class ReactorConfiguration {
+
+
+ @Bean
+ public void contextLifterConfiguration() {
+ Hooks.onEachOperator(MdcContextLifter.class.getSimpleName(),
+ Operators.lift((sc, sub) -> new ContextLifter<>(sub)));
+ }
+}
diff --git a/src/main/java/it/pagopa/interop/signalhub/push/service/config/SignalHubPushConfig.java b/src/main/java/it/pagopa/interop/signalhub/push/service/config/SignalHubPushConfig.java
index c49c24f..72e78eb 100644
--- a/src/main/java/it/pagopa/interop/signalhub/push/service/config/SignalHubPushConfig.java
+++ b/src/main/java/it/pagopa/interop/signalhub/push/service/config/SignalHubPushConfig.java
@@ -13,4 +13,5 @@
public class SignalHubPushConfig {
private String id;
private String audience;
+ private String headerTraceIdKey;
}
diff --git a/src/main/java/it/pagopa/interop/signalhub/push/service/exception/PDNDErrorWebExceptionHandler.java b/src/main/java/it/pagopa/interop/signalhub/push/service/exception/PDNDErrorWebExceptionHandler.java
index 2a24e48..2992795 100644
--- a/src/main/java/it/pagopa/interop/signalhub/push/service/exception/PDNDErrorWebExceptionHandler.java
+++ b/src/main/java/it/pagopa/interop/signalhub/push/service/exception/PDNDErrorWebExceptionHandler.java
@@ -10,10 +10,13 @@
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
+import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import reactor.util.annotation.NonNull;
+import java.util.Collections;
+
@Slf4j
@Configuration
@@ -36,6 +39,7 @@ public Mono handle(@NonNull ServerWebExchange serverWebExchange, @NonNull
dataBuffer = bufferFactory.wrap(this.helper.generateFallbackProblem().getBytes());
}
serverWebExchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
+
return serverWebExchange.getResponse().writeWith(Mono.just(dataBuffer));
}
}
diff --git a/src/main/java/it/pagopa/interop/signalhub/push/service/exception/RestExceptionHandler.java b/src/main/java/it/pagopa/interop/signalhub/push/service/exception/RestExceptionHandler.java
index 1e9c55d..988923d 100644
--- a/src/main/java/it/pagopa/interop/signalhub/push/service/exception/RestExceptionHandler.java
+++ b/src/main/java/it/pagopa/interop/signalhub/push/service/exception/RestExceptionHandler.java
@@ -23,7 +23,6 @@
@Slf4j
@Order(Ordered.HIGHEST_PRECEDENCE)
@ControllerAdvice
-
public class RestExceptionHandler {
@Autowired
diff --git a/src/main/java/it/pagopa/interop/signalhub/push/service/filter/JWTFilter.java b/src/main/java/it/pagopa/interop/signalhub/push/service/filter/JWTFilter.java
index f414aae..dcdd8b2 100644
--- a/src/main/java/it/pagopa/interop/signalhub/push/service/filter/JWTFilter.java
+++ b/src/main/java/it/pagopa/interop/signalhub/push/service/filter/JWTFilter.java
@@ -9,8 +9,12 @@
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
+import org.springframework.core.Ordered;
+import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.security.authentication.ReactiveAuthenticationManager;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
@@ -32,10 +36,10 @@
import static it.pagopa.interop.signalhub.push.service.exception.ExceptionTypeEnum.*;
@Profile("!test")
-@Slf4j
@AllArgsConstructor
@Configuration
public class JWTFilter implements WebFilter {
+ private static final Logger log = LoggerFactory.getLogger(JWTFilter.class);
private final JWTConverter jwtConverter;
private final PrincipalAgreementValidator principalAgreementValidator;
private final ReactiveAuthenticationManager reactiveAuthManager;
diff --git a/src/main/java/it/pagopa/interop/signalhub/push/service/filter/TraceIdFilter.java b/src/main/java/it/pagopa/interop/signalhub/push/service/filter/TraceIdFilter.java
new file mode 100644
index 0000000..ff9ef37
--- /dev/null
+++ b/src/main/java/it/pagopa/interop/signalhub/push/service/filter/TraceIdFilter.java
@@ -0,0 +1,42 @@
+package it.pagopa.interop.signalhub.push.service.filter;
+
+import it.pagopa.interop.signalhub.push.service.config.SignalHubPushConfig;
+import lombok.AllArgsConstructor;
+import org.springframework.core.Ordered;
+import org.springframework.core.annotation.Order;
+import org.springframework.http.HttpHeaders;
+import org.springframework.stereotype.Component;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.WebFilter;
+import org.springframework.web.server.WebFilterChain;
+import reactor.core.publisher.Mono;
+import reactor.util.context.Context;
+
+import java.util.Collections;
+import java.util.UUID;
+
+import static it.pagopa.interop.signalhub.push.service.utils.Const.TRACE_ID_KEY;
+
+
+@Component
+@Order(Ordered.HIGHEST_PRECEDENCE)
+@AllArgsConstructor
+public class TraceIdFilter implements WebFilter {
+ private final SignalHubPushConfig cfg;
+
+
+ @Override
+ public Mono filter(ServerWebExchange exchange, WebFilterChain chain) {
+ HttpHeaders headers = exchange.getRequest().getHeaders();
+ var traceId = UUID.randomUUID().toString();
+ if (headers.containsKey(cfg.getHeaderTraceIdKey())) {
+ traceId = headers.getFirst(cfg.getHeaderTraceIdKey());
+ }
+ exchange.getResponse().getHeaders()
+ .putIfAbsent(cfg.getHeaderTraceIdKey(), Collections.singletonList(traceId));
+
+
+ String finalTraceId = traceId;
+ return chain.filter(exchange).contextWrite(Context.of(TRACE_ID_KEY, finalTraceId));
+ }
+}
diff --git a/src/main/java/it/pagopa/interop/signalhub/push/service/logging/ContextLifter.java b/src/main/java/it/pagopa/interop/signalhub/push/service/logging/ContextLifter.java
new file mode 100644
index 0000000..92d5e08
--- /dev/null
+++ b/src/main/java/it/pagopa/interop/signalhub/push/service/logging/ContextLifter.java
@@ -0,0 +1,55 @@
+package it.pagopa.interop.signalhub.push.service.logging;
+
+import org.reactivestreams.Subscription;
+import reactor.core.CoreSubscriber;
+import reactor.util.context.Context;
+
+public class ContextLifter implements CoreSubscriber {
+ private final CoreSubscriber actualSubscriber;
+ private final Context context;
+
+ public ContextLifter(CoreSubscriber actualSubscriber) {
+ this.actualSubscriber = actualSubscriber;
+ this.context = actualSubscriber.currentContext();
+ }
+
+ @Override
+ public Context currentContext() {
+ return context;
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ actualSubscriber.onSubscribe(subscription);
+ }
+
+ @Override
+ public void onNext(T t) {
+ MdcContextLifter.setContextToMdc(context);
+ try {
+ actualSubscriber.onNext(t);
+ } finally {
+ MdcContextLifter.clearMdc();
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ MdcContextLifter.setContextToMdc(context);
+ try {
+ actualSubscriber.onError(throwable);
+ } finally {
+ MdcContextLifter.clearMdc();
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ MdcContextLifter.setContextToMdc(context);
+ try {
+ actualSubscriber.onComplete();
+ } finally {
+ MdcContextLifter.clearMdc();
+ }
+ }
+}
diff --git a/src/main/java/it/pagopa/interop/signalhub/push/service/logging/MdcContextLifter.java b/src/main/java/it/pagopa/interop/signalhub/push/service/logging/MdcContextLifter.java
new file mode 100644
index 0000000..ecaad79
--- /dev/null
+++ b/src/main/java/it/pagopa/interop/signalhub/push/service/logging/MdcContextLifter.java
@@ -0,0 +1,40 @@
+package it.pagopa.interop.signalhub.push.service.logging;
+
+import org.slf4j.MDC;
+import reactor.core.publisher.Signal;
+import reactor.util.context.Context;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static it.pagopa.interop.signalhub.push.service.utils.Const.TRACE_ID_KEY;
+
+public class MdcContextLifter implements Consumer> {
+
+ @Override
+ public void accept(Signal> signal) {
+ if (!signal.isOnComplete() && !signal.isOnError()) {
+ Optional> context = signal.getContextView().stream()
+ .filter(cxt -> cxt.getKey().equals(TRACE_ID_KEY))
+ .findFirst();
+
+ context.ifPresent(ctx -> MDC.put(TRACE_ID_KEY, (String)ctx.getValue()));
+ } else {
+ MDC.clear();
+ }
+ }
+
+ public static void setContextToMdc(Context context) {
+ context.stream().forEach(entry -> {
+ if (entry.getKey().equals(TRACE_ID_KEY)){
+ MDC.put(TRACE_ID_KEY, (String) entry.getValue());
+ }
+ });
+ }
+
+ public static void clearMdc(){
+ MDC.clear();
+ }
+
+}
diff --git a/src/main/java/it/pagopa/interop/signalhub/push/service/utils/Const.java b/src/main/java/it/pagopa/interop/signalhub/push/service/utils/Const.java
index 0e4eb99..a8820f5 100644
--- a/src/main/java/it/pagopa/interop/signalhub/push/service/utils/Const.java
+++ b/src/main/java/it/pagopa/interop/signalhub/push/service/utils/Const.java
@@ -4,5 +4,6 @@ public class Const {
public static final String ORGANIZATION_ID_VALUE = "x_organization_id";
public static final String STATE_PUBLISHED = "PUBLISHED";
public static final String SEED_UPDATE = "seedUpdate";
+ public static final String TRACE_ID_KEY = "traceId";
}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 3a04e68..4c1c104 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -1,4 +1,7 @@
logging.level.root=INFO
+logging.level.org.springframework.cloud.sleuth=INFO
+logging.pattern.level="%2p [%X{traceId:-}]"
+
spring.main.web-application-type=reactive
management.endpoint.health.show-details=always
@@ -18,6 +21,7 @@ pdnd.client.event.endpoint-url=https://api.uat.interop.pagopa.it/1.0
application.eservice.push.id=6b14c622-dad2-44ea-82bc-2dd4010364d5
application.eservice.push.audience=interop-signalhub-push-signal
+application.eservice.push.header-trace-id-key=X-Trace-Id
database.name=signal-hub
database.host=${DATABASE_READER_HOST:localhost}
diff --git a/src/test/java/it/pagopa/interop/signalhub/push/service/repository/EServiceRepositoryTest.java b/src/test/java/it/pagopa/interop/signalhub/push/service/repository/EServiceRepositoryTest.java
index 7af3b3d..89a0aea 100644
--- a/src/test/java/it/pagopa/interop/signalhub/push/service/repository/EServiceRepositoryTest.java
+++ b/src/test/java/it/pagopa/interop/signalhub/push/service/repository/EServiceRepositoryTest.java
@@ -2,14 +2,11 @@
import it.pagopa.interop.signalhub.push.service.config.BaseTest;
import it.pagopa.interop.signalhub.push.service.entities.EService;
-import org.junit.jupiter.api.AfterEach;
+
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import java.sql.Timestamp;
-import java.time.Instant;
import java.util.List;
@@ -17,10 +14,9 @@ class EServiceRepositoryTest extends BaseTest.WithR2DBC {
private static final String correctEservice = "BC-eservice";
private static final String correctProducer = "BC-producer";
+ private static final String correctDescriptor = "BC-descriptor";
private static final String correctState = "PUBLISHED";
private static final String incorrectState = "ACTIVE";
- private EService entitySaved;
-
@Autowired
private EServiceRepository eServiceRepository;
@@ -33,6 +29,7 @@ void whenFindOrganizationWithBadlyParamThenReturnNull(){
correctProducer,
incorrectState).collectList().block();
+ Assertions.assertNotNull(entity);
Assertions.assertTrue(entity.isEmpty());
}
@@ -45,17 +42,19 @@ void whenFindOrganizationWithCorrectParamThenReturnEntity(){
correctProducer,
correctState).collectList().block();
+ Assertions.assertNotNull(entity);
Assertions.assertFalse(entity.isEmpty());
- //Assertions.assertEquals(entitySaved, entity.get(0));
+ Assertions.assertEquals(getEntity(), entity.get(0));
}
private EService getEntity(){
- entitySaved = new EService();
+ EService entitySaved = new EService();
entitySaved.setEserviceId(correctEservice);
entitySaved.setProducerId(correctProducer);
- entitySaved.setDescriptorId("1234");
+ entitySaved.setDescriptorId(correctDescriptor);
entitySaved.setState(correctState);
+ entitySaved.setEventId(12L);
return entitySaved;
}