Skip to content

Commit

Permalink
Merge pull request #729 from i-Cell-Mobilsoft-Open-Source/feature/691…
Browse files Browse the repository at this point in the history
…-RedisStreamPublisher-list-publish-alternative

#691 RedisStreamPublisher pipelined publish
  • Loading branch information
plevente011230 authored Dec 16, 2024
2 parents 51db812 + aba64b6 commit 8fc9c37
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import hu.icellmobilsoft.coffee.dto.common.LogConstants;
import hu.icellmobilsoft.coffee.dto.exception.InvalidParameterException;
import hu.icellmobilsoft.coffee.dto.exception.TechnicalException;
import hu.icellmobilsoft.coffee.dto.exception.enums.CoffeeFaultType;
import hu.icellmobilsoft.coffee.module.redis.manager.RedisManager;
import hu.icellmobilsoft.coffee.module.redis.manager.RedisManagerConnection;
import hu.icellmobilsoft.coffee.module.redisstream.common.RedisStreamUtil;
Expand All @@ -46,6 +47,8 @@
import hu.icellmobilsoft.coffee.se.logging.Logger;
import hu.icellmobilsoft.coffee.se.logging.mdc.MDC;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;

Expand All @@ -59,6 +62,8 @@
@Dependent
public class RedisStreamPublisher {

private static final int PIPELINE_SIZE = 1000;

@Inject
private Logger log;

Expand Down Expand Up @@ -198,7 +203,8 @@ public List<Optional<StreamEntryID>> publishPublications(List<RedisStreamPublica
validateGroup(streamGroup);
id = publishInActiveConnection(createJedisMessage(publication.getStreamMessage(), publication.getParameters()), streamGroup);
} else {
id = publishInActiveConnection(createJedisMessage(publication.getStreamMessage(), publication.getParameters()),
id = publishInActiveConnection(
createJedisMessage(publication.getStreamMessage(), publication.getParameters()),
publication.getStreamGroup());
}
ids.add(id);
Expand All @@ -207,6 +213,48 @@ public List<Optional<StreamEntryID>> publishPublications(List<RedisStreamPublica
}
}

/**
* Publish (send) multiple messages through pipeline to stream calculated by input publication streamGroup name.
*
* @param publications
* stream publication data list
* @return Created Redis Stream messages identifiers from Redis server
* @throws BaseException
* exception on sending
*/
public List<Optional<StreamEntryID>> publishPublicationsPipelined(List<RedisStreamPublication> publications) throws BaseException {
if (publications == null) {
throw new InvalidParameterException("publications is null!");
}
checkRedisManager();

try (RedisManagerConnection ignored = redisManager.initConnection()) {
List<Response<StreamEntryID>> responses = new ArrayList<>(publications.size());
Pipeline pipeline = initPipeline();
int i = 1;
for (RedisStreamPublication publication : publications) {
Response<StreamEntryID> response;
if (StringUtils.isBlank(publication.getStreamGroup())) {
validateGroup(streamGroup);
response = publishThroughPipeline(
pipeline,
streamGroup,
createJedisMessage(publication.getStreamMessage(), publication.getParameters()));
} else {
validateGroup(publication.getStreamGroup());
response = publishThroughPipeline(
pipeline,
publication.getStreamGroup(),
createJedisMessage(publication.getStreamMessage(), publication.getParameters()));
}
responses.add(response);
i = syncPipelineIfNeeded(pipeline, publications.size(), i);
}
syncPipelineIfNeeded(pipeline, publications.size(), i);
return getStreamEntryIds(responses);
}
}

/**
* Publish (send) multiple messages to stream calculated by input streamGroup name.
*
Expand All @@ -232,9 +280,7 @@ public List<Optional<StreamEntryID>> publish(List<String> streamMessages) throws
* exception on sending
*/
public List<Optional<StreamEntryID>> publish(List<String> streamMessages, Map<String, String> parameters) throws BaseException {
if (streamMessages == null) {
throw new InvalidParameterException("streamMessages is null!");
}
validateStreamMessages(streamMessages);
checkInitialization();

try (RedisManagerConnection ignored = redisManager.initConnection()) {
Expand Down Expand Up @@ -272,9 +318,7 @@ public List<Optional<StreamEntryID>> publish(String streamGroup, List<String> st
*/
public List<Optional<StreamEntryID>> publish(String streamGroup, List<String> streamMessages, Map<String, String> parameters)
throws BaseException {
if (streamMessages == null) {
throw new InvalidParameterException("streamMessages is null!");
}
validateStreamMessages(streamMessages);
validateGroup(streamGroup);
checkRedisManager();

Expand Down Expand Up @@ -306,6 +350,107 @@ protected List<Optional<StreamEntryID>> publishInActiveConnection(String streamG
return ids;
}

/**
* Publish (send) multiple messages through pipeline to stream calculated by input streamGroup name.
*
* @param streamMessages
* Messages in stream. Can be String or JSON List
* @return Created Redis Stream messages identifiers from Redis server
* @throws BaseException
* exception on sending
*/
public List<Optional<StreamEntryID>> publishPipelined(List<String> streamMessages) throws BaseException {
return publishPipelined(streamMessages, null);
}

/**
* Publish (send) multiple messages through pipeline to stream with pipeline calculated by input streamGroup name.
*
* @param streamMessages
* Messages in stream. Can be String or JSON List
* @param parameters
* Messages parameters, nullable. Map key value is standardized in {@link StreamMessageParameter} enum value
* @return Created Redis Stream messages identifiers from Redis server
* @throws BaseException
* exception on sending
*/
public List<Optional<StreamEntryID>> publishPipelined(List<String> streamMessages, Map<String, String> parameters) throws BaseException {
validateStreamMessages(streamMessages);
checkInitialization();

try (RedisManagerConnection ignored = redisManager.initConnection()) {
return publishPipelinedInActiveConnection(streamGroup, streamMessages, parameters);
}
}

/**
* Publish (send) multiple messages through pipeline to stream with pipeline calculated by input streamGroup name.
*
* @param streamGroup
* stream group to send (another than initialized)
* @param streamMessages
* Messages in stream. Can be String or JSON List
* @return Created Redis Stream message identifier from Redis server
* @throws BaseException
* exception on sending
*/
public List<Optional<StreamEntryID>> publishPipelined(String streamGroup, List<String> streamMessages) throws BaseException {
return publishPipelined(streamGroup, streamMessages, null);
}

/**
* Publish (send) multiple messages through pipeline to stream with pipeline calculated by input streamGroup name.
*
* @param streamGroup
* stream group to send (another than initialized)
* @param streamMessages
* Messages in stream. Can be String or JSON List
* @param parameters
* Messages parameters, nullable. Map key value is standardized in {@link StreamMessageParameter} enum value
* @return Created Redis Stream messages identifiers from Redis server
* @throws BaseException
* exception on sending
*/
public List<Optional<StreamEntryID>> publishPipelined(String streamGroup, List<String> streamMessages, Map<String, String> parameters)
throws BaseException {
validateStreamMessages(streamMessages);
validateGroup(streamGroup);
checkRedisManager();

try (RedisManagerConnection ignored = redisManager.initConnection()) {
return publishPipelinedInActiveConnection(streamGroup, streamMessages, parameters);
}
}

/**
* Publish (send) multiple messages through pipeline to stream
*
* @param streamGroup
* Stream group to send (another than initialized)
* @param streamMessages
* Messages in stream. Can be String or JSON List
* @param parameters
* Messages parameters, nullable. Map key value is standardized in {@link StreamMessageParameter} enum value
* @return Created Redis Stream messages identifiers from Redis server
* @throws BaseException
* exception on sending
*/
protected List<Optional<StreamEntryID>> publishPipelinedInActiveConnection(String streamGroup, List<String> streamMessages,
Map<String, String> parameters) throws BaseException {

Pipeline pipeline = initPipeline();

int i = 1;
List<Response<StreamEntryID>> responses = new ArrayList<>(streamMessages.size());
for (String streamMessage : streamMessages) {
Response<StreamEntryID> response = publishThroughPipeline(pipeline, streamGroup, createJedisMessage(streamMessage, parameters));
responses.add(response);
i = syncPipelineIfNeeded(pipeline, streamMessages.size(), i);
}
syncPipelineIfNeeded(pipeline, streamMessages.size(), i);
return getStreamEntryIds(responses);
}

/**
* Publish (send) message to stream with class initialized {@code #jedisInstance}
*
Expand Down Expand Up @@ -352,7 +497,8 @@ private String getFlowIdMessage(Map<String, String> parameters) {
return flowIdMessage;
}
return Optional.ofNullable(parameters.get(StreamMessageParameter.FLOW_ID_EXTENSION.getMessageKey()))
.map(extension -> flowIdMessage + "_" + extension).orElse(flowIdMessage);
.map(extension -> flowIdMessage + "_" + extension)
.orElse(flowIdMessage);
}

/**
Expand All @@ -371,9 +517,7 @@ private String getFlowIdMessage(Map<String, String> parameters) {
* Exception
*/
protected Optional<StreamEntryID> publishInActiveConnection(Map<String, String> values, String streamGroup) throws BaseException {
XAddParams params = XAddParams.xAddParams();
config.getProducerMaxLen().ifPresent(params::maxLen);
config.getProducerTTL().ifPresent(ttl -> params.minId(new StreamEntryID(Instant.now().minusMillis(ttl).toEpochMilli(), 0).toString()));
XAddParams params = getXAddParams();
Optional<StreamEntryID> streamEntryID = redisManager.run(Jedis::xadd, "xadd", RedisStreamUtil.streamKey(streamGroup), values, params);
if (log.isTraceEnabled()) {
log.trace("Published streamEntryID: [{0}] into [{1}]", streamEntryID, RedisStreamUtil.streamKey(streamGroup));
Expand Down Expand Up @@ -437,6 +581,112 @@ protected void checkRedisManager() throws TechnicalException {
}
}

/**
* Validates streamMessages
*
* @param streamMessages
* streamMessages
* @throws BaseException
* if validation fails
*/
protected void validateStreamMessages(List<String> streamMessages) throws BaseException {
if (streamMessages == null) {
throw new InvalidParameterException("streamMessages is null!");
}
}

/**
* Returns the default XAddParams
*
* @return the default XAdd params
*/
protected XAddParams getXAddParams() {
XAddParams params = XAddParams.xAddParams();
config.getProducerMaxLen().ifPresent(params::maxLen);
config.getProducerTTL().ifPresent(ttl -> params.minId(new StreamEntryID(Instant.now().minusMillis(ttl).toEpochMilli(), 0).toString()));
return params;
}

/**
* Initializes a new pipeline<br/>
* Requires active connection
*
* @return pipeline instance
* @throws BaseException
* if any error occurred while creating pipeline
*/
protected Pipeline initPipeline() throws BaseException {
return redisManager.run(Jedis::pipelined, "pipelined")
.orElseThrow(() -> new BaseException(CoffeeFaultType.REDIS_OPERATION_FAILED, "Error occurred while creating pipeline"));
}

/**
* Publish (send) message to stream through pipeline
*
* @param pipeline
* The pipeline instance
* @param streamGroup
* Stream group to send
* @param jedisMessage
* Redis Stream message structure, ready to publish
*
* @return {@link Pipeline#xadd(String, Map, XAddParams)} response
*/
protected Response<StreamEntryID> publishThroughPipeline(Pipeline pipeline, String streamGroup, Map<String, String> jedisMessage) {
return pipeline.xadd(RedisStreamUtil.streamKey(streamGroup), jedisMessage, getXAddParams());
}

/**
* Gets StreamEntryIds-s from pipeline responses
*
* @param responses
* {@link Pipeline#xadd(String, Map, XAddParams)} response list
* @return list of optional StreamEntryIds
*/
protected List<Optional<StreamEntryID>> getStreamEntryIds(List<Response<StreamEntryID>> responses) {
List<Optional<StreamEntryID>> streamEntryIds = responses.stream().map(Response::get).map(Optional::ofNullable).toList();
if (log.isTraceEnabled()) {
log.trace(
"Published [{0}] streamEntries into [{1}]",
streamEntryIds.stream().filter(Optional::isPresent).count(),
RedisStreamUtil.streamKey(streamGroup));
}
return streamEntryIds;
}

/**
* Syncs pipeline if needed and increases the counter<br/>
* If pipelined responses reach {@link RedisStreamPublisher#PIPELINE_SIZE},<br/>
* or if all messages have been sent, and there are pipelined responses,<br/>
* then {@link Pipeline#sync()} is called<br/>
*
* @param pipeline
* the Pipeline instance
* @param messagesToSend
* The number of messages to be sent
* @param messagesSent
* The number of sent messages
* @return The number of sent messages increased
*
*/
protected int syncPipelineIfNeeded(Pipeline pipeline, int messagesToSend, int messagesSent) {
if ((messagesSent < messagesToSend && messagesSent % getPipelineSize() == 0)
|| (messagesSent == messagesToSend && pipeline.hasPipelinedResponse())) {
pipeline.sync();
}
messagesSent++;
return messagesSent;
}

/**
* Returns the size of the pipeline
*
* @return the size of the pipeline
*/
protected int getPipelineSize() {
return PIPELINE_SIZE;
}

private TechnicalException notInitializedException() {
return new TechnicalException("RedisStreamHandler is not initialized!");
}
Expand Down
11 changes: 11 additions & 0 deletions docs/en/common/module/coffee-module-redisstream.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ redisStreamPublisher.publish("message", parameters); //<3>
// or
RedisStreamPublication publication = RedisStreamPublication.of(id).withTTL(defaultTTL).withParameter(StreamMessageParameter.FLOW_ID_EXTENSION, id))
redisStreamPublisher.publishPublication(publication); //<4>
// In case of many records, it might be a better option to publish messages through pipeline
List<RedisStreamPublication> redisStreamPublicationsPipelined = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
redisStreamPublicationsPipelined.add(RedisStreamPublication.of("alternativeGroup", "pipelined - " + i, parameters));
}
publisher.publishPublicationsPipelined(redisStreamPublicationsPipelined);
// or
List<String> ids = IntStream.range(0, 1000).mapToObj(i -> RandomUtil.generateId()).toList();
publisher.publishPipelined(ids); //<5>
----
<1> "group" is not mandatory in all cases
<2> The "message" content itself will be stored in a kind of coffee stream message structure,
Expand Down
10 changes: 10 additions & 0 deletions docs/en/migration/migration290to2100.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,19 @@
coff:ee v2.9.0 -> v2.10.0 migration description, news, changes

=== coffee-configuration

A misleading exception message has been clarified in ApplicationConfiguration class.

==== Migration

Changes are backwards compatible doesnt need any migration.

=== coffee-redisstream

A new function has been added to RedisStreamPublisher, which makes it possible to send multiple messages at once to Redis stream through pipeline.

==== Migration

Changes are backwards compatible doesnt need any migration.

=== Java11 support remove
Expand Down
Loading

0 comments on commit 8fc9c37

Please sign in to comment.