From a57a2dc296ddf008f30da56d6f2377a072052d77 Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 8 Dec 2021 10:40:05 -0400 Subject: [PATCH] Async consumers for multiprocessing (#2) * Fix up warning about the unclosed app context. * Slap together async-consumer stuff. * Add separator for asyncConsumer parameter. * Update example as suggested. --- README.md | 52 +++++++++++++++++++ example.properties | 6 +++ .../islandora/alpaca/driver/AlpacaDriver.java | 20 ++++--- .../derivative/DerivativeOptions.java | 15 +++++- .../indexing/fcrepo/FcrepoIndexerOptions.java | 6 ++- .../TriplestoreIndexerOptions.java | 6 ++- .../alpaca/support/config/PropertyConfig.java | 13 ++++- 7 files changed, 106 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index f20f0139..d1e2e1cc 100644 --- a/README.md +++ b/README.md @@ -37,20 +37,25 @@ error.maxRedeliveries=4 This defines how many times to retry a message before failing completely. There are also common ActiveMQ properties to setup the connection. + ``` # ActiveMQ options jms.brokerUrl=tcp://localhost:61616 ``` + This defines the url to the ActiveMQ broker. + ``` jms.username= jms.password= ``` This defines the login credentials (if required) + ``` jms.connections=10 ``` This defines the pool of connections to the ActiveMQ instance. + ``` jms.concurrent-consumers=1 ``` @@ -66,19 +71,24 @@ It's properties are: # Fcrepo indexer options fcrepo.indexer.enabled=true ``` + This defines whether the Fedora indexer is enabled or not. + ``` fcrepo.indexer.node=queue:islandora-indexing-fcrepo-content fcrepo.indexer.delete=queue:islandora-indexing-fcrepo-delete fcrepo.indexer.media=queue:islandora-indexing-fcrepo-media fcrepo.indexer.external=queue:islandora-indexing-fcrepo-file-external ``` + These define the various queues to listen on for the indexing/deletion messages. The part after `queue:` should match your Islandora instance "Actions". + ``` fcrepo.indexer.milliner.baseUrl=http://localhost:8000/milliner ``` This defines the location of your Milliner microservice. + ``` fcrepo.indexer.concurrent-consumers=1 fcrepo.indexer.max-concurrent-consumers=1 @@ -87,6 +97,12 @@ These define the default number of concurrent consumers and maximum number of co consumers working off your ActiveMQ instance. A value of `-1` means no setting is applied. +``` +fcrepo.indexer.async-consumer=true +``` + +This property allows the concurrent consumers to process concurrently; otherwise, the consumers will wait to the previous message has been processed before executing. + ### islandora-indexing-triplestore This service indexes the Drupal node into the configured triplestore @@ -97,25 +113,39 @@ It's properties are: # Triplestore indexer options triplestore.indexer.enabled=false ``` + This defines whether the Triplestore indexer is enabled or not. + ``` triplestore.index.stream=queue:islandora-indexing-triplestore-index triplestore.delete.stream=queue:islandora-indexing-triplestore-delete ``` + These define the various queues to listen on for the indexing/deletion messages. The part after `queue:` should match your Islandora instance "Actions". + ``` triplestore.baseUrl=http://localhost:8080/bigdata/namespace/kb/sparql ``` + This defines the location of your triplestore's SPARQL update endpoint. + ``` triplestore.indexer.concurrent-consumers=1 triplestore.indexer.max-concurrent-consumers=1 ``` + These define the default number of concurrent consumers and maximum number of concurrent consumers working off your ActiveMQ instance. A value of `-1` means no setting is applied. + +``` +triplestore.indexer.async-consumer=true +``` + +This property allows the concurrent consumers to process concurrently; otherwise, the consumers will wait to the previous message has been processed before executing. + ### islandora-connector-derivative This service is used to configure an external microservice. This service will deploy multiple copies of its routes @@ -127,25 +157,40 @@ a comma separated list. Each item in the list defines a new route and must also ``` derivative..enabled=true ``` + This defines if the `item` service is enabled. + ``` derivative..in.stream=queue:islandora-item-connector.index ``` + This is the input queue for the derivative microservice. The part after `queue:` should match your Islandora instance "Actions". + ``` derivative..service.url=http://example.org/derivative/convert ``` + This is the microservice URL to process the request. + ``` derivative..concurrent-consumers=1 derivative..max-concurrent-consumers=1 ``` + These define the default number of concurrent consumers and maximum number of concurrent consumers working off your ActiveMQ instance. A value of `-1` means no setting is applied. + +``` +derivative..async-consumer=true +``` + +This property allows the concurrent consumers to process concurrently; otherwise, the consumers will wait to the previous message has been processed before executing. + For example, with two services defined (houdini and crayfits) my configuration would have + ``` derivative.systems.installed=houdini,fits @@ -154,23 +199,27 @@ derivative.houdini.in.stream=queue:islandora-connector-houdini derivative.houdini.service.url=http://127.0.0.1:8000/houdini/convert derivative.houdini.concurrent-consumers=1 derivative.houdini.max-concurrent-consumers=4 +derivative.houdini.async-consumer=true derivative.fits.enabled=true derivative.fits.in.stream=queue:islandora-connector-fits derivative.fits.service.url=http://127.0.0.1:8000/crayfits derivative.fits.concurrent-consumers=2 derivative.fits.max-concurrent-consumers=2 +derivative.fits.async-consumer=false ``` ### Customizing HTTP client timeouts You can alter the HTTP client from the defaults for its request, connection and socket timeouts. To do this you want to enable the request configurer. + ```shell request.configurer.enabled=true ``` Then set the next 3 timeouts (measured in milliseconds) to the desired timeout. + ```shell request.timeout=-1 connection.timeout=-1 @@ -182,6 +231,7 @@ The default for all three is `-1` which indicates no timeout. ## Deploying/Running You can see the options by passing the `-h|--help` flag + ```shell > java -jar islandora-alpaca-app/build/libs/islandora-alpaca-app-2.0.0-all.jar -h Usage: alpaca [-hV] [-c=] @@ -192,6 +242,7 @@ Usage: alpaca [-hV] [-c=] ``` Using the `-V|--version` flag will just return the current version of the application. + ```shell > java -jar islandora-alpaca-app/build/libs/islandora-alpaca-app-2.0.0-all.jar -v 2.0.0 @@ -212,6 +263,7 @@ Logging is done to the console, and defaults to the INFO level. To get more verb can use the Java property `islandora.alpaca.log` i.e. + ```shell java -Dislandora.alpaca.log=DEBUG -jar islandora-alpaca-app-2.0.0-all.jar -c /opt/my.properties ``` diff --git a/example.properties b/example.properties index 7a4b1d1b..2c477156 100644 --- a/example.properties +++ b/example.properties @@ -21,6 +21,7 @@ fcrepo.indexer.external=queue:islandora-indexing-fcrepo-file-external fcrepo.indexer.milliner.baseUrl=http://127.0.0.1:8000/milliner/ fcrepo.indexer.concurrent-consumers=-1 fcrepo.indexer.max-concurrent-consumers=-1 +fcrepo.indexer.async-consumer=false # Triplestore indexer options triplestore.indexer.enabled=true @@ -29,6 +30,7 @@ triplestore.index.stream=queue:islandora-indexing-triplestore-index triplestore.delete.stream=queue:islandora-indexing-triplestore-delete triplestore.indexer.concurrent-consumers=-1 triplestore.indexer.max-concurrent-consumers=-1 +triplestore.indexer.async-consumer=false # Derivative services derivative.systems.installed=fits,homarus,houdini,ocr @@ -38,21 +40,25 @@ derivative.fits.in.stream=queue:islandora-connector-fits derivative.fits.service.url=http://localhost:8000/crayfits derivative.fits.concurrent-consumers=-1 derivative.fits.max-concurrent-consumers=-1 +derivative.fits.async-consumer=false derivative.homarus.enabled=true derivative.homarus.in.stream=queue:islandora-connector-homarus derivative.homarus.service.url=http://127.0.0.1:8000/homarus/convert derivative.homarus.concurrent-consumers=-1 derivative.homarus.max-concurrent-consumers=-1 +derivative.homarus.async-consumer=false derivative.houdini.enabled=true derivative.houdini.in.stream=queue:islandora-connector-houdini derivative.houdini.service.url=http://127.0.0.1:8000/houdini/convert derivative.houdini.concurrent-consumers=-1 derivative.houdini.max-concurrent-consumers=-1 +derivative.houdini.async-consumer=false derivative.ocr.enabled=true derivative.ocr.in.stream=queue:islandora-connector-ocr derivative.ocr.service.url=http://localhost:8000/hypercube derivative.ocr.concurrent-consumers=-1 derivative.ocr.max-concurrent-consumers=-1 +derivative.ocr.async-consumer=false diff --git a/islandora-alpaca-app/src/main/java/ca/islandora/alpaca/driver/AlpacaDriver.java b/islandora-alpaca-app/src/main/java/ca/islandora/alpaca/driver/AlpacaDriver.java index d74575c3..034f4948 100644 --- a/islandora-alpaca-app/src/main/java/ca/islandora/alpaca/driver/AlpacaDriver.java +++ b/islandora-alpaca-app/src/main/java/ca/islandora/alpaca/driver/AlpacaDriver.java @@ -54,17 +54,21 @@ public Integer call() throws Exception { System.setProperty(ALPACA_CONFIG_PROPERTY, configurationFilePath.toFile().getAbsolutePath()); } final var appContext = new AnnotationConfigApplicationContext("ca.islandora.alpaca"); - appContext.start(); - LOGGER.info("Alpaca started."); + try { + appContext.start(); + LOGGER.info("Alpaca started."); - while (appContext.isRunning()) { - try { - Thread.sleep(1000); - } catch (final InterruptedException e) { - throw new RuntimeException("This should never happen"); + while (appContext.isRunning()) { + try { + Thread.sleep(1000); + } catch (final InterruptedException e) { + throw new RuntimeException("This should never happen"); + } } + return 0; + } finally { + appContext.close(); } - return 0; } /** diff --git a/islandora-connector-derivative/src/main/java/ca/islandora/alpaca/connector/derivative/DerivativeOptions.java b/islandora-connector-derivative/src/main/java/ca/islandora/alpaca/connector/derivative/DerivativeOptions.java index d1f3c988..2802ce4a 100644 --- a/islandora-connector-derivative/src/main/java/ca/islandora/alpaca/connector/derivative/DerivativeOptions.java +++ b/islandora-connector-derivative/src/main/java/ca/islandora/alpaca/connector/derivative/DerivativeOptions.java @@ -50,6 +50,7 @@ public class DerivativeOptions extends PropertyConfig { private static final String DERIVATIVE_OUTPUT_PROPERTY = "service.url"; private static final String DERIVATIVE_CONCURRENT_PROPERTY = "concurrent-consumers"; private static final String DERIVATIVE_MAX_CONCURRENT_PROPERTY = "max-concurrent-consumers"; + private static final String DERIVATIVE_ASYNC_CONSUMER = "async-consumer"; @Autowired private Environment environment; @@ -104,8 +105,11 @@ private void startDerivativeService(final String serviceName) throws Exception { Integer.class, -1); final int maxConcurrentConsumers = environment.getProperty(maxConcurrentConsumerProperty(serviceName), Integer.class, -1); + final boolean asyncConsumer = environment.getProperty(asyncConsumerProperty(serviceName), + Boolean.class, false); // Add concurrent/max-concurrent - final String finalInput = addJmsOptions(addBrokerName(input), concurrentConsumers, maxConcurrentConsumers); + final String finalInput = addJmsOptions(addBrokerName(input), concurrentConsumers, maxConcurrentConsumers, + asyncConsumer); // Add connectionClose and other http options. final String finalOutput = addHttpOptions(output); camelContext.addRoutes(new DerivativeConnector(serviceName, finalInput, finalOutput, this)); @@ -180,4 +184,13 @@ private String maxConcurrentConsumerProperty(final String systemName) { return DERIVATIVE_PREFIX + "." + systemName + "." + DERIVATIVE_MAX_CONCURRENT_PROPERTY; } + /** + * Return the expected async-consumer property. + * @param systemName the derivative system name + * @return the property + */ + private String asyncConsumerProperty(final String systemName) { + return DERIVATIVE_PREFIX + "." + systemName + "." + DERIVATIVE_ASYNC_CONSUMER; + } + } diff --git a/islandora-indexing-fcrepo/src/main/java/ca/islandora/alpaca/indexing/fcrepo/FcrepoIndexerOptions.java b/islandora-indexing-fcrepo/src/main/java/ca/islandora/alpaca/indexing/fcrepo/FcrepoIndexerOptions.java index 319d63c9..d8c254ef 100644 --- a/islandora-indexing-fcrepo/src/main/java/ca/islandora/alpaca/indexing/fcrepo/FcrepoIndexerOptions.java +++ b/islandora-indexing-fcrepo/src/main/java/ca/islandora/alpaca/indexing/fcrepo/FcrepoIndexerOptions.java @@ -43,6 +43,7 @@ public class FcrepoIndexerOptions extends PropertyConfig { private static final String FCREPO_BASE_URI_HEADER_PROPERTY = "fcrepo.indexer.fedoraHeader"; private static final String FCREPO_INDEXER_CONCURRENT = "fcrepo.indexer.concurrent-consumers"; private static final String FCREPO_INDEXER_MAX_CONCURRENT = "fcrepo.indexer.max-concurrent-consumers"; + private static final String FCREPO_INDEXER_ASYNC_CONSUMER = "fcrepo.indexer.async-consumer"; @Value("${" + FCREPO_INDEXER_NODE_INDEX + ":}") private String fcrepoNodeIndex; @@ -68,6 +69,9 @@ public class FcrepoIndexerOptions extends PropertyConfig { @Value("${" + FCREPO_INDEXER_MAX_CONCURRENT + ":-1}") private int fcrepoMaxConcurrentConsumers; + @Value("${" + FCREPO_INDEXER_ASYNC_CONSUMER + ":false}") + private boolean fcrepoAsyncConsumers; + /** * Defines that Fedora indexer is only enabled if the appropriate property is set to "true". */ @@ -113,7 +117,7 @@ public String getExternalIndex() { * The altered topic/queue string. */ private String addConcurrent(final String queueString) { - return addJmsOptions(queueString, fcrepoConcurrentConsumers, fcrepoMaxConcurrentConsumers); + return addJmsOptions(queueString, fcrepoConcurrentConsumers, fcrepoMaxConcurrentConsumers, fcrepoAsyncConsumers); } /** diff --git a/islandora-indexing-triplestore/src/main/java/ca/islandora/alpaca/indexing/triplestore/TriplestoreIndexerOptions.java b/islandora-indexing-triplestore/src/main/java/ca/islandora/alpaca/indexing/triplestore/TriplestoreIndexerOptions.java index 3c62e847..4d3bcf39 100644 --- a/islandora-indexing-triplestore/src/main/java/ca/islandora/alpaca/indexing/triplestore/TriplestoreIndexerOptions.java +++ b/islandora-indexing-triplestore/src/main/java/ca/islandora/alpaca/indexing/triplestore/TriplestoreIndexerOptions.java @@ -43,6 +43,7 @@ public class TriplestoreIndexerOptions extends PropertyConfig { private static final String TRIPLESTORE_DELETE_QUEUE = "triplestore.delete.stream"; private static final String TRIPLESTORE_CONCURRENT = "triplestore.indexer.concurrent-consumers"; private static final String TRIPLESTORE_MAX_CONCURRENT = "triplestore.indexer.max-concurrent-consumers"; + private static final String TRIPLESTORE_ASYNC_CONSUMER = "triplestore.indexer.async-consumer"; @Value("${" + TRIPLESTORE_INDEX_QUEUE + ":}") private String jmsIndexStream; @@ -59,6 +60,9 @@ public class TriplestoreIndexerOptions extends PropertyConfig { @Value("${" + TRIPLESTORE_MAX_CONCURRENT + ":-1}") private int triplestoreMaxConcurrent; + @Value("${" + TRIPLESTORE_ASYNC_CONSUMER + ":false}") + private boolean triplestoreAsyncConsumer; + /** * Defines that triplestore indexer is only enabled if the appropriate property is set to "true". */ @@ -99,7 +103,7 @@ public String getTriplestoreBaseUrl() { * The altered topic/queue string. */ private String addConcurrent(final String queueString) { - return addJmsOptions(queueString, triplestoreConcurrent, triplestoreMaxConcurrent); + return addJmsOptions(queueString, triplestoreConcurrent, triplestoreMaxConcurrent, triplestoreAsyncConsumer); } /** diff --git a/islandora-support/src/main/java/ca/islandora/alpaca/support/config/PropertyConfig.java b/islandora-support/src/main/java/ca/islandora/alpaca/support/config/PropertyConfig.java index a0465366..44fdec1e 100644 --- a/islandora-support/src/main/java/ca/islandora/alpaca/support/config/PropertyConfig.java +++ b/islandora-support/src/main/java/ca/islandora/alpaca/support/config/PropertyConfig.java @@ -60,11 +60,15 @@ public int getMaxRedeliveries() { * The number of concurrent consumers. -1 means no setting. * @param maxConcurrentConsumers * The max number of concurrent consumers. -1 means no setting. + * @param asyncConsumers + * Indicate if the queue should be processed strictly queue-wise (false; + * more for dealing with overhead?); otherwise, allow multiple items to be + * processed at the same time. * @return * The modified topic/queue string. */ public static String addJmsOptions(final String queueString, final int concurrentConsumers, - final int maxConcurrentConsumers) { + final int maxConcurrentConsumers, final boolean asyncConsumers) { final StringBuilder builder = new StringBuilder(); if (concurrentConsumers > 0) { builder.append("concurrentConsumers="); @@ -77,6 +81,13 @@ public static String addJmsOptions(final String queueString, final int concurren builder.append("maxConcurrentConsumers="); builder.append(maxConcurrentConsumers); } + if (asyncConsumers) { + if (builder.length() > 0) { + builder.append('&'); + } + builder.append("asyncConsumer=") + .append(asyncConsumers); + } if (builder.length() > 0) { return queueString + (queueString.contains("?") ? '&' : '?') + builder; }