From 51cbd9ba402112e20802c24f442f047bb531bb1c Mon Sep 17 00:00:00 2001 From: Raymond Meester Date: Wed, 2 Oct 2024 13:40:54 +0200 Subject: [PATCH 1/5] set bodytype in stepevent to simpleName() --- .../main/java/org/assimbly/dil/event/collect/StepCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java index ba805f99..2bb918fc 100644 --- a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java +++ b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java @@ -99,7 +99,7 @@ private void processEvent(Exchange exchange, String stepId, long stepTimestamp){ // read body only once byte[] body = exchange.getMessage().getBody(byte[].class); int bodyLength = body != null ? body.length : 0; - String bodyType = body!=null ? body.getClass().getTypeName() : ""; + String bodyType = body!=null ? exchange.getMessage().getBody().getClass().getSimpleName() : ""; // set custom properties setCustomProperties(exchange, bodyType, bodyLength, stepId, stepTimestamp); From e75a66b366cb17aefca0edef3d81dfab5796f8cd Mon Sep 17 00:00:00 2001 From: Raymond Meester Date: Wed, 2 Oct 2024 13:42:40 +0200 Subject: [PATCH 2/5] format of tenary --- .../main/java/org/assimbly/dil/event/collect/StepCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java index 2bb918fc..b1bfc175 100644 --- a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java +++ b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java @@ -99,7 +99,7 @@ private void processEvent(Exchange exchange, String stepId, long stepTimestamp){ // read body only once byte[] body = exchange.getMessage().getBody(byte[].class); int bodyLength = body != null ? body.length : 0; - String bodyType = body!=null ? exchange.getMessage().getBody().getClass().getSimpleName() : ""; + String bodyType = body!= null ? exchange.getMessage().getBody().getClass().getSimpleName() : ""; // set custom properties setCustomProperties(exchange, bodyType, bodyLength, stepId, stepTimestamp); From bb4a42c18d9a9722cfb987e3115930c602a6a62c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Gon=C3=A7alves?= Date: Tue, 8 Oct 2024 14:31:47 +0100 Subject: [PATCH 3/5] get body as inputStream, and then into a byte[] --- .../dil/event/collect/StepCollector.java | 89 ++++++++++--------- 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java index b1bfc175..ab2d08a2 100644 --- a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java +++ b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java @@ -4,6 +4,7 @@ import org.apache.camel.Message; import org.apache.camel.spi.CamelEvent; import org.apache.camel.support.EventNotifierSupport; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.assimbly.dil.event.domain.Filter; import org.assimbly.dil.event.domain.Store; @@ -13,6 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.charset.*; import java.text.SimpleDateFormat; @@ -96,48 +98,53 @@ public void notify(CamelEvent event) throws Exception { private void processEvent(Exchange exchange, String stepId, long stepTimestamp){ - // read body only once - byte[] body = exchange.getMessage().getBody(byte[].class); - int bodyLength = body != null ? body.length : 0; - String bodyType = body!= null ? exchange.getMessage().getBody().getClass().getSimpleName() : ""; - - // set custom properties - setCustomProperties(exchange, bodyType, bodyLength, stepId, stepTimestamp); - - //set fields - Message message = exchange.getMessage(); - String bodyToStoreOnEvent = getBodyToStoreOnEvent(exchange, body); - Map headers = message.getHeaders(); - Map properties = exchange.getProperties(); - String transactionId = message.getMessageId(); - - //use breadcrumbId when available, otherwise set custom - transactionId = message.getHeader(BREADCRUMB_ID_HEADER, String.class); - if(transactionId==null || transactionId.isEmpty()){ - transactionId = message.getMessageId() + "_" + stepId; - message.setHeader(BREADCRUMB_ID_HEADER, transactionId); - } + try { + // read body only once + InputStream inputStream = exchange.getMessage().getBody(InputStream.class); + byte[] body = IOUtils.toByteArray(inputStream); + int bodyLength = body != null ? body.length : 0; + String bodyType = body != null ? exchange.getMessage().getBody().getClass().getSimpleName() : ""; + + // set custom properties + setCustomProperties(exchange, bodyType, bodyLength, stepId, stepTimestamp); + + //set fields + Message message = exchange.getMessage(); + String bodyToStoreOnEvent = getBodyToStoreOnEvent(exchange, body); + Map headers = message.getHeaders(); + Map properties = exchange.getProperties(); + String transactionId = message.getMessageId(); + + //use breadcrumbId when available, otherwise set custom + transactionId = message.getHeader(BREADCRUMB_ID_HEADER, String.class); + if (transactionId == null || transactionId.isEmpty()) { + transactionId = message.getMessageId() + "_" + stepId; + message.setHeader(BREADCRUMB_ID_HEADER, transactionId); + } - // get previous flowId and flowVersion - String previousFlowId = exchange.getMessage().getHeader(FLOW_ID_HEADER, String.class); - String previousFlowVersion = exchange.getMessage().getHeader(FLOW_VERSION_HEADER, String.class); - // set flowId and flowVersion - exchange.getMessage().setHeader(FLOW_ID_HEADER, flowId); - exchange.getMessage().setHeader(FLOW_VERSION_HEADER, flowVersion); - - //calculate times - String timestamp = EventUtil.getCreatedTimestamp(stepTimestamp); - String expiryDate = EventUtil.getExpiryTimestamp(expiryInHours); - - //create json - MessageEvent messageEvent = new MessageEvent( - timestamp, transactionId, flowId, flowVersion, previousFlowId, previousFlowVersion, stepId, headers, - properties, bodyToStoreOnEvent, expiryDate - ); - String json = messageEvent.toJson(); - - //store the event - storeManager.storeEvent(json); + // get previous flowId and flowVersion + String previousFlowId = exchange.getMessage().getHeader(FLOW_ID_HEADER, String.class); + String previousFlowVersion = exchange.getMessage().getHeader(FLOW_VERSION_HEADER, String.class); + // set flowId and flowVersion + exchange.getMessage().setHeader(FLOW_ID_HEADER, flowId); + exchange.getMessage().setHeader(FLOW_VERSION_HEADER, flowVersion); + + //calculate times + String timestamp = EventUtil.getCreatedTimestamp(stepTimestamp); + String expiryDate = EventUtil.getExpiryTimestamp(expiryInHours); + + //create json + MessageEvent messageEvent = new MessageEvent( + timestamp, transactionId, flowId, flowVersion, previousFlowId, previousFlowVersion, stepId, headers, + properties, bodyToStoreOnEvent, expiryDate + ); + String json = messageEvent.toJson(); + + //store the event + storeManager.storeEvent(json); + } catch (Exception e) { + log.error("Error to process event", e); + } } public String getBodyToStoreOnEvent(Exchange exchange, byte[] body) { From 6a9ead61bf26e6105d6ea468dd77d014bf1ba3d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Gon=C3=A7alves?= Date: Wed, 9 Oct 2024 16:40:25 +0100 Subject: [PATCH 4/5] use try catch only when converts InputStream into a byte[], this way it will continue even when convertion is not possible, and uses an empty body --- .../dil/event/collect/StepCollector.java | 93 ++++++++++--------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java index ab2d08a2..bd99625a 100644 --- a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java +++ b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java @@ -98,53 +98,56 @@ public void notify(CamelEvent event) throws Exception { private void processEvent(Exchange exchange, String stepId, long stepTimestamp){ - try { - // read body only once - InputStream inputStream = exchange.getMessage().getBody(InputStream.class); - byte[] body = IOUtils.toByteArray(inputStream); - int bodyLength = body != null ? body.length : 0; - String bodyType = body != null ? exchange.getMessage().getBody().getClass().getSimpleName() : ""; - - // set custom properties - setCustomProperties(exchange, bodyType, bodyLength, stepId, stepTimestamp); - - //set fields - Message message = exchange.getMessage(); - String bodyToStoreOnEvent = getBodyToStoreOnEvent(exchange, body); - Map headers = message.getHeaders(); - Map properties = exchange.getProperties(); - String transactionId = message.getMessageId(); - - //use breadcrumbId when available, otherwise set custom - transactionId = message.getHeader(BREADCRUMB_ID_HEADER, String.class); - if (transactionId == null || transactionId.isEmpty()) { - transactionId = message.getMessageId() + "_" + stepId; - message.setHeader(BREADCRUMB_ID_HEADER, transactionId); - } + // read body only once + InputStream inputStream = exchange.getMessage().getBody(InputStream.class); + + byte[] body = null; + if (inputStream != null) { + try { + body = IOUtils.toByteArray(inputStream); + } catch (Exception e) { } + } - // get previous flowId and flowVersion - String previousFlowId = exchange.getMessage().getHeader(FLOW_ID_HEADER, String.class); - String previousFlowVersion = exchange.getMessage().getHeader(FLOW_VERSION_HEADER, String.class); - // set flowId and flowVersion - exchange.getMessage().setHeader(FLOW_ID_HEADER, flowId); - exchange.getMessage().setHeader(FLOW_VERSION_HEADER, flowVersion); - - //calculate times - String timestamp = EventUtil.getCreatedTimestamp(stepTimestamp); - String expiryDate = EventUtil.getExpiryTimestamp(expiryInHours); - - //create json - MessageEvent messageEvent = new MessageEvent( - timestamp, transactionId, flowId, flowVersion, previousFlowId, previousFlowVersion, stepId, headers, - properties, bodyToStoreOnEvent, expiryDate - ); - String json = messageEvent.toJson(); - - //store the event - storeManager.storeEvent(json); - } catch (Exception e) { - log.error("Error to process event", e); + int bodyLength = body != null ? body.length : 0; + String bodyType = body != null ? exchange.getMessage().getBody().getClass().getSimpleName() : ""; + + // set custom properties + setCustomProperties(exchange, bodyType, bodyLength, stepId, stepTimestamp); + + //set fields + Message message = exchange.getMessage(); + String bodyToStoreOnEvent = getBodyToStoreOnEvent(exchange, body); + Map headers = message.getHeaders(); + Map properties = exchange.getProperties(); + String transactionId = message.getMessageId(); + + //use breadcrumbId when available, otherwise set custom + transactionId = message.getHeader(BREADCRUMB_ID_HEADER, String.class); + if (transactionId == null || transactionId.isEmpty()) { + transactionId = message.getMessageId() + "_" + stepId; + message.setHeader(BREADCRUMB_ID_HEADER, transactionId); } + + // get previous flowId and flowVersion + String previousFlowId = exchange.getMessage().getHeader(FLOW_ID_HEADER, String.class); + String previousFlowVersion = exchange.getMessage().getHeader(FLOW_VERSION_HEADER, String.class); + // set flowId and flowVersion + exchange.getMessage().setHeader(FLOW_ID_HEADER, flowId); + exchange.getMessage().setHeader(FLOW_VERSION_HEADER, flowVersion); + + //calculate times + String timestamp = EventUtil.getCreatedTimestamp(stepTimestamp); + String expiryDate = EventUtil.getExpiryTimestamp(expiryInHours); + + //create json + MessageEvent messageEvent = new MessageEvent( + timestamp, transactionId, flowId, flowVersion, previousFlowId, previousFlowVersion, stepId, headers, + properties, bodyToStoreOnEvent, expiryDate + ); + String json = messageEvent.toJson(); + + //store the event + storeManager.storeEvent(json); } public String getBodyToStoreOnEvent(Exchange exchange, byte[] body) { From 66bd6a00943370636ae2d6c7663d6bf620abfe77 Mon Sep 17 00:00:00 2001 From: Raymond Meester Date: Fri, 11 Oct 2024 08:37:59 +0200 Subject: [PATCH 5/5] Changed FlowId and FlowVersion headers for stepcollector --- .../java/org/assimbly/dil/event/collect/StepCollector.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java index bd99625a..c5177aab 100644 --- a/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java +++ b/dil/src/main/java/org/assimbly/dil/event/collect/StepCollector.java @@ -37,8 +37,8 @@ public class StepCollector extends EventNotifierSupport { private final String BREADCRUMB_ID_HEADER = "breadcrumbId"; public static final String COMPONENT_INIT_TIME_HEADER = "ComponentInitTime"; - public static final String FLOW_ID_HEADER = "flowId"; - public static final String FLOW_VERSION_HEADER = "flowVersion"; + public static final String FLOW_ID_HEADER = "DOVETAIL_FlowId"; + public static final String FLOW_VERSION_HEADER = "DOVETAIL_FlowVersion"; public static final String RESPONSE_TIME_PROPERTY = "ResponseTime"; public static final String TIMESTAMP_PROPERTY = "Timestamp";