Skip to content

Commit

Permalink
fix: rewrite of the concatenation operators
Browse files Browse the repository at this point in the history
This fixes race conditions in concatMap and stream concatenation operators.

Refs: #1388
  • Loading branch information
jponge committed Nov 30, 2023
1 parent f6be4a8 commit b552fc4
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 354 deletions.
9 changes: 8 additions & 1 deletion implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.class.removed",
"old": "class io.smallrye.mutiny.operators.multi.MultiConcatMapOp.ConcatMapMainSubscriber<I, O>",
"justification": "Internal API refactoring"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;

import io.smallrye.mutiny.CompositeException;
Expand Down Expand Up @@ -103,6 +104,19 @@ public static long add(AtomicLong requested, long requests) {
}
}

public static <T> long add(AtomicLongFieldUpdater<T> updater, T receiver, long requests) {
for (;;) {
long r = updater.get(receiver);
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;

Check warning on line 111 in implementation/src/main/java/io/smallrye/mutiny/helpers/Subscriptions.java

View check run for this annotation

Codecov / codecov/patch

implementation/src/main/java/io/smallrye/mutiny/helpers/Subscriptions.java#L111

Added line #L111 was not covered by tests
}
long u = add(r, requests);
if (updater.compareAndSet(receiver, r, u)) {
return r;
}
}

Check warning on line 117 in implementation/src/main/java/io/smallrye/mutiny/helpers/Subscriptions.java

View check run for this annotation

Codecov / codecov/patch

implementation/src/main/java/io/smallrye/mutiny/helpers/Subscriptions.java#L117

Added line #L117 was not covered by tests
}

/**
* Atomically subtract the given number (positive, not validated) from the target field unless it contains Long.MAX_VALUE.
*
Expand Down
Loading

0 comments on commit b552fc4

Please sign in to comment.