-
Message Channel and Endpoint: how we connect an application to the channel. It is used in every single application.
-
Content-Based Router: routes each message to the correct recipient based on message content.
-
Dead-Letter Channel: the messaging system determines that it cannot or should not deliver a message. It is used (in our case and generally) for messages that cannot be processed successfully after a certain amount of tries.
-
Invalid-Message Channel: channel for messages that are malformed or fail validation checks and cannot be processed as expected.
-
Message Translator: used between other filters or applications to translate one data format into another.
-
Wipe Tap: inserts a simple Recipient List into the channel that publishes each incoming message to the main channel and a secondary channel.
-
Normalizer: routes each message type through a custom Message Translator so that the resulting messages match a common format.
-
Content Enricher: access an external data source in order to augment a message with missing information.
-
Multicast: allows a message to be sent to multiple recipients or destinations simultaneously (not in the book, but included in Camel: https://camel.apache.org/components/4.4.x/eips/multicast-eip.html).
- Message Channel
to("jms:queue:notification.producer.user.creation.queue")
- Point-to-Point Channel Inherited implemented by this type of queue.
- Message Channel
to("jms:queue:notification.producer.creator.creation.queue")
- Point-to-Point Channel Inherited implemented by this type of queue.
- Message Channel
to("jms:queue:notification.producer.event.creation.queue")
- Point-to-Point Channel Inherited implemented by this type of queue.
- Message Channel
from("jms:queue:notification.producer.user.creation.queue")
from("jms:queue:notification.producer.creator.creation.queue")
from("jms:queue:notification.producer.event.creation.queue")
from("jms:queue:notification.producer.ticketpayment.inprocess.queue")
from("jms:queue:notification.producer.ticketpayment.rejected.queue")
from("jms:queue:notification.producer.ticketpayment.approved.queue")
from("jms:queue:notification.producer.recommendations.queue")
- Content-Based Router
choice()
.when(header("validMessage").isEqualTo(false))
.to("jms:queue:notification.deadletter.queue")
.otherwise()
.to("jms:queue:notification.builder.user.creation.queue")
choice()
.when(header("validMessage").isEqualTo(false))
.to("jms:queue:notification.deadletter.queue")
.otherwise()
.to("jms:queue:notification.builder.creator.creation.queue")
choice()
.when(header("validMessage").isEqualTo(false))
.to("jms:queue:notification.deadletter.queue")
.otherwise()
.to("jms:queue:notification.builder.event.creation.queue")
choice()
.when(header("validMessage").isEqualTo(false))
.to("jms:queue:notification.deadletter.queue")
.otherwise()
.to("jms:queue:notification.builder.ticketpayment.inprocess.queue")
choice()
.when(header("validMessage").isEqualTo(false))
.to("jms:queue:notification.deadletter.queue")
.otherwise()
.to("jms:queue:notification.builder.ticketpayment.rejected.queue")
choice()
.when(header("validMessage").isEqualTo(false))
.to("jms:queue:notification.deadletter.queue")
.otherwise()
.to("jms:queue:notification.builder.ticketpayment.approved.queue")
choice()
.when(header("validMessage").isEqualTo(false))
.to("jms:queue:notification.deadletter.queue")
.otherwise()
.to("jms:queue:notification.builder.recommendations.queue")
-
Point-to-Point Channel Inherited implemented by this type of queue.
-
Content Enricher
if (user != null) {
notificationJson.put("receivesEmails", user.getReceivesEmails());
notificationJson.put("receivesPushs", user.getReceivesPushs());
} else if (creator != null) {
notificationJson.put("receivesEmails", true);
notificationJson.put("receivesPushs", true);
}
- Dead-Letter Channel
choice()
.when(header("validMessage").isEqualTo(false))
.to("jms:queue:notification.deadletter.queue")
- Message Channel
to("jms:queue:ticketvalidation.deadletter.queue")
to("jms:queue:ticketvalidation.financial.queue")
to("jms:queue:ticketvalidation.completion.queue")
to("jms:queue:notification.producer.ticketpayment.inprocess.queue")
to("jms:queue:recommender.builder.queue")
- Content-Based Router
.when(simple("${body[validPrice]} == false"))
.to("jms:queue:ticketvalidation.deadletter.queue")
.otherwise()
.multicast().stopOnException()
.parallelProcessing()
.to("direct:processTicket", "direct:inProcessNotification", "direct:inProcessRecommender")
.end()
.end()
- Invalid-Message Channel
.choice()
.when(simple("${body[validPrice]} == false"))
.log("Sending to ticketvalidation.invalidmessage.queue")
.to("jms:queue:ticketvalidation.invalidmessage.queue")
- Wire Tap
.to("direct:processTicket", "direct:inProcessNotification", "direct:inProcessRecommender")
- Message Translator
.marshal().json(JsonLibrary.Jackson) // Convert Map to
- Multicast
.otherwise()
.multicast().stopOnException()
.parallelProcessing()
.to("direct:processTicket", "direct:inProcessNotification", "direct:inProcessRecommender")
.end()
.end()
- Message Channel
from("jms:queue:ticketvalidation.financial.queue")
- Content-Based Router
choice()
.when(simple("${body[financiallyValid]} == true"))
.to("jms:queue:ticketvalidation.completion.queue")
.otherwise()
.to("jms:queue:notification.producer.ticketpayment.rejected.queue")
- Message Channel
from("jms:queue:ticketvalidation.completion.queue")
-
Point-to-Point Channel Inherited implemented by this type of queue.
-
Content-Based Router
choice()
.when(simple("${body[message]} == 'ticket-payment-approved'"))
.to("jms:queue:notification.producer.ticketpayment.approved.queue")
.otherwise()
.to("jms:queue:ticketvalidation.deadletter.queue")
- Message Translator
.marshal().json(JsonLibrary.Jackson)
.marshal().json(JsonLibrary.Jackson)
- Normalizer
.process(exchange -> {
Object body = exchange.getIn().getBody();
if (body instanceof String) {
String jsonBody = (String) body;
@SuppressWarnings("unchecked")
Map<String, Object> messageData = objectMapper.readValue(jsonBody, Map.class);
exchange.getIn().setBody(messageData);
} else if (body instanceof Map) {
// No conversion needed, body is already a Map
} else {
throw new IllegalArgumentException("Unsupported message format: " + body.getClass());
}
})
- Dead-Letter Channel
.choice()
.when(simple("${body[message]} == 'ticket-payment-approved'"))
.to("jms:queue:notification.producer.ticketpayment.approved.queue")
.otherwise()
.to("jms:queue:ticketvalidation.deadletter.queue")
- Message Channel
from("jms:queue:recommender.builder.queue")
- Message Translator
.unmarshal().json(JsonLibrary.Jackson, Map.class) // Unmarshal
- Content Enricher
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
try {
@SuppressWarnings("unchecked")
Map<String, Object> messageData = exchange.getIn().getBody(Map.class);
int userId = (int) messageData.get("userId");
// Fetch User from the database
User user = em.find(User.class, userId);
if (user == null) {
throw new Exception("User not found for id: " + userId);
}
// Create the recommender model and set EntityManager
NlpRecommenderModel model = new NlpRecommenderModel(user);
model.setEntityManager(em);
// Or use PopularityRecommenderModel if needed
// PopularityRecommenderModel model = new PopularityRecommenderModel(user);
// model.setEntityManager(em);
EventRecommender eventRecommender = new EventRecommender(model);
eventRecommender.requestRecommendation();
List<Event> recommendations = eventRecommender.sendRecommendation();
// Extract event IDs from the recommendations
List<Integer> eventIds = recommendations.stream()
.map(Event::getId)
.collect(Collectors.toList());
// Prepare recommendation message
Map<String, Object> recommendationMessage = new HashMap<>();
recommendationMessage.put("eventIds", eventIds);
recommendationMessage.put("userId", userId);
recommendationMessage.put("message", "event-recommendation");
String jsonMessage = objectMapper.writeValueAsString(recommendationMessage);
exchange.getIn().setBody(jsonMessage);
em.getTransaction().commit();
} catch (Exception e) {
if (em.getTransaction().isActive()) {
em.getTransaction().rollback();
}
throw e;
} finally {
em.close();
}
}
})