Skip to content

Commit

Permalink
GH-1063 Fix header propagation in composed function
Browse files Browse the repository at this point in the history
Resolves #1063
  • Loading branch information
olegz committed Sep 25, 2023
1 parent 1b0a5e3 commit afb419d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,13 +639,16 @@ public boolean isRoutingFunction() {
@Override
public <V> Function<Object, V> andThen(Function<? super Object, ? extends V> after) {
Assert.isTrue(after instanceof FunctionInvocationWrapper, "Composed function must be an instanceof FunctionInvocationWrapper.");

if (FunctionTypeUtils.isMultipleArgumentType(this.inputType)
|| FunctionTypeUtils.isMultipleArgumentType(this.outputType)
|| FunctionTypeUtils.isMultipleArgumentType(((FunctionInvocationWrapper) after).inputType)
|| FunctionTypeUtils.isMultipleArgumentType(((FunctionInvocationWrapper) after).outputType)) {
throw new UnsupportedOperationException("Composition of functions with multiple arguments is not supported at the moment");
}

this.setSkipOutputConversion(true);
((FunctionInvocationWrapper) after).setSkipOutputConversion(true);
Function rawComposedFunction = v -> ((FunctionInvocationWrapper) after).doApply(doApply(v));

FunctionInvocationWrapper afterWrapper = (FunctionInvocationWrapper) after;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.ResolvableType;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
Expand Down Expand Up @@ -566,6 +567,14 @@ public void testReactiveMonoSupplier() {
assertThat(FunctionTypeUtils.isMono(function.getOutputType()));
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testHeaderPropagationInComposedFunction() {
FunctionCatalog catalog = this.configureCatalog(GH_1063_Configuration.class);
Consumer function = catalog.lookup("uppercase|reverse|print");
function.accept("hello");
}

@Test
public void testFunctionCompositionWithReactiveSupplierAndConsumer() {
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter,
Expand Down Expand Up @@ -836,4 +845,24 @@ public String apply(byte[] t) {
}
}

@EnableAutoConfiguration
@Configuration
public static class GH_1063_Configuration {

@Bean
Function<String, Message<String>> uppercase() {
return input -> MessageBuilder.withPayload(input).setHeader("FOO", "BAR").build();
}

@Bean
Function<String, String> reverse() {
return payload -> new StringBuilder(payload).reverse().toString();
}

@Bean
Consumer<Message<String>> print() {
return msg -> assertThat(msg.getHeaders().get("FOO")).isEqualTo("BAR");
}
}

}

0 comments on commit afb419d

Please sign in to comment.