Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concat array enhancement #2508

Merged
merged 3 commits into from
Jan 25, 2021
Merged

Conversation

danielkec
Copy link
Contributor

@danielkec danielkec commented Nov 8, 2020

  • More effective cancellation handling
  • Better handling of negative requests

JMH test results:

Benchmark                                     Mode  Cnt        Score        Error  Units
ConcatArrayComparison.reqMaxNewCA            thrpt   10   742789.861 ±  25826.167  ops/s
ConcatArrayComparison.reqMaxOldCA            thrpt   10   716121.440 ±  28539.035  ops/s
ConcatArrayComparison.variableRequestsNewCA  thrpt   10  6538445.756 ± 629438.611  ops/s
ConcatArrayComparison.variableRequestsOldCA  thrpt   10  5267535.840 ± 650941.922  ops/s

@danielkec danielkec self-assigned this Nov 8, 2020
@danielkec danielkec force-pushed the concat-array-refactor branch from 97e2fc2 to edff274 Compare November 24, 2020 08:17
Signed-off-by: Daniel Kec <[email protected]>
@danielkec danielkec added the reactive Reactive streams and related components label Nov 24, 2020
@danielkec danielkec changed the title WIP: Concat array enhancement Concat array enhancement Nov 24, 2020
@akarnokd
Copy link
Collaborator

The code looks quite unconventional to me and the state transitions are somewhat hard to follow. In general, concat can break when there is a switchover from source to source at the same time the downstream requests thus the outstanding request amount is not properly forwarded to them.

Here is a test that passes on master with the SubscriptionArbiter but fails on this implementation because some request(1)s are not handed over properly:

    static void race(Runnable r1, Runnable r2, ExecutorService exec) {

        AtomicInteger sync = new AtomicInteger(2);
        CountDownLatch cdl = new CountDownLatch(1);

        exec.submit(() -> {
            if (sync.decrementAndGet() != 0) {
                while (sync.get() != 0) ;
            }

            try {
                r1.run();
            } finally {
                cdl.countDown();
            }
        });

        if (sync.decrementAndGet() != 0) {
            while (sync.get() != 0) ;
        }
        r2.run();

        try {
            Assert.assertTrue(cdl.await(5, TimeUnit.SECONDS));
        } catch (InterruptedException ex) {
            Assert.fail("", ex);
        }
    }


    @Test
    public void switchOverRequestRace() {
        var exec = Executors.newSingleThreadExecutor();
        try {
            for (int i = 0; i < 10_000; i++) {
                System.out.printf("Round %d%n", i + 1);
                TestSubscriber<Object> ts = new TestSubscriber<>(0L);
                AtomicReference<Flow.Subscriber<? super Integer>> sref = new AtomicReference<>();

                Multi.concatArray(new Flow.Publisher<Integer>() {

                    @Override
                    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
                        subscriber.onSubscribe(EmptySubscription.INSTANCE);
                        sref.set(subscriber);
                    }
                }, Multi.range(1, 100)).subscribe(ts);

                race(() -> {
                    sref.get().onComplete();
                }, () -> {
                    var s = ts.getSubcription();
                    for (int j = 0; j < 100; j++) {
                        s.request(1);
                    }
                }, exec);

                ts.awaitDone(5, TimeUnit.SECONDS);
                ts.assertItemCount(100);
            }
        } finally {
            exec.shutdown();
        }
    }

@olotenko
Copy link

Thanks.

The problem with the original code is the lack of atomicity in state transitions around the same place, which can also be shown to lead to problems (not conformant to the specification). Consequently, an attempt to capture all state transitions in a single atomic counter.

produced++; // assert: matching request(1) has been done by nextSource()
this.subscription = subscription;
// assert: requested == SEE_OTHER
REQUESTED.setOpaque(this, p0); // assert: p0 is guaranteed to be a value of requested never seen before
Copy link

@olotenko olotenko Nov 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things need doing:
long oldProduced = produced; on the line before change of REQUESTED, then use oldProduced to the end of this method.

This addresses the case raised by @akarnok - concurrent request results in concurrent onNext, which updates produced.

and

REQUESTED.setVolatile(this, p0) to replace setOpaque - opaque doesn't guarantee ordering with other stores.

@olotenko
Copy link

olotenko commented Nov 25, 2020

The code looks quite unconventional to me and the state transitions are somewhat hard to follow. In general, concat can break when there is a switchover from source to source at the same time the downstream requests thus the outstanding request amount is not properly forwarded to them.

Here is a test that passes on master with the SubscriptionArbiter but fails on this implementation because some request(1)s are not handed over properly:

I even remember considering the race condition this test shows.

It really only shows Multi.range does not conform to the spec - it is able to deliver onNext during onSubscribe. The spec requires these signals to be delivered serially, and there is a TCK capturing this condition (which fails for several Publishers).

https://github.com/reactive-streams/reactive-streams-jvm/blob/006202eb877c54cf64b258a8a78328e4fb38cab6/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java#L244 - this is the test that Multi.range will fail. It looks specifically for the occurrences of onNext executed concurrently with onSubscribe.

@akarnokd
Copy link
Collaborator

103 tests for concurrent access violation which range does not do. Also §2.1 and §3.2 cover reentrance and §3.3 covers recursion which range conforms as well.

Of course, you can ensure no request is delivered until the subscriber.onSubscribe invocation has returned via the deferred subscription approach. However, that incurs extra allocation and full-barrier atomics thus I'd expect no clear performance benefit over the current implementation.

@olotenko
Copy link

olotenko commented Dec 12, 2020

That is not the proof of the absence of the race in range. The proof of the existence of the race is the existence of this failing condition, which can be shown to occur because onNext is delivered concurrently with onSubscribe by range.

Even without such a test it is easy to see from the code of the range that it does not deliver signals in a total order for all the signals.

We have a prototype that works around this issue with spec conformance of the range, @danielkec is preparing a merge.

…/onComplete concurrently with onSubscribe in response to concurrent request().

Signed-off-by: Daniel Kec <[email protected]>
Signed-off-by: Daniel Kec <[email protected]>
}

LASTTHREADCOMPLETING.setOpaque(this, current);
VarHandle.storeStoreFence();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this again, strictly speaking this fence is not needed, because we have stronger fences between the stores targeted by this fence. But no need to change the code as the cost of this fence on the target platforms is zero.

Copy link

@olotenko olotenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appears to have only cosmetic changes compared to the internal proposal.

@spericas spericas self-requested a review January 25, 2021 17:16
@danielkec danielkec merged commit a35caa9 into helidon-io:master Jan 25, 2021
@danielkec danielkec deleted the concat-array-refactor branch January 25, 2021 17:58
spericas added a commit that referenced this pull request Feb 11, 2021
* Upgrade Netty to 4.1.58 (#2678)

Signed-off-by: Tomas Langer <[email protected]>

* Added overall timeout to evictable cache (#2659)

Signed-off-by: Tomas Langer <[email protected]>

* Fix copyright year for commits broken by squashing. (#2687)

Signed-off-by: Tomas Langer <[email protected]>

* Concat array enhancement (#2508)

* Concat array enhancement

Signed-off-by: Daniel Kec <[email protected]>

* Update Jackson to 2.12.1 (#2690)

* Update Jackson to 2.12.1
* Upgrade to latest Junit5 to get fix for junit-team/junit5#2198
* Manage junit4 version

* PokemonService template fixed in SE Database Archetype. (#2701)

Signed-off-by: Tomas Kraus <[email protected]>

* Fixed different output in DbClient SE archetype (#2703)

Signed-off-by: Tomas Kraus <[email protected]>

* Fix TODO application: (#2708)

- WebSecurity needs to be passed config.get("security") to take the "security.web-server" configuration
 - Added outbound configuration for the google login
 - Upgraded cassandra driver to fix issues with old guava dependencies
 - Removed metrics to avoid issues with cassandra driver.

Fixes #2707

* Update k8s descriptors to avoid using deprecated APIs. (#2719)

* Separate execution of DataChunkReleaseTest in its own VM to prevent leak messages in other test's logs. (#2716)

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Changes in this commit: (#2727)

1. Upgrade to Jersey 2.33
2. Configuration via system properties for the Jersey Client API. Any response in an exception will be mapped to an empty one to prevent data leaks. See eclipse-ee4j/jersey#4641.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Properly release underlying buffer before passing it to WebSocket handler (#2715)

* Properly release underlying buffer before passing it to handler.

* Releases data chunks after passing them to Tyrus without any copying. Reports an error and closes connection if Tyrus is unable to handle the data. Finally, fixed a problem related to subscription requests.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Removed unused logger.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed checkstyle.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fix issue with null value in JSON. (#2723)

Signed-off-by: Tomas Langer <[email protected]>

* Upgrade grpc to v1.35.0 (#2713)

* Upgrade grpc to v1.35.0

* Update copyright

* Upgrades OCI SDK to version 1.31.0 (#2699)

* Updated OCI to 1.31.0

Signed-off-by: Laird Nelson <[email protected]>

* Fix null array values in HOCON/JSON config parser. (#2731)

Resolves #2720 (follow-up)

* Performance improvements to queue(s) management in Webserver (#2704)

* Initial patch.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed some type params and improved comments.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* More cleanup and make sure to fail publisher on an error condition.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Suppress warnings.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Call clearQueues on every new request for proper cleanup of keep-alive connections. Some copyright fixes.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed checkstyle issues.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Force logging of LEAK error even if finalize does not get called on a DataChunk.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Upgrade Weld (#2668)

Signed-off-by: Tomas Langer <[email protected]>

* Rest client async header propagation with usage of Helidon Context (#2735)

Rest client header propagation with usage of Helidon Context

Signed-off-by: David Kral <[email protected]>

* Allow override of Jersey property via config (#2737)

* Allow the default value of property jersey.config.client.ignoreExceptionResponse to be overridden via config. New test.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed copyright year.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* New implementation of LazyValue (#2738)

* New implementation of LazyValue that lazily initializes a Semaphore instead of eagerly creating a ReentrantLock. Makes use of volatile guarantees and atomicity of VarHandle updates.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* New test for LazyValueImpl.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Reduced sleep time in test.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Update CHANGELOG for 2.2.1 release (#2743)

* 2.2.1 THIRD_PARTY_LICENSES update (#2746)

* Update THIRD_PARTY_LICENSES

* Support async invocations using optional synthetic SimplyTimed behavior (#2745)

* Add support for async invocations for optional inferred SimplyTimed behavior on JAX-RS endpoints

Signed-off-by: [email protected] <[email protected]>

* Do not attempt to access the request context in Fallback callback. If used together with Retry, it is possible for the fallback to be called in a fresh thread for which there is no current request scope. Instead just use the original value obtained in this class' constructor. Updated functional test (with some class renaming) to cover this use case. (#2748)

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fix for native image. (#2753)

Signed-off-by: Tomas Langer <[email protected]>

* Fixed checkstyle issues.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

Co-authored-by: Tomas Langer <[email protected]>
Co-authored-by: Daniel Kec <[email protected]>
Co-authored-by: Joe DiPol <[email protected]>
Co-authored-by: Tomáš Kraus <[email protected]>
Co-authored-by: Romain Grecourt <[email protected]>
Co-authored-by: Jonathan Knight <[email protected]>
Co-authored-by: Laird Nelson <[email protected]>
Co-authored-by: David Král <[email protected]>
Co-authored-by: Tim Quinn <[email protected]>
spericas added a commit that referenced this pull request Nov 22, 2021
* Fault Tolerance 3.0 Support (#2680)

* Initial changes to implement new metrics layer. Moving from complex names to simpler names and tags.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* More metric updates.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Migration of most unit tests to new metrics.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Completed migration of metrics test.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* New exception to discern timeouts during retries.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Implementation of retry metrics.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Cleanup metrics between tests.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Several changes related to execution of FT 3.0 TCKs. Adjusted initial size of executors and fixed a few other problems.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Copyright and checkstyle updates.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed copyright year.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed typos and some cleanup.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Created exclude file as a workaround for a sportbugs' bug.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Updated copyright year.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* MicroProfile Opentracing 2.0 (#2676)

* Microprofile Opentracing uprgated to 2.0
* Unused dependences removed
* Obsolete excludes removed

* Sync up of microprofile-4.0 with master branch (#2757)

* Upgrade Netty to 4.1.58 (#2678)

Signed-off-by: Tomas Langer <[email protected]>

* Added overall timeout to evictable cache (#2659)

Signed-off-by: Tomas Langer <[email protected]>

* Fix copyright year for commits broken by squashing. (#2687)

Signed-off-by: Tomas Langer <[email protected]>

* Concat array enhancement (#2508)

* Concat array enhancement

Signed-off-by: Daniel Kec <[email protected]>

* Update Jackson to 2.12.1 (#2690)

* Update Jackson to 2.12.1
* Upgrade to latest Junit5 to get fix for junit-team/junit5#2198
* Manage junit4 version

* PokemonService template fixed in SE Database Archetype. (#2701)

Signed-off-by: Tomas Kraus <[email protected]>

* Fixed different output in DbClient SE archetype (#2703)

Signed-off-by: Tomas Kraus <[email protected]>

* Fix TODO application: (#2708)

- WebSecurity needs to be passed config.get("security") to take the "security.web-server" configuration
 - Added outbound configuration for the google login
 - Upgraded cassandra driver to fix issues with old guava dependencies
 - Removed metrics to avoid issues with cassandra driver.

Fixes #2707

* Update k8s descriptors to avoid using deprecated APIs. (#2719)

* Separate execution of DataChunkReleaseTest in its own VM to prevent leak messages in other test's logs. (#2716)

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Changes in this commit: (#2727)

1. Upgrade to Jersey 2.33
2. Configuration via system properties for the Jersey Client API. Any response in an exception will be mapped to an empty one to prevent data leaks. See eclipse-ee4j/jersey#4641.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Properly release underlying buffer before passing it to WebSocket handler (#2715)

* Properly release underlying buffer before passing it to handler.

* Releases data chunks after passing them to Tyrus without any copying. Reports an error and closes connection if Tyrus is unable to handle the data. Finally, fixed a problem related to subscription requests.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Removed unused logger.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed checkstyle.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fix issue with null value in JSON. (#2723)

Signed-off-by: Tomas Langer <[email protected]>

* Upgrade grpc to v1.35.0 (#2713)

* Upgrade grpc to v1.35.0

* Update copyright

* Upgrades OCI SDK to version 1.31.0 (#2699)

* Updated OCI to 1.31.0

Signed-off-by: Laird Nelson <[email protected]>

* Fix null array values in HOCON/JSON config parser. (#2731)

Resolves #2720 (follow-up)

* Performance improvements to queue(s) management in Webserver (#2704)

* Initial patch.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed some type params and improved comments.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* More cleanup and make sure to fail publisher on an error condition.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Suppress warnings.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Call clearQueues on every new request for proper cleanup of keep-alive connections. Some copyright fixes.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed checkstyle issues.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Force logging of LEAK error even if finalize does not get called on a DataChunk.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Upgrade Weld (#2668)

Signed-off-by: Tomas Langer <[email protected]>

* Rest client async header propagation with usage of Helidon Context (#2735)

Rest client header propagation with usage of Helidon Context

Signed-off-by: David Kral <[email protected]>

* Allow override of Jersey property via config (#2737)

* Allow the default value of property jersey.config.client.ignoreExceptionResponse to be overridden via config. New test.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed copyright year.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* New implementation of LazyValue (#2738)

* New implementation of LazyValue that lazily initializes a Semaphore instead of eagerly creating a ReentrantLock. Makes use of volatile guarantees and atomicity of VarHandle updates.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* New test for LazyValueImpl.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Reduced sleep time in test.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Update CHANGELOG for 2.2.1 release (#2743)

* 2.2.1 THIRD_PARTY_LICENSES update (#2746)

* Update THIRD_PARTY_LICENSES

* Support async invocations using optional synthetic SimplyTimed behavior (#2745)

* Add support for async invocations for optional inferred SimplyTimed behavior on JAX-RS endpoints

Signed-off-by: [email protected] <[email protected]>

* Do not attempt to access the request context in Fallback callback. If used together with Retry, it is possible for the fallback to be called in a fresh thread for which there is no current request scope. Instead just use the original value obtained in this class' constructor. Updated functional test (with some class renaming) to cover this use case. (#2748)

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fix for native image. (#2753)

Signed-off-by: Tomas Langer <[email protected]>

* Fixed checkstyle issues.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

Co-authored-by: Tomas Langer <[email protected]>
Co-authored-by: Daniel Kec <[email protected]>
Co-authored-by: Joe DiPol <[email protected]>
Co-authored-by: Tomáš Kraus <[email protected]>
Co-authored-by: Romain Grecourt <[email protected]>
Co-authored-by: Jonathan Knight <[email protected]>
Co-authored-by: Laird Nelson <[email protected]>
Co-authored-by: David Král <[email protected]>
Co-authored-by: Tim Quinn <[email protected]>

* Fixed problems in RetryImpl after merge.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed problems with metrics after merge.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Updated version in suite file.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed problem retrieving registry for metrics.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed more problems after merge. All tests are passing now.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed checkstyle errors.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed TODO.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Enabled TCK's by default and removed generated file.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* One more checkstyle violation.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Removed duplicate test after merge.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

Co-authored-by: Dmitry Aleksandrov <[email protected]>
Co-authored-by: Tomas Langer <[email protected]>
Co-authored-by: Daniel Kec <[email protected]>
Co-authored-by: Joe DiPol <[email protected]>
Co-authored-by: Tomáš Kraus <[email protected]>
Co-authored-by: Romain Grecourt <[email protected]>
Co-authored-by: Jonathan Knight <[email protected]>
Co-authored-by: Laird Nelson <[email protected]>
Co-authored-by: David Král <[email protected]>
Co-authored-by: Tim Quinn <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
reactive Reactive streams and related components
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants