Skip to content

Commit

Permalink
Polishing.
Browse files Browse the repository at this point in the history
Add new method to BoundStreamOperations and remove add accepting a Publisher.

Original Pull Request: #2936
  • Loading branch information
christophstrobl committed Sep 3, 2024
1 parent 486dc97 commit fb0f0bc
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ default Mono<RecordId> xAdd(ByteBufferRecord record) {
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
* @return {@link Mono} the {@link RecordId id}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
* @since 3.4
*/
default Mono<RecordId> xAdd(ByteBufferRecord record, XAddOptions xAddOptions) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
Expand Down Expand Up @@ -58,6 +59,18 @@ public interface BoundStreamOperations<K, HK, HV> {
@Nullable
RecordId add(Map<HK, HV> body);

/**
* Append a record to the stream {@code key} with the specified options.
*
* @param content record content as Map.
* @param xAddOptions additional parameters for the {@literal XADD} call.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.4
*/
@Nullable
RecordId add(Map<HK, HV> content, XAddOptions xAddOptions);

/**
* Removes the specified entries from the stream. Returns the number of items deleted, that may be different from the
* number of IDs passed in case certain IDs do not exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,6 @@ default Mono<Long> acknowledge(String group, Record<K, ?> record) {
return acknowledge(record.getRequiredStream(), group, record.getId());
}

/**
* Append one or more records to the stream {@code key} with the specified options.
*
* @param key the stream key.
* @param bodyPublisher record body {@link Publisher}.
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the record Ids.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
default Flux<RecordId> add (K key, Publisher<? extends Map<? extends HK, ? extends HV>> bodyPublisher,
XAddOptions xAddOptions) {
return Flux.from(bodyPublisher).flatMap(it -> add(key, it, xAddOptions));
}

/**
* Append a record to the stream {@code key} with the specified options.
*
Expand All @@ -119,7 +104,7 @@ default Flux<RecordId> add (K key, Publisher<? extends Map<? extends HK, ? exten
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the {@link Mono} emitting the {@link RecordId}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
* @since 3.4
*/
default Mono<RecordId> add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
Expand All @@ -132,7 +117,7 @@ default Mono<RecordId> add(K key, Map<? extends HK, ? extends HV> content, XAddO
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the {@link Mono} emitting the {@link RecordId}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
* @since 3.4
*/
@SuppressWarnings("unchecked")
default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
Expand All @@ -149,7 +134,7 @@ default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record, XAdd
* @see MapRecord
* @see ObjectRecord
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
* @since 3.4
*/
Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ default Long acknowledge(String group, Record<K, ?> record) {
* @param xAddOptions additional parameters for the {@literal XADD} call.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
* @since 3.4
*/
@SuppressWarnings("unchecked")
@Nullable
Expand All @@ -120,7 +120,7 @@ default RecordId add(K key, Map<? extends HK, ? extends HV> content, XAddOptions
* @param xAddOptions additional parameters for the {@literal XADD} call.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
* @since 3.4
*/
@SuppressWarnings("unchecked")
@Nullable
Expand All @@ -138,7 +138,7 @@ default RecordId add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOption
* @see MapRecord
* @see ObjectRecord
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
* @since 3.4
*/
@SuppressWarnings("unchecked")
@Nullable
Expand Down

0 comments on commit fb0f0bc

Please sign in to comment.