diff --git a/islandora-connector-broadcast/src/main/java/ca/islandora/alpaca/connector/broadcast/BroadcastRouter.java b/islandora-connector-broadcast/src/main/java/ca/islandora/alpaca/connector/broadcast/BroadcastRouter.java index 7239ebbb..cda1c345 100644 --- a/islandora-connector-broadcast/src/main/java/ca/islandora/alpaca/connector/broadcast/BroadcastRouter.java +++ b/islandora-connector-broadcast/src/main/java/ca/islandora/alpaca/connector/broadcast/BroadcastRouter.java @@ -19,6 +19,7 @@ import static org.apache.camel.LoggingLevel.INFO; +import org.apache.camel.ExchangePattern; import org.apache.camel.builder.RouteBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,14 +38,23 @@ public class BroadcastRouter extends RouteBuilder { */ public void configure() throws Exception { - // Distribute message based on configured header. + // Distribute message based on headers. from("{{input.stream}}") .routeId("MessageBroadcaster") - .description("Broadcast messages from one queue/topic to other specified queues/topics.") + .description("Broadcast messages from one queue/topic to other queues/topics") .log(INFO, LOGGER, "Distributing message: ${headers[JMSMessageID]} with timestamp ${headers[JMSTimestamp]}") - .recipientList(simple("${headers[IslandoraBroadcastRecipients]}")) - .ignoreInvalidEndpoints(); + .filter(header("IslandoraExchangePattern")) + .process(exchange -> { + final String patternName = exchange.getIn().getHeader("IslandoraExchangePattern", String.class); + try { + exchange.setPattern(ExchangePattern.asEnum(patternName)); + } catch (IllegalArgumentException e) { + LOGGER.warn("Ignoring malformed exchange pattern: " + patternName); + } + }) + .end() + .routingSlip(header("IslandoraBroadcastRecipients")).ignoreInvalidEndpoints(); } } diff --git a/islandora-connector-pipeline/README.md b/islandora-connector-pipeline/README.md deleted file mode 100644 index 38839fb6..00000000 --- a/islandora-connector-pipeline/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Islandora Connector Pipeline - -## Description - -This service takes an incoming message and routes through the queues specified in the 'IslandoraPipelineRecipients' header as a comma separated list sequentially. - -## Configuration - -Once deployed, the incoming message stream can be configured by editing the `ca.islandora.alpaca.connector.pipeline.cfg` file in your Karaf installation's `etc` directory. - - input.stream=broker:queue:islandora-connector-pipeline - -For example, by changing `broker` to `activemq`, this service will attempt read messages from the ActiveMQ queue `islandora-connector-pipeline`. In theory, any broker technology supported by Camel should be supported, though ActiveMQ is the only one tested. - -## More Information - -For more information, please visit the [Apache Camel](http://camel.apache.org) documentation. diff --git a/islandora-connector-pipeline/build.gradle b/islandora-connector-pipeline/build.gradle deleted file mode 100644 index 85a54ae9..00000000 --- a/islandora-connector-pipeline/build.gradle +++ /dev/null @@ -1,32 +0,0 @@ -apply plugin: 'osgi' - -description = 'Islandora CLAW Pipeline' - -dependencies { - compile group: 'org.apache.camel', name: 'camel-core', version: camelVersion - compile group: 'org.apache.camel', name: 'camel-blueprint', version: camelVersion - compile group: 'org.apache.activemq', name: 'activemq-camel', version: activemqVersion - compile group: 'org.slf4j', name: 'slf4j-api', version: slf4jVersion - testCompile group: 'org.apache.camel', name: 'camel-test-blueprint', version: camelVersion -} - -jar { - manifest { - description project.description - docURL project.docURL - vendor project.vendor - license project.license - - instruction 'Import-Package', 'org.apache.activemq.camel.component,' + - "org.apache.camel;version=\"${camelVersionRange}\"," + - defaultOsgiImports - instruction 'Export-Package', 'ca.islandora.alpaca.connector.pipeline' - } -} - -artifacts { - archives (file('build/cfg/main/ca.islandora.alpaca.connector.pipeline.cfg')) { - classifier 'configuration' - type 'cfg' - } -} diff --git a/islandora-connector-pipeline/src/main/cfg/ca.islandora.alpaca.connector.pipeline.cfg b/islandora-connector-pipeline/src/main/cfg/ca.islandora.alpaca.connector.pipeline.cfg deleted file mode 100644 index 8148b3c8..00000000 --- a/islandora-connector-pipeline/src/main/cfg/ca.islandora.alpaca.connector.pipeline.cfg +++ /dev/null @@ -1,4 +0,0 @@ -# Incoming queue/topic on the broker of your choice. -# Replace the 'broker' prefix with the broker for your system. -# E.g. activemq, zeromq, etc... -input.stream=broker:queue:islandora-connector-pipeline diff --git a/islandora-connector-pipeline/src/main/java/ca/islandora/alpaca/connector/pipeline/PipelineRouter.java b/islandora-connector-pipeline/src/main/java/ca/islandora/alpaca/connector/pipeline/PipelineRouter.java deleted file mode 100644 index 390030c8..00000000 --- a/islandora-connector-pipeline/src/main/java/ca/islandora/alpaca/connector/pipeline/PipelineRouter.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to Islandora Foundation under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional - * information regarding copyright ownership. - * - * The Islandora Foundation licenses this file to you under the MIT License. - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://opensource.org/licenses/MIT - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ca.islandora.alpaca.connector.pipeline; - -import static org.apache.camel.LoggingLevel.INFO; - -import org.apache.camel.ExchangePattern; -import org.apache.camel.builder.RouteBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A content router distributing messages to multiple endpoints. - * - * @author Daniel Lamb / Nat - */ -public class PipelineRouter extends RouteBuilder { - - private static final Logger LOGGER = LoggerFactory.getLogger(PipelineRouter.class); - - /** - * Configure the message route workflow. - */ - public void configure() throws Exception { - from("{{input.stream}}") - .routeId("Pipleline") - .description("Route incoming messages to queues based on header slip.") - .log(INFO, LOGGER, - "Distributing message: ${headers[JMSMessageID]} with timestamp ${headers[JMSTimestamp]}") - .routingSlip(header("IslandoraPipelineRecipients")) - .ignoreInvalidEndpoints(); - } -} - diff --git a/islandora-connector-pipeline/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/islandora-connector-pipeline/src/main/resources/OSGI-INF/blueprint/blueprint.xml deleted file mode 100644 index 21c4732a..00000000 --- a/islandora-connector-pipeline/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - - - - - - - - - - - ca.islandora.alpaca.connector.pipeline - - diff --git a/karaf/src/main/resources/features.xml b/karaf/src/main/resources/features.xml index ad08bc12..e4c572cf 100644 --- a/karaf/src/main/resources/features.xml +++ b/karaf/src/main/resources/features.xml @@ -31,15 +31,4 @@ - -
Distributes a message to a list of queues/topics sequentially.
- - fcrepo-service-activemq - - mvn:ca.islandora.alpaca/islandora-connector-pipeline/${project.version} - - mvn:ca.islandora.alpaca/islandora-connector-pipeline/${project.version}/cfg/configuration - -
- diff --git a/settings.gradle b/settings.gradle index 6e2823ea..e05f19d0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,9 +1,7 @@ include ':islandora-karaf' include ':islandora-indexing-triplestore' include ':islandora-connector-broadcast' -include ':islandora-connector-pipeline' project(':islandora-karaf').projectDir = "$rootDir/karaf" as File project(':islandora-indexing-triplestore').projectDir = "$rootDir/islandora-indexing-triplestore" as File project(':islandora-connector-broadcast').projectDir = "$rootDir/islandora-connector-broadcast" as File -project(':islandora-connector-pipeline').projectDir = "$rootDir/islandora-connector-pipeline" as File