Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to add headers to event externalization #855

Closed
StephanePaulus opened this issue Oct 4, 2024 · 9 comments
Closed

Add support to add headers to event externalization #855

StephanePaulus opened this issue Oct 4, 2024 · 9 comments
Assignees
Labels
in: event publication registry Event publication registry meta: waiting for feedback Waiting for feedback of the original reporter type: enhancement Major enhanvements, new features
Milestone

Comments

@StephanePaulus
Copy link

StephanePaulus commented Oct 4, 2024

Currently the method used to send Kafka message is CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

This doesn't allow to send extra custom headers.

The only way to make this currently possible is to provide a custom KafkaEventExternalizerConfiguration bean something like this:

@Bean
DelegatingEventExternalizer kafkaEventExternalizer(
    EventExternalizationConfiguration configuration,
    KafkaOperations<Object, Object> operations,
    BeanFactory factory) {

  log.debug("Registering domain event externalization to Kafka…");

  var context = new StandardEvaluationContext();
  context.setBeanResolver(new BeanFactoryResolver(factory));

  return new DelegatingEventExternalizer(configuration, (target, payload) -> {
    var routing = BrokerRouting.of(target, context);
    if (payload instanceof KafkaEvent kafkaEvent) {
      return operations.send(new ProducerRecord<>(
          routing.getTarget(),
          null,
          routing.getKey(payload),
          kafkaEvent.data(),
          kafkaEvent.customKafkaHeaders().entrySet().stream()
              .map(entry -> new Header() {
                @Override
                public String key() {
                  return entry.getKey();
                }

                @Override
                public byte[] value() {
                  return entry.getValue();
                }
              }).toList()));
    } else {
      return operations.send(routing.getTarget(), routing.getKey(payload), payload);
    }
  });
}

I had to create the headers in this bean, to not have to deal with a custom Serialiser for the Kafka Header object.

@odrotbohm odrotbohm changed the title Add Kafka Headers support Add support to add headers to event externalization Oct 8, 2024
@odrotbohm odrotbohm added in: event publication registry Event publication registry type: enhancement Major enhanvements, new features labels Oct 8, 2024
@odrotbohm
Copy link
Member

This is a great idea. I'm playing with a prototype that exposes an

EventExternalizationConfiguration.defaults(…)
  .headers(MyType.class, it -> Map.of(…)) // map from event type to headers
  .build();

for the implementations of DelegatingEventExternalizer to pick those up via configuration.getHeadersFor(…). I've deployed a preview of that under the version 1.3.0-GH-855-SNAPSHOT. Would you mind giving this a spin?

@odrotbohm odrotbohm added the meta: waiting for feedback Waiting for feedback of the original reporter label Oct 8, 2024
@StephanePaulus
Copy link
Author

I just had some times to try it out and the code will throw an exception as MessageHeaders is an immutable object.

java.util.concurrent.CompletionException: java.lang.UnsupportedOperationException: MessageHeaders is immutable
	at org.springframework.util.concurrent.FutureUtils.lambda$toSupplier$0(FutureUtils.java:78)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1768)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.UnsupportedOperationException: MessageHeaders is immutable
	at org.springframework.messaging.MessageHeaders.put(MessageHeaders.java:274)
	at org.springframework.modulith.events.kafka.KafkaEventExternalizerConfiguration.lambda$kafkaEventExternalizer$0(KafkaEventExternalizerConfiguration.java:69)
	at org.springframework.modulith.events.support.DelegatingEventExternalizer.externalize(DelegatingEventExternalizer.java:77)
	at org.springframework.modulith.events.support.EventExternalizationSupport.externalize(EventExternalizationSupport.java:87)
	at org.springframework.modulith.events.support.DelegatingEventExternalizer.externalize(DelegatingEventExternalizer.java:65)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:359)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:771)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:380)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:771)
	at org.springframework.modulith.events.support.CompletionRegisteringAdvisor$CompletionRegisteringMethodInterceptor.invoke(CompletionRegisteringAdvisor.java:171)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:771)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:114)
	at org.springframework.util.concurrent.FutureUtils.lambda$toSupplier$0(FutureUtils.java:74)
	... 5 common frames omitted

@odrotbohm
Copy link
Member

odrotbohm commented Oct 11, 2024

I am apparently too stupid to read Javadoc. 🤦🏼‍♂️ I've tweaked the setup and added a basic test case. Please give it another try. Make sure you're using the latest snapshot (-U for Maven).

@odrotbohm odrotbohm self-assigned this Oct 11, 2024
@StephanePaulus
Copy link
Author

I got it to work, sadly this fix requires the user to use Spring Message which requires some extra thinking and extra beans to make it work. I didn't try yet to send a byteArray directly or Protobuf messages. I never really worked with Spring Message interface and it seems to default to JSON, so the object you are sending has to be JSON serialisable.

Also the header and the mapping function are linked now. If in the mapping you have
mapping(OrderCompleted.class, it -> "Hello World") the header will have to be like .headers(String.class, it -> Map.of("customHeader", it)), this is because the type of payload in this method configuration.getHeadersFor(payload)) will be String.class

@odrotbohm
Copy link
Member

I'm not sure, I follow.

I got it to work, sadly this fix requires the user to use Spring Message which requires some extra thinking and extra beans to make it work.

That's not the intention. The intention is to remove the need for a custom bean declaration solely for the fact of adding message headers by exposing EventExternalizationConfiguration.headers(…). The flip to use Message as the means to interact with KafkaTemplate is an implementation detail. The serialization can be influenced by configuring KafkaTemplate.setMessageConverter(…).

As you're dealing with a custom event in the first place (which I wouldn't necessarily consider a good idea to start with, as this means that the event produce has to be aware of the mechanics at play here), maybe we should support Message explicitly so that you can prepare the full message in the configuration like this:

EventExternalizationConfiguration.defaults(…)
  .mapping(KafkaEvent.class, it -> MessageBuilder.payload(it.getData())
    .copyHeaders(it.customKafkaHeaders())
    .build())
  .…

We could discover that the result of the mapping step is a Message already, apply the routing target headers if they're not set yet and send this off?

@StephanePaulus
Copy link
Author

I will have a play with how Messages work in Spring in relation to Kafka, as I never used it like this and the serialisation seems to be a bit different with the need to provide extra MessageConvertors, like to make the code work I had to add this:

  @Bean
  ByteArrayJsonMessageConverter byteArrayJsonMessageConverter() {
    return new ByteArrayJsonMessageConverter();
  }

Normally I am able to use something like this in my configuration:

spring:
  kafka:
    ...
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

and modify the serializer with special ones like for example the one from confluent that can use the Kafka schema regsitry:
io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer

I guess it is all possible using the send(Message message) too, but I am not used to it and need to check what extra configuration is needed.

@odrotbohm
Copy link
Member

odrotbohm commented Oct 11, 2024

We enable Jackson by default as it seems to be a reasonable intermediate serialization format. That said, we should probably prefer the ByteArrayJsonMessageConverter over the simple JsonMessageConverter in that case and default the producer's value serializer to a ByteArraySerializer for symmetry.

However, if you set spring.modulith.events.kafka.enable-json to false, the payload of the Message will be forwarded to the value serializer you configure. I've locally tweaked our Kafka example to that setup and this seems to work. Would you mind trying that?

@StephanePaulus
Copy link
Author

From my simple test, it works without extra beans/configuration. Mapping the event to a Spring Message makes sense.

I also tried to make something work for my own use-case and that is a bit more complicated, but I know this is a edge case.

In my use-case I need to send protobuf messages that use a schema regsitry to serialise and desirialse. I was able to make this work with a custom messageConvertor ProtobufMessageConverter extends MessagingMessageConverter again this can be achieved in multiple ways.

Currently all MessageConverter require the Object to be of Json format.

To complicate things even more, I am also required to encrypt some parts of the data or the entire event, event can contain private information, that cannot be persisted in the event_pubplication table, unless encrypted. I know that this part could be an extra feature on its own.

@odrotbohm
Copy link
Member

I guess we will have to untangle things a bit here. It looks like, with all the specialties of your specific scenario out of the picture, the new API allows defining headers without having to re-declare the entire bean. I'll go ahead and merge the feature then and close this ticket as resolved. Feel free to open further ones in case you think we can improve things to better cater to your special case's needs.

@odrotbohm odrotbohm added this to the 1.3 RC1 milestone Oct 13, 2024
odrotbohm added a commit that referenced this issue Oct 13, 2024
EventExternalizationConfiguration now exposes a ….headers(Class<T>, Function<T, Map<String, Object>) to allow to define a function that extracts headers from the event that are supposed to added to the message to be sent out. The Kafka and AMQP implementations have been augmented to consider those configurations.

Furthermore, if the mapping step prior to the externalization creates a Spring Message<?>, we add routing information as fallback and send it out as is.
odrotbohm added a commit that referenced this issue Oct 25, 2024
In case no ObjectMapper bean instance is present, we now fall back to creating a default one to render a JSON byte array. This is useful in case Jackson is on the class path but not necessarily the JacksonObjectMapperBuilder, which is located in spring-web.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: event publication registry Event publication registry meta: waiting for feedback Waiting for feedback of the original reporter type: enhancement Major enhanvements, new features
Projects
None yet
Development

No branches or pull requests

2 participants