diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 08a91bce8..1960c2212 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -639,6 +639,7 @@ public boolean isRoutingFunction() { @Override public Function andThen(Function after) { Assert.isTrue(after instanceof FunctionInvocationWrapper, "Composed function must be an instanceof FunctionInvocationWrapper."); + if (FunctionTypeUtils.isMultipleArgumentType(this.inputType) || FunctionTypeUtils.isMultipleArgumentType(this.outputType) || FunctionTypeUtils.isMultipleArgumentType(((FunctionInvocationWrapper) after).inputType) @@ -646,6 +647,8 @@ public Function andThen(Function aft throw new UnsupportedOperationException("Composition of functions with multiple arguments is not supported at the moment"); } + this.setSkipOutputConversion(true); + ((FunctionInvocationWrapper) after).setSkipOutputConversion(true); Function rawComposedFunction = v -> ((FunctionInvocationWrapper) after).doApply(doApply(v)); FunctionInvocationWrapper afterWrapper = (FunctionInvocationWrapper) after; diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java index 4314ee669..1a6da9a66 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java @@ -61,6 +61,7 @@ import org.springframework.cloud.function.json.JsonMapper; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.core.ResolvableType; import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.support.DefaultConversionService; @@ -566,6 +567,14 @@ public void testReactiveMonoSupplier() { assertThat(FunctionTypeUtils.isMono(function.getOutputType())); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testHeaderPropagationInComposedFunction() { + FunctionCatalog catalog = this.configureCatalog(GH_1063_Configuration.class); + Consumer function = catalog.lookup("uppercase|reverse|print"); + function.accept("hello"); + } + @Test public void testFunctionCompositionWithReactiveSupplierAndConsumer() { SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter, @@ -836,4 +845,24 @@ public String apply(byte[] t) { } } + @EnableAutoConfiguration + @Configuration + public static class GH_1063_Configuration { + + @Bean + Function> uppercase() { + return input -> MessageBuilder.withPayload(input).setHeader("FOO", "BAR").build(); + } + + @Bean + Function reverse() { + return payload -> new StringBuilder(payload).reverse().toString(); + } + + @Bean + Consumer> print() { + return msg -> assertThat(msg.getHeaders().get("FOO")).isEqualTo("BAR"); + } + } + }