From d8e066a981f75e07b56913b21c4bcda789147e4c Mon Sep 17 00:00:00 2001
From: Levente Prehoda
Date: Thu, 12 Dec 2024 14:54:54 +0100
Subject: [PATCH 1/3] #691 RedisStreamPublisher pipelined publish
---
.../publisher/RedisStreamPublisher.java | 226 +++++++++++++++++-
.../module/coffee-module-redisstream.adoc | 11 +
docs/en/migration/migration290to2100.adoc | 10 +
.../module/coffee-module-redisstream.adoc | 11 +
docs/hu/migration/migration290to2100.adoc | 10 +
5 files changed, 257 insertions(+), 11 deletions(-)
diff --git a/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java b/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java
index d9a9a15ef..1b58d3bec 100644
--- a/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java
+++ b/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java
@@ -35,6 +35,7 @@
import hu.icellmobilsoft.coffee.dto.common.LogConstants;
import hu.icellmobilsoft.coffee.dto.exception.InvalidParameterException;
import hu.icellmobilsoft.coffee.dto.exception.TechnicalException;
+import hu.icellmobilsoft.coffee.dto.exception.enums.CoffeeFaultType;
import hu.icellmobilsoft.coffee.module.redis.manager.RedisManager;
import hu.icellmobilsoft.coffee.module.redis.manager.RedisManagerConnection;
import hu.icellmobilsoft.coffee.module.redisstream.common.RedisStreamUtil;
@@ -46,6 +47,8 @@
import hu.icellmobilsoft.coffee.se.logging.Logger;
import hu.icellmobilsoft.coffee.se.logging.mdc.MDC;
import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.Response;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
@@ -198,7 +201,8 @@ public List> publishPublications(List> publishPublications(List> publishPublicationsPipelined(List publications) throws BaseException {
+ if (publications == null) {
+ throw new InvalidParameterException("publications is null!");
+ }
+ checkRedisManager();
+
+ try (RedisManagerConnection ignored = redisManager.initConnection()) {
+ List> responses = new ArrayList<>(publications.size());
+ Pipeline pipeline = initPipeline();
+ for (RedisStreamPublication publication : publications) {
+ Response response;
+ if (StringUtils.isBlank(publication.getStreamGroup())) {
+ validateGroup(streamGroup);
+ response = publishThroughPipeline(
+ pipeline,
+ streamGroup,
+ createJedisMessage(publication.getStreamMessage(), publication.getParameters()));
+ } else {
+ validateGroup(publication.getStreamGroup());
+ response = publishThroughPipeline(
+ pipeline,
+ publication.getStreamGroup(),
+ createJedisMessage(publication.getStreamMessage(), publication.getParameters()));
+ }
+ responses.add(response);
+ }
+ pipeline.sync();
+ return getStreamEntryIds(responses);
+ }
+ }
+
/**
* Publish (send) multiple messages to stream calculated by input streamGroup name.
*
@@ -232,9 +276,7 @@ public List> publish(List streamMessages) throws
* exception on sending
*/
public List> publish(List streamMessages, Map parameters) throws BaseException {
- if (streamMessages == null) {
- throw new InvalidParameterException("streamMessages is null!");
- }
+ validateStreamMessages(streamMessages);
checkInitialization();
try (RedisManagerConnection ignored = redisManager.initConnection()) {
@@ -272,9 +314,7 @@ public List> publish(String streamGroup, List st
*/
public List> publish(String streamGroup, List streamMessages, Map parameters)
throws BaseException {
- if (streamMessages == null) {
- throw new InvalidParameterException("streamMessages is null!");
- }
+ validateStreamMessages(streamMessages);
validateGroup(streamGroup);
checkRedisManager();
@@ -306,6 +346,105 @@ protected List> publishInActiveConnection(String streamG
return ids;
}
+ /**
+ * Publish (send) multiple messages through pipeline to stream calculated by input streamGroup name.
+ *
+ * @param streamMessages
+ * Messages in stream. Can be String or JSON List
+ * @return Created Redis Stream messages identifiers from Redis server
+ * @throws BaseException
+ * exception on sending
+ */
+ public List> publishPipelined(List streamMessages) throws BaseException {
+ return publishPipelined(streamMessages, null);
+ }
+
+ /**
+ * Publish (send) multiple messages through pipeline to stream with pipeline calculated by input streamGroup name.
+ *
+ * @param streamMessages
+ * Messages in stream. Can be String or JSON List
+ * @param parameters
+ * Messages parameters, nullable. Map key value is standardized in {@link StreamMessageParameter} enum value
+ * @return Created Redis Stream messages identifiers from Redis server
+ * @throws BaseException
+ * exception on sending
+ */
+ public List> publishPipelined(List streamMessages, Map parameters) throws BaseException {
+ validateStreamMessages(streamMessages);
+ checkInitialization();
+
+ try (RedisManagerConnection ignored = redisManager.initConnection()) {
+ return publishPipelinedInActiveConnection(streamGroup, streamMessages, parameters);
+ }
+ }
+
+ /**
+ * Publish (send) multiple messages through pipeline to stream with pipeline calculated by input streamGroup name.
+ *
+ * @param streamGroup
+ * stream group to send (another than initialized)
+ * @param streamMessages
+ * Messages in stream. Can be String or JSON List
+ * @return Created Redis Stream message identifier from Redis server
+ * @throws BaseException
+ * exception on sending
+ */
+ public List> publishPipelined(String streamGroup, List streamMessages) throws BaseException {
+ return publishPipelined(streamGroup, streamMessages, null);
+ }
+
+ /**
+ * Publish (send) multiple messages through pipeline to stream with pipeline calculated by input streamGroup name.
+ *
+ * @param streamGroup
+ * stream group to send (another than initialized)
+ * @param streamMessages
+ * Messages in stream. Can be String or JSON List
+ * @param parameters
+ * Messages parameters, nullable. Map key value is standardized in {@link StreamMessageParameter} enum value
+ * @return Created Redis Stream messages identifiers from Redis server
+ * @throws BaseException
+ * exception on sending
+ */
+ public List> publishPipelined(String streamGroup, List streamMessages, Map parameters)
+ throws BaseException {
+ validateStreamMessages(streamMessages);
+ validateGroup(streamGroup);
+ checkRedisManager();
+
+ try (RedisManagerConnection ignored = redisManager.initConnection()) {
+ return publishPipelinedInActiveConnection(streamGroup, streamMessages, parameters);
+ }
+ }
+
+ /**
+ * Publish (send) multiple messages through pipeline to stream
+ *
+ * @param streamGroup
+ * Stream group to send (another than initialized)
+ * @param streamMessages
+ * Messages in stream. Can be String or JSON List
+ * @param parameters
+ * Messages parameters, nullable. Map key value is standardized in {@link StreamMessageParameter} enum value
+ * @return Created Redis Stream messages identifiers from Redis server
+ * @throws BaseException
+ * exception on sending
+ */
+ protected List> publishPipelinedInActiveConnection(String streamGroup, List streamMessages,
+ Map parameters) throws BaseException {
+
+ Pipeline pipeline = initPipeline();
+
+ List> responses = new ArrayList<>(streamMessages.size());
+ for (String streamMessage : streamMessages) {
+ Response response = publishThroughPipeline(pipeline, streamGroup, createJedisMessage(streamMessage, parameters));
+ responses.add(response);
+ }
+ pipeline.sync();
+ return getStreamEntryIds(responses);
+ }
+
/**
* Publish (send) message to stream with class initialized {@code #jedisInstance}
*
@@ -352,7 +491,8 @@ private String getFlowIdMessage(Map parameters) {
return flowIdMessage;
}
return Optional.ofNullable(parameters.get(StreamMessageParameter.FLOW_ID_EXTENSION.getMessageKey()))
- .map(extension -> flowIdMessage + "_" + extension).orElse(flowIdMessage);
+ .map(extension -> flowIdMessage + "_" + extension)
+ .orElse(flowIdMessage);
}
/**
@@ -371,9 +511,7 @@ private String getFlowIdMessage(Map parameters) {
* Exception
*/
protected Optional publishInActiveConnection(Map values, String streamGroup) throws BaseException {
- XAddParams params = XAddParams.xAddParams();
- config.getProducerMaxLen().ifPresent(params::maxLen);
- config.getProducerTTL().ifPresent(ttl -> params.minId(new StreamEntryID(Instant.now().minusMillis(ttl).toEpochMilli(), 0).toString()));
+ XAddParams params = getXAddParams();
Optional streamEntryID = redisManager.run(Jedis::xadd, "xadd", RedisStreamUtil.streamKey(streamGroup), values, params);
if (log.isTraceEnabled()) {
log.trace("Published streamEntryID: [{0}] into [{1}]", streamEntryID, RedisStreamUtil.streamKey(streamGroup));
@@ -437,6 +575,72 @@ protected void checkRedisManager() throws TechnicalException {
}
}
+ /**
+ * Validates streamMessages
+ *
+ * @param streamMessages
+ * streamMessages
+ * @throws BaseException
+ * if validation fails
+ */
+ protected void validateStreamMessages(List streamMessages) throws BaseException {
+ if (streamMessages == null) {
+ throw new InvalidParameterException("streamMessages is null!");
+ }
+ }
+
+ /**
+ * Returns the default XAddParams
+ *
+ * @return the default XAdd params
+ */
+ protected XAddParams getXAddParams() {
+ XAddParams params = XAddParams.xAddParams();
+ config.getProducerMaxLen().ifPresent(params::maxLen);
+ config.getProducerTTL().ifPresent(ttl -> params.minId(new StreamEntryID(Instant.now().minusMillis(ttl).toEpochMilli(), 0).toString()));
+ return params;
+ }
+
+ /**
+ * Initializes a new pipeline
+ * Requires active connection
+ *
+ * @return pipeline instance
+ * @throws BaseException
+ * if any error occurred while creating pipeline
+ */
+ protected Pipeline initPipeline() throws BaseException {
+ return redisManager.run(Jedis::pipelined, "pipelined")
+ .orElseThrow(() -> new BaseException(CoffeeFaultType.REDIS_OPERATION_FAILED, "Error occurred while creating pipeline"));
+ }
+
+ /**
+ * Publish (send) message to stream through pipeline
+ *
+ * @param pipeline
+ * The pipeline instance
+ * @param streamGroup
+ * Stream group to send
+ * @param jedisMessage
+ * Redis Stream message structure, ready to publish
+ *
+ * @return {@link Pipeline#xadd(String, Map, XAddParams)} response
+ */
+ protected Response publishThroughPipeline(Pipeline pipeline, String streamGroup, Map jedisMessage) {
+ return pipeline.xadd(RedisStreamUtil.streamKey(streamGroup), jedisMessage, getXAddParams());
+ }
+
+ protected List> getStreamEntryIds(List> responses) {
+ List> streamEntryIds = responses.stream().map(Response::get).map(Optional::ofNullable).toList();
+ if (log.isTraceEnabled()) {
+ log.trace(
+ "Published [{0}] streamEntries into [{1}]",
+ streamEntryIds.stream().filter(Optional::isPresent).count(),
+ RedisStreamUtil.streamKey(streamGroup));
+ }
+ return streamEntryIds;
+ }
+
private TechnicalException notInitializedException() {
return new TechnicalException("RedisStreamHandler is not initialized!");
}
diff --git a/docs/en/common/module/coffee-module-redisstream.adoc b/docs/en/common/module/coffee-module-redisstream.adoc
index f18388909..085eadf13 100644
--- a/docs/en/common/module/coffee-module-redisstream.adoc
+++ b/docs/en/common/module/coffee-module-redisstream.adoc
@@ -183,6 +183,17 @@ redisStreamPublisher.publish("message", parameters); //<3>
// or
RedisStreamPublication publication = RedisStreamPublication.of(id).withTTL(defaultTTL).withParameter(StreamMessageParameter.FLOW_ID_EXTENSION, id))
redisStreamPublisher.publishPublication(publication); //<4>
+
+// In case of many records, it might be a better option to publish messages through pipeline
+List redisStreamPublicationsPipelined = new ArrayList<>();
+for (int i = 0; i < 1000; i++) {
+ redisStreamPublicationsPipelined.add(RedisStreamPublication.of("alternativeGroup", "pipelined - " + i, parameters));
+}
+publisher.publishPublicationsPipelined(redisStreamPublicationsPipelined);
+// or
+List ids = IntStream.range(0, 1000).mapToObj(i -> RandomUtil.generateId()).toList();
+publisher.publishPipelined(ids); //<5>
+
----
<1> "group" is not mandatory in all cases
<2> The "message" content itself will be stored in a kind of coffee stream message structure,
diff --git a/docs/en/migration/migration290to2100.adoc b/docs/en/migration/migration290to2100.adoc
index 2bc8119db..d59ae7093 100644
--- a/docs/en/migration/migration290to2100.adoc
+++ b/docs/en/migration/migration290to2100.adoc
@@ -3,9 +3,19 @@
coff:ee v2.9.0 -> v2.10.0 migration description, news, changes
=== coffee-configuration
+
A misleading exception message has been clarified in ApplicationConfiguration class.
==== Migration
+
+Changes are backwards compatible doesnt need any migration.
+
+=== coffee-redisstream
+
+A new function has been added to RedisStreamPublisher, which makes it possible to send multiple messages at once to Redis stream through pipeline.
+
+==== Migration
+
Changes are backwards compatible doesnt need any migration.
=== Java11 support remove
diff --git a/docs/hu/common/module/coffee-module-redisstream.adoc b/docs/hu/common/module/coffee-module-redisstream.adoc
index df27284a2..9a6603174 100644
--- a/docs/hu/common/module/coffee-module-redisstream.adoc
+++ b/docs/hu/common/module/coffee-module-redisstream.adoc
@@ -183,6 +183,17 @@ redisStreamPublisher.publish("message", parameters); //<3>
// vagy
RedisStreamPublication publication = RedisStreamPublication.of(id).withTTL(defaultTTL).withParameter(StreamMessageParameter.FLOW_ID_EXTENSION, id))
redisStreamPublisher.publishPublication(publication); //<4>
+
+// Nagy számú üzenet esetén érdemes pipeline-on keresztül küldeni az üzeneteket:
+List redisStreamPublicationsPipelined = new ArrayList<>();
+for (int i = 0; i < 1000; i++) {
+ redisStreamPublicationsPipelined.add(RedisStreamPublication.of("alternativeGroup", "pipelined - " + i, parameters));
+}
+publisher.publishPublicationsPipelined(redisStreamPublicationsPipelined);
+// vagy
+List ids = IntStream.range(0, 1000).mapToObj(i -> RandomUtil.generateId()).toList();
+publisher.publishPipelined(ids);
+
----
<1> "group" nem kötelező minden esetben
<2> Maga a "message" tartalom egyfajta coffee stream message struktúrába fog kerülni,
diff --git a/docs/hu/migration/migration290to2100.adoc b/docs/hu/migration/migration290to2100.adoc
index 5ff601b87..633d15d87 100644
--- a/docs/hu/migration/migration290to2100.adoc
+++ b/docs/hu/migration/migration290to2100.adoc
@@ -3,9 +3,19 @@
coff:ee v2.9.0 -> v2.10.0 migrációs leírás, újdonságok, változások leírása
=== coffee-configuration
+
Pontosításra került egy félrevezető hibaüzenet az ApplicationConfiguration osztályban.
==== Migration
+
+A változtatások nem eredményeznek átállási munkálatokat, visszafelé kompatibilis.
+
+=== coffee-redisstream
+
+A RedisStreamPublisher ki lett egészítve egy új funkcióval, ami lehetővé teszi egyszerre több üzenet Redis streamre küldését pipeline-on keresztül.
+
+==== Migration
+
A változtatások nem eredményeznek átállási munkálatokat, visszafelé kompatibilis.
=== Java11 támogatás eltávolítása
From 72cbfd14a2b91fdd949bc73ffe7c7cfe30662e37 Mon Sep 17 00:00:00 2001
From: Levente Prehoda
Date: Thu, 12 Dec 2024 15:11:32 +0100
Subject: [PATCH 2/3] #691 add missing javadoc
---
.../module/redisstream/publisher/RedisStreamPublisher.java | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java b/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java
index 1b58d3bec..047d210a5 100644
--- a/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java
+++ b/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java
@@ -630,6 +630,13 @@ protected Response publishThroughPipeline(Pipeline pipeline, Stri
return pipeline.xadd(RedisStreamUtil.streamKey(streamGroup), jedisMessage, getXAddParams());
}
+ /**
+ * Gets StreamEntryIds-s from pipeline responses
+ *
+ * @param responses
+ * {@link Pipeline#xadd(String, Map, XAddParams)} response list
+ * @return list of optional StreamEntryIds
+ */
protected List> getStreamEntryIds(List> responses) {
List> streamEntryIds = responses.stream().map(Response::get).map(Optional::ofNullable).toList();
if (log.isTraceEnabled()) {
From aba64b610525f852b69fbc9719b4d46e5c42535e Mon Sep 17 00:00:00 2001
From: Levente Prehoda
Date: Fri, 13 Dec 2024 19:37:29 +0100
Subject: [PATCH 3/3] #691 call Pipeline#sync() after every 1000 items
---
.../publisher/RedisStreamPublisher.java | 43 ++++++++++++++++++-
1 file changed, 41 insertions(+), 2 deletions(-)
diff --git a/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java b/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java
index 047d210a5..2d4fe820d 100644
--- a/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java
+++ b/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java
@@ -62,6 +62,8 @@
@Dependent
public class RedisStreamPublisher {
+ private static final int PIPELINE_SIZE = 1000;
+
@Inject
private Logger log;
@@ -229,6 +231,7 @@ public List> publishPublicationsPipelined(List> responses = new ArrayList<>(publications.size());
Pipeline pipeline = initPipeline();
+ int i = 1;
for (RedisStreamPublication publication : publications) {
Response response;
if (StringUtils.isBlank(publication.getStreamGroup())) {
@@ -245,8 +248,9 @@ public List> publishPublicationsPipelined(List> publishPipelinedInActiveConnection(Strin
Pipeline pipeline = initPipeline();
+ int i = 1;
List> responses = new ArrayList<>(streamMessages.size());
for (String streamMessage : streamMessages) {
Response response = publishThroughPipeline(pipeline, streamGroup, createJedisMessage(streamMessage, parameters));
responses.add(response);
+ i = syncPipelineIfNeeded(pipeline, streamMessages.size(), i);
}
- pipeline.sync();
+ syncPipelineIfNeeded(pipeline, streamMessages.size(), i);
return getStreamEntryIds(responses);
}
@@ -648,6 +654,39 @@ protected List> getStreamEntryIds(List
+ * If pipelined responses reach {@link RedisStreamPublisher#PIPELINE_SIZE},
+ * or if all messages have been sent, and there are pipelined responses,
+ * then {@link Pipeline#sync()} is called
+ *
+ * @param pipeline
+ * the Pipeline instance
+ * @param messagesToSend
+ * The number of messages to be sent
+ * @param messagesSent
+ * The number of sent messages
+ * @return The number of sent messages increased
+ *
+ */
+ protected int syncPipelineIfNeeded(Pipeline pipeline, int messagesToSend, int messagesSent) {
+ if ((messagesSent < messagesToSend && messagesSent % getPipelineSize() == 0)
+ || (messagesSent == messagesToSend && pipeline.hasPipelinedResponse())) {
+ pipeline.sync();
+ }
+ messagesSent++;
+ return messagesSent;
+ }
+
+ /**
+ * Returns the size of the pipeline
+ *
+ * @return the size of the pipeline
+ */
+ protected int getPipelineSize() {
+ return PIPELINE_SIZE;
+ }
+
private TechnicalException notInitializedException() {
return new TechnicalException("RedisStreamHandler is not initialized!");
}