Skip to content

Commit

Permalink
Merge pull request #322 from assimbly/develop
Browse files Browse the repository at this point in the history
Release 5.0.1
  • Loading branch information
skin27 authored Nov 29, 2024
2 parents 1875f4c + 84bddc5 commit baf17ea
Show file tree
Hide file tree
Showing 18 changed files with 357 additions and 152 deletions.
2 changes: 1 addition & 1 deletion broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>runtime</artifactId>
<groupId>org.assimbly</groupId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<name>broker</name>
Expand Down
2 changes: 1 addition & 1 deletion brokerRest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>runtime</artifactId>
<groupId>org.assimbly</groupId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<name>broker-rest</name>
Expand Down
2 changes: 1 addition & 1 deletion dil/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>runtime</artifactId>
<groupId>org.assimbly</groupId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<name>dil</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,21 @@ public class ZipFileEnrichStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
elementNames = new ArrayList<>();

if (newExchange == null) {
// there’s no remote file to consume
return oldExchange;
}

Message in = oldExchange.getIn();
Message resource = newExchange.getIn();

byte[] sourceZip = in.getBody(byte[].class);
byte[] resourceData = resource.getBody(byte[].class);
byte[] resourceData = newExchange.getContext().getTypeConverter().convertTo(byte[].class, resource.getBody());

String fileName = resource.getHeader(Exchange.FILE_NAME, String.class);
String fileName = resource.getHeader(Exchange.FILE_NAME_CONSUMED, String.class);
if(fileName == null) {
fileName = resource.getHeader(Exchange.FILE_NAME, String.class);
}

ByteArrayOutputStream baos = new ByteArrayOutputStream();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ private void setFields(){
} else if (conType.equals("pooled")) {

if (maxConnections == null) {
maxConnections = "10";
maxConnections = "2";
}
if (concurentConsumers == null) {
concurentConsumers = "10";
concurentConsumers = "2";
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ private void addVariables(Exchange exchange) {
}

private void addHeaders(Exchange exchange) {

JSONObject headers2 = getJsonFromMap(filterHeaderAndProperties(exchange.getIn().getHeaders()));
json.put("Headers", headers2);

if (showAll || showHeaders) {
JSONObject headers = getJsonFromMap(filterHeaderAndProperties(exchange.getIn().getHeaders()));
json.put("Headers", headers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.assimbly.dil.blocks.processors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;


public class OpenTelemetryLogProcessor implements Processor {

public void process(Exchange exchange) throws Exception {

String json = format(exchange);

System.out.println(json);

}


private String format(Exchange exchange) throws JsonProcessingException {

Object inBody = exchange.getIn().getBody();
Map<String, Object> exchangeMap = new HashMap<>();
exchangeMap.put("ExchangeId", exchange.getExchangeId());
exchangeMap.put("ExchangePattern", exchange.getPattern().toString());
exchangeMap.put("Body", inBody != null ? inBody.toString() : "null");
exchangeMap.put("Headers", exchange.getIn().getHeaders());

Instant now = Instant.now();

// Convert Instant to string using a formatter
String formattedTime = DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC)
.format(now);

Map<String, Object> map = new HashMap<>();
map.put("timestamp", formattedTime);
map.put("logLevel", "INFO");
map.put("serviceName", exchange.getFromRouteId());
map.put("message", exchange.getFromRouteId());
map.put("attributes",exchangeMap);

ObjectMapper mapper = new ObjectMapper();
String jsonString = mapper.writeValueAsString(map);

return jsonString;
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package org.assimbly.dil.blocks.processors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import java.util.HashMap;
import java.util.Map;


public class SetLogProcessor implements Processor {

Expand All @@ -17,26 +12,8 @@ public void process(Exchange exchange) throws Exception {

String json = jsonFormatter.format(exchange);

System.out.println("-------------MyLog --------------------->\n\n" + json);

}


private String formatExchangeToString(Exchange exchange) throws JsonProcessingException {

Object inBody = exchange.getIn().getBody();

Map<String, Object> map = new HashMap<>();
map.put("ExchangeId", exchange.getExchangeId());
map.put("FromRouteId", exchange.getFromRouteId());
map.put("ExchangePattern", exchange.getPattern().toString());
map.put("Body", inBody != null ? inBody.toString() : "null");
map.put("Headers", exchange.getIn().getHeaders());

ObjectMapper mapper = new ObjectMapper();
String jsonString = mapper.writeValueAsString(map);
System.out.println(json);

return jsonString;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public String getId() {
return id;
}

@JsonProperty("bundleId")
@JsonProperty("flowId")
public String getFlowId() {
return flowId;
}
Expand All @@ -100,7 +100,7 @@ public String getFlowVersion() {
return flowVersion;
}

@JsonProperty("previousBundleId")
@JsonProperty("previousFlowId")
public String getPreviousFlowId() {
return previousFlowId;
}
Expand Down
25 changes: 24 additions & 1 deletion dil/src/main/java/org/assimbly/dil/loader/FlowLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import org.apache.camel.*;
import org.apache.camel.builder.*;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.RouteConfigurationDefinition;
import org.apache.camel.spi.Resource;
import org.apache.camel.spi.RoutesBuilderLoader;
import org.apache.camel.spi.RoutesLoader;
Expand Down Expand Up @@ -127,14 +129,15 @@ private void setErrorHandlers() throws Exception{

private void setRouteConfigurations() throws Exception{

removeRouteConfiguration(flowId);

for(String prop : props.keySet()){
if(prop.endsWith("routeconfiguration")){

String routeConfiguration = props.get(prop);
String id = props.get(prop + ".id");

if(routeConfiguration!=null && !routeConfiguration.isEmpty()){
context.removeRoute(id);
loadStep(routeConfiguration, "routeconfiguration", id, null);
}
}
Expand Down Expand Up @@ -247,6 +250,26 @@ private void setErrorHandler(String id, String errorUri) throws Exception {

}

private void removeRouteConfiguration(String flowId) {

ModelCamelContext modelContext = (ModelCamelContext) context;

List<RouteConfigurationDefinition> routeConfigurationsToRemove = modelContext.getRouteConfigurationDefinitions().stream()
.filter(Objects::nonNull) // Exclude null entries
.filter(routeConfig -> routeConfig.getId().startsWith(flowId))
.toList(); // Collect into a new list to avoid modifying the original list during iteration

routeConfigurationsToRemove.forEach(routeConfig -> {
try {
modelContext.removeRouteConfiguration(routeConfig);
log.info("Removed routeConfiguration: " + routeConfig.getId());
} catch (Exception e) {
log.warn("Failed to remove route configuration: " + routeConfig.getId());
}
});

}

public String getReport(){
return flowLoaderReport.getReport();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
// This class unmarshalls an XML file into a Java treemap object
// The XML file must be in DIL (Data Integration Language) format
public class Unmarshall {

private Document doc;
private TreeMap<String, String> properties;
private XMLConfiguration conf;
Expand Down Expand Up @@ -97,17 +96,13 @@ private void addDependencies(Element element){
}

String flowDependencies = null;
NodeList dependenciesList = dependencies.getChildNodes();

for (int i = 0; i < dependenciesList.getLength(); i++) {
Node dependency = dependenciesList.item(i);
if (dependency instanceof Element) {
if(i == 0){
flowDependencies = dependency.getTextContent();
}else{
flowDependencies = flowDependencies + "," + dependency.getTextContent();
}
List<Element> dependenciesList = getElementChildren(dependencies);

for(Element dependency: dependenciesList){
if(flowDependencies==null){
flowDependencies = dependency.getTextContent();
}else{
flowDependencies = flowDependencies + "," + dependency.getTextContent();
}
}

Expand Down Expand Up @@ -229,4 +224,18 @@ private String evaluateXpath(String xpath) throws TransformerException, XPathExp
return xp.evaluate(doc);
}

List<Element> getElementChildren(Node parent) {
List<Element> elementChildren = new ArrayList<>();
NodeList childNodes = parent.getChildNodes();

for (int i = 0; i < childNodes.getLength(); i++) {
Node node = childNodes.item(i);
if (node.getNodeType() == Node.ELEMENT_NODE) {
elementChildren.add((Element) node);
}
}

return elementChildren;
}

}
2 changes: 1 addition & 1 deletion integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>runtime</artifactId>
<groupId>org.assimbly</groupId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<name>integration</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,30 @@ public interface Integration {
* Gets the stats of an integration
*
* @param mediaType (xml or json)
* @throws Exception if flow doesn't start
* @return returns number of messages
* @throws Exception if stats can't be retrieved
* @return returns stats of integration (system)
*
*/
public String getStats(String mediaType) throws Exception;

/**
* Gets the stats of all steps
*
* @param mediaType (xml or json)
* @throws Exception if stats can't be retrieved
* @return returns stats of integration (system)
*/
public String getStepsStats(String mediaType) throws Exception;

/**
* Gets the stats of all flows
*
* @param mediaType (xml or json)
* @throws Exception if stats can't be retrieved
* @return returns stats of integration (system)
*/
public String getFlowsStats(String mediaType) throws Exception;

/**
* Gets the stats of an integration
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ public String testConnection(String host, int port, int timeOut) {

public abstract String getStats(String mediaType) throws Exception;

public abstract String getFlowsStats(String mediaType) throws Exception;

public abstract String getStepsStats(String mediaType) throws Exception;

public abstract String getMessages(String mediaType) throws Exception;

public abstract String getStatsByFlowIds(String flowIds, String filter, String mediaType) throws Exception;
Expand Down
Loading

0 comments on commit baf17ea

Please sign in to comment.