diff --git a/broker/pom.xml b/broker/pom.xml index 9626b885..46b6bb1c 100644 --- a/broker/pom.xml +++ b/broker/pom.xml @@ -6,7 +6,7 @@ runtime org.assimbly - 5.0.0 + 5.0.1 broker diff --git a/brokerRest/pom.xml b/brokerRest/pom.xml index 464f0a99..ec6a1948 100644 --- a/brokerRest/pom.xml +++ b/brokerRest/pom.xml @@ -6,7 +6,7 @@ runtime org.assimbly - 5.0.0 + 5.0.1 broker-rest diff --git a/dil/pom.xml b/dil/pom.xml index 7fa6224d..7fbe58f3 100644 --- a/dil/pom.xml +++ b/dil/pom.xml @@ -4,7 +4,7 @@ runtime org.assimbly - 5.0.0 + 5.0.1 dil diff --git a/dil/src/main/java/org/assimbly/dil/blocks/beans/enrich/zipfile/ZipFileEnrichStrategy.java b/dil/src/main/java/org/assimbly/dil/blocks/beans/enrich/zipfile/ZipFileEnrichStrategy.java index b84c176d..ff127a34 100644 --- a/dil/src/main/java/org/assimbly/dil/blocks/beans/enrich/zipfile/ZipFileEnrichStrategy.java +++ b/dil/src/main/java/org/assimbly/dil/blocks/beans/enrich/zipfile/ZipFileEnrichStrategy.java @@ -27,13 +27,21 @@ public class ZipFileEnrichStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { elementNames = new ArrayList<>(); + if (newExchange == null) { + // there’s no remote file to consume + return oldExchange; + } + Message in = oldExchange.getIn(); Message resource = newExchange.getIn(); byte[] sourceZip = in.getBody(byte[].class); - byte[] resourceData = resource.getBody(byte[].class); + byte[] resourceData = newExchange.getContext().getTypeConverter().convertTo(byte[].class, resource.getBody()); - String fileName = resource.getHeader(Exchange.FILE_NAME, String.class); + String fileName = resource.getHeader(Exchange.FILE_NAME_CONSUMED, String.class); + if(fileName == null) { + fileName = resource.getHeader(Exchange.FILE_NAME, String.class); + } ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/dil/src/main/java/org/assimbly/dil/blocks/connections/broker/ActiveMQConnection.java b/dil/src/main/java/org/assimbly/dil/blocks/connections/broker/ActiveMQConnection.java index edd38dfb..91eae2f8 100644 --- a/dil/src/main/java/org/assimbly/dil/blocks/connections/broker/ActiveMQConnection.java +++ b/dil/src/main/java/org/assimbly/dil/blocks/connections/broker/ActiveMQConnection.java @@ -73,10 +73,10 @@ private void setFields(){ } else if (conType.equals("pooled")) { if (maxConnections == null) { - maxConnections = "10"; + maxConnections = "2"; } if (concurentConsumers == null) { - concurentConsumers = "10"; + concurentConsumers = "2"; } } diff --git a/dil/src/main/java/org/assimbly/dil/blocks/processors/JsonExchangeFormatter.java b/dil/src/main/java/org/assimbly/dil/blocks/processors/JsonExchangeFormatter.java index c6fb2b0d..0eefa347 100644 --- a/dil/src/main/java/org/assimbly/dil/blocks/processors/JsonExchangeFormatter.java +++ b/dil/src/main/java/org/assimbly/dil/blocks/processors/JsonExchangeFormatter.java @@ -144,6 +144,10 @@ private void addVariables(Exchange exchange) { } private void addHeaders(Exchange exchange) { + + JSONObject headers2 = getJsonFromMap(filterHeaderAndProperties(exchange.getIn().getHeaders())); + json.put("Headers", headers2); + if (showAll || showHeaders) { JSONObject headers = getJsonFromMap(filterHeaderAndProperties(exchange.getIn().getHeaders())); json.put("Headers", headers); diff --git a/dil/src/main/java/org/assimbly/dil/blocks/processors/OpenTelemetryLogProcessor.java b/dil/src/main/java/org/assimbly/dil/blocks/processors/OpenTelemetryLogProcessor.java new file mode 100644 index 00000000..34d2425f --- /dev/null +++ b/dil/src/main/java/org/assimbly/dil/blocks/processors/OpenTelemetryLogProcessor.java @@ -0,0 +1,56 @@ +package org.assimbly.dil.blocks.processors; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; + + +public class OpenTelemetryLogProcessor implements Processor { + + public void process(Exchange exchange) throws Exception { + + String json = format(exchange); + + System.out.println(json); + + } + + + private String format(Exchange exchange) throws JsonProcessingException { + + Object inBody = exchange.getIn().getBody(); + Map exchangeMap = new HashMap<>(); + exchangeMap.put("ExchangeId", exchange.getExchangeId()); + exchangeMap.put("ExchangePattern", exchange.getPattern().toString()); + exchangeMap.put("Body", inBody != null ? inBody.toString() : "null"); + exchangeMap.put("Headers", exchange.getIn().getHeaders()); + + Instant now = Instant.now(); + + // Convert Instant to string using a formatter + String formattedTime = DateTimeFormatter + .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC) + .format(now); + + Map map = new HashMap<>(); + map.put("timestamp", formattedTime); + map.put("logLevel", "INFO"); + map.put("serviceName", exchange.getFromRouteId()); + map.put("message", exchange.getFromRouteId()); + map.put("attributes",exchangeMap); + + ObjectMapper mapper = new ObjectMapper(); + String jsonString = mapper.writeValueAsString(map); + + return jsonString; + } + +} \ No newline at end of file diff --git a/dil/src/main/java/org/assimbly/dil/blocks/processors/SetLogProcessor.java b/dil/src/main/java/org/assimbly/dil/blocks/processors/SetLogProcessor.java index db53b29b..d160dd84 100644 --- a/dil/src/main/java/org/assimbly/dil/blocks/processors/SetLogProcessor.java +++ b/dil/src/main/java/org/assimbly/dil/blocks/processors/SetLogProcessor.java @@ -1,12 +1,7 @@ package org.assimbly.dil.blocks.processors; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import java.util.HashMap; -import java.util.Map; - public class SetLogProcessor implements Processor { @@ -17,26 +12,8 @@ public void process(Exchange exchange) throws Exception { String json = jsonFormatter.format(exchange); - System.out.println("-------------MyLog --------------------->\n\n" + json); - - } - - - private String formatExchangeToString(Exchange exchange) throws JsonProcessingException { - - Object inBody = exchange.getIn().getBody(); - - Map map = new HashMap<>(); - map.put("ExchangeId", exchange.getExchangeId()); - map.put("FromRouteId", exchange.getFromRouteId()); - map.put("ExchangePattern", exchange.getPattern().toString()); - map.put("Body", inBody != null ? inBody.toString() : "null"); - map.put("Headers", exchange.getIn().getHeaders()); - - ObjectMapper mapper = new ObjectMapper(); - String jsonString = mapper.writeValueAsString(map); + System.out.println(json); - return jsonString; } } \ No newline at end of file diff --git a/dil/src/main/java/org/assimbly/dil/event/domain/MessageEvent.java b/dil/src/main/java/org/assimbly/dil/event/domain/MessageEvent.java index 2d71f6fa..c26b4047 100644 --- a/dil/src/main/java/org/assimbly/dil/event/domain/MessageEvent.java +++ b/dil/src/main/java/org/assimbly/dil/event/domain/MessageEvent.java @@ -90,7 +90,7 @@ public String getId() { return id; } - @JsonProperty("bundleId") + @JsonProperty("flowId") public String getFlowId() { return flowId; } @@ -100,7 +100,7 @@ public String getFlowVersion() { return flowVersion; } - @JsonProperty("previousBundleId") + @JsonProperty("previousFlowId") public String getPreviousFlowId() { return previousFlowId; } diff --git a/dil/src/main/java/org/assimbly/dil/loader/FlowLoader.java b/dil/src/main/java/org/assimbly/dil/loader/FlowLoader.java index da6aca2e..350ae7c0 100644 --- a/dil/src/main/java/org/assimbly/dil/loader/FlowLoader.java +++ b/dil/src/main/java/org/assimbly/dil/loader/FlowLoader.java @@ -4,6 +4,8 @@ import org.apache.camel.*; import org.apache.camel.builder.*; +import org.apache.camel.model.ModelCamelContext; +import org.apache.camel.model.RouteConfigurationDefinition; import org.apache.camel.spi.Resource; import org.apache.camel.spi.RoutesBuilderLoader; import org.apache.camel.spi.RoutesLoader; @@ -127,6 +129,8 @@ private void setErrorHandlers() throws Exception{ private void setRouteConfigurations() throws Exception{ + removeRouteConfiguration(flowId); + for(String prop : props.keySet()){ if(prop.endsWith("routeconfiguration")){ @@ -134,7 +138,6 @@ private void setRouteConfigurations() throws Exception{ String id = props.get(prop + ".id"); if(routeConfiguration!=null && !routeConfiguration.isEmpty()){ - context.removeRoute(id); loadStep(routeConfiguration, "routeconfiguration", id, null); } } @@ -247,6 +250,26 @@ private void setErrorHandler(String id, String errorUri) throws Exception { } + private void removeRouteConfiguration(String flowId) { + + ModelCamelContext modelContext = (ModelCamelContext) context; + + List routeConfigurationsToRemove = modelContext.getRouteConfigurationDefinitions().stream() + .filter(Objects::nonNull) // Exclude null entries + .filter(routeConfig -> routeConfig.getId().startsWith(flowId)) + .toList(); // Collect into a new list to avoid modifying the original list during iteration + + routeConfigurationsToRemove.forEach(routeConfig -> { + try { + modelContext.removeRouteConfiguration(routeConfig); + log.info("Removed routeConfiguration: " + routeConfig.getId()); + } catch (Exception e) { + log.warn("Failed to remove route configuration: " + routeConfig.getId()); + } + }); + + } + public String getReport(){ return flowLoaderReport.getReport(); } diff --git a/dil/src/main/java/org/assimbly/dil/transpiler/marshalling/Unmarshall.java b/dil/src/main/java/org/assimbly/dil/transpiler/marshalling/Unmarshall.java index 7a1c7fc3..6e0a90b9 100644 --- a/dil/src/main/java/org/assimbly/dil/transpiler/marshalling/Unmarshall.java +++ b/dil/src/main/java/org/assimbly/dil/transpiler/marshalling/Unmarshall.java @@ -17,7 +17,6 @@ // This class unmarshalls an XML file into a Java treemap object // The XML file must be in DIL (Data Integration Language) format public class Unmarshall { - private Document doc; private TreeMap properties; private XMLConfiguration conf; @@ -97,17 +96,13 @@ private void addDependencies(Element element){ } String flowDependencies = null; - NodeList dependenciesList = dependencies.getChildNodes(); - - for (int i = 0; i < dependenciesList.getLength(); i++) { - Node dependency = dependenciesList.item(i); - if (dependency instanceof Element) { - if(i == 0){ - flowDependencies = dependency.getTextContent(); - }else{ - flowDependencies = flowDependencies + "," + dependency.getTextContent(); - } + List dependenciesList = getElementChildren(dependencies); + for(Element dependency: dependenciesList){ + if(flowDependencies==null){ + flowDependencies = dependency.getTextContent(); + }else{ + flowDependencies = flowDependencies + "," + dependency.getTextContent(); } } @@ -229,4 +224,18 @@ private String evaluateXpath(String xpath) throws TransformerException, XPathExp return xp.evaluate(doc); } + List getElementChildren(Node parent) { + List elementChildren = new ArrayList<>(); + NodeList childNodes = parent.getChildNodes(); + + for (int i = 0; i < childNodes.getLength(); i++) { + Node node = childNodes.item(i); + if (node.getNodeType() == Node.ELEMENT_NODE) { + elementChildren.add((Element) node); + } + } + + return elementChildren; + } + } diff --git a/integration/pom.xml b/integration/pom.xml index cc9b3416..a5bec61b 100644 --- a/integration/pom.xml +++ b/integration/pom.xml @@ -4,7 +4,7 @@ runtime org.assimbly - 5.0.0 + 5.0.1 integration diff --git a/integration/src/main/java/org/assimbly/integration/Integration.java b/integration/src/main/java/org/assimbly/integration/Integration.java index 980e9a9b..afdf77dc 100644 --- a/integration/src/main/java/org/assimbly/integration/Integration.java +++ b/integration/src/main/java/org/assimbly/integration/Integration.java @@ -262,11 +262,30 @@ public interface Integration { * Gets the stats of an integration * * @param mediaType (xml or json) - * @throws Exception if flow doesn't start - * @return returns number of messages + * @throws Exception if stats can't be retrieved + * @return returns stats of integration (system) + * */ public String getStats(String mediaType) throws Exception; + /** + * Gets the stats of all steps + * + * @param mediaType (xml or json) + * @throws Exception if stats can't be retrieved + * @return returns stats of integration (system) + */ + public String getStepsStats(String mediaType) throws Exception; + + /** + * Gets the stats of all flows + * + * @param mediaType (xml or json) + * @throws Exception if stats can't be retrieved + * @return returns stats of integration (system) + */ + public String getFlowsStats(String mediaType) throws Exception; + /** * Gets the stats of an integration * diff --git a/integration/src/main/java/org/assimbly/integration/impl/BaseIntegration.java b/integration/src/main/java/org/assimbly/integration/impl/BaseIntegration.java index 74893063..f0d2de9f 100644 --- a/integration/src/main/java/org/assimbly/integration/impl/BaseIntegration.java +++ b/integration/src/main/java/org/assimbly/integration/impl/BaseIntegration.java @@ -289,6 +289,10 @@ public String testConnection(String host, int port, int timeOut) { public abstract String getStats(String mediaType) throws Exception; + public abstract String getFlowsStats(String mediaType) throws Exception; + + public abstract String getStepsStats(String mediaType) throws Exception; + public abstract String getMessages(String mediaType) throws Exception; public abstract String getStatsByFlowIds(String flowIds, String filter, String mediaType) throws Exception; diff --git a/integration/src/main/java/org/assimbly/integration/impl/CamelIntegration.java b/integration/src/main/java/org/assimbly/integration/impl/CamelIntegration.java index 6f4e8d45..1cf878fd 100644 --- a/integration/src/main/java/org/assimbly/integration/impl/CamelIntegration.java +++ b/integration/src/main/java/org/assimbly/integration/impl/CamelIntegration.java @@ -1,5 +1,10 @@ package org.assimbly.integration.impl; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.encoder.JsonEncoder; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.ConsoleAppender; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.Resources; @@ -68,6 +73,7 @@ import org.jasypt.properties.EncryptableProperties; import org.json.JSONArray; import org.json.JSONObject; +import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.xml.sax.InputSource; import org.yaml.snakeyaml.Yaml; @@ -84,7 +90,10 @@ import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; import java.lang.reflect.Method; +import java.math.BigDecimal; import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -93,9 +102,10 @@ import java.nio.file.Paths; import java.security.KeyStoreException; import java.security.cert.Certificate; +import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -106,7 +116,7 @@ public class CamelIntegration extends BaseIntegration { private boolean started; private static String BROKER_HOST = "ASSIMBLY_BROKER_HOST"; private static String BROKER_PORT = "ASSIMBLY_BROKER_PORT"; - private final static long stopTimeout = 1000; + private final static long stopTimeout = 500; private ServiceStatus status; private String flowStatus; private final MetricRegistry metricRegistry = new MetricRegistry(); @@ -169,7 +179,7 @@ public void setDefaultSettings() throws Exception { setDefaultThreadProfile(5,50,5000); - setThreadProfile("wiretapProfile", 0,5,2000); + setThreadProfile("wiretapProfile", 0,10,2500); setCertificateStore(true); @@ -183,7 +193,7 @@ public void setDefaultSettings() throws Exception { setHistoryMetrics(true); - setTracing(true,"backlog"); + //setTracing(true,"backlog"); } @@ -289,8 +299,9 @@ public void setDefaultBlocks() throws Exception { registry.bind("FlowLogger", new FlowLogger()); registry.bind("exceptionAsJson", new ExceptionAsJsonProcessor()); - registry.bind("SetLogProcessor", new SetLogProcessor()); - registry.bind("JsonExchangeFormatter", new JsonExchangeFormatter()); + //registry.bind("SetLogProcessor", new SetLogProcessor()); + //registry.bind("JsonExchangeFormatter", new JsonExchangeFormatter()); + //registry.bind("opentelemetry", new OpenTelemetryLogProcessor()); } @@ -309,10 +320,13 @@ public void setThreadProfile(String name, int poolSize, int maxPoolSize, int max public void setGlobalOptions(){ //enable virtual threads - System.setProperty("camel.threads.virtual.enabled","true"); + //System.setProperty("camel.threads.virtual.enabled","true"); context.setUseBreadcrumb(true); + //enable performance stats + context.getManagementStrategy().getManagementAgent().setLoadStatisticsEnabled(true); + // Enable Jackson JSON type converter for more types. context.getGlobalOptions().put("CamelJacksonEnableTypeConverter", "true"); // Allow Jackson JSON to convert to pojo types also @@ -1049,12 +1063,8 @@ private void addCustomActiveMQConnection(TreeMap props, String f Component activemqComp = this.context.getComponent(activemqName); if (activemqComp == null) { - JmsComponent jmsComponent = getJmsComponent(activemqUrl); - this.context.addComponent(activemqName, jmsComponent); - - } } catch (Exception e) { @@ -1065,12 +1075,15 @@ private void addCustomActiveMQConnection(TreeMap props, String f private static JmsComponent getJmsComponent(String activemqUrl) { + int maxConnections = getEnvironmentVarAsInteger("AMQ_MAXIMUM_CONNECTIONS",500); + int idleTimeout = getEnvironmentVarAsInteger("AMQ_IDLE_TIMEOUT",5000); + ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(activemqUrl); PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory); - pooledConnectionFactory.setMaxConnections(1000); // Max connections in the pool - pooledConnectionFactory.setIdleTimeout(5000); // Idle timeout in milliseconds + pooledConnectionFactory.setMaxConnections(maxConnections); // Max connections in the pool + pooledConnectionFactory.setIdleTimeout(idleTimeout); // Idle timeout in milliseconds JmsComponent jmsComponent = new JmsComponent(); jmsComponent.setConnectionFactory(pooledConnectionFactory); @@ -1308,7 +1321,7 @@ public String startFlow(String id, long timeout) { initFlowActionReport(id, "Start"); if(hasFlow(id)) { - stopFlow(id, timeout); + stopFlow(id, timeout, false); } boolean addFlow = false; @@ -1339,13 +1352,12 @@ public String startFlow(String id, long timeout) { if (!result.equals("loaded") && !result.equals("started")){ if(result.equalsIgnoreCase("error")){ String startReport = loadReport; - stopFlow(id, timeout); + stopFlow(id, timeout, false); loadReport = startReport; }else{ finishFlowActionReport(id, "error",result,"error"); } }else if(result.equals("loaded")) { - List steps = getRoutesByFlowId(id); log.info("Starting " + steps.size() + " steps"); @@ -1366,7 +1378,7 @@ public String startFlow(String id, long timeout) { }catch (Exception e) { if(context.isStarted()) { - stopFlow(id, stopTimeout); + stopFlow(id, stopTimeout, false); finishFlowActionReport(id, "error","Start flow failed | error=" + e.getMessage(),"error"); log.error("Start flow failed. | flowid=" + id,e); }else{ @@ -1419,14 +1431,7 @@ private ServiceStatus startStep(Route route){ public String restartFlow(String id, long timeout) { try { - - if(hasFlow(id)) { - stopFlow(id, timeout); - startFlow(id, timeout); - }else { - startFlow(id, timeout); - } - + startFlow(id, timeout); }catch (Exception e) { log.error("Restart flow failed. | flowid=" + id,e); finishFlowActionReport(id, "error", e.getMessage(),"error"); @@ -1437,8 +1442,14 @@ public String restartFlow(String id, long timeout) { } public String stopFlow(String id, long timeout) { + return stopFlow(id, timeout, true); + } + + public String stopFlow(String id, long timeout, boolean enableReport) { - initFlowActionReport(id, "stop"); + if(enableReport) { + initFlowActionReport(id, "stop"); + } try { @@ -1450,20 +1461,24 @@ public String stopFlow(String id, long timeout) { log.info("Stopping step id: " + routeId); - if(route.getConfigurationId()!=null) { - log.info("Remove routeConfiguration step id= " + routeId); - removeRouteConfiguration(route.getConfigurationId()); - } + //moved removal of routeConfiguration to the flowLoader + //if(route.getConfigurationId()!=null) { + // removeRouteConfiguration(route.getConfigurationId()); + //} context.getRouteController().stopRoute(routeId,timeout, TimeUnit.MILLISECONDS); context.removeRoute(routeId); } - finishFlowActionReport(id, "stop","Stopped flow successfully","info"); + if(enableReport) { + finishFlowActionReport(id, "stop", "Stopped flow successfully", "info"); + } }catch (Exception e) { - finishFlowActionReport(id, "error","Stop flow failed | error=" + e.getMessage(),"error"); + if(enableReport) { + finishFlowActionReport(id, "error", "Stop flow failed | error=" + e.getMessage(), "error"); + } log.error("Stop flow failed. | flowid=" + id,e); } @@ -1475,8 +1490,8 @@ private void removeRouteConfiguration(String routeConfigurationId) throws Except ModelCamelContext modelContext = (ModelCamelContext) context; RouteConfigurationDefinition routeConfigurationDefinition = modelContext.getRouteConfigurationDefinition(routeConfigurationId); if(routeConfigurationDefinition!=null){ - log.info("Remove routeConfiguration=" + routeConfigurationDefinition.getId()); modelContext.removeRouteConfiguration(routeConfigurationDefinition); + log.info("Removed routeConfiguration: " + routeConfigurationDefinition.getId()); } } @@ -2116,8 +2131,11 @@ public String getFlowStats(String flowId, boolean fullStats, boolean includeMeta long completedMessages = 0; long failedMessages = 0; long pendingMessages = 0; - String uptime = null; + BigDecimal cpuLoadLastMinute = new BigDecimal("0.00"); + BigDecimal cpuLoadLast5Minutes = new BigDecimal("0.00"); + BigDecimal cpuLoadLast15Minutes = new BigDecimal("0.00"); long uptimeMillis = 0; + String uptime = null; Date lastFailed = null; Date lastCompleted = null; @@ -2162,27 +2180,20 @@ public String getFlowStats(String flowId, boolean fullStats, boolean includeMeta if(lastCompleted==null){ lastCompleted = route.getLastExchangeCompletedTimestamp(); }else{ - Date completed = route.getLastExchangeFailureTimestamp(); + Date completed = route.getLastExchangeCompletedTimestamp(); if(completed!=null && completed.after(lastCompleted)){ lastCompleted = completed; } } + cpuLoadLastMinute = cpuLoadLastMinute.add(parseBigDecimal(route.getLoad01())); + cpuLoadLast5Minutes = cpuLoadLast5Minutes.add(parseBigDecimal(route.getLoad05())); + cpuLoadLast15Minutes = cpuLoadLast15Minutes.add(parseBigDecimal(route.getLoad15())); + if(includeSteps){ - JSONObject step = new JSONObject();; - if(fullStats){ - step = getStepStats(routeId, fullStats); - }else{ - String stepId = StringUtils.substringAfter(routeId,flowId + "-"); - step.put("id", stepId); - step.put("total",route.getExchangesTotal()); - step.put("completed",route.getExchangesCompleted()); - step.put("failed",route.getExchangesFailed()); - step.put("pending",route.getExchangesInflight()); - } + JSONObject step = getStepStats(routeId, fullStats); steps.put(step); } - } } @@ -2193,10 +2204,13 @@ public String getFlowStats(String flowId, boolean fullStats, boolean includeMeta flow.put("pending",pendingMessages); if(fullStats){ + flow.put("status",getFlowStatus(flowId)); flow.put("timeout",getTimeout(context)); flow.put("uptime",uptime); flow.put("uptimeMillis",uptimeMillis); - flow.put("status",getFlowStatus(flowId)); + flow.put("cpuLoadLastMinute",cpuLoadLastMinute); + flow.put("cpuLoadLast5Minutes",cpuLoadLast5Minutes); + flow.put("cpuLoadLast15Minutes",cpuLoadLast15Minutes); flow.put("lastFailed",lastFailed != null ? lastFailed : ""); flow.put("lastCompleted",lastCompleted != null ? lastCompleted : ""); } @@ -2241,36 +2255,6 @@ private long getTimeout(CamelContext context) throws MalformedObjectNameExceptio } } - /* - public String getFlowStats(String id, boolean fullStats, String mediaType) throws Exception { - - JSONObject json = new JSONObject(); - JSONObject flow = new JSONObject(); - JSONArray steps = new JSONArray(); - - List routes = getRoutesByFlowId(id); - - for(Route route: routes){ - JSONObject step = getStepStats(route.getId(), fullStats); - steps.put(step); - } - - flow.put("id",id); - flow.put("steps",steps); - json.put("flow",flow); - - String flowStats = json.toString(4); - if(mediaType.contains("xml")) { - flowStats = DocConverter.convertJsonToXml(flowStats); - } - - return flowStats; - - } - */ - - - public String getFlowStepStats(String flowId, String stepid, boolean fullStats, String mediaType) throws Exception { String routeid = flowId + "-" + stepid; @@ -2307,15 +2291,15 @@ private JSONObject getStepStats(String routeid, boolean fullStats) throws Except JSONObject load = new JSONObject(); - String throughput = "0"; //route.getThroughput(); + //String throughput = route.getThroughput(); String stepLoad01 = route.getLoad01(); String stepLoad05 = route.getLoad05(); String stepLoad15 = route.getLoad15(); - load.put("throughput", throughput); - load.put("load01", stepLoad01); - load.put("load05", stepLoad05); - load.put("load15", stepLoad15); + //load.put("throughput", throughput); + load.put("cpuLoadLastMinute", stepLoad01); + load.put("cpuLoadLast5Minutes", stepLoad05); + load.put("cpuLoadLast15Minutes", stepLoad15); step.put("load", load); } @@ -2329,33 +2313,74 @@ private JSONObject getStepStats(String routeid, boolean fullStats) throws Except json.put("step", step); return json; + } public String getStats(String mediaType) throws Exception { - Set flowIds = new HashSet(); - - List routes = context.getRoutes(); + JSONObject json = new JSONObject(); - for(Route route: routes){ - String routeId = route.getId(); - String flowId = StringUtils.substringBefore(routeId,"-"); - if(flowId!=null && !flowId.isEmpty()) { - flowIds.add(flowId); - } + ManagedCamelContextMBean managedCamelContext = managed.getManagedCamelContext(); + + json.put("camelId",managedCamelContext.getCamelId()); + json.put("camelVersion",managedCamelContext.getCamelVersion()); + json.put("status",managedCamelContext.getState()); + json.put("uptime",managedCamelContext.getUptime()); + json.put("uptimeMillis",managedCamelContext.getUptimeMillis()); + json.put("startedFlows",countFlows("started", "text/plain")); + json.put("startedSteps",managedCamelContext.getStartedRoutes()); + json.put("exchangesTotal",managedCamelContext.getExchangesTotal()); + json.put("exchangesCompleted",managedCamelContext.getExchangesCompleted()); + json.put("exchangesInflight",managedCamelContext.getExchangesInflight()); + json.put("exchangesFailed",managedCamelContext.getExchangesFailed()); + json.put("cpuLoadLastMinute",managedCamelContext.getLoad01()); + json.put("cpuLoadLast5Minutes",managedCamelContext.getLoad05()); + json.put("cpuLoadLast15Minutes",managedCamelContext.getLoad15()); + json.put("memoryUsage",getMemoryUsage()); + json.put("totalThreads",ManagementFactory.getThreadMXBean().getThreadCount()); + + String stats = json.toString(4); + if(mediaType.contains("xml")) { + stats = DocConverter.convertJsonToXml(stats); } - String result = getStatsFromList(flowIds, true, false, false); + return stats; - if(mediaType.contains("xml")) { - result = DocConverter.convertJsonToXml(result); + } + + private double getMemoryUsage(){ + + MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage heapMemoryUsage = memoryBean.getHeapMemoryUsage(); + + long maxMemory = heapMemoryUsage.getMax(); // Maximum available memory + long usedMemory = heapMemoryUsage.getUsed(); // Currently used memory + + double memoryUsagePercentage = ((double) usedMemory / maxMemory) * 100; + + DecimalFormat df = new DecimalFormat("#.##"); + memoryUsagePercentage = Double.valueOf(df.format(memoryUsagePercentage)); + + return memoryUsagePercentage; + + } + + public String getStepsStats(String mediaType) throws Exception { + + ManagedCamelContextMBean managedCamelContext = managed.getManagedCamelContext(); + + String result = managedCamelContext.dumpRoutesStatsAsXml(true,false); + + if(mediaType.contains("json")) { + result = DocConverter.convertXmlToJson(result); } return result; } - public String getMessages(String mediaType) throws Exception { + + public String getFlowsStats(String mediaType) throws Exception { Set flowIds = new HashSet(); @@ -2369,7 +2394,7 @@ public String getMessages(String mediaType) throws Exception { } } - String result = getStatsFromList(flowIds, false, false, false); + String result = getStatsFromList(flowIds, true, false, false); if(mediaType.contains("xml")) { result = DocConverter.convertJsonToXml(result); @@ -2414,7 +2439,29 @@ private String getStatsFromList(Set flowIds, String filter, boolean full return result; } + public String getMessages(String mediaType) throws Exception { + Set flowIds = new HashSet(); + + List routes = context.getRoutes(); + + for(Route route: routes){ + String routeId = route.getId(); + String flowId = StringUtils.substringBefore(routeId,"-"); + if(flowId!=null && !flowId.isEmpty()) { + flowIds.add(flowId); + } + } + + String result = getStatsFromList(flowIds, false, false, false); + + if(mediaType.contains("xml")) { + result = DocConverter.convertJsonToXml(result); + } + + return result; + + } public String getMetrics(String mediaType) throws Exception { @@ -2727,7 +2774,7 @@ public String resolveDependency(String scheme) throws Exception { String groupId = component.getString("groupId"); String artifactId = component.getString("artifactId"); - String version = catalog.getCatalogVersion(); //versionManager.getLoadedVersion(); //component.getString("version"); + String version = catalog.getCatalogVersion(); String dependency = groupId + ":" + artifactId + ":" + version; String result; @@ -3102,4 +3149,23 @@ private String getKeystorePassword() { return keystorePwd; } -} + public static BigDecimal parseBigDecimal(String value) { + if (value == null || value.isEmpty()) { + return BigDecimal.ZERO; // or handle as needed + } + return new BigDecimal(value); + } + + private static int getEnvironmentVarAsInteger(String envName, int defaultValue){ + int value = defaultValue; + if (envName != null && !envName.isEmpty()) { + try { + value = Integer.parseInt(envName); + } catch (NumberFormatException e) { + System.err.println("Invalid value for " + envName + ": " + value); + } + } + return value; + } + +} \ No newline at end of file diff --git a/integrationRest/pom.xml b/integrationRest/pom.xml index 7e51ca78..e3a58740 100644 --- a/integrationRest/pom.xml +++ b/integrationRest/pom.xml @@ -6,7 +6,7 @@ runtime org.assimbly - 5.0.0 + 5.0.1 integration-rest diff --git a/integrationRest/src/main/java/org/assimbly/integrationrest/StatisticsRuntime.java b/integrationRest/src/main/java/org/assimbly/integrationrest/StatisticsRuntime.java index 6f87adae..ce84f0c8 100644 --- a/integrationRest/src/main/java/org/assimbly/integrationrest/StatisticsRuntime.java +++ b/integrationRest/src/main/java/org/assimbly/integrationrest/StatisticsRuntime.java @@ -53,6 +53,45 @@ public ResponseEntity getStats(@Parameter(hidden = true) @RequestHeader( } } + @GetMapping( + path = "/integration/stats/steps", + produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE, MediaType.TEXT_PLAIN_VALUE} + ) + public ResponseEntity getRoutesStats(@Parameter(hidden = true) @RequestHeader(value = "Accept") String mediaType) throws Exception { + + plainResponse = true; + integration = integrationRuntime.getIntegration(); + + try { + String stats = integration.getStepsStats(mediaType); + if(stats.startsWith("Error")||stats.startsWith("Warning")) {plainResponse = false;} + return ResponseUtil.createSuccessResponse(1L, mediaType,"/integration/stats/steps",stats,plainResponse); + } catch (Exception e) { + log.error("Get stats failed",e); + return ResponseUtil.createFailureResponse(1L, mediaType,"/integration/stats/steps",e.getMessage()); + } + } + + @GetMapping( + path = "/integration/stats/flows", + produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE, MediaType.TEXT_PLAIN_VALUE} + ) + public ResponseEntity getFlowsStats(@Parameter(hidden = true) @RequestHeader(value = "Accept") String mediaType) throws Exception { + + plainResponse = true; + integration = integrationRuntime.getIntegration(); + + try { + String stats = integration.getFlowsStats(mediaType); + if(stats.startsWith("Error")||stats.startsWith("Warning")) {plainResponse = false;} + return ResponseUtil.createSuccessResponse(1L, mediaType,"/integration/stats/flows",stats,plainResponse); + } catch (Exception e) { + log.error("Get stats failed",e); + return ResponseUtil.createFailureResponse(1L, mediaType,"/integration/stats/flows",e.getMessage()); + } + } + + @GetMapping( path = "/integration/messages", produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE, MediaType.TEXT_PLAIN_VALUE} diff --git a/pom.xml b/pom.xml index 78207e4c..560e123f 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.assimbly runtime - 5.0.0 + 5.0.1 broker @@ -25,11 +25,11 @@ UTF-8 ${project.basedir} 6.1.3 - 5.0.0 + 5.0.1 3.0 - 3.3.4 + 3.3.5 3.9.9 - 4.8.0 + 4.8.1 2.3.232 2.7.3 2.11.5