diff --git a/buildSrc/src/main/kotlin/io.github.nstdio.http.ext.test-conventions.gradle.kts b/buildSrc/src/main/kotlin/io.github.nstdio.http.ext.test-conventions.gradle.kts index 3a92922..5ae7ccd 100644 --- a/buildSrc/src/main/kotlin/io.github.nstdio.http.ext.test-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/io.github.nstdio.http.ext.test-conventions.gradle.kts @@ -81,6 +81,7 @@ val brotli4JVersion = "1.8.0" val brotliOrgVersion = "0.1.2" val gsonVersion = "2.9.1" val equalsverifierVersion = "3.10.1" +val coroutinesVersion = "1.6.4" val jsonLibs = mapOf( "jackson" to "com.fasterxml.jackson.core", @@ -97,6 +98,8 @@ val spiDeps = listOf( dependencies { spiDeps.forEach { compileOnly(it) } + testImplementation(platform("org.jetbrains.kotlinx:kotlinx-coroutines-bom:$coroutinesVersion")) + /** AssertJ & Friends */ testImplementation("org.assertj:assertj-core:$assertJVersion") testImplementation("io.kotest:kotest-assertions-core:$kotestAssertionsVersion") @@ -116,6 +119,9 @@ dependencies { testImplementation("nl.jqno.equalsverifier:equalsverifier:$equalsverifierVersion") testImplementation("com.tngtech.archunit:archunit-junit5:1.0.0-rc1") + /** Kotlin Coroutines */ + testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core") + spiDeps.forEach { spiTestImplementation(it) } spiTestImplementation("com.aayushatharva.brotli4j:native-${getArch()}:$brotli4JVersion") } diff --git a/src/main/java/io/github/nstdio/http/ext/BodyPublishers.java b/src/main/java/io/github/nstdio/http/ext/BodyPublishers.java new file mode 100644 index 0000000..09c66f6 --- /dev/null +++ b/src/main/java/io/github/nstdio/http/ext/BodyPublishers.java @@ -0,0 +1,131 @@ +/* + * Copyright (C) 2022 Edgar Asatryan + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.nstdio.http.ext; + +import io.github.nstdio.http.ext.spi.JsonMappingProvider; + +import java.io.ByteArrayOutputStream; +import java.net.http.HttpRequest.BodyPublisher; +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; + +/** + * Implementations of various useful {@link BodyPublisher}s. + */ +public final class BodyPublishers { + private BodyPublishers() { + } + + /** + * Returns a request body publisher whose body is JSON representation of {@code body}. The conversion will be done + * using {@code JsonMappingProvider} default provider retrieved using {@link JsonMappingProvider#provider()}. + * + * @param body The body. + * + * @return a BodyPublisher + */ + public static BodyPublisher ofJson(Object body) { + return ofJson(body, JsonMappingProvider.provider()); + } + + /** + * Returns a request body publisher whose body is JSON representation of {@code body}. The conversion will be done + * using {@code jsonProvider}. + * + * @param body The body. + * @param jsonProvider The JSON mapping provider to use when creating JSON presentation of {@code body}. + * + * @return a BodyPublisher + */ + public static BodyPublisher ofJson(Object body, JsonMappingProvider jsonProvider) { + return ofJson(body, jsonProvider, null); + } + + /** + * Returns a request body publisher whose body is JSON representation of {@code body}. The conversion will be done + * using {@code JsonMappingProvider} default provider retrieved using {@link JsonMappingProvider#provider()}. + * + * @param body The body. + * @param executor The scheduler to use to publish body to subscriber. If {@code null} the * + * {@link ForkJoinPool#commonPool()} will be used. + * + * @return a BodyPublisher + */ + public static BodyPublisher ofJson(Object body, Executor executor) { + return ofJson(body, JsonMappingProvider.provider(), executor); + } + + /** + * Returns a request body publisher whose body is JSON representation of {@code body}. The conversion will be done * + * using {@code jsonProvider}. + * + * @param body The body. + * @param jsonProvider The JSON mapping provider to use when creating JSON presentation of {@code body}. + * @param executor The scheduler to use to publish body to subscriber. If {@code null} the + * {@link ForkJoinPool#commonPool()} will be used. + * + * @return a BodyPublisher + */ + public static BodyPublisher ofJson(Object body, JsonMappingProvider jsonProvider, Executor executor) { + return new JsonPublisher(body, jsonProvider, Optional.ofNullable(executor).orElseGet(ForkJoinPool::commonPool)); + } + + /** + * The {@code BodyPublisher} that converts objects to JSON. + */ + static final class JsonPublisher implements BodyPublisher { + private final Object body; + private final JsonMappingProvider provider; + private final Executor executor; + + JsonPublisher(Object body, JsonMappingProvider provider, Executor executor) { + this.body = body; + this.provider = provider; + this.executor = executor; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + var subscription = ByteArraySubscription.ofByteBuffer(subscriber, bytesSupplier(), executor); + + subscriber.onSubscribe(subscription); + } + + private Supplier bytesSupplier() { + return () -> { + var os = new ByteArrayOutputStream(); + try { + provider.get().write(body, os); + } catch (Throwable e) { + Throwables.sneakyThrow(e); + } + + return os.toByteArray(); + }; + } + + @Override + public long contentLength() { + return -1; + } + } + +} diff --git a/src/main/java/io/github/nstdio/http/ext/ByteArraySubscription.java b/src/main/java/io/github/nstdio/http/ext/ByteArraySubscription.java index eae10ea..fc61351 100644 --- a/src/main/java/io/github/nstdio/http/ext/ByteArraySubscription.java +++ b/src/main/java/io/github/nstdio/http/ext/ByteArraySubscription.java @@ -18,42 +18,84 @@ import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; + +class ByteArraySubscription implements Subscription { + private final Subscriber subscriber; + private final Executor executor; + + private final Function mapper; + private final Supplier bytes; -class ByteArraySubscription implements Subscription { - private final Subscriber> subscriber; private final AtomicBoolean completed = new AtomicBoolean(false); - private final byte[] bytes; + private Future result; - ByteArraySubscription(Subscriber> subscriber, byte[] bytes) { + ByteArraySubscription(Subscriber subscriber, Executor executor, Supplier bytes, Function mapper) { this.subscriber = subscriber; + this.executor = executor; this.bytes = bytes; + this.mapper = mapper; + } + + static ByteArraySubscription> ofByteBufferList(Subscriber> subscriber, byte[] bytes) { + return new ByteArraySubscription<>(subscriber, DirectExecutor.INSTANCE, () -> bytes, o -> List.of(ByteBuffer.wrap(o).asReadOnlyBuffer())); + } + + static ByteArraySubscription ofByteBuffer(Subscriber subscriber, Supplier bytes, Executor executor) { + return new ByteArraySubscription<>(subscriber, executor, bytes, ByteBuffer::wrap); } @Override public void request(long n) { - if (completed.get()) { - return; - } + if (!completed.getAndSet(true)) { + if (n > 0) { + submit(() -> { + try { + T item = mapper.apply(bytes.get()); - if (n <= 0) { - subscriber.onError(new IllegalArgumentException("n <= 0")); - return; + subscriber.onNext(item); + subscriber.onComplete(); + } catch (Throwable th) { + subscriber.onError(th); + } + }); + } else { + var e = new IllegalArgumentException("n <= 0"); + submit(() -> subscriber.onError(e)); + } } + } + @Override + public void cancel() { completed.set(true); - ByteBuffer buffer = ByteBuffer.wrap(bytes).asReadOnlyBuffer(); - List item = List.of(buffer); + if (result != null) { + result.cancel(false); + } + } - subscriber.onNext(item); - subscriber.onComplete(); + private void submit(Runnable r) { + if (executor instanceof ExecutorService) { + result = ((ExecutorService) executor).submit(r); + } else { + executor.execute(r); + } } - @Override - public void cancel() { - completed.set(true); + private enum DirectExecutor implements Executor { + INSTANCE; + + @Override + public void execute(Runnable command) { + command.run(); + } } } diff --git a/src/main/java/io/github/nstdio/http/ext/ContentTypeInterceptor.java b/src/main/java/io/github/nstdio/http/ext/ContentTypeInterceptor.java new file mode 100644 index 0000000..75009f2 --- /dev/null +++ b/src/main/java/io/github/nstdio/http/ext/ContentTypeInterceptor.java @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2022 Edgar Asatryan + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.nstdio.http.ext; + +import io.github.nstdio.http.ext.BodyPublishers.JsonPublisher; + +import java.net.http.HttpRequest.BodyPublisher; +import java.util.Map; +import java.util.Optional; + +import static io.github.nstdio.http.ext.Headers.HEADER_CONTENT_TYPE; +import static java.util.function.Predicate.not; + +class ContentTypeInterceptor implements Interceptor { + private final Interceptor headersAdding; + + ContentTypeInterceptor(String contentType) { + headersAdding = new HeadersAddingInterceptor(Map.of(HEADER_CONTENT_TYPE, contentType)); + } + + @Override + public Chain intercept(Chain in) { + if (!isJsonPublisher(in.request().bodyPublisher())) { + return in; + } + + return headersAdding.intercept(in); + } + + private static boolean isJsonPublisher(Optional bodyPublisher) { + return bodyPublisher.filter(not(JsonPublisher.class::isInstance)).isEmpty(); + } +} diff --git a/src/main/java/io/github/nstdio/http/ext/ExtendedHttpClient.java b/src/main/java/io/github/nstdio/http/ext/ExtendedHttpClient.java index 5954ae9..5191f89 100644 --- a/src/main/java/io/github/nstdio/http/ext/ExtendedHttpClient.java +++ b/src/main/java/io/github/nstdio/http/ext/ExtendedHttpClient.java @@ -47,6 +47,7 @@ public class ExtendedHttpClient extends HttpClient { private final CompressionInterceptor compressionInterceptor; private final CachingInterceptor cachingInterceptor; private final HeadersAddingInterceptor headersAddingInterceptor; + private final ContentTypeInterceptor contentTypeInterceptor; private final HttpClient delegate; private final boolean allowInsecure; @@ -68,6 +69,7 @@ private ExtendedHttpClient(CompressionInterceptor compressionInterceptor, this.compressionInterceptor = compressionInterceptor; this.cachingInterceptor = cachingInterceptor; this.headersAddingInterceptor = headersAddingInterceptor; + this.contentTypeInterceptor = new ContentTypeInterceptor("application/json"); this.delegate = delegate; this.allowInsecure = allowInsecure; } @@ -188,6 +190,7 @@ private Chain buildAndExecute(RequestContext ctx) { chain = possiblyApply(compressionInterceptor, chain); chain = possiblyApply(cachingInterceptor, chain); chain = possiblyApply(headersAddingInterceptor, chain); + chain = possiblyApply(contentTypeInterceptor, chain); return chain; } diff --git a/src/main/java/io/github/nstdio/http/ext/Headers.java b/src/main/java/io/github/nstdio/http/ext/Headers.java index 957fa17..ad76047 100644 --- a/src/main/java/io/github/nstdio/http/ext/Headers.java +++ b/src/main/java/io/github/nstdio/http/ext/Headers.java @@ -41,6 +41,7 @@ class Headers { static final String HEADER_VARY = "Vary"; static final String HEADER_CONTENT_ENCODING = "Content-Encoding"; static final String HEADER_CONTENT_LENGTH = "Content-Length"; + static final String HEADER_CONTENT_TYPE = "Content-Type"; static final String HEADER_IF_MODIFIED_SINCE = "If-Modified-Since"; static final String HEADER_IF_NONE_MATCH = "If-None-Match"; static final String HEADER_CACHE_CONTROL = "Cache-Control"; diff --git a/src/main/java/io/github/nstdio/http/ext/HeadersAddingInterceptor.java b/src/main/java/io/github/nstdio/http/ext/HeadersAddingInterceptor.java index c3d90bc..2457d57 100644 --- a/src/main/java/io/github/nstdio/http/ext/HeadersAddingInterceptor.java +++ b/src/main/java/io/github/nstdio/http/ext/HeadersAddingInterceptor.java @@ -30,6 +30,10 @@ class HeadersAddingInterceptor implements Interceptor { this.headers = headers; this.resolvableHeaders = resolvableHeaders; } + + HeadersAddingInterceptor(Map headers) { + this(headers, Map.of()); + } @Override public Chain intercept(Chain in) { diff --git a/src/main/java/io/github/nstdio/http/ext/InMemoryCache.java b/src/main/java/io/github/nstdio/http/ext/InMemoryCache.java index 9f427f7..fdc9e9d 100644 --- a/src/main/java/io/github/nstdio/http/ext/InMemoryCache.java +++ b/src/main/java/io/github/nstdio/http/ext/InMemoryCache.java @@ -89,7 +89,7 @@ public long bodySize() { @Override public void subscribeTo(Flow.Subscriber> sub) { - Flow.Subscription subscription = new ByteArraySubscription(sub, body); + Flow.Subscription subscription = ByteArraySubscription.ofByteBufferList(sub, body); sub.onSubscribe(subscription); } diff --git a/src/main/java/io/github/nstdio/http/ext/spi/GsonJsonMapping.java b/src/main/java/io/github/nstdio/http/ext/spi/GsonJsonMapping.java index 1c20dc0..b57e893 100644 --- a/src/main/java/io/github/nstdio/http/ext/spi/GsonJsonMapping.java +++ b/src/main/java/io/github/nstdio/http/ext/spi/GsonJsonMapping.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.lang.reflect.Type; import java.util.Objects; @@ -67,4 +69,11 @@ public T read(byte[] bytes, Class targetType) throws IOException { public T read(byte[] bytes, Type targetType) throws IOException { return read(new ByteArrayInputStream(bytes), targetType); } + + @Override + public void write(Object o, OutputStream os) throws IOException { + try (var writer = new OutputStreamWriter(os, UTF_8)) { + gson.toJson(o, writer); + } + } } diff --git a/src/main/java/io/github/nstdio/http/ext/spi/JacksonJsonMapping.java b/src/main/java/io/github/nstdio/http/ext/spi/JacksonJsonMapping.java index 27bda6a..0e5e98b 100644 --- a/src/main/java/io/github/nstdio/http/ext/spi/JacksonJsonMapping.java +++ b/src/main/java/io/github/nstdio/http/ext/spi/JacksonJsonMapping.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.lang.reflect.Type; public class JacksonJsonMapping implements JsonMapping { @@ -59,6 +60,10 @@ public T read(byte[] bytes, Type targetType) throws IOException { return mapper.readValue(bytes, constructType(targetType)); } + @Override + public void write(Object o, OutputStream os) throws IOException { + mapper.writeValue(os, o); + } private JavaType constructType(Type targetType) { return mapper.constructType(targetType); diff --git a/src/main/java/io/github/nstdio/http/ext/spi/JsonMapping.java b/src/main/java/io/github/nstdio/http/ext/spi/JsonMapping.java index d8a3320..9d94804 100644 --- a/src/main/java/io/github/nstdio/http/ext/spi/JsonMapping.java +++ b/src/main/java/io/github/nstdio/http/ext/spi/JsonMapping.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.lang.reflect.Type; /** @@ -77,4 +78,16 @@ public interface JsonMapping { * @throws IOException When there is a JSON parsing or binding error or I/O error occurred. */ T read(byte[] bytes, Type targetType) throws IOException; + + /** + * Writes JSON representation of {@code o} object to {@code os} stream. + * + * @param o The object to write. + * @param os The output stream. + * + * @throws IOException When I/O error occurred. + */ + default void write(Object o, OutputStream os) throws IOException { + throw new RuntimeException("not supported!"); + } } diff --git a/src/spiTest/kotlin/io/github/nstdio/http/ext/BodyPublishersTest.kt b/src/spiTest/kotlin/io/github/nstdio/http/ext/BodyPublishersTest.kt new file mode 100644 index 0000000..d4de2af --- /dev/null +++ b/src/spiTest/kotlin/io/github/nstdio/http/ext/BodyPublishersTest.kt @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2022 Edgar Asatryan + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.nstdio.http.ext + +import io.github.nstdio.http.ext.spi.Classpath +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import mockwebserver3.MockResponse +import mockwebserver3.MockWebServer +import mockwebserver3.junit5.internal.MockWebServerExtension +import org.junit.jupiter.api.Assumptions.assumeTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import java.net.http.HttpRequest +import java.net.http.HttpResponse.BodyHandlers.discarding +import java.nio.charset.StandardCharsets.UTF_8 + +@ExtendWith(MockWebServerExtension::class) +class BodyPublishersTest(private val mockWebServer: MockWebServer) { + @Nested + internal inner class OfJsonTest { + @BeforeEach + fun setUp() { + assumeTrue { ALL_JSON.any { Classpath.isPresent(it) } } + } + + @Test + fun `Should publish body as JSON`() { + //given + val client = ExtendedHttpClient.newHttpClient() + val body = mapOf("a" to 1, "b" to 2) + val request = HttpRequest.newBuilder(mockWebServer.url("/test").toUri()) + .POST(BodyPublishers.ofJson(body)) + .build() + + mockWebServer.enqueue(MockResponse().setResponseCode(200)) + + //when + client.send(request, discarding()) + + //then + val actual = mockWebServer.takeRequest() + + actual.body.readString(UTF_8) + .shouldBe("{\"a\":1,\"b\":2}") + + actual.headers["Content-Type"] + .shouldNotBeNull() + .shouldBe("application/json") + } + } + +} diff --git a/src/spiTest/kotlin/io/github/nstdio/http/ext/spi/JsonMappingContract.kt b/src/spiTest/kotlin/io/github/nstdio/http/ext/spi/JsonMappingContract.kt index 8ba2094..2cc6c5f 100644 --- a/src/spiTest/kotlin/io/github/nstdio/http/ext/spi/JsonMappingContract.kt +++ b/src/spiTest/kotlin/io/github/nstdio/http/ext/spi/JsonMappingContract.kt @@ -16,6 +16,7 @@ package io.github.nstdio.http.ext.spi import com.tngtech.archunit.thirdparty.com.google.common.reflect.TypeToken +import io.kotest.assertions.json.shouldBeJsonObject import io.kotest.assertions.throwables.shouldThrow import io.kotest.matchers.nulls.shouldNotBeNull import org.assertj.core.api.Assertions.assertThat @@ -24,8 +25,10 @@ import org.mockito.Mockito.atLeastOnce import org.mockito.Mockito.spy import org.mockito.Mockito.verify import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream import java.io.IOException import java.io.InputStream +import java.nio.charset.StandardCharsets.UTF_8 internal interface JsonMappingContract { fun get(): JsonMapping @@ -112,6 +115,20 @@ internal interface JsonMappingContract { .containsEntry("b", 2) } + @Test + fun `Should write object as JSON`() { + //given + val obj = mapOf("a" to 1, "b" to 2) + val mapping = get() + val out = ByteArrayOutputStream() + + //when + mapping.write(obj, out) + + //then + out.toString(UTF_8).shouldBeJsonObject() + } + open class TestInputStream(private val inputStream: InputStream) : InputStream() { override fun read(): Int = inputStream.read() override fun close() = inputStream.close() diff --git a/src/test/kotlin/io/github/nstdio/http/ext/ByteArraySubscriptionTest.kt b/src/test/kotlin/io/github/nstdio/http/ext/ByteArraySubscriptionTest.kt index 3254d63..032cf0c 100644 --- a/src/test/kotlin/io/github/nstdio/http/ext/ByteArraySubscriptionTest.kt +++ b/src/test/kotlin/io/github/nstdio/http/ext/ByteArraySubscriptionTest.kt @@ -15,36 +15,49 @@ */ package io.github.nstdio.http.ext +import io.kotest.assertions.timing.eventually import io.kotest.property.Arb import io.kotest.property.arbitrary.next import io.kotest.property.arbitrary.string +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith import org.mockito.ArgumentMatchers +import org.mockito.ArgumentMatchers.any +import org.mockito.BDDMockito.given import org.mockito.Mock -import org.mockito.Mockito.times +import org.mockito.Mockito.inOrder +import org.mockito.Mockito.mock import org.mockito.Mockito.verify import org.mockito.Mockito.verifyNoInteractions import org.mockito.Mockito.verifyNoMoreInteractions import org.mockito.junit.jupiter.MockitoExtension +import java.io.IOException import java.net.http.HttpResponse.BodySubscriber +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets +import java.util.concurrent.ExecutorService +import java.util.concurrent.ForkJoinPool +import java.util.concurrent.Future +import kotlin.time.Duration.Companion.seconds @ExtendWith(MockitoExtension::class) internal class ByteArraySubscriptionTest { @Mock - lateinit var mockSubscriber: BodySubscriber + lateinit var mockSubscriber: BodySubscriber> @Test fun shouldInvokeOnCompleteAfterFirstRequest() { //given val bytes = Arb.string(10, 20).next().toByteArray(StandardCharsets.UTF_8) - val subscription = ByteArraySubscription(mockSubscriber, bytes) + val subscription = ByteArraySubscription.ofByteBufferList(mockSubscriber, bytes) //when - for (i in 0..63) { - subscription.request(1) - } + runAsyncAwait { subscription.request(1) } //then verify(mockSubscriber).onNext(ArgumentMatchers.anyList()) @@ -56,32 +69,103 @@ internal class ByteArraySubscriptionTest { fun shouldReportErrorWhenRequestedIsNegative() { //given val bytes = Arb.string(10, 20).next().toByteArray(StandardCharsets.UTF_8) - val subscription = ByteArraySubscription(mockSubscriber, bytes) + val subscription = ByteArraySubscription.ofByteBufferList(mockSubscriber, bytes) //when subscription.request(0) subscription.request(-1) //then - verify(mockSubscriber, times(2)).onError( - ArgumentMatchers.any( + verify(mockSubscriber).onError( + any( IllegalArgumentException::class.java ) ) + verifyNoMoreInteractions(mockSubscriber) } @Test fun shouldNotInvokeSubscriberWhenCanceled() { //given - val subscription = ByteArraySubscription(mockSubscriber, ByteArray(0)) + val subscription = ByteArraySubscription.ofByteBufferList(mockSubscriber, ByteArray(0)) //when subscription.cancel() - for (i in 0..63) { + + runAsyncAwait { subscription.request(1) } //then verifyNoInteractions(mockSubscriber) } -} \ No newline at end of file + + @Test + fun `Should invoke subscribers onError`() { + //given + val exc = IOException("hey!") + val subscription = ByteArraySubscription(mockSubscriber, { it.run() }, { throw exc }, { listOf() }) + + //when + subscription.request(1) + + //then + verify(mockSubscriber).onError(exc) + verifyNoMoreInteractions(mockSubscriber) + } + + @Test + fun `Should eventually submit data to subscriber`() { + //given + val executor = ForkJoinPool.commonPool() + val bytes = Arb.byteArray(8).next() + val item = listOf(bytes.toBuffer()) + val subscription = ByteArraySubscription(mockSubscriber, executor, { bytes }, { item }) + + //when + subscription.request(1) + + //then + runBlocking { + eventually(10.seconds) { + val inOrder = inOrder(mockSubscriber) + inOrder.verify(mockSubscriber).onNext(item) + inOrder.verify(mockSubscriber).onComplete() + inOrder.verifyNoMoreInteractions() + + true + } + } + } + + @Test + fun `Should cancel result`() { + //given + val mockExecutor = mock(ExecutorService::class.java) + val mockFuture = mock(Future::class.java) + + given(mockExecutor.submit(any(Runnable::class.java))) + .willReturn(mockFuture) + + val bytes = Arb.byteArray(8).next() + val item = listOf(bytes.toBuffer()) + val subscription = ByteArraySubscription(mockSubscriber, mockExecutor, { bytes }, { item }) + + //when + subscription.request(1) + subscription.cancel() + + //then + verify(mockFuture).cancel(false) + } + + private fun runAsyncAwait(block: suspend CoroutineScope.() -> Unit) { + runBlocking { + coroutineScope { + (0..64).map { + async(block = block) + }.awaitAll() + } + } + } +} diff --git a/src/test/kotlin/io/github/nstdio/http/ext/archunit/VisibilityTest.kt b/src/test/kotlin/io/github/nstdio/http/ext/archunit/VisibilityTest.kt index cbc30f6..8875b01 100644 --- a/src/test/kotlin/io/github/nstdio/http/ext/archunit/VisibilityTest.kt +++ b/src/test/kotlin/io/github/nstdio/http/ext/archunit/VisibilityTest.kt @@ -27,6 +27,7 @@ import com.tngtech.archunit.lang.conditions.ArchPredicates.are import com.tngtech.archunit.lang.syntax.ArchRuleDefinition import io.github.nstdio.http.ext.BodyHandlers import io.github.nstdio.http.ext.BodyHandlers.DecompressingBodyHandlerBuilder +import io.github.nstdio.http.ext.BodyPublishers import io.github.nstdio.http.ext.BodySubscribers import io.github.nstdio.http.ext.Cache import io.github.nstdio.http.ext.Cache.DiskCacheBuilder @@ -60,6 +61,7 @@ internal object VisibilityTest { .and(not(DecompressingBodyHandlerBuilder::class.java)) .and(not(BodyHandlers::class.java)) .and(not(BodySubscribers::class.java)) + .and(not(BodyPublishers::class.java)) .and(not(Cache.CacheBuilder::class.java)) .and(not(Cache.CacheEntry::class.java)) .and(not(InMemoryCacheBuilder::class.java))