Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

error-handler-definition not subscribed to producer error channel #2997

Closed
Nephery opened this issue Aug 29, 2024 · 13 comments
Closed

error-handler-definition not subscribed to producer error channel #2997

Nephery opened this issue Aug 29, 2024 · 13 comments
Assignees
Labels
Milestone

Comments

@Nephery
Copy link

Nephery commented Aug 29, 2024

Describe the issue
When the producer error channel is enabled and an error-handler-definition is defined for that binding, the error handler doesn't receive any error messages.

To Reproduce
Steps to reproduce the behavior:

  1. Create a error handler function like:
    @Bean
    public Consumer<ErrorMessage> myErrorHandlerFnc() {
        msg -> logger.info("failed to publish message {}", msg);
    }
  2. Configure any output binding as follows:
    spring.cloud.stream.bindings.<binding-name>.producer.error-channel-enabled=true
    spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandlerFnc
  3. Cause the producer binding to throw an exception
  4. Notice that myErrorHandlerFnc doesn't receive the error message.

Version of the framework
Spring Boot: 3.3.2
Spring Cloud: 2023.0.3

Expected behavior
myErrorHandlerFnc to receive error messages from the producer binding.

Additional context
Looking at AbstractMessageChannelBinder.registerErrorInfrastructure(
ProducerDestination destination, String bindingName)
, it seems like subscribeFunctionErrorHandler() needs to be called there against the producer binding's error channel.

@olegz olegz self-assigned this Sep 3, 2024
@olegz
Copy link
Contributor

olegz commented Sep 3, 2024

I just tested it with the following (complete app)

@SpringBootApplication
public class DemoStreamVegasApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoStreamVegasApplication.class, 
				"--spring.cloud.function.definition=uppercase",
				"--spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandlerFnc");
	}

	@Bean
	public Function<String, String> uppercase() {
		return v -> {
			throw new RuntimeException("intentional");
		};
	}
	
	@Bean
	public Consumer<Message> myErrorHandlerFnc() {
		return v -> {
			System.out.println("Error: " + v);
		};
	}
}

And I see successful invocation of error handler

Error: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler. . . . .
<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.3.2</version>
		<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>oz.example</groupId>
<artifactId>demo-stream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo-stream</name>
<description>Demo project for Spring Boot</description>
<properties>
		<spring-cloud.version>2023.0.3</spring-cloud.version>
</properties>

@olegz
Copy link
Contributor

olegz commented Sep 3, 2024

Also, you don't need spring.cloud.stream.bindings.<binding-name>.producer.error-channel-enabled=true. We should probably remove this property as it is really leftover from the old mechanism of dealing with errrors

@olegz
Copy link
Contributor

olegz commented Sep 5, 2024

@Nephery I just tested it with the output binding and it's working as well, so you may want to provide me with a small sample top reproduce or we can have a screen share session and see what's going on. Let me know

@Nephery
Copy link
Author

Nephery commented Sep 5, 2024

Managed to reproduce it with a modified version of this rabbit MQ sample:

https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-functions-samples/multi-functions-rabbit

  1. Update the sample to use Spring Boot 3.3.3 and Spring Cloud 2023.0.3.
  2. Add this bean to the app:
    @Bean
    public Consumer<ErrorMessage> errorSink() {
        return errorMessage -> {
            System.out.println("Error message received: " + errorMessage);
        };
    }
  3. Change the application.yml to:
    spring:
      rabbitmq:
        publisher-returns: true
        publisher-confirm-type: correlated
      cloud:
        function:
          definition: source1
        stream:
          bindings:
            source1-out-0:
              destination: test1
              error-handler-definition: errorSink
          rabbit:
            bindings:
              source1-out-0:
                producer:
                  declare-exchange: false
    management:
      endpoints:
        web:
          exposure:
            include: '*'
  4. Run the docker-compose.yml
  5. Run the app.
  6. Notice in the logs that the publish fails as expected (due to missing exchange), but errorSink isn't getting invoked.

Now to confirm that messages are being sent to the source1-out-0 binding's error channel:

  1. Navigate to localhost:8080/actuator/integrationgraph
  2. Find the node whose name ends with .source1-out-0.errors (i.e. the binding's error channel).
  3. In this node, notice that its sendTimers.successes.count is incrementing. This confirms that this channel is really receiving the error messages, but errorsSink is not getting invoked.
    • Alternatively you can check localhost:8080/actuator/metrics/spring.integration.send?tag=name:<binder-id>.source1-out-0.errors (where <binder-id> is the rabbit binder's runtime-generated ID) to see the same thing.

@dima-bzz
Copy link

It also doesn't work for kafka. Also, when this setting is enabled, an entry appears in the logs:
PublishSubscribeChannel : Channel 'unknown.channel.name' has 1 subscriber(s).

Version of the framework
Spring Boot: 3.2.9
Spring Cloud: 2023.0.3

@olegz olegz added this to the 4.2.0-M2 milestone Sep 23, 2024
@olegz
Copy link
Contributor

olegz commented Sep 24, 2024

Backporting it to 4.1 as well

@olegz olegz closed this as completed in 56047dd Sep 24, 2024
@dima-bzz
Copy link

@olegz This works with StreamBridge, but does not work through integration. See example https://github.com/dima-bzz/testkafka/tree/error-handler

@olegz
Copy link
Contributor

olegz commented Sep 25, 2024

When you say does not work through integration do you mean Spring Integration's ServiceActivator?

@olegz
Copy link
Contributor

olegz commented Sep 25, 2024

Also, your code does not even use the latest snapshot

 <properties>
        <spring-cloud.version>2023.0.3</spring-cloud.version>
    </properties>

So how is is related to the change of this issue?

@dima-bzz
Copy link

@olegz I have fixed the Spring Cloud version. I probably put it wrong. I mean, when using Spring Cloud Function Supplier, error-handler-definition does not work. If I send a message via StreamBridge and an error occurs in sending, then error-handler-definition works.

Not working error-handler-definition

    @Bean
    public Supplier<String> source1() {
        return () -> {
            String message = "FromSource1";
            System.out.println("******************");
            System.out.println("From Source1");
            System.out.println("******************");
            System.out.println("Sending value: " + message);
            throw new RuntimeException("123"); // this send Error message to 'errorChannel'
        };
    }

Working error-handler-definition

    @Scheduled(fixedDelay = 3000)
    public void send() {
        this.streamBridge.send("source1-out-0",
                MessageBuilder.withPayload("123").setHeader(KafkaHeaders.KEY, 1L).build()); // this send Error message to 'kafka-102312345.source1-out-0.errors'
    }

@dima-bzz
Copy link

I kind of got it. In the first case, the error occurs before calling the sending handler in Kafka,
respectively, the error is sent to the 'errorChannel' channel.
In the second case, the error occurs already in the handler for sending to Kafka and the error is sent to the 'errorChannel' and 'kafka-102312345.source1-out-0.errors' channels. Did I understand correctly?

@olegz
Copy link
Contributor

olegz commented Sep 26, 2024

The error thrown inside of a function constitutes consumer error, (e.g., <something>-in-0). For error to be triggered on <something>-out-0, the function must complete successfully, yet the error must occur inside the binder when sending a result message to output binding - <something>-out-0

@dima-bzz
Copy link

Thanks, I understand now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants