From 1c23501611baf9013fc6caabaeee3f7b3398b709 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 22 Jun 2023 12:20:37 +0200 Subject: [PATCH 1/4] Multi split operator Fixes #1278 --- documentation/Pipfile.lock | 232 +++++++++++----- documentation/docs/guides/multi-split.md | 52 ++++ documentation/mkdocs.yml | 1 + .../test/java/guides/operators/SplitTest.java | 45 +++ implementation/pom.xml | 1 + .../main/java/io/smallrye/mutiny/Multi.java | 31 +++ .../mutiny/operators/AbstractMulti.java | 1 - .../operators/multi/split/MultiSplitter.java | 261 ++++++++++++++++++ implementation/src/main/java/module-info.java | 1 + .../multi/split/MultiSplitterTest.java | 237 ++++++++++++++++ .../mutiny/tcktests/MultiSplitTckTest.java | 24 ++ 11 files changed, 820 insertions(+), 66 deletions(-) create mode 100644 documentation/docs/guides/multi-split.md create mode 100644 documentation/src/test/java/guides/operators/SplitTest.java create mode 100644 implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java create mode 100644 implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java create mode 100644 reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSplitTckTest.java diff --git a/documentation/Pipfile.lock b/documentation/Pipfile.lock index 72335edfd..2e4d9771a 100644 --- a/documentation/Pipfile.lock +++ b/documentation/Pipfile.lock @@ -102,7 +102,7 @@ "sha256:f8303414c7b03f794347ad062c0516cee0e15f7a612abd0ce1e25caf6ceb47df", "sha256:fca62a8301b605b954ad2e9c3666f9d97f63872aa4efcae5492baca2056b74ab" ], - "markers": "python_version >= '3.7'", + "markers": "python_full_version >= '3.7.0'", "version": "==3.1.0" }, "click": { @@ -113,6 +113,14 @@ "markers": "python_version >= '3.7'", "version": "==8.1.3" }, + "colorama": { + "hashes": [ + "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", + "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6'", + "version": "==0.4.6" + }, "ghp-import": { "hashes": [ "sha256:8337dd7b50877f163d4c0289bc1f1c7f127550241988d568c1db512c4324a619", @@ -146,59 +154,59 @@ }, "markupsafe": { "hashes": [ - "sha256:0576fe974b40a400449768941d5d0858cc624e3249dfd1e0c33674e5c7ca7aed", - "sha256:085fd3201e7b12809f9e6e9bc1e5c96a368c8523fad5afb02afe3c051ae4afcc", - "sha256:090376d812fb6ac5f171e5938e82e7f2d7adc2b629101cec0db8b267815c85e2", - "sha256:0b462104ba25f1ac006fdab8b6a01ebbfbce9ed37fd37fd4acd70c67c973e460", - "sha256:137678c63c977754abe9086a3ec011e8fd985ab90631145dfb9294ad09c102a7", - "sha256:1bea30e9bf331f3fef67e0a3877b2288593c98a21ccb2cf29b74c581a4eb3af0", - "sha256:22152d00bf4a9c7c83960521fc558f55a1adbc0631fbb00a9471e097b19d72e1", - "sha256:22731d79ed2eb25059ae3df1dfc9cb1546691cc41f4e3130fe6bfbc3ecbbecfa", - "sha256:2298c859cfc5463f1b64bd55cb3e602528db6fa0f3cfd568d3605c50678f8f03", - "sha256:28057e985dace2f478e042eaa15606c7efccb700797660629da387eb289b9323", - "sha256:2e7821bffe00aa6bd07a23913b7f4e01328c3d5cc0b40b36c0bd81d362faeb65", - "sha256:2ec4f2d48ae59bbb9d1f9d7efb9236ab81429a764dedca114f5fdabbc3788013", - "sha256:340bea174e9761308703ae988e982005aedf427de816d1afe98147668cc03036", - "sha256:40627dcf047dadb22cd25ea7ecfe9cbf3bbbad0482ee5920b582f3809c97654f", - "sha256:40dfd3fefbef579ee058f139733ac336312663c6706d1163b82b3003fb1925c4", - "sha256:4cf06cdc1dda95223e9d2d3c58d3b178aa5dacb35ee7e3bbac10e4e1faacb419", - "sha256:50c42830a633fa0cf9e7d27664637532791bfc31c731a87b202d2d8ac40c3ea2", - "sha256:55f44b440d491028addb3b88f72207d71eeebfb7b5dbf0643f7c023ae1fba619", - "sha256:608e7073dfa9e38a85d38474c082d4281f4ce276ac0010224eaba11e929dd53a", - "sha256:63ba06c9941e46fa389d389644e2d8225e0e3e5ebcc4ff1ea8506dce646f8c8a", - "sha256:65608c35bfb8a76763f37036547f7adfd09270fbdbf96608be2bead319728fcd", - "sha256:665a36ae6f8f20a4676b53224e33d456a6f5a72657d9c83c2aa00765072f31f7", - "sha256:6d6607f98fcf17e534162f0709aaad3ab7a96032723d8ac8750ffe17ae5a0666", - "sha256:7313ce6a199651c4ed9d7e4cfb4aa56fe923b1adf9af3b420ee14e6d9a73df65", - "sha256:7668b52e102d0ed87cb082380a7e2e1e78737ddecdde129acadb0eccc5423859", - "sha256:7df70907e00c970c60b9ef2938d894a9381f38e6b9db73c5be35e59d92e06625", - "sha256:7e007132af78ea9df29495dbf7b5824cb71648d7133cf7848a2a5dd00d36f9ff", - "sha256:835fb5e38fd89328e9c81067fd642b3593c33e1e17e2fdbf77f5676abb14a156", - "sha256:8bca7e26c1dd751236cfb0c6c72d4ad61d986e9a41bbf76cb445f69488b2a2bd", - "sha256:8db032bf0ce9022a8e41a22598eefc802314e81b879ae093f36ce9ddf39ab1ba", - "sha256:99625a92da8229df6d44335e6fcc558a5037dd0a760e11d84be2260e6f37002f", - "sha256:9cad97ab29dfc3f0249b483412c85c8ef4766d96cdf9dcf5a1e3caa3f3661cf1", - "sha256:a4abaec6ca3ad8660690236d11bfe28dfd707778e2442b45addd2f086d6ef094", - "sha256:a6e40afa7f45939ca356f348c8e23048e02cb109ced1eb8420961b2f40fb373a", - "sha256:a6f2fcca746e8d5910e18782f976489939d54a91f9411c32051b4aab2bd7c513", - "sha256:a806db027852538d2ad7555b203300173dd1b77ba116de92da9afbc3a3be3eed", - "sha256:abcabc8c2b26036d62d4c746381a6f7cf60aafcc653198ad678306986b09450d", - "sha256:b8526c6d437855442cdd3d87eede9c425c4445ea011ca38d937db299382e6fa3", - "sha256:bb06feb762bade6bf3c8b844462274db0c76acc95c52abe8dbed28ae3d44a147", - "sha256:c0a33bc9f02c2b17c3ea382f91b4db0e6cde90b63b296422a939886a7a80de1c", - "sha256:c4a549890a45f57f1ebf99c067a4ad0cb423a05544accaf2b065246827ed9603", - "sha256:ca244fa73f50a800cf8c3ebf7fd93149ec37f5cb9596aa8873ae2c1d23498601", - "sha256:cf877ab4ed6e302ec1d04952ca358b381a882fbd9d1b07cccbfd61783561f98a", - "sha256:d9d971ec1e79906046aa3ca266de79eac42f1dbf3612a05dc9368125952bd1a1", - "sha256:da25303d91526aac3672ee6d49a2f3db2d9502a4a60b55519feb1a4c7714e07d", - "sha256:e55e40ff0cc8cc5c07996915ad367fa47da6b3fc091fdadca7f5403239c5fec3", - "sha256:f03a532d7dee1bed20bc4884194a16160a2de9ffc6354b3878ec9682bb623c54", - "sha256:f1cd098434e83e656abf198f103a8207a8187c0fc110306691a2e94a78d0abb2", - "sha256:f2bfb563d0211ce16b63c7cb9395d2c682a23187f54c3d79bfec33e6705473c6", - "sha256:f8ffb705ffcf5ddd0e80b65ddf7bed7ee4f5a441ea7d3419e861a12eaf41af58" + "sha256:05fb21170423db021895e1ea1e1f3ab3adb85d1c2333cbc2310f2a26bc77272e", + "sha256:0a4e4a1aff6c7ac4cd55792abf96c915634c2b97e3cc1c7129578aa68ebd754e", + "sha256:10bbfe99883db80bdbaff2dcf681dfc6533a614f700da1287707e8a5d78a8431", + "sha256:134da1eca9ec0ae528110ccc9e48041e0828d79f24121a1a146161103c76e686", + "sha256:1577735524cdad32f9f694208aa75e422adba74f1baee7551620e43a3141f559", + "sha256:1b40069d487e7edb2676d3fbdb2b0829ffa2cd63a2ec26c4938b2d34391b4ecc", + "sha256:282c2cb35b5b673bbcadb33a585408104df04f14b2d9b01d4c345a3b92861c2c", + "sha256:2c1b19b3aaacc6e57b7e25710ff571c24d6c3613a45e905b1fde04d691b98ee0", + "sha256:2ef12179d3a291be237280175b542c07a36e7f60718296278d8593d21ca937d4", + "sha256:338ae27d6b8745585f87218a3f23f1512dbf52c26c28e322dbe54bcede54ccb9", + "sha256:3c0fae6c3be832a0a0473ac912810b2877c8cb9d76ca48de1ed31e1c68386575", + "sha256:3fd4abcb888d15a94f32b75d8fd18ee162ca0c064f35b11134be77050296d6ba", + "sha256:42de32b22b6b804f42c5d98be4f7e5e977ecdd9ee9b660fda1a3edf03b11792d", + "sha256:504b320cd4b7eff6f968eddf81127112db685e81f7e36e75f9f84f0df46041c3", + "sha256:525808b8019e36eb524b8c68acdd63a37e75714eac50e988180b169d64480a00", + "sha256:56d9f2ecac662ca1611d183feb03a3fa4406469dafe241673d521dd5ae92a155", + "sha256:5bbe06f8eeafd38e5d0a4894ffec89378b6c6a625ff57e3028921f8ff59318ac", + "sha256:65c1a9bcdadc6c28eecee2c119465aebff8f7a584dd719facdd9e825ec61ab52", + "sha256:68e78619a61ecf91e76aa3e6e8e33fc4894a2bebe93410754bd28fce0a8a4f9f", + "sha256:69c0f17e9f5a7afdf2cc9fb2d1ce6aabdb3bafb7f38017c0b77862bcec2bbad8", + "sha256:6b2b56950d93e41f33b4223ead100ea0fe11f8e6ee5f641eb753ce4b77a7042b", + "sha256:787003c0ddb00500e49a10f2844fac87aa6ce977b90b0feaaf9de23c22508b24", + "sha256:7ef3cb2ebbf91e330e3bb937efada0edd9003683db6b57bb108c4001f37a02ea", + "sha256:8023faf4e01efadfa183e863fefde0046de576c6f14659e8782065bcece22198", + "sha256:8758846a7e80910096950b67071243da3e5a20ed2546e6392603c096778d48e0", + "sha256:8afafd99945ead6e075b973fefa56379c5b5c53fd8937dad92c662da5d8fd5ee", + "sha256:8c41976a29d078bb235fea9b2ecd3da465df42a562910f9022f1a03107bd02be", + "sha256:8e254ae696c88d98da6555f5ace2279cf7cd5b3f52be2b5cf97feafe883b58d2", + "sha256:9402b03f1a1b4dc4c19845e5c749e3ab82d5078d16a2a4c2cd2df62d57bb0707", + "sha256:962f82a3086483f5e5f64dbad880d31038b698494799b097bc59c2edf392fce6", + "sha256:9dcdfd0eaf283af041973bff14a2e143b8bd64e069f4c383416ecd79a81aab58", + "sha256:aa7bd130efab1c280bed0f45501b7c8795f9fdbeb02e965371bbef3523627779", + "sha256:ab4a0df41e7c16a1392727727e7998a467472d0ad65f3ad5e6e765015df08636", + "sha256:ad9e82fb8f09ade1c3e1b996a6337afac2b8b9e365f926f5a61aacc71adc5b3c", + "sha256:af598ed32d6ae86f1b747b82783958b1a4ab8f617b06fe68795c7f026abbdcad", + "sha256:b076b6226fb84157e3f7c971a47ff3a679d837cf338547532ab866c57930dbee", + "sha256:b7ff0f54cb4ff66dd38bebd335a38e2c22c41a8ee45aa608efc890ac3e3931bc", + "sha256:bfce63a9e7834b12b87c64d6b155fdd9b3b96191b6bd334bf37db7ff1fe457f2", + "sha256:c011a4149cfbcf9f03994ec2edffcb8b1dc2d2aede7ca243746df97a5d41ce48", + "sha256:c9c804664ebe8f83a211cace637506669e7890fec1b4195b505c214e50dd4eb7", + "sha256:ca379055a47383d02a5400cb0d110cef0a776fc644cda797db0c5696cfd7e18e", + "sha256:cb0932dc158471523c9637e807d9bfb93e06a95cbf010f1a38b98623b929ef2b", + "sha256:cd0f502fe016460680cd20aaa5a76d241d6f35a1c3350c474bac1273803893fa", + "sha256:ceb01949af7121f9fc39f7d27f91be8546f3fb112c608bc4029aef0bab86a2a5", + "sha256:d080e0a5eb2529460b30190fcfcc4199bd7f827663f858a226a81bc27beaa97e", + "sha256:dd15ff04ffd7e05ffcb7fe79f1b98041b8ea30ae9234aed2a9168b5797c3effb", + "sha256:df0be2b576a7abbf737b1575f048c23fb1d769f267ec4358296f31c2479db8f9", + "sha256:e09031c87a1e51556fdcb46e5bd4f59dfb743061cf93c4d6831bf894f125eb57", + "sha256:e4dd52d80b8c83fdce44e12478ad2e85c64ea965e75d66dbeafb0a3e77308fcc", + "sha256:fec21693218efe39aa7f8599346e90c705afa52c5b31ae019b2e57e8f6542bb2" ], "markers": "python_version >= '3.7'", - "version": "==2.1.2" + "version": "==2.1.3" }, "mergedeep": { "hashes": [ @@ -218,27 +226,27 @@ }, "mkdocs": { "hashes": [ - "sha256:8947af423a6d0facf41ea1195b8e1e8c85ad94ac95ae307fe11232e0424b11c5", - "sha256:c8856a832c1e56702577023cd64cc5f84948280c1c0fcc6af4cd39006ea6aa8c" + "sha256:5955093bbd4dd2e9403c5afaf57324ad8b04f16886512a3ee6ef828956481c57", + "sha256:6ee46d309bda331aac915cd24aab882c179a933bd9e77b80ce7d2eaaa3f689dd" ], "index": "pypi", - "version": "==1.4.2" + "version": "==1.4.3" }, "mkdocs-macros-plugin": { "hashes": [ - "sha256:96bdabeb98b96139544f0048ea2f5cb80c7befde6b21e94c6d4596c22774cbcf", - "sha256:9e64e1cabcf6925359de29fe54f62d5847fb455c2528c440b87f8f1240650608" + "sha256:b10cbc1cd4d4a127eea9f8ee3cc57f5aa91b2a1d44e0f02249d7910a43b975a1", + "sha256:d6c545571ce98420232260ceb1533ffabcc39a19f26285ec45ef8fb060f10bda" ], "index": "pypi", - "version": "==0.7.0" + "version": "==1.0.1" }, "mkdocs-material": { "hashes": [ - "sha256:51760fa4c9ee3ca0b3a661ec9f9817ec312961bb84ff19e5b523fdc5256e1d6c", - "sha256:7623608f746c6d9ff68a8ef01f13eddf32fa2cae5e15badb251f26d1196bc8f1" + "sha256:1021bfea20f00a9423530c8c2ae9be3c78b80f5a527b3f822e6de3d872e5ab79", + "sha256:f9e62558a6b01ffac314423cbc223d970c25fbc78999860226245b64e64d6751" ], "index": "pypi", - "version": "==8.5.10" + "version": "==9.1.16" }, "mkdocs-material-extensions": { "hashes": [ @@ -334,12 +342,106 @@ "markers": "python_version >= '3.6'", "version": "==0.1" }, + "regex": { + "hashes": [ + "sha256:0385e73da22363778ef2324950e08b689abdf0b108a7d8decb403ad7f5191938", + "sha256:051da80e6eeb6e239e394ae60704d2b566aa6a7aed6f2890a7967307267a5dc6", + "sha256:05ed27acdf4465c95826962528f9e8d41dbf9b1aa8531a387dee6ed215a3e9ef", + "sha256:0654bca0cdf28a5956c83839162692725159f4cda8d63e0911a2c0dc76166525", + "sha256:09e4a1a6acc39294a36b7338819b10baceb227f7f7dbbea0506d419b5a1dd8af", + "sha256:0b49c764f88a79160fa64f9a7b425620e87c9f46095ef9c9920542ab2495c8bc", + "sha256:0b71e63226e393b534105fcbdd8740410dc6b0854c2bfa39bbda6b0d40e59a54", + "sha256:0c29ca1bd61b16b67be247be87390ef1d1ef702800f91fbd1991f5c4421ebae8", + "sha256:10590510780b7541969287512d1b43f19f965c2ece6c9b1c00fc367b29d8dce7", + "sha256:10cb847aeb1728412c666ab2e2000ba6f174f25b2bdc7292e7dd71b16db07568", + "sha256:12b74fbbf6cbbf9dbce20eb9b5879469e97aeeaa874145517563cca4029db65c", + "sha256:20326216cc2afe69b6e98528160b225d72f85ab080cbdf0b11528cbbaba2248f", + "sha256:2239d95d8e243658b8dbb36b12bd10c33ad6e6933a54d36ff053713f129aa536", + "sha256:25be746a8ec7bc7b082783216de8e9473803706723b3f6bef34b3d0ed03d57e2", + "sha256:271f0bdba3c70b58e6f500b205d10a36fb4b58bd06ac61381b68de66442efddb", + "sha256:29cdd471ebf9e0f2fb3cac165efedc3c58db841d83a518b082077e612d3ee5df", + "sha256:2d44dc13229905ae96dd2ae2dd7cebf824ee92bc52e8cf03dcead37d926da019", + "sha256:3676f1dd082be28b1266c93f618ee07741b704ab7b68501a173ce7d8d0d0ca18", + "sha256:36efeba71c6539d23c4643be88295ce8c82c88bbd7c65e8a24081d2ca123da3f", + "sha256:3e5219bf9e75993d73ab3d25985c857c77e614525fac9ae02b1bebd92f7cecac", + "sha256:43e1dd9d12df9004246bacb79a0e5886b3b6071b32e41f83b0acbf293f820ee8", + "sha256:457b6cce21bee41ac292d6753d5e94dcbc5c9e3e3a834da285b0bde7aa4a11e9", + "sha256:463b6a3ceb5ca952e66550a4532cef94c9a0c80dc156c4cc343041951aec1697", + "sha256:4959e8bcbfda5146477d21c3a8ad81b185cd252f3d0d6e4724a5ef11c012fb06", + "sha256:4d3850beab9f527f06ccc94b446c864059c57651b3f911fddb8d9d3ec1d1b25d", + "sha256:5708089ed5b40a7b2dc561e0c8baa9535b77771b64a8330b684823cfd5116036", + "sha256:5c6b48d0fa50d8f4df3daf451be7f9689c2bde1a52b1225c5926e3f54b6a9ed1", + "sha256:61474f0b41fe1a80e8dfa70f70ea1e047387b7cd01c85ec88fa44f5d7561d787", + "sha256:6343c6928282c1f6a9db41f5fd551662310e8774c0e5ebccb767002fcf663ca9", + "sha256:65ba8603753cec91c71de423a943ba506363b0e5c3fdb913ef8f9caa14b2c7e0", + "sha256:687ea9d78a4b1cf82f8479cab23678aff723108df3edeac098e5b2498879f4a7", + "sha256:6b2675068c8b56f6bfd5a2bda55b8accbb96c02fd563704732fd1c95e2083461", + "sha256:7117d10690c38a622e54c432dfbbd3cbd92f09401d622902c32f6d377e2300ee", + "sha256:7178bbc1b2ec40eaca599d13c092079bf529679bf0371c602edaa555e10b41c3", + "sha256:72d1a25bf36d2050ceb35b517afe13864865268dfb45910e2e17a84be6cbfeb0", + "sha256:742e19a90d9bb2f4a6cf2862b8b06dea5e09b96c9f2df1779e53432d7275331f", + "sha256:74390d18c75054947e4194019077e243c06fbb62e541d8817a0fa822ea310c14", + "sha256:74419d2b50ecb98360cfaa2974da8689cb3b45b9deff0dcf489c0d333bcc1477", + "sha256:824bf3ac11001849aec3fa1d69abcb67aac3e150a933963fb12bda5151fe1bfd", + "sha256:83320a09188e0e6c39088355d423aa9d056ad57a0b6c6381b300ec1a04ec3d16", + "sha256:837328d14cde912af625d5f303ec29f7e28cdab588674897baafaf505341f2fc", + "sha256:841d6e0e5663d4c7b4c8099c9997be748677d46cbf43f9f471150e560791f7ff", + "sha256:87b2a5bb5e78ee0ad1de71c664d6eb536dc3947a46a69182a90f4410f5e3f7dd", + "sha256:890e5a11c97cf0d0c550eb661b937a1e45431ffa79803b942a057c4fb12a2da2", + "sha256:8abbc5d54ea0ee80e37fef009e3cec5dafd722ed3c829126253d3e22f3846f1e", + "sha256:8e3f1316c2293e5469f8f09dc2d76efb6c3982d3da91ba95061a7e69489a14ef", + "sha256:8f56fcb7ff7bf7404becdfc60b1e81a6d0561807051fd2f1860b0d0348156a07", + "sha256:9427a399501818a7564f8c90eced1e9e20709ece36be701f394ada99890ea4b3", + "sha256:976d7a304b59ede34ca2921305b57356694f9e6879db323fd90a80f865d355a3", + "sha256:9a5bfb3004f2144a084a16ce19ca56b8ac46e6fd0651f54269fc9e230edb5e4a", + "sha256:9beb322958aaca059f34975b0df135181f2e5d7a13b84d3e0e45434749cb20f7", + "sha256:9edcbad1f8a407e450fbac88d89e04e0b99a08473f666a3f3de0fd292badb6aa", + "sha256:9edce5281f965cf135e19840f4d93d55b3835122aa76ccacfd389e880ba4cf82", + "sha256:a4c3b7fa4cdaa69268748665a1a6ff70c014d39bb69c50fda64b396c9116cf77", + "sha256:a8105e9af3b029f243ab11ad47c19b566482c150c754e4c717900a798806b222", + "sha256:a99b50300df5add73d307cf66abea093304a07eb017bce94f01e795090dea87c", + "sha256:aad51907d74fc183033ad796dd4c2e080d1adcc4fd3c0fd4fd499f30c03011cd", + "sha256:af4dd387354dc83a3bff67127a124c21116feb0d2ef536805c454721c5d7993d", + "sha256:b28f5024a3a041009eb4c333863d7894d191215b39576535c6734cd88b0fcb68", + "sha256:b4598b1897837067a57b08147a68ac026c1e73b31ef6e36deeeb1fa60b2933c9", + "sha256:b6192d5af2ccd2a38877bfef086d35e6659566a335b1492786ff254c168b1693", + "sha256:b862c2b9d5ae38a68b92e215b93f98d4c5e9454fa36aae4450f61dd33ff48487", + "sha256:b956231ebdc45f5b7a2e1f90f66a12be9610ce775fe1b1d50414aac1e9206c06", + "sha256:bb60b503ec8a6e4e3e03a681072fa3a5adcbfa5479fa2d898ae2b4a8e24c4591", + "sha256:bbb02fd4462f37060122e5acacec78e49c0fbb303c30dd49c7f493cf21fc5b27", + "sha256:bdff5eab10e59cf26bc479f565e25ed71a7d041d1ded04ccf9aee1d9f208487a", + "sha256:c123f662be8ec5ab4ea72ea300359023a5d1df095b7ead76fedcd8babbedf969", + "sha256:c2b867c17a7a7ae44c43ebbeb1b5ff406b3e8d5b3e14662683e5e66e6cc868d3", + "sha256:c5f8037000eb21e4823aa485149f2299eb589f8d1fe4b448036d230c3f4e68e0", + "sha256:c6a57b742133830eec44d9b2290daf5cbe0a2f1d6acee1b3c7b1c7b2f3606df7", + "sha256:ccf91346b7bd20c790310c4147eee6ed495a54ddb6737162a36ce9dbef3e4751", + "sha256:cf67ca618b4fd34aee78740bea954d7c69fdda419eb208c2c0c7060bb822d747", + "sha256:d2da3abc88711bce7557412310dfa50327d5769a31d1c894b58eb256459dc289", + "sha256:d4f03bb71d482f979bda92e1427f3ec9b220e62a7dd337af0aa6b47bf4498f72", + "sha256:d54af539295392611e7efbe94e827311eb8b29668e2b3f4cadcfe6f46df9c777", + "sha256:d77f09bc4b55d4bf7cc5eba785d87001d6757b7c9eec237fe2af57aba1a071d9", + "sha256:d831c2f8ff278179705ca59f7e8524069c1a989e716a1874d6d1aab6119d91d1", + "sha256:dbbbfce33cd98f97f6bffb17801b0576e653f4fdb1d399b2ea89638bc8d08ae1", + "sha256:dcba6dae7de533c876255317c11f3abe4907ba7d9aa15d13e3d9710d4315ec0e", + "sha256:e0bb18053dfcfed432cc3ac632b5e5e5c5b7e55fb3f8090e867bfd9b054dbcbf", + "sha256:e2fbd6236aae3b7f9d514312cdb58e6494ee1c76a9948adde6eba33eb1c4264f", + "sha256:e5087a3c59eef624a4591ef9eaa6e9a8d8a94c779dade95d27c0bc24650261cd", + "sha256:e8915cc96abeb8983cea1df3c939e3c6e1ac778340c17732eb63bb96247b91d2", + "sha256:ea353ecb6ab5f7e7d2f4372b1e779796ebd7b37352d290096978fea83c4dba0c", + "sha256:ee2d1a9a253b1729bb2de27d41f696ae893507c7db224436abe83ee25356f5c1", + "sha256:f415f802fbcafed5dcc694c13b1292f07fe0befdb94aa8a52905bd115ff41e88", + "sha256:fb5ec16523dc573a4b277663a2b5a364e2099902d3944c9419a40ebd56a118f9", + "sha256:fea75c3710d4f31389eed3c02f62d0b66a9da282521075061ce875eb5300cf23" + ], + "markers": "python_version >= '3.6'", + "version": "==2023.6.3" + }, "requests": { "hashes": [ "sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f", "sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1" ], - "index": "pypi", + "markers": "python_version >= '3.7'", "version": "==2.31.0" }, "six": { @@ -360,11 +462,11 @@ }, "urllib3": { "hashes": [ - "sha256:61717a1095d7e155cdb737ac7bb2f4324a858a1e2e6466f6d03ff630ca68d3cc", - "sha256:d055c2f9d38dc53c808f6fdc8eab7360b6fdbbde02340ed25cfbcd817c62469e" + "sha256:48e7fafa40319d358848e1bc6809b208340fafe2096f1725d05d67443d0483d1", + "sha256:bee28b5e56addb8226c96f7f13ac28cb4c301dd5ea8a6ca179c0b9835e032825" ], "markers": "python_version >= '3.7'", - "version": "==2.0.2" + "version": "==2.0.3" }, "verspec": { "hashes": [ diff --git a/documentation/docs/guides/multi-split.md b/documentation/docs/guides/multi-split.md new file mode 100644 index 000000000..f17278008 --- /dev/null +++ b/documentation/docs/guides/multi-split.md @@ -0,0 +1,52 @@ +--- +tags: +- guide +- intermediate +--- + +# Splitting a Multi into several Multi + +It is possible to split a `Multi` into several `Multi` streams. + +## Using the split operator + +Suppose that we have a stream of strings that represent _signals_, and that we want a `Multi` for each kind of signal: + +- `?foo`, `?bar` are _input_ signals, +- `!foo`, `!bar` are _output_ signals, +- `foo`, `bar` are _other_ signals. + +To do that, we need a function that maps each item of the stream to its target stream. +The splitter API needs a Java enumeration to define keys, as in: + +```java linenums="1" +{{ insert('java/guides/operators/SplitTest.java', 'enum') }} +``` + +Now we can use the `split` operator that provides a splitter object, and fetch individual `Multi` for each split stream using the `get` method: + +```java linenums="1" +{{ insert('java/guides/operators/SplitTest.java', 'splits') }} +``` + +This prints the following console output: + +``` +output - a +input - b +output - c +output - d +other - 123 +input - e +``` + +## Notes on using splits + +- Items flow when all splits have a subscriber. +- The flow stops when either of the subscribers cancels, or when any subscriber has a no outstanding demand. +- The flow resumes when all splits have a subscriber again, and when all subscribers have outstanding demand. +- Only one subscriber can be active for a given split. Other subscription attempts will receive an error. +- When a subscriber cancels, then a new subscription attempt on its corresponding split can succeed. +- Subscribing to an already completed or errored split results in receiving the terminal signal (`onComplete()` or `onFailure(err)`). +- The upstream `Multi` gets subscribed to when the first split subscription happens, no matter which split it is. +- The first split subscription passes its context, if any, to the upstream `Multi`. It is expected that all split subscribers share the same context object, or the behavior of your code will most likely be incorrect. diff --git a/documentation/mkdocs.yml b/documentation/mkdocs.yml index 121b6aa91..957d104c9 100644 --- a/documentation/mkdocs.yml +++ b/documentation/mkdocs.yml @@ -49,6 +49,7 @@ nav: - 'guides/context-passing.md' - 'guides/replaying-multis.md' - 'guides/controlling-demand.md' + - 'guides/multi-split.md' - 'Reference': - 'reference/migrating-to-mutiny-2.md' - 'reference/why-is-asynchronous-important.md' diff --git a/documentation/src/test/java/guides/operators/SplitTest.java b/documentation/src/test/java/guides/operators/SplitTest.java new file mode 100644 index 000000000..f810c08b5 --- /dev/null +++ b/documentation/src/test/java/guides/operators/SplitTest.java @@ -0,0 +1,45 @@ +package guides.operators; + +import io.smallrye.mutiny.Multi; +import org.junit.jupiter.api.Test; + +public class SplitTest { + + // + enum Signals { + INPUT, + OUTPUT, + OTHER + } + // + + @Test + public void splitDemo() { + // + Multi multi = Multi.createFrom().items( + "!a", "?b", "!c", "!d", "123", "?e" + ); + + var splitter = multi.split(Signals.class, s -> { + if (s.startsWith("?")) { + return Signals.INPUT; + } else if (s.startsWith("!")) { + return Signals.OUTPUT; + } else { + return Signals.OTHER; + } + }); + + splitter.get(Signals.INPUT) + .onItem().transform(s -> s.substring(1)) + .subscribe().with(signal -> System.out.println("input - " + signal)); + + splitter.get(Signals.OUTPUT) + .onItem().transform(s -> s.substring(1)) + .subscribe().with(signal -> System.out.println("output - " + signal)); + + splitter.get(Signals.OTHER) + .subscribe().with(signal -> System.out.println("other - " + signal)); + // + } +} diff --git a/implementation/pom.xml b/implementation/pom.xml index 75ae8fb11..f6f959297 100644 --- a/implementation/pom.xml +++ b/implementation/pom.xml @@ -108,6 +108,7 @@ io/smallrye/mutiny/infrastructure/*.java io/smallrye/mutiny/operators/*.java io/smallrye/mutiny/operators/multi/processors/*.java + io/smallrye/mutiny/operators/multi/split/*.java io/smallrye/mutiny/subscription/*.java io/smallrye/mutiny/tuples/*.java io/smallrye/mutiny/unchecked/*.java diff --git a/implementation/src/main/java/io/smallrye/mutiny/Multi.java b/implementation/src/main/java/io/smallrye/mutiny/Multi.java index 113f9faf9..b8dabc884 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/Multi.java +++ b/implementation/src/main/java/io/smallrye/mutiny/Multi.java @@ -10,8 +10,10 @@ import java.util.function.*; import io.smallrye.common.annotation.CheckReturnValue; +import io.smallrye.common.annotation.Experimental; import io.smallrye.mutiny.groups.*; import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.operators.multi.split.MultiSplitter; public interface Multi extends Publisher { @@ -637,4 +639,33 @@ default Multi capDemandsTo(long max) { */ @CheckReturnValue Multi capDemandsUsing(LongFunction function); + + /** + * Splits this {@link Multi} into several co-operating {@link Multi} based on an enumeration and a mapping function. + *

+ * Here is a sample where a stream of integers is split into streams for odd and even numbers: + * + *

+     * {@code
+     * // Split someMulti into 2 streams
+     * var splitter = someMulti.split(OddEven.class, n -> (n % 2 == 0) ? OddEven.EVEN : OddEven.ODD);
+     *
+     * // Stream for odd numbers
+     * vor odd = splitter.get(OddEven.ODD).subscribe().with(...);
+     *
+     * // Stream for even numbers
+     * vor even = splitter.get(OddEven.EVEN).subscribe().with(...);
+     * }
+     * 
+ * + * @param keyType the key type + * @param splitter the splitter function + * @return a splitter + * @param the key type + */ + @CheckReturnValue + @Experimental("Multi splitting is an experimental API in Mutiny 2.3.0") + default > MultiSplitter split(Class keyType, Function splitter) { + return new MultiSplitter<>(this, keyType, splitter); + } } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java index 7021c1736..6afbf2214 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java @@ -179,5 +179,4 @@ public MultiDemandPacing paceDemand() { public Multi capDemandsUsing(LongFunction function) { return Infrastructure.onMultiCreation(new MultiDemandCapping<>(this, nonNull(function, "function"))); } - } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java new file mode 100644 index 000000000..8b2b47bfd --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java @@ -0,0 +1,261 @@ +package io.smallrye.mutiny.operators.multi.split; + +import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import io.smallrye.common.annotation.CheckReturnValue; +import io.smallrye.mutiny.Context; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.operators.AbstractMulti; +import io.smallrye.mutiny.subscription.ContextSupport; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +/** + * Splits a {@link Multi} into several co-operating {@link Multi}. + *

+ * Each split {@link Multi} receives items based on a function that maps each item to a key from an enumeration. + *

+ * The demand of each split {@link Multi} is independent. + * Items flow when all keys from the enumeration have a split subscriber, and until either one of the split has a {@code 0} + * demand, + * or when one of the split subscriber cancels. + * The flow resumes when all keys have a subscriber again, and when the demand for each split is strictly positive. + *

+ * Calls to {@link #get(Enum)} result in new {@link Multi} objects, but given a key {@code K} then there can be only one + * active subscription. If there is already a subscriber for {@code K} then any subscription request to a {@link Multi} for key + * {@code K} results in a terminal failure. + * Note that when a subscriber for {@code K} has cancelled then a request to subscribe for a {@link Multi} for {@code K} can + * succeed. + *

+ * If the upstream {@link Multi} has already completed or failed, then any new subscriber will receive the terminal signal + * (see {@link MultiSubscriber#onCompletion()} and {@link MultiSubscriber#onFailure(Throwable)}). + *

+ * Note on {@link Context} support: it is assumed that all split subscribers share the same {@link Context} instance, if any. + * The {@link Context} is passed to the upstream {@link Multi} when the first split subscription happens. + * When disjoint {@link Context} are in use by the different split subscribers then the behavior of your code will be most + * likely incorrect. + * + * @param the items type + * @param the enumeration type + */ +public class MultiSplitter> { + + private final Multi upstream; + private final Function splitter; + private final ConcurrentHashMap splits; + private final int requiredNumberOfSubscribers; + + public MultiSplitter(Multi upstream, Class keyType, Function splitter) { + this.upstream = nonNull(upstream, "upstream"); + if (!nonNull(keyType, "keyType").isEnum()) { + // Note: the Java compiler enforces a type check on keyType being some enum, so this branch is only here for added peace of mind + throw new IllegalArgumentException("The key type must be that of an enumeration"); + } + this.splitter = nonNull(splitter, "splitter"); + this.splits = new ConcurrentHashMap<>(); + this.requiredNumberOfSubscribers = keyType.getEnumConstants().length; + } + + /** + * Get a {@link Multi} for a given key. + * + * @param key the key + * @return a new {@link Multi} + */ + @CheckReturnValue + public Multi get(K key) { + return Infrastructure.onMultiCreation(new SplitMulti(key)); + } + + private enum State { + INIT, + AWAITING_SUBSCRIPTION, + SUBSCRIBED, + COMPLETED, + FAILED + } + + private final AtomicReference state = new AtomicReference<>(State.INIT); + + private volatile Throwable terminalFailure; + + private Flow.Subscription upstreamSubscription; + + private void onSplitRequest() { + if (state.get() != State.SUBSCRIBED || splits.size() < requiredNumberOfSubscribers) { + return; + } + for (SplitMulti.Split split : splits.values()) { + if (split.demand.get() == 0L) { + return; + } + } + upstreamSubscription.request(1L); + } + + private void onUpstreamFailure() { + for (SplitMulti.Split split : splits.values()) { + split.downstream.onFailure(terminalFailure); + } + splits.clear(); + } + + private void onUpstreamCompletion() { + for (SplitMulti.Split split : splits.values()) { + split.downstream.onCompletion(); + } + splits.clear(); + } + + private void onUpstreamItem(T item) { + try { + K key = splitter.apply(item); + if (key == null) { + throw new NullPointerException("The splitter function returned null"); + } + // Note: if the target subscriber was removed between the last upstream demand and now, it is simply discarded + SplitMulti.Split target = splits.get(key); + if (target != null) { + target.downstream.onItem(item); + if (splits.size() == requiredNumberOfSubscribers + && (target.demand.get() == Long.MAX_VALUE || target.demand.decrementAndGet() > 0L)) { + upstreamSubscription.request(1L); + } + } + } catch (Throwable err) { + terminalFailure = err; + state.set(State.FAILED); + onUpstreamFailure(); + } + } + + // Note: we need a subscriber class because another onCompletion definition exists in Multi + private class Forwarder implements MultiSubscriber, ContextSupport { + + private final Context context; + + private Forwarder(MultiSubscriber firstSubscriber) { + if (firstSubscriber instanceof ContextSupport) { + context = ((ContextSupport) firstSubscriber).context(); + } else { + context = Context.empty(); + } + } + + @Override + public void onItem(T item) { + if (state.get() != State.SUBSCRIBED) { + return; + } + onUpstreamItem(item); + } + + @Override + public void onFailure(Throwable failure) { + if (state.compareAndSet(State.SUBSCRIBED, State.FAILED)) { + terminalFailure = failure; + onUpstreamFailure(); + } + } + + @Override + public void onCompletion() { + if (state.compareAndSet(State.SUBSCRIBED, State.COMPLETED)) { + onUpstreamCompletion(); + } + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + if (state.get() != State.AWAITING_SUBSCRIPTION) { + subscription.cancel(); + } else { + upstreamSubscription = subscription; + state.set(State.SUBSCRIBED); + // In case all splits would be subscribed... + onSplitRequest(); + } + } + + @Override + public Context context() { + return context; + } + } + + private class SplitMulti extends AbstractMulti { + + private final K key; + + private SplitMulti(K key) { + this.key = key; + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + nonNull(subscriber, "subscriber"); + + // First subscription triggers upstream subscription + if (state.compareAndSet(State.INIT, State.AWAITING_SUBSCRIPTION)) { + // Assumption: all split subscribers share the same context, if any + upstream.subscribe().withSubscriber(new Forwarder(subscriber)); + } + + // Early exits + State stateWhenSubscribing = state.get(); + if (stateWhenSubscribing == State.FAILED) { + subscriber.onSubscribe(Subscriptions.CANCELLED); + subscriber.onFailure(terminalFailure); + return; + } + if (stateWhenSubscribing == State.COMPLETED) { + subscriber.onSubscribe(Subscriptions.CANCELLED); + subscriber.onCompletion(); + return; + } + + // Regular subscription path + Split split = new Split(subscriber); + Split previous = splits.putIfAbsent(key, split); + if (previous == null) { + subscriber.onSubscribe(split); + } else { + subscriber.onSubscribe(Subscriptions.CANCELLED); + subscriber.onError(new IllegalStateException("There is already a subscriber for key " + key)); + } + } + + private class Split implements Flow.Subscription { + + MultiSubscriber downstream; + AtomicLong demand = new AtomicLong(); + + private Split(MultiSubscriber subscriber) { + this.downstream = subscriber; + } + + @Override + public void request(long n) { + if (n <= 0) { + cancel(); + downstream.onError(Subscriptions.getInvalidRequestException()); + return; + } + Subscriptions.add(demand, n); + onSplitRequest(); + } + + @Override + public void cancel() { + splits.remove(key); + } + } + } +} diff --git a/implementation/src/main/java/module-info.java b/implementation/src/main/java/module-info.java index 8df86c907..0494bd048 100644 --- a/implementation/src/main/java/module-info.java +++ b/implementation/src/main/java/module-info.java @@ -13,6 +13,7 @@ exports io.smallrye.mutiny.infrastructure; exports io.smallrye.mutiny.operators; exports io.smallrye.mutiny.operators.multi.processors; + exports io.smallrye.mutiny.operators.multi.split; exports io.smallrye.mutiny.subscription; exports io.smallrye.mutiny.tuples; exports io.smallrye.mutiny.unchecked; diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java new file mode 100644 index 000000000..2aab64b3d --- /dev/null +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java @@ -0,0 +1,237 @@ +package io.smallrye.mutiny.operators.multi.split; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Context; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; + +class MultiSplitterTest { + + enum OddEven { + ODD, + EVEN + } + + MultiSplitter evenOddSplitter() { + return Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .split(OddEven.class, n -> (n % 2 == 0) ? OddEven.EVEN : OddEven.ODD); + } + + @Test + void rejectNullKeyType() { + var err = assertThrows(IllegalArgumentException.class, + () -> Multi.createFrom().nothing().split(null, null)); + assertThat(err.getMessage()).contains("keyType"); + } + + @Test + void rejectNullSplitter() { + var err = assertThrows(IllegalArgumentException.class, + () -> Multi.createFrom().nothing().split(OddEven.class, null)); + assertThat(err.getMessage()).contains("splitter"); + } + + @Test + void rejectNegativeDemand() { + var sub = evenOddSplitter().get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create()); + sub.request(-10); + sub.assertFailedWith(IllegalArgumentException.class, "must be greater than 0"); + } + + @Test + void checkBasicBehavior() { + var splitter = evenOddSplitter(); + var odd = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create()); + var even = splitter.get(OddEven.EVEN) + .subscribe().withSubscriber(AssertSubscriber.create()); + + odd.assertHasNotReceivedAnyItem(); + even.assertHasNotReceivedAnyItem(); + + odd.request(2L); + + odd.assertHasNotReceivedAnyItem(); + even.assertHasNotReceivedAnyItem(); + + even.request(1L); + + odd.assertItems(1); + even.assertItems(2); + + even.request(1L); + + odd.assertItems(1, 3); + even.assertItems(2); + + odd.request(1L); + + odd.assertItems(1, 3); + even.assertItems(2, 4); + + even.request(1L); + + odd.assertItems(1, 3, 5); + even.assertItems(2, 4); + + odd.request(Long.MAX_VALUE); + + odd.assertItems(1, 3, 5); + even.assertItems(2, 4, 6); + + even.request(Long.MAX_VALUE); + + odd.assertItems(1, 3, 5, 7, 9); + even.assertItems(2, 4, 6, 8, 10); + + odd.assertCompleted(); + even.assertCompleted(); + + var afterWork = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + afterWork.assertHasNotReceivedAnyItem().hasCompleted(); + } + + @Test + void checkPauseOnCancellation() { + var splitter = evenOddSplitter(); + var odd = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create()); + var even = splitter.get(OddEven.EVEN) + .subscribe().withSubscriber(AssertSubscriber.create()); + + odd.request(2L); + even.request(2L); + + odd.assertItems(1, 3); + even.assertItems(2); + + even.cancel(); + odd.request(2L); + + odd.assertItems(1, 3); + + even = splitter.get(OddEven.EVEN) + .subscribe().withSubscriber(AssertSubscriber.create()); + even.request(2L); + + odd.assertItems(1, 3, 5); + even.assertItems(4, 6); + } + + @Test + void boundedDemandPrevailsOverUnboundedDemand() { + var splitter = evenOddSplitter(); + var odd = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create()); + var even = splitter.get(OddEven.EVEN) + .subscribe().withSubscriber(AssertSubscriber.create()); + + odd.request(Long.MAX_VALUE); + even.request(2L); + + odd.assertItems(1, 3); + even.assertItems(2, 4); + } + + @Test + void rejectSubscriptionWhenAlreadyActive() { + var splitter = evenOddSplitter(); + var ok = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + var failing = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + ok.assertHasNotReceivedAnyItem().assertNotTerminated(); + failing.assertFailedWith(IllegalStateException.class, "There is already a subscriber for key ODD"); + } + + @Test + void nullReturningSplitter() { + var splitter = Multi.createFrom().items(1, 2, 3) + .split(OddEven.class, n -> null); + var odd = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create()); + var even = splitter.get(OddEven.EVEN) + .subscribe().withSubscriber(AssertSubscriber.create()); + + odd.request(Long.MAX_VALUE); + even.request(Long.MAX_VALUE); + + odd.assertFailedWith(NullPointerException.class, "The splitter function returned null"); + even.assertFailedWith(NullPointerException.class, "The splitter function returned null"); + + var afterWork = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + afterWork.assertFailedWith(NullPointerException.class, "The splitter function returned null"); + } + + @Test + void throwingSplitter() { + var splitter = Multi.createFrom().items(1, 2, 3) + .split(OddEven.class, n -> { + throw new RuntimeException("boom"); + }); + var odd = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create()); + var even = splitter.get(OddEven.EVEN) + .subscribe().withSubscriber(AssertSubscriber.create()); + + odd.request(Long.MAX_VALUE); + even.request(Long.MAX_VALUE); + + odd.assertFailedWith(RuntimeException.class, "boom"); + even.assertFailedWith(RuntimeException.class, "boom"); + + var afterWork = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + afterWork.assertFailedWith(RuntimeException.class, "boom"); + } + + @Test + void failedMultiSubscriptions() { + var splitter = Multi.createFrom().items(1, 2, 3) + .onCompletion().switchTo(Multi.createFrom().failure(new RuntimeException("boom"))) + .split(OddEven.class, n -> (n % 2 == 0) ? OddEven.EVEN : OddEven.ODD); + + var odd = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + var even = splitter.get(OddEven.EVEN) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + odd.assertFailedWith(RuntimeException.class, "boom") + .assertItems(1, 3); + + even.assertFailedWith(RuntimeException.class, "boom") + .assertItems(2); + + var afterWork = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + afterWork.assertHasNotReceivedAnyItem() + .assertFailedWith(RuntimeException.class, "boom"); + } + + @Test + void contextPassing() { + var splitter = Multi.createFrom().context(ctx -> Multi.createFrom().iterable(ctx.> get("items"))) + .split(OddEven.class, n -> (n % 2 == 0) ? OddEven.EVEN : OddEven.ODD); + + var ctx = Context.of("items", List.of(1, 2, 3, 4, 5, 6)); + + var odd = splitter.get(OddEven.ODD) + .subscribe().withSubscriber(AssertSubscriber.create(ctx, Long.MAX_VALUE)); + + var even = splitter.get(OddEven.EVEN) + .subscribe().withSubscriber(AssertSubscriber.create(ctx, Long.MAX_VALUE)); + + odd.assertCompleted().assertItems(1, 3, 5); + even.assertCompleted().assertItems(2, 4, 6); + } +} diff --git a/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSplitTckTest.java b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSplitTckTest.java new file mode 100644 index 000000000..06cf9e236 --- /dev/null +++ b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSplitTckTest.java @@ -0,0 +1,24 @@ +package io.smallrye.mutiny.tcktests; + +import java.util.concurrent.Flow; + +public class MultiSplitTckTest extends AbstractPublisherTck { + + enum Anything { + AnyValue + } + + @Override + public Flow.Publisher createFlowPublisher(long elements) { + return upstream(elements) + .split(Anything.class, n -> Anything.AnyValue) + .get(Anything.AnyValue); + } + + @Override + public Flow.Publisher createFailedFlowPublisher() { + return failedUpstream() + .split(Anything.class, n -> Anything.AnyValue) + .get(Anything.AnyValue); + } +} From f307084391cba0aab4d9804294f0849ee2ee009c Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 3 Jul 2023 10:39:36 +0200 Subject: [PATCH 2/4] Expose the split key --- .../operators/multi/split/MultiSplitter.java | 24 +++++++++++-------- .../operators/multi/split/SplitMulti.java | 19 +++++++++++++++ .../multi/split/MultiSplitterTest.java | 11 +++++++++ 3 files changed, 44 insertions(+), 10 deletions(-) create mode 100644 implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/SplitMulti.java diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java index 8b2b47bfd..b405464b4 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java @@ -12,7 +12,6 @@ import io.smallrye.mutiny.Context; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.helpers.Subscriptions; -import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.AbstractMulti; import io.smallrye.mutiny.subscription.ContextSupport; import io.smallrye.mutiny.subscription.MultiSubscriber; @@ -49,7 +48,7 @@ public class MultiSplitter> { private final Multi upstream; private final Function splitter; - private final ConcurrentHashMap splits; + private final ConcurrentHashMap splits; private final int requiredNumberOfSubscribers; public MultiSplitter(Multi upstream, Class keyType, Function splitter) { @@ -70,8 +69,8 @@ public MultiSplitter(Multi upstream, Class keyType, Function get(K key) { - return Infrastructure.onMultiCreation(new SplitMulti(key)); + public SplitMulti get(K key) { + return new SplitMultiImpl(key); } private enum State { @@ -92,7 +91,7 @@ private void onSplitRequest() { if (state.get() != State.SUBSCRIBED || splits.size() < requiredNumberOfSubscribers) { return; } - for (SplitMulti.Split split : splits.values()) { + for (SplitMultiImpl.Split split : splits.values()) { if (split.demand.get() == 0L) { return; } @@ -101,14 +100,14 @@ private void onSplitRequest() { } private void onUpstreamFailure() { - for (SplitMulti.Split split : splits.values()) { + for (SplitMultiImpl.Split split : splits.values()) { split.downstream.onFailure(terminalFailure); } splits.clear(); } private void onUpstreamCompletion() { - for (SplitMulti.Split split : splits.values()) { + for (SplitMultiImpl.Split split : splits.values()) { split.downstream.onCompletion(); } splits.clear(); @@ -121,7 +120,7 @@ private void onUpstreamItem(T item) { throw new NullPointerException("The splitter function returned null"); } // Note: if the target subscriber was removed between the last upstream demand and now, it is simply discarded - SplitMulti.Split target = splits.get(key); + SplitMultiImpl.Split target = splits.get(key); if (target != null) { target.downstream.onItem(item); if (splits.size() == requiredNumberOfSubscribers @@ -190,14 +189,19 @@ public Context context() { } } - private class SplitMulti extends AbstractMulti { + private class SplitMultiImpl extends AbstractMulti implements SplitMulti { private final K key; - private SplitMulti(K key) { + private SplitMultiImpl(K key) { this.key = key; } + @Override + public K splitKey() { + return key; + } + @Override public void subscribe(MultiSubscriber subscriber) { nonNull(subscriber, "subscriber"); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/SplitMulti.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/SplitMulti.java new file mode 100644 index 000000000..6f86f2760 --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/SplitMulti.java @@ -0,0 +1,19 @@ +package io.smallrye.mutiny.operators.multi.split; + +import io.smallrye.mutiny.Multi; + +/** + * A {@link Multi} that corresponds to {@link MultiSplitter} key. + * + * @param the element types + * @param the key + */ +public interface SplitMulti> extends Multi { + + /** + * Get the corresponding split key. + * + * @return the key + */ + K splitKey(); +} diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java index 2aab64b3d..44c6ce0e5 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java @@ -99,6 +99,17 @@ void checkBasicBehavior() { afterWork.assertHasNotReceivedAnyItem().hasCompleted(); } + @Test + void checkSplitKey() { + var splitter = evenOddSplitter(); + + var odd = splitter.get(OddEven.ODD); + assertThat(odd.splitKey()).isEqualTo(OddEven.ODD); + + var even = splitter.get(OddEven.EVEN); + assertThat(even.splitKey()).isEqualTo(OddEven.EVEN); + } + @Test void checkPauseOnCancellation() { var splitter = evenOddSplitter(); From 8c653fedcc50c5b5164839912f4387bb3daa5b89 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 3 Jul 2023 17:07:47 +0200 Subject: [PATCH 3/4] Revert "Expose the split key" This reverts commit f307084391cba0aab4d9804294f0849ee2ee009c. --- .../operators/multi/split/MultiSplitter.java | 24 ++++++++----------- .../operators/multi/split/SplitMulti.java | 19 --------------- .../multi/split/MultiSplitterTest.java | 11 --------- 3 files changed, 10 insertions(+), 44 deletions(-) delete mode 100644 implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/SplitMulti.java diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java index b405464b4..8b2b47bfd 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java @@ -12,6 +12,7 @@ import io.smallrye.mutiny.Context; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.AbstractMulti; import io.smallrye.mutiny.subscription.ContextSupport; import io.smallrye.mutiny.subscription.MultiSubscriber; @@ -48,7 +49,7 @@ public class MultiSplitter> { private final Multi upstream; private final Function splitter; - private final ConcurrentHashMap splits; + private final ConcurrentHashMap splits; private final int requiredNumberOfSubscribers; public MultiSplitter(Multi upstream, Class keyType, Function splitter) { @@ -69,8 +70,8 @@ public MultiSplitter(Multi upstream, Class keyType, Function get(K key) { - return new SplitMultiImpl(key); + public Multi get(K key) { + return Infrastructure.onMultiCreation(new SplitMulti(key)); } private enum State { @@ -91,7 +92,7 @@ private void onSplitRequest() { if (state.get() != State.SUBSCRIBED || splits.size() < requiredNumberOfSubscribers) { return; } - for (SplitMultiImpl.Split split : splits.values()) { + for (SplitMulti.Split split : splits.values()) { if (split.demand.get() == 0L) { return; } @@ -100,14 +101,14 @@ private void onSplitRequest() { } private void onUpstreamFailure() { - for (SplitMultiImpl.Split split : splits.values()) { + for (SplitMulti.Split split : splits.values()) { split.downstream.onFailure(terminalFailure); } splits.clear(); } private void onUpstreamCompletion() { - for (SplitMultiImpl.Split split : splits.values()) { + for (SplitMulti.Split split : splits.values()) { split.downstream.onCompletion(); } splits.clear(); @@ -120,7 +121,7 @@ private void onUpstreamItem(T item) { throw new NullPointerException("The splitter function returned null"); } // Note: if the target subscriber was removed between the last upstream demand and now, it is simply discarded - SplitMultiImpl.Split target = splits.get(key); + SplitMulti.Split target = splits.get(key); if (target != null) { target.downstream.onItem(item); if (splits.size() == requiredNumberOfSubscribers @@ -189,19 +190,14 @@ public Context context() { } } - private class SplitMultiImpl extends AbstractMulti implements SplitMulti { + private class SplitMulti extends AbstractMulti { private final K key; - private SplitMultiImpl(K key) { + private SplitMulti(K key) { this.key = key; } - @Override - public K splitKey() { - return key; - } - @Override public void subscribe(MultiSubscriber subscriber) { nonNull(subscriber, "subscriber"); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/SplitMulti.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/SplitMulti.java deleted file mode 100644 index 6f86f2760..000000000 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/SplitMulti.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.smallrye.mutiny.operators.multi.split; - -import io.smallrye.mutiny.Multi; - -/** - * A {@link Multi} that corresponds to {@link MultiSplitter} key. - * - * @param the element types - * @param the key - */ -public interface SplitMulti> extends Multi { - - /** - * Get the corresponding split key. - * - * @return the key - */ - K splitKey(); -} diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java index 44c6ce0e5..2aab64b3d 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java @@ -99,17 +99,6 @@ void checkBasicBehavior() { afterWork.assertHasNotReceivedAnyItem().hasCompleted(); } - @Test - void checkSplitKey() { - var splitter = evenOddSplitter(); - - var odd = splitter.get(OddEven.ODD); - assertThat(odd.splitKey()).isEqualTo(OddEven.ODD); - - var even = splitter.get(OddEven.EVEN); - assertThat(even.splitKey()).isEqualTo(OddEven.EVEN); - } - @Test void checkPauseOnCancellation() { var splitter = evenOddSplitter(); From 7f4f3981cbfac7e534fd9f7fe263ef4b72cb4488 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 3 Jul 2023 17:11:13 +0200 Subject: [PATCH 4/4] Expose the splitter key type --- .../mutiny/operators/multi/split/MultiSplitter.java | 11 +++++++++++ .../operators/multi/split/MultiSplitterTest.java | 2 ++ 2 files changed, 13 insertions(+) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java index 8b2b47bfd..ac4783245 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java @@ -51,6 +51,7 @@ public class MultiSplitter> { private final Function splitter; private final ConcurrentHashMap splits; private final int requiredNumberOfSubscribers; + private final Class keyType; public MultiSplitter(Multi upstream, Class keyType, Function splitter) { this.upstream = nonNull(upstream, "upstream"); @@ -58,6 +59,7 @@ public MultiSplitter(Multi upstream, Class keyType, Function(); this.requiredNumberOfSubscribers = keyType.getEnumConstants().length; @@ -74,6 +76,15 @@ public Multi get(K key) { return Infrastructure.onMultiCreation(new SplitMulti(key)); } + /** + * Get the (enum) key type. + * + * @return the key type + */ + public Class keyType() { + return keyType; + } + private enum State { INIT, AWAITING_SUBSCRIPTION, diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java index 2aab64b3d..124e75cdf 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java @@ -48,6 +48,8 @@ void rejectNegativeDemand() { @Test void checkBasicBehavior() { var splitter = evenOddSplitter(); + assertThat(splitter.keyType()).isEqualTo(OddEven.class); + var odd = splitter.get(OddEven.ODD) .subscribe().withSubscriber(AssertSubscriber.create()); var even = splitter.get(OddEven.EVEN)