Skip to content

Commit

Permalink
Async consumers for multiprocessing (#2)
Browse files Browse the repository at this point in the history
* Fix up warning about the unclosed app context.

* Slap together async-consumer stuff.

* Add separator for asyncConsumer parameter.

* Update example as suggested.
  • Loading branch information
adam-vessey authored Dec 8, 2021
1 parent 309089b commit a57a2dc
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 12 deletions.
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,25 @@ error.maxRedeliveries=4
This defines how many times to retry a message before failing completely.

There are also common ActiveMQ properties to setup the connection.

```
# ActiveMQ options
jms.brokerUrl=tcp://localhost:61616
```

This defines the url to the ActiveMQ broker.

```
jms.username=
jms.password=
```
This defines the login credentials (if required)

```
jms.connections=10
```
This defines the pool of connections to the ActiveMQ instance.

```
jms.concurrent-consumers=1
```
Expand All @@ -66,19 +71,24 @@ It's properties are:
# Fcrepo indexer options
fcrepo.indexer.enabled=true
```

This defines whether the Fedora indexer is enabled or not.

```
fcrepo.indexer.node=queue:islandora-indexing-fcrepo-content
fcrepo.indexer.delete=queue:islandora-indexing-fcrepo-delete
fcrepo.indexer.media=queue:islandora-indexing-fcrepo-media
fcrepo.indexer.external=queue:islandora-indexing-fcrepo-file-external
```

These define the various queues to listen on for the indexing/deletion
messages. The part after `queue:` should match your Islandora instance "Actions".

```
fcrepo.indexer.milliner.baseUrl=http://localhost:8000/milliner
```
This defines the location of your Milliner microservice.

```
fcrepo.indexer.concurrent-consumers=1
fcrepo.indexer.max-concurrent-consumers=1
Expand All @@ -87,6 +97,12 @@ These define the default number of concurrent consumers and maximum number of co
consumers working off your ActiveMQ instance.
A value of `-1` means no setting is applied.

```
fcrepo.indexer.async-consumer=true
```

This property allows the concurrent consumers to process concurrently; otherwise, the consumers will wait to the previous message has been processed before executing.

### islandora-indexing-triplestore

This service indexes the Drupal node into the configured triplestore
Expand All @@ -97,25 +113,39 @@ It's properties are:
# Triplestore indexer options
triplestore.indexer.enabled=false
```

This defines whether the Triplestore indexer is enabled or not.

```
triplestore.index.stream=queue:islandora-indexing-triplestore-index
triplestore.delete.stream=queue:islandora-indexing-triplestore-delete
```

These define the various queues to listen on for the indexing/deletion
messages. The part after `queue:` should match your Islandora instance "Actions".

```
triplestore.baseUrl=http://localhost:8080/bigdata/namespace/kb/sparql
```

This defines the location of your triplestore's SPARQL update endpoint.

```
triplestore.indexer.concurrent-consumers=1
triplestore.indexer.max-concurrent-consumers=1
```

These define the default number of concurrent consumers and maximum number of concurrent
consumers working off your ActiveMQ instance.
A value of `-1` means no setting is applied.


```
triplestore.indexer.async-consumer=true
```

This property allows the concurrent consumers to process concurrently; otherwise, the consumers will wait to the previous message has been processed before executing.

### islandora-connector-derivative

This service is used to configure an external microservice. This service will deploy multiple copies of its routes
Expand All @@ -127,25 +157,40 @@ a comma separated list. Each item in the list defines a new route and must also
```
derivative.<item>.enabled=true
```

This defines if the `item` service is enabled.

```
derivative.<item>.in.stream=queue:islandora-item-connector.index
```

This is the input queue for the derivative microservice.
The part after `queue:` should match your Islandora instance "Actions".

```
derivative.<item>.service.url=http://example.org/derivative/convert
```

This is the microservice URL to process the request.

```
derivative.<item>.concurrent-consumers=1
derivative.<item>.max-concurrent-consumers=1
```

These define the default number of concurrent consumers and maximum number of concurrent
consumers working off your ActiveMQ instance.
A value of `-1` means no setting is applied.


```
derivative.<item>.async-consumer=true
```

This property allows the concurrent consumers to process concurrently; otherwise, the consumers will wait to the previous message has been processed before executing.

For example, with two services defined (houdini and crayfits) my configuration would have

```
derivative.systems.installed=houdini,fits
Expand All @@ -154,23 +199,27 @@ derivative.houdini.in.stream=queue:islandora-connector-houdini
derivative.houdini.service.url=http://127.0.0.1:8000/houdini/convert
derivative.houdini.concurrent-consumers=1
derivative.houdini.max-concurrent-consumers=4
derivative.houdini.async-consumer=true
derivative.fits.enabled=true
derivative.fits.in.stream=queue:islandora-connector-fits
derivative.fits.service.url=http://127.0.0.1:8000/crayfits
derivative.fits.concurrent-consumers=2
derivative.fits.max-concurrent-consumers=2
derivative.fits.async-consumer=false
```

### Customizing HTTP client timeouts

You can alter the HTTP client from the defaults for its request, connection and socket timeouts.
To do this you want to enable the request configurer.

```shell
request.configurer.enabled=true
```

Then set the next 3 timeouts (measured in milliseconds) to the desired timeout.

```shell
request.timeout=-1
connection.timeout=-1
Expand All @@ -182,6 +231,7 @@ The default for all three is `-1` which indicates no timeout.
## Deploying/Running

You can see the options by passing the `-h|--help` flag

```shell
> java -jar islandora-alpaca-app/build/libs/islandora-alpaca-app-2.0.0-all.jar -h
Usage: alpaca [-hV] [-c=<configurationFilePath>]
Expand All @@ -192,6 +242,7 @@ Usage: alpaca [-hV] [-c=<configurationFilePath>]
```

Using the `-V|--version` flag will just return the current version of the application.

```shell
> java -jar islandora-alpaca-app/build/libs/islandora-alpaca-app-2.0.0-all.jar -v
2.0.0
Expand All @@ -212,6 +263,7 @@ Logging is done to the console, and defaults to the INFO level. To get more verb
can use the Java property `islandora.alpaca.log`

i.e.

```shell
java -Dislandora.alpaca.log=DEBUG -jar islandora-alpaca-app-2.0.0-all.jar -c /opt/my.properties
```
Expand Down
6 changes: 6 additions & 0 deletions example.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ fcrepo.indexer.external=queue:islandora-indexing-fcrepo-file-external
fcrepo.indexer.milliner.baseUrl=http://127.0.0.1:8000/milliner/
fcrepo.indexer.concurrent-consumers=-1
fcrepo.indexer.max-concurrent-consumers=-1
fcrepo.indexer.async-consumer=false

# Triplestore indexer options
triplestore.indexer.enabled=true
Expand All @@ -29,6 +30,7 @@ triplestore.index.stream=queue:islandora-indexing-triplestore-index
triplestore.delete.stream=queue:islandora-indexing-triplestore-delete
triplestore.indexer.concurrent-consumers=-1
triplestore.indexer.max-concurrent-consumers=-1
triplestore.indexer.async-consumer=false

# Derivative services
derivative.systems.installed=fits,homarus,houdini,ocr
Expand All @@ -38,21 +40,25 @@ derivative.fits.in.stream=queue:islandora-connector-fits
derivative.fits.service.url=http://localhost:8000/crayfits
derivative.fits.concurrent-consumers=-1
derivative.fits.max-concurrent-consumers=-1
derivative.fits.async-consumer=false

derivative.homarus.enabled=true
derivative.homarus.in.stream=queue:islandora-connector-homarus
derivative.homarus.service.url=http://127.0.0.1:8000/homarus/convert
derivative.homarus.concurrent-consumers=-1
derivative.homarus.max-concurrent-consumers=-1
derivative.homarus.async-consumer=false

derivative.houdini.enabled=true
derivative.houdini.in.stream=queue:islandora-connector-houdini
derivative.houdini.service.url=http://127.0.0.1:8000/houdini/convert
derivative.houdini.concurrent-consumers=-1
derivative.houdini.max-concurrent-consumers=-1
derivative.houdini.async-consumer=false

derivative.ocr.enabled=true
derivative.ocr.in.stream=queue:islandora-connector-ocr
derivative.ocr.service.url=http://localhost:8000/hypercube
derivative.ocr.concurrent-consumers=-1
derivative.ocr.max-concurrent-consumers=-1
derivative.ocr.async-consumer=false
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,21 @@ public Integer call() throws Exception {
System.setProperty(ALPACA_CONFIG_PROPERTY, configurationFilePath.toFile().getAbsolutePath());
}
final var appContext = new AnnotationConfigApplicationContext("ca.islandora.alpaca");
appContext.start();
LOGGER.info("Alpaca started.");
try {
appContext.start();
LOGGER.info("Alpaca started.");

while (appContext.isRunning()) {
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException("This should never happen");
while (appContext.isRunning()) {
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException("This should never happen");
}
}
return 0;
} finally {
appContext.close();
}
return 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class DerivativeOptions extends PropertyConfig {
private static final String DERIVATIVE_OUTPUT_PROPERTY = "service.url";
private static final String DERIVATIVE_CONCURRENT_PROPERTY = "concurrent-consumers";
private static final String DERIVATIVE_MAX_CONCURRENT_PROPERTY = "max-concurrent-consumers";
private static final String DERIVATIVE_ASYNC_CONSUMER = "async-consumer";

@Autowired
private Environment environment;
Expand Down Expand Up @@ -104,8 +105,11 @@ private void startDerivativeService(final String serviceName) throws Exception {
Integer.class, -1);
final int maxConcurrentConsumers = environment.getProperty(maxConcurrentConsumerProperty(serviceName),
Integer.class, -1);
final boolean asyncConsumer = environment.getProperty(asyncConsumerProperty(serviceName),
Boolean.class, false);
// Add concurrent/max-concurrent
final String finalInput = addJmsOptions(addBrokerName(input), concurrentConsumers, maxConcurrentConsumers);
final String finalInput = addJmsOptions(addBrokerName(input), concurrentConsumers, maxConcurrentConsumers,
asyncConsumer);
// Add connectionClose and other http options.
final String finalOutput = addHttpOptions(output);
camelContext.addRoutes(new DerivativeConnector(serviceName, finalInput, finalOutput, this));
Expand Down Expand Up @@ -180,4 +184,13 @@ private String maxConcurrentConsumerProperty(final String systemName) {
return DERIVATIVE_PREFIX + "." + systemName + "." + DERIVATIVE_MAX_CONCURRENT_PROPERTY;
}

/**
* Return the expected async-consumer property.
* @param systemName the derivative system name
* @return the property
*/
private String asyncConsumerProperty(final String systemName) {
return DERIVATIVE_PREFIX + "." + systemName + "." + DERIVATIVE_ASYNC_CONSUMER;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class FcrepoIndexerOptions extends PropertyConfig {
private static final String FCREPO_BASE_URI_HEADER_PROPERTY = "fcrepo.indexer.fedoraHeader";
private static final String FCREPO_INDEXER_CONCURRENT = "fcrepo.indexer.concurrent-consumers";
private static final String FCREPO_INDEXER_MAX_CONCURRENT = "fcrepo.indexer.max-concurrent-consumers";
private static final String FCREPO_INDEXER_ASYNC_CONSUMER = "fcrepo.indexer.async-consumer";

@Value("${" + FCREPO_INDEXER_NODE_INDEX + ":}")
private String fcrepoNodeIndex;
Expand All @@ -68,6 +69,9 @@ public class FcrepoIndexerOptions extends PropertyConfig {
@Value("${" + FCREPO_INDEXER_MAX_CONCURRENT + ":-1}")
private int fcrepoMaxConcurrentConsumers;

@Value("${" + FCREPO_INDEXER_ASYNC_CONSUMER + ":false}")
private boolean fcrepoAsyncConsumers;

/**
* Defines that Fedora indexer is only enabled if the appropriate property is set to "true".
*/
Expand Down Expand Up @@ -113,7 +117,7 @@ public String getExternalIndex() {
* The altered topic/queue string.
*/
private String addConcurrent(final String queueString) {
return addJmsOptions(queueString, fcrepoConcurrentConsumers, fcrepoMaxConcurrentConsumers);
return addJmsOptions(queueString, fcrepoConcurrentConsumers, fcrepoMaxConcurrentConsumers, fcrepoAsyncConsumers);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class TriplestoreIndexerOptions extends PropertyConfig {
private static final String TRIPLESTORE_DELETE_QUEUE = "triplestore.delete.stream";
private static final String TRIPLESTORE_CONCURRENT = "triplestore.indexer.concurrent-consumers";
private static final String TRIPLESTORE_MAX_CONCURRENT = "triplestore.indexer.max-concurrent-consumers";
private static final String TRIPLESTORE_ASYNC_CONSUMER = "triplestore.indexer.async-consumer";

@Value("${" + TRIPLESTORE_INDEX_QUEUE + ":}")
private String jmsIndexStream;
Expand All @@ -59,6 +60,9 @@ public class TriplestoreIndexerOptions extends PropertyConfig {
@Value("${" + TRIPLESTORE_MAX_CONCURRENT + ":-1}")
private int triplestoreMaxConcurrent;

@Value("${" + TRIPLESTORE_ASYNC_CONSUMER + ":false}")
private boolean triplestoreAsyncConsumer;

/**
* Defines that triplestore indexer is only enabled if the appropriate property is set to "true".
*/
Expand Down Expand Up @@ -99,7 +103,7 @@ public String getTriplestoreBaseUrl() {
* The altered topic/queue string.
*/
private String addConcurrent(final String queueString) {
return addJmsOptions(queueString, triplestoreConcurrent, triplestoreMaxConcurrent);
return addJmsOptions(queueString, triplestoreConcurrent, triplestoreMaxConcurrent, triplestoreAsyncConsumer);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ public int getMaxRedeliveries() {
* The number of concurrent consumers. -1 means no setting.
* @param maxConcurrentConsumers
* The max number of concurrent consumers. -1 means no setting.
* @param asyncConsumers
* Indicate if the queue should be processed strictly queue-wise (false;
* more for dealing with overhead?); otherwise, allow multiple items to be
* processed at the same time.
* @return
* The modified topic/queue string.
*/
public static String addJmsOptions(final String queueString, final int concurrentConsumers,
final int maxConcurrentConsumers) {
final int maxConcurrentConsumers, final boolean asyncConsumers) {
final StringBuilder builder = new StringBuilder();
if (concurrentConsumers > 0) {
builder.append("concurrentConsumers=");
Expand All @@ -77,6 +81,13 @@ public static String addJmsOptions(final String queueString, final int concurren
builder.append("maxConcurrentConsumers=");
builder.append(maxConcurrentConsumers);
}
if (asyncConsumers) {
if (builder.length() > 0) {
builder.append('&');
}
builder.append("asyncConsumer=")
.append(asyncConsumers);
}
if (builder.length() > 0) {
return queueString + (queueString.contains("?") ? '&' : '?') + builder;
}
Expand Down

0 comments on commit a57a2dc

Please sign in to comment.