Skip to content

Commit

Permalink
yaml-loader: add support for error handler #339
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed May 27, 2020
1 parent b86aefd commit 96714cb
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.k.loader.yaml.parser;

import com.fasterxml.jackson.annotation.JsonAlias;
import org.apache.camel.builder.DeadLetterChannelBuilder;
import org.apache.camel.builder.DefaultErrorHandlerBuilder;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.ErrorHandlerBuilderRef;
import org.apache.camel.builder.NoErrorHandlerBuilder;
import org.apache.camel.k.annotation.yaml.YAMLStepParser;
import org.apache.camel.k.loader.yaml.spi.ProcessorStepParser;
import org.apache.camel.k.loader.yaml.spi.StartStepParser;
import org.apache.camel.k.loader.yaml.spi.StepParserSupport;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;

@YAMLStepParser("error-handler")
public class ErrorHandlerStepParser implements StartStepParser, ProcessorStepParser {
@Override
public ProcessorDefinition<?> toStartProcessor(Context context) {
final Definition definition = context.node(Definition.class);

StepParserSupport.notNull(definition.builder, "builder");

context.builder().errorHandler(definition.builder);

return context.processor();
}

@Override
public ProcessorDefinition<?> toProcessor(Context context) {
final Definition definition = context.node(Definition.class);

StepParserSupport.notNull(context.processor(), "processor");
StepParserSupport.notNull(definition.builder, "builder");

return context.processor(RouteDefinition.class).errorHandler(definition.builder);
}

public static final class Definition {
public ErrorHandlerBuilder builder;

@JsonAlias("default")
public void setDefault(DefaultErrorHandlerBuilder builder) {
setBuilder(builder);
}

@JsonAlias("dead-letter-channel")
public void setDeadLetterChannel(DeadLetterChannelBuilder builder) {
setBuilder(builder);
}

@JsonAlias({"no-error-handler", "none" })
public void setNoErrorHandler(NoErrorHandlerBuilder builder) {
setBuilder(builder);
}

@JsonAlias("ref")
public void setRefHandler(ErrorHandlerBuilderRef builder) {
setBuilder(builder);
}

@JsonAlias("custom")
public void setCustomHandler(ErrorHandlerBuilder builder) {
setBuilder(builder);
}

private void setBuilder(ErrorHandlerBuilder builder) {
if (this.builder != null) {
throw new IllegalArgumentException("An ErrorHandler has already been set");
}

this.builder = builder;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.spi.HasCamelContext;
import org.apache.camel.util.ObjectHelper;

Expand All @@ -37,11 +38,13 @@ public interface StepParser {
class Context implements HasCamelContext {
private final ObjectMapper mapper;
private final RouteBuilder builder;
private final ProcessorDefinition<?> processor;
private final JsonNode node;
private final Resolver resolver;

public Context(RouteBuilder builder, ObjectMapper mapper, JsonNode node, Resolver resolver) {
public Context(RouteBuilder builder, ProcessorDefinition<?> processor, ObjectMapper mapper, JsonNode node, Resolver resolver) {
this.builder = builder;
this.processor = processor;
this.mapper = mapper;
this.node = node;
this.resolver = ObjectHelper.notNull(resolver, "resolver");
Expand All @@ -52,6 +55,14 @@ public CamelContext getCamelContext() {
return builder.getContext();
}

public ProcessorDefinition<?> processor() {
return this.processor;
}

public <T extends ProcessorDefinition<?>> T processor(Class<T> type) {
return type.cast(this.processor);
}

public RouteBuilder builder() {
return builder;
}
Expand Down Expand Up @@ -88,9 +99,30 @@ public <T extends StepParser> T lookup(Class<T> type, String stepId) {
throw new RuntimeException("No handler for step with id: " + stepId);
}

public static Context of(Context context, ProcessorDefinition<?> processor, JsonNode step) {
return new Context(
context.builder,
processor,
context.mapper,
step,
context.resolver
);
}

public static Context of(Context context, ProcessorDefinition<?> processor) {
return new Context(
context.builder,
processor,
context.mapper,
context.node,
context.resolver
);
}

public static Context of(Context context, JsonNode step) {
return new Context(
context.builder,
context.processor,
context.mapper,
step,
context.resolver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ public static ProcessorDefinition<?> convertSteps(ProcessorStepParser.Context co
ProcessorDefinition<?> current = parent;

for (Step step : steps) {

ProcessorDefinition<?> child = ProcessorStepParser.invoke(
ProcessorStepParser.Context.of(context, step.node),
ProcessorStepParser.Context.of(context, current, step.node),
step.id
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void configure() throws Exception {
try (is) {
for (Step step : mapper.readValue(is, Step[].class)) {
StartStepParser.invoke(
new StepParser.Context(this, mapper, step.node, resolver),
new StepParser.Context(this, null, mapper, step.node, resolver),
step.id);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,21 @@ class RoutesTest extends TestSupport {
cleanup:
context?.stop()
}

def 'errorHandler'() {
setup:
def context = startContext([
'myFailingProcessor' : new MyFailingProcessor()
])

mockEndpoint(context, 'mock:on-error') {
expectedMessageCount = 1
}
when:
context.createProducerTemplate().requestBody('direct:start', 'Hello World');
then:
MockEndpoint.assertIsSatisfied(context)
cleanup:
context?.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.camel.k.loader.yaml.spi.ProcessorStepParser
import org.apache.camel.k.loader.yaml.spi.StartStepParser
import org.apache.camel.k.loader.yaml.spi.StepParser
import org.apache.camel.model.ProcessorDefinition
import org.apache.camel.model.RouteDefinition
import spock.lang.Specification

import java.nio.charset.StandardCharsets
Expand All @@ -43,7 +44,7 @@ class TestSupport extends Specification {
}
}

return new StepParser.Context(builder, MAPPER, node, RESOLVER)
return new StepParser.Context(builder, new RouteDefinition(), MAPPER, node, RESOLVER)
}

static StepParser.Context stepContext(JsonNode content) {
Expand All @@ -53,7 +54,7 @@ class TestSupport extends Specification {
}
}

return new StepParser.Context(builder, MAPPER, content, RESOLVER)
return new StepParser.Context(builder, new RouteDefinition(), MAPPER, content, RESOLVER)
}

static CamelContext startContext(String content) {
Expand Down Expand Up @@ -119,6 +120,10 @@ class TestSupport extends Specification {
return type.getConstructor().newInstance().toProcessor(stepContext(content))
}

static <U extends StartStepParser> ProcessorDefinition<?> toStartProcessor(Class<U> type, String content) {
return type.getConstructor().newInstance().toStartProcessor(stepContext(content))
}

static ProcessorDefinition<?> toProcessor(String id, String content) {
def ctx = stepContext(content)
def parser = RESOLVER.resolve(ctx.camelContext, id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.k.loader.yaml.parser

import org.apache.camel.builder.DeadLetterChannelBuilder
import org.apache.camel.builder.DefaultErrorHandlerBuilder
import org.apache.camel.builder.ErrorHandlerBuilderRef
import org.apache.camel.builder.NoErrorHandlerBuilder
import org.apache.camel.k.loader.yaml.TestSupport
import org.apache.camel.model.RouteDefinition

class ErrorHandlerTest extends TestSupport {
def "definition (route/no-error-handler)"() {
when:
def processor = toProcessor(ErrorHandlerStepParser, '''
no-error-handler: {}
''')
then:
with(processor, RouteDefinition) {
errorHandlerFactory instanceof NoErrorHandlerBuilder
}
}

def "definition (route/default)"() {
when:
def processor = toProcessor(ErrorHandlerStepParser, '''
default:
dead-letter-uri: "jms:queue:dead"
''')
then:
with(processor, RouteDefinition) {
with(errorHandlerFactory, DefaultErrorHandlerBuilder) {
deadLetterUri == 'jms:queue:dead'
}
}
}

def "definition (route/dead-letter-channel)"() {
when:
def processor = toProcessor(ErrorHandlerStepParser, '''
dead-letter-channel: "jms:queue:dead"
''')
then:
with(processor, RouteDefinition) {
with(errorHandlerFactory, DeadLetterChannelBuilder) {
deadLetterUri == 'jms:queue:dead'
}
}
}

def "definition (route/ref)"() {
when:
def processor = toProcessor(ErrorHandlerStepParser, '''
ref: "myErrorHandler"
''')
then:
with(processor, RouteDefinition) {
with(errorHandlerFactory, ErrorHandlerBuilderRef) {
ref == 'myErrorHandler'
}
}
}

def "definition (global/no-error-handler)"() {
given:
def stepContext = stepContext('''
no-error-handler: {}
''')
when:
new ErrorHandlerStepParser().toStartProcessor(stepContext)
then:
stepContext.builder().routeCollection.errorHandlerFactory instanceof NoErrorHandlerBuilder
}

def "definition (global/default)"() {
given:
def stepContext = stepContext('''
default:
dead-letter-uri: "jms:queue:dead"
''')
when:
new ErrorHandlerStepParser().toStartProcessor(stepContext)
then:
with(stepContext.builder().routeCollection.errorHandlerFactory, DefaultErrorHandlerBuilder) {
deadLetterUri == 'jms:queue:dead'
}
}

def "definition (global/dead-letter-channel)"() {
given:
def stepContext = stepContext('''
dead-letter-channel: "jms:queue:dead"
''')
when:
new ErrorHandlerStepParser().toStartProcessor(stepContext)
then:
with(stepContext.builder().routeCollection.errorHandlerFactory, DefaultErrorHandlerBuilder) {
deadLetterUri == 'jms:queue:dead'
}
}

def "definition (global/ref)"() {
given:
def stepContext = stepContext('''
ref: "myErrorHandler"
''')
when:
new ErrorHandlerStepParser().toStartProcessor(stepContext)
then:
with(stepContext.builder().routeCollection.errorHandlerFactory, ErrorHandlerBuilderRef) {
ref == 'myErrorHandler'
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
- error-handler:
dead-letter-channel:
dead-letter-uri: "mock:on-error"
redelivery-delay: 0
- from:
uri: "direct:start"
steps:
- process:
ref: "myFailingProcessor"
Loading

0 comments on commit 96714cb

Please sign in to comment.