diff --git a/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java b/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java index d9a9a15ef..2d4fe820d 100644 --- a/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java +++ b/coffee-module/coffee-module-redisstream/src/main/java/hu/icellmobilsoft/coffee/module/redisstream/publisher/RedisStreamPublisher.java @@ -35,6 +35,7 @@ import hu.icellmobilsoft.coffee.dto.common.LogConstants; import hu.icellmobilsoft.coffee.dto.exception.InvalidParameterException; import hu.icellmobilsoft.coffee.dto.exception.TechnicalException; +import hu.icellmobilsoft.coffee.dto.exception.enums.CoffeeFaultType; import hu.icellmobilsoft.coffee.module.redis.manager.RedisManager; import hu.icellmobilsoft.coffee.module.redis.manager.RedisManagerConnection; import hu.icellmobilsoft.coffee.module.redisstream.common.RedisStreamUtil; @@ -46,6 +47,8 @@ import hu.icellmobilsoft.coffee.se.logging.Logger; import hu.icellmobilsoft.coffee.se.logging.mdc.MDC; import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Response; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.params.XAddParams; @@ -59,6 +62,8 @@ @Dependent public class RedisStreamPublisher { + private static final int PIPELINE_SIZE = 1000; + @Inject private Logger log; @@ -198,7 +203,8 @@ public List> publishPublications(List> publishPublications(List> publishPublicationsPipelined(List publications) throws BaseException { + if (publications == null) { + throw new InvalidParameterException("publications is null!"); + } + checkRedisManager(); + + try (RedisManagerConnection ignored = redisManager.initConnection()) { + List> responses = new ArrayList<>(publications.size()); + Pipeline pipeline = initPipeline(); + int i = 1; + for (RedisStreamPublication publication : publications) { + Response response; + if (StringUtils.isBlank(publication.getStreamGroup())) { + validateGroup(streamGroup); + response = publishThroughPipeline( + pipeline, + streamGroup, + createJedisMessage(publication.getStreamMessage(), publication.getParameters())); + } else { + validateGroup(publication.getStreamGroup()); + response = publishThroughPipeline( + pipeline, + publication.getStreamGroup(), + createJedisMessage(publication.getStreamMessage(), publication.getParameters())); + } + responses.add(response); + i = syncPipelineIfNeeded(pipeline, publications.size(), i); + } + syncPipelineIfNeeded(pipeline, publications.size(), i); + return getStreamEntryIds(responses); + } + } + /** * Publish (send) multiple messages to stream calculated by input streamGroup name. * @@ -232,9 +280,7 @@ public List> publish(List streamMessages) throws * exception on sending */ public List> publish(List streamMessages, Map parameters) throws BaseException { - if (streamMessages == null) { - throw new InvalidParameterException("streamMessages is null!"); - } + validateStreamMessages(streamMessages); checkInitialization(); try (RedisManagerConnection ignored = redisManager.initConnection()) { @@ -272,9 +318,7 @@ public List> publish(String streamGroup, List st */ public List> publish(String streamGroup, List streamMessages, Map parameters) throws BaseException { - if (streamMessages == null) { - throw new InvalidParameterException("streamMessages is null!"); - } + validateStreamMessages(streamMessages); validateGroup(streamGroup); checkRedisManager(); @@ -306,6 +350,107 @@ protected List> publishInActiveConnection(String streamG return ids; } + /** + * Publish (send) multiple messages through pipeline to stream calculated by input streamGroup name. + * + * @param streamMessages + * Messages in stream. Can be String or JSON List + * @return Created Redis Stream messages identifiers from Redis server + * @throws BaseException + * exception on sending + */ + public List> publishPipelined(List streamMessages) throws BaseException { + return publishPipelined(streamMessages, null); + } + + /** + * Publish (send) multiple messages through pipeline to stream with pipeline calculated by input streamGroup name. + * + * @param streamMessages + * Messages in stream. Can be String or JSON List + * @param parameters + * Messages parameters, nullable. Map key value is standardized in {@link StreamMessageParameter} enum value + * @return Created Redis Stream messages identifiers from Redis server + * @throws BaseException + * exception on sending + */ + public List> publishPipelined(List streamMessages, Map parameters) throws BaseException { + validateStreamMessages(streamMessages); + checkInitialization(); + + try (RedisManagerConnection ignored = redisManager.initConnection()) { + return publishPipelinedInActiveConnection(streamGroup, streamMessages, parameters); + } + } + + /** + * Publish (send) multiple messages through pipeline to stream with pipeline calculated by input streamGroup name. + * + * @param streamGroup + * stream group to send (another than initialized) + * @param streamMessages + * Messages in stream. Can be String or JSON List + * @return Created Redis Stream message identifier from Redis server + * @throws BaseException + * exception on sending + */ + public List> publishPipelined(String streamGroup, List streamMessages) throws BaseException { + return publishPipelined(streamGroup, streamMessages, null); + } + + /** + * Publish (send) multiple messages through pipeline to stream with pipeline calculated by input streamGroup name. + * + * @param streamGroup + * stream group to send (another than initialized) + * @param streamMessages + * Messages in stream. Can be String or JSON List + * @param parameters + * Messages parameters, nullable. Map key value is standardized in {@link StreamMessageParameter} enum value + * @return Created Redis Stream messages identifiers from Redis server + * @throws BaseException + * exception on sending + */ + public List> publishPipelined(String streamGroup, List streamMessages, Map parameters) + throws BaseException { + validateStreamMessages(streamMessages); + validateGroup(streamGroup); + checkRedisManager(); + + try (RedisManagerConnection ignored = redisManager.initConnection()) { + return publishPipelinedInActiveConnection(streamGroup, streamMessages, parameters); + } + } + + /** + * Publish (send) multiple messages through pipeline to stream + * + * @param streamGroup + * Stream group to send (another than initialized) + * @param streamMessages + * Messages in stream. Can be String or JSON List + * @param parameters + * Messages parameters, nullable. Map key value is standardized in {@link StreamMessageParameter} enum value + * @return Created Redis Stream messages identifiers from Redis server + * @throws BaseException + * exception on sending + */ + protected List> publishPipelinedInActiveConnection(String streamGroup, List streamMessages, + Map parameters) throws BaseException { + + Pipeline pipeline = initPipeline(); + + int i = 1; + List> responses = new ArrayList<>(streamMessages.size()); + for (String streamMessage : streamMessages) { + Response response = publishThroughPipeline(pipeline, streamGroup, createJedisMessage(streamMessage, parameters)); + responses.add(response); + i = syncPipelineIfNeeded(pipeline, streamMessages.size(), i); + } + syncPipelineIfNeeded(pipeline, streamMessages.size(), i); + return getStreamEntryIds(responses); + } + /** * Publish (send) message to stream with class initialized {@code #jedisInstance} * @@ -352,7 +497,8 @@ private String getFlowIdMessage(Map parameters) { return flowIdMessage; } return Optional.ofNullable(parameters.get(StreamMessageParameter.FLOW_ID_EXTENSION.getMessageKey())) - .map(extension -> flowIdMessage + "_" + extension).orElse(flowIdMessage); + .map(extension -> flowIdMessage + "_" + extension) + .orElse(flowIdMessage); } /** @@ -371,9 +517,7 @@ private String getFlowIdMessage(Map parameters) { * Exception */ protected Optional publishInActiveConnection(Map values, String streamGroup) throws BaseException { - XAddParams params = XAddParams.xAddParams(); - config.getProducerMaxLen().ifPresent(params::maxLen); - config.getProducerTTL().ifPresent(ttl -> params.minId(new StreamEntryID(Instant.now().minusMillis(ttl).toEpochMilli(), 0).toString())); + XAddParams params = getXAddParams(); Optional streamEntryID = redisManager.run(Jedis::xadd, "xadd", RedisStreamUtil.streamKey(streamGroup), values, params); if (log.isTraceEnabled()) { log.trace("Published streamEntryID: [{0}] into [{1}]", streamEntryID, RedisStreamUtil.streamKey(streamGroup)); @@ -437,6 +581,112 @@ protected void checkRedisManager() throws TechnicalException { } } + /** + * Validates streamMessages + * + * @param streamMessages + * streamMessages + * @throws BaseException + * if validation fails + */ + protected void validateStreamMessages(List streamMessages) throws BaseException { + if (streamMessages == null) { + throw new InvalidParameterException("streamMessages is null!"); + } + } + + /** + * Returns the default XAddParams + * + * @return the default XAdd params + */ + protected XAddParams getXAddParams() { + XAddParams params = XAddParams.xAddParams(); + config.getProducerMaxLen().ifPresent(params::maxLen); + config.getProducerTTL().ifPresent(ttl -> params.minId(new StreamEntryID(Instant.now().minusMillis(ttl).toEpochMilli(), 0).toString())); + return params; + } + + /** + * Initializes a new pipeline
+ * Requires active connection + * + * @return pipeline instance + * @throws BaseException + * if any error occurred while creating pipeline + */ + protected Pipeline initPipeline() throws BaseException { + return redisManager.run(Jedis::pipelined, "pipelined") + .orElseThrow(() -> new BaseException(CoffeeFaultType.REDIS_OPERATION_FAILED, "Error occurred while creating pipeline")); + } + + /** + * Publish (send) message to stream through pipeline + * + * @param pipeline + * The pipeline instance + * @param streamGroup + * Stream group to send + * @param jedisMessage + * Redis Stream message structure, ready to publish + * + * @return {@link Pipeline#xadd(String, Map, XAddParams)} response + */ + protected Response publishThroughPipeline(Pipeline pipeline, String streamGroup, Map jedisMessage) { + return pipeline.xadd(RedisStreamUtil.streamKey(streamGroup), jedisMessage, getXAddParams()); + } + + /** + * Gets StreamEntryIds-s from pipeline responses + * + * @param responses + * {@link Pipeline#xadd(String, Map, XAddParams)} response list + * @return list of optional StreamEntryIds + */ + protected List> getStreamEntryIds(List> responses) { + List> streamEntryIds = responses.stream().map(Response::get).map(Optional::ofNullable).toList(); + if (log.isTraceEnabled()) { + log.trace( + "Published [{0}] streamEntries into [{1}]", + streamEntryIds.stream().filter(Optional::isPresent).count(), + RedisStreamUtil.streamKey(streamGroup)); + } + return streamEntryIds; + } + + /** + * Syncs pipeline if needed and increases the counter
+ * If pipelined responses reach {@link RedisStreamPublisher#PIPELINE_SIZE},
+ * or if all messages have been sent, and there are pipelined responses,
+ * then {@link Pipeline#sync()} is called
+ * + * @param pipeline + * the Pipeline instance + * @param messagesToSend + * The number of messages to be sent + * @param messagesSent + * The number of sent messages + * @return The number of sent messages increased + * + */ + protected int syncPipelineIfNeeded(Pipeline pipeline, int messagesToSend, int messagesSent) { + if ((messagesSent < messagesToSend && messagesSent % getPipelineSize() == 0) + || (messagesSent == messagesToSend && pipeline.hasPipelinedResponse())) { + pipeline.sync(); + } + messagesSent++; + return messagesSent; + } + + /** + * Returns the size of the pipeline + * + * @return the size of the pipeline + */ + protected int getPipelineSize() { + return PIPELINE_SIZE; + } + private TechnicalException notInitializedException() { return new TechnicalException("RedisStreamHandler is not initialized!"); } diff --git a/docs/en/common/module/coffee-module-redisstream.adoc b/docs/en/common/module/coffee-module-redisstream.adoc index f18388909..085eadf13 100644 --- a/docs/en/common/module/coffee-module-redisstream.adoc +++ b/docs/en/common/module/coffee-module-redisstream.adoc @@ -183,6 +183,17 @@ redisStreamPublisher.publish("message", parameters); //<3> // or RedisStreamPublication publication = RedisStreamPublication.of(id).withTTL(defaultTTL).withParameter(StreamMessageParameter.FLOW_ID_EXTENSION, id)) redisStreamPublisher.publishPublication(publication); //<4> + +// In case of many records, it might be a better option to publish messages through pipeline +List redisStreamPublicationsPipelined = new ArrayList<>(); +for (int i = 0; i < 1000; i++) { + redisStreamPublicationsPipelined.add(RedisStreamPublication.of("alternativeGroup", "pipelined - " + i, parameters)); +} +publisher.publishPublicationsPipelined(redisStreamPublicationsPipelined); +// or +List ids = IntStream.range(0, 1000).mapToObj(i -> RandomUtil.generateId()).toList(); +publisher.publishPipelined(ids); //<5> + ---- <1> "group" is not mandatory in all cases <2> The "message" content itself will be stored in a kind of coffee stream message structure, diff --git a/docs/en/migration/migration290to2100.adoc b/docs/en/migration/migration290to2100.adoc index 2bc8119db..d59ae7093 100644 --- a/docs/en/migration/migration290to2100.adoc +++ b/docs/en/migration/migration290to2100.adoc @@ -3,9 +3,19 @@ coff:ee v2.9.0 -> v2.10.0 migration description, news, changes === coffee-configuration + A misleading exception message has been clarified in ApplicationConfiguration class. ==== Migration + +Changes are backwards compatible doesnt need any migration. + +=== coffee-redisstream + +A new function has been added to RedisStreamPublisher, which makes it possible to send multiple messages at once to Redis stream through pipeline. + +==== Migration + Changes are backwards compatible doesnt need any migration. === Java11 support remove diff --git a/docs/hu/common/module/coffee-module-redisstream.adoc b/docs/hu/common/module/coffee-module-redisstream.adoc index df27284a2..9a6603174 100644 --- a/docs/hu/common/module/coffee-module-redisstream.adoc +++ b/docs/hu/common/module/coffee-module-redisstream.adoc @@ -183,6 +183,17 @@ redisStreamPublisher.publish("message", parameters); //<3> // vagy RedisStreamPublication publication = RedisStreamPublication.of(id).withTTL(defaultTTL).withParameter(StreamMessageParameter.FLOW_ID_EXTENSION, id)) redisStreamPublisher.publishPublication(publication); //<4> + +// Nagy számú üzenet esetén érdemes pipeline-on keresztül küldeni az üzeneteket: +List redisStreamPublicationsPipelined = new ArrayList<>(); +for (int i = 0; i < 1000; i++) { + redisStreamPublicationsPipelined.add(RedisStreamPublication.of("alternativeGroup", "pipelined - " + i, parameters)); +} +publisher.publishPublicationsPipelined(redisStreamPublicationsPipelined); +// vagy +List ids = IntStream.range(0, 1000).mapToObj(i -> RandomUtil.generateId()).toList(); +publisher.publishPipelined(ids); + ---- <1> "group" nem kötelező minden esetben <2> Maga a "message" tartalom egyfajta coffee stream message struktúrába fog kerülni, diff --git a/docs/hu/migration/migration290to2100.adoc b/docs/hu/migration/migration290to2100.adoc index 5ff601b87..633d15d87 100644 --- a/docs/hu/migration/migration290to2100.adoc +++ b/docs/hu/migration/migration290to2100.adoc @@ -3,9 +3,19 @@ coff:ee v2.9.0 -> v2.10.0 migrációs leírás, újdonságok, változások leírása === coffee-configuration + Pontosításra került egy félrevezető hibaüzenet az ApplicationConfiguration osztályban. ==== Migration + +A változtatások nem eredményeznek átállási munkálatokat, visszafelé kompatibilis. + +=== coffee-redisstream + +A RedisStreamPublisher ki lett egészítve egy új funkcióval, ami lehetővé teszi egyszerre több üzenet Redis streamre küldését pipeline-on keresztül. + +==== Migration + A változtatások nem eredményeznek átállási munkálatokat, visszafelé kompatibilis. === Java11 támogatás eltávolítása