diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java index ddbec8b2c8b..f960c8fc6bf 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java @@ -71,7 +71,7 @@ public CompletableFuture buildWebSocketDirect(StandardWebSock request.headers().entrySet().stream() .forEach(e -> e.getValue().stream().forEach(v -> options.addHeader(e.getKey(), v))); - options.setAbsoluteURI(request.uri().toString()); + options.setAbsoluteURI(WebSocket.toWebSocketUri(request.uri()).toString()); CompletableFuture response = new CompletableFuture<>(); diff --git a/junit/mockwebserver/pom.xml b/junit/mockwebserver/pom.xml index 74ee6e0afa2..1b598652e1d 100644 --- a/junit/mockwebserver/pom.xml +++ b/junit/mockwebserver/pom.xml @@ -43,6 +43,17 @@ io.fabric8 zjsonpatch + + io.vertx + vertx-web-client + test + + + io.vertx + vertx-uri-template + ${vertx.version} + test + org.spockframework spock-core diff --git a/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerCrudTest.groovy b/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerCrudTest.groovy index 6e0840dec3b..2cc28a99290 100644 --- a/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerCrudTest.groovy +++ b/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerCrudTest.groovy @@ -15,25 +15,23 @@ */ package io.fabric8.mockwebserver -import com.fasterxml.jackson.databind.ObjectMapper import io.fabric8.mockwebserver.crud.CrudDispatcher -import okhttp3.MediaType -import okhttp3.OkHttpClient -import okhttp3.Request -import okhttp3.RequestBody +import io.vertx.core.Future +import io.vertx.core.Vertx +import io.vertx.ext.web.client.WebClient import okhttp3.mockwebserver.MockWebServer import spock.lang.Shared import spock.lang.Specification +import spock.util.concurrent.AsyncConditions class DefaultMockServerCrudTest extends Specification { - DefaultMockServer server - @Shared - def client = new OkHttpClient() - + static def vertx = Vertx.vertx() @Shared - def mapper = new ObjectMapper() + static def client = WebClient.create(vertx) + + DefaultMockServer server def setup() { server = new DefaultMockServer(new Context(), new MockWebServer(), new HashMap<>(), @@ -45,98 +43,143 @@ class DefaultMockServerCrudTest extends Specification { server.shutdown() } - def "get /, with empty store, should return 404"() { - when: - def result = client.newCall(new Request.Builder().url(server.url("/")).build()).execute() - - then: - assert result.code() == 404 - assert result.body().string() == "" + def cleanupSpec() { + client.close() + vertx.close() } - def "get /, with one item, should return item"() { - given: - client.newCall(new Request.Builder().url(server.url("/")).post( - RequestBody.create(MediaType.parse("application/json"), - mapper.writeValueAsString(new User(1L, "user", true)))).build()). - execute() - - when: - def result = client.newCall(new Request.Builder().url(server.url("/")).build()).execute() + def "GET /, with empty store, should return 404"() { + given: "An HTTP request to /" + def request = client.get(server.port, server.getHostName(), "/") + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + request.send().onComplete { res -> + async.evaluate { + assert res.result().statusCode() == 404 + assert res.result().body() == null + } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } - then: - assert result.code() == 200 - assert result.body().string() == "{\"id\":1,\"username\":\"user\",\"enabled\":true}" + def "POST /, with one item, should return item"() { + given: "An HTTP request to /" + def request = client.post(server.port, server.getHostName(), "/") + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent with one JSON item and completed" + request.sendJson(new User(1L, "user", true)).onComplete { res -> + async.evaluate { + assert res.result().statusCode() == 202 + assert res.result().body().toString() == "{\"id\":1,\"username\":\"user\",\"enabled\":true}" + } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) } - def "get /, with multiple items, should return array"() { - given: - client.newCall(new Request.Builder().url(server.url("/")).post( - RequestBody.create(MediaType.parse("application/json"), - mapper.writeValueAsString(new User(1L, "user", true)))).build()). - execute() - client.newCall(new Request.Builder().url(server.url("/")).post( - RequestBody.create(MediaType.parse("application/json"), - mapper.writeValueAsString(new User(2L, "user-2", true)))).build()). - execute() - - when: - def result = client.newCall(new Request.Builder().url(server.url("/")).build()).execute() - - then: - assert result.code() == 200 - assert result.body().string() == - "[{\"id\":1,\"username\":\"user\",\"enabled\":true},{\"id\":2,\"username\":\"user-2\",\"enabled\":true}]" + def "GET /, with multiple items, should return array"() { + given: "An HTTP request to /" + def request = client.get(server.port, server.getHostName(), "/") + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + and: "Items in the server" + def itemsInServer = client.post(server.port, server.getHostName(), "/") + .sendJson(new User(1L, "user", true)) + .compose { _ -> + client.post(server.port, server.getHostName(), "/") + .sendJson(new User(2L, "user-2", true)) + } + + when: "The request is sent and completed" + itemsInServer.onComplete {isr -> + request.send().onComplete { res -> + async.evaluate { assert res.result().statusCode() == 200 + assert res.result().body().toString() == "[{\"id\":1,\"username\":\"user\",\"enabled\":true},{\"id\":2,\"username\":\"user-2\",\"enabled\":true}]" + } + } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) } - def "get /1, with existent item, should return item"() { - given: - client.newCall(new Request.Builder().url(server.url("/")).post( - RequestBody.create(MediaType.parse("application/json"), - mapper.writeValueAsString(new User(1L, "user", true)))).build()). - execute() - client.newCall(new Request.Builder().url(server.url("/")).post( - RequestBody.create(MediaType.parse("application/json"), - mapper.writeValueAsString(new User(2L, "user-2", true)))).build()). - execute() - - when: - def result = client.newCall(new Request.Builder().url(server.url("/1")).build()).execute() - - then: - assert result.code() == 200 - assert result.body().string() == "{\"id\":1,\"username\":\"user\",\"enabled\":true}" + def "GET /1, with existent item, should return item"() { + given: "An HTTP request to /1" + def request = client.get(server.port, server.getHostName(), "/1") + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + and: "Items in the server" + def itemsInServer = Future.all( + client.post(server.port, server.getHostName(), "/") + .sendJson(new User(1L, "user", true)), + client.post(server.port, server.getHostName(), "/") + .sendJson(new User(2L, "user-2", true)) + ) + + when: "The request is sent and completed" + itemsInServer.onComplete {isr -> + request.send().onComplete { res -> + async.evaluate { + assert res.result().statusCode() == 200 + assert res.result().body().toString() == "{\"id\":1,\"username\":\"user\",\"enabled\":true}" + } + } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) } - def "put /1, with missing item, should create item"() { - when: - def result = client.newCall(new Request.Builder().url(server.url("/1")).put( - RequestBody.create(MediaType.parse("application/json"), - mapper.writeValueAsString(new User(1L, "user-replaced", true)))).build()). - execute() + def "PUT /1, with missing item, should create item"() { + given: "An HTTP request to /1" + def request = client.put(server.port, server.getHostName(), "/1") + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + + when: "The request is sent with one JSON item and completed" + request.sendJson(new User(1L, "user-replaced", true)).onComplete { res -> + async.evaluate { + assert res.result().statusCode() == 201 + assert res.result().body().toString() == "{\"id\":1,\"username\":\"user-replaced\",\"enabled\":true}" + } + } - then: - assert result.code() == 201 - assert result.body().string() == "{\"id\":1,\"username\":\"user-replaced\",\"enabled\":true}" + then: "Expect the result to be completed in the specified time" + async.await(10) } - def "put /1, with existent item, should replace item"() { - given: - client.newCall(new Request.Builder().url(server.url("/")).post( - RequestBody.create(MediaType.parse("application/json"), - mapper.writeValueAsString(new User(1L, "user", true)))).build()). - execute() - - when: - def result = client.newCall(new Request.Builder().url(server.url("/1")).put( - RequestBody.create(MediaType.parse("application/json"), - mapper.writeValueAsString(new User(1L, "user-replaced", true)))).build()). - execute() - - then: - assert result.code() == 202 - assert result.body().string() == "{\"id\":1,\"username\":\"user-replaced\",\"enabled\":true}" - def item = client.newCall(new Request.Builder().url(server.url("/1")).build()).execute() - assert item.body().string() == "{\"id\":1,\"username\":\"user-replaced\",\"enabled\":true}" + def "PUT /1, with existent item, should replace item"() { + given: "An HTTP request to /1" + def request = client.put(server.port, server.getHostName(), "/1") + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + and: "Items in the server" + def itemsInServer = Future.all( + client.post(server.port, server.getHostName(), "/") + .sendJson(new User(1L, "user", true)), + client.post(server.port, server.getHostName(), "/") + .sendJson(new User(2L, "user-2", true)) + ) + + when: "The request is sent with one JSON item and completed" + itemsInServer.onComplete { isr -> + request.sendJson(new User(1L, "user-replaced", true)).onComplete { res -> + async.evaluate { + assert res.result().statusCode() == 202 + assert res.result().body().toString() == "{\"id\":1,\"username\":\"user-replaced\",\"enabled\":true}" + } + } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) } } diff --git a/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerTest.groovy b/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerTest.groovy index 93adec35972..bc064504b17 100644 --- a/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerTest.groovy +++ b/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerTest.groovy @@ -15,654 +15,647 @@ */ package io.fabric8.mockwebserver +import io.fabric8.mockwebserver.internal.WebSocketMessage import io.fabric8.mockwebserver.utils.ResponseProvider -import okhttp3.* +import io.vertx.core.Future +import io.vertx.core.Vertx +import io.vertx.core.http.WebSocketClient +import io.vertx.ext.web.client.WebClient +import okhttp3.Headers import okhttp3.mockwebserver.RecordedRequest -import okio.ByteString import spock.lang.Shared import spock.lang.Specification +import spock.util.concurrent.AsyncConditions import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.CountDownLatch +import java.util.concurrent.CompletableFuture import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.AtomicInteger class DefaultMockServerTest extends Specification { - DefaultMockServer server - - @Shared - OkHttpClient client = new OkHttpClient() - - def setup() { - server = new DefaultMockServer() - server.start() - } - - def cleanup() { - server.shutdown() - } - - def "getPort, should return a valid port"() { - when: - def result = server.getPort() - - then: - assert result > 0 - assert result <= 65535 - } - - def "getHostName, should return a valid host name"() { - when: - def result = server.getHostName() - - then: - assert !result.isBlank() - } - - def "toProxy, should return Proxy with the current HostName and Port"() { - when: - def result = server.toProxyAddress() - - then: - assert result.address() instanceof InetSocketAddress - assert ((InetSocketAddress)result.address()).getPort() == server.getPort() - assert ((InetSocketAddress)result.address()).getHostName() == server.getHostName() - } - - def "getRequestCount, with no requests, should return 0"() { - when: - def result = server.getRequestCount() - - then: - assert result == 0 - } - - def "getRequestCount, with multiple, should return valid request count"() { - given: - client.newCall(new Request.Builder().url(server.url("/")).get().build()).execute() - client.newCall(new Request.Builder().url(server.url("/one")).get().build()).execute() - client.newCall(new Request.Builder().url(server.url("/two")).get().build()).execute() - - when: - def result = server.getRequestCount() - - then: - assert result == 3 - } - - def "getLastRequest, with no requests, should return null"() { - when: - def result = server.getLastRequest() - - then: - assert result == null - } - - def "getLastRequest, with one request, should return the request"() { - given: - client.newCall(new Request.Builder().url(server.url("/")).get().build()).execute() - - when: - def result = server.getLastRequest() - - then: - assert result.getPath() == "/" - } - - def "getLastRequest, with one request, can be invoked multiple times"() { - given: - client.newCall(new Request.Builder().url(server.url("/")).get().build()).execute() + @Shared + static def vertx = Vertx.vertx() + + DefaultMockServer server + + WebClient client + + WebSocketClient wsClient + + def setup() { + server = new DefaultMockServer() + server.start() + client = WebClient.create(vertx) + wsClient = vertx.createWebSocketClient() + } + + def cleanup() { + server.shutdown() + client.close() + wsClient.close() + } + + def cleanupSpec() { + vertx.close() + } + + + def "getPort, should return a valid port"() { + when: + def result = server.getPort() + + then: + assert result > 0 + assert result <= 65535 + } + + def "getHostName, should return a valid host name"() { + when: + def result = server.getHostName() + + then: + assert !result.isBlank() + } + + def "toProxy, should return Proxy with the current HostName and Port"() { + when: + def result = server.toProxyAddress() + + then: + assert result.address() instanceof InetSocketAddress + assert ((InetSocketAddress) result.address()).getPort() == server.getPort() + assert ((InetSocketAddress) result.address()).getHostName() == server.getHostName() + } + + def "getRequestCount, with no requests, should return 0"() { + when: + def result = server.getRequestCount() + + then: + assert result == 0 + } + + def "getRequestCount, with multiple, should return valid request count"() { + given: + def all = Future.all( + client.get(server.port, server.getHostName(), "/").send(), + client.get(server.port, server.getHostName(), "/one").send(), + client.get(server.port, server.getHostName(), "/two").send() + ) + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + all.onComplete {isr -> + async.evaluate { assert server.getRequestCount() == 3 } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "getLastRequest, with no requests, should return null"() { + when: + def result = server.getLastRequest() + + then: + assert result == null + } + + def "getLastRequest, with one request, should return the request"() { + given: + def request = client.get(server.port, server.getHostName(), "/").send() + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + request.onComplete { isr -> + async.evaluate { assert server.getLastRequest().getPath() == "/" } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "getLastRequest, with one request, can be invoked multiple times"() { + given: + def request = client.get(server.port, server.getHostName(), "/").send() + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + request.onComplete { isr -> server.getLastRequest() - - when: - def result = server.getLastRequest() - - then: - assert result.getPath() == "/" - } - - def "getLastRequest, with multiple requests, should return the latest request"() { - given: - client.newCall(new Request.Builder().url(server.url("/")).get().build()).execute() - client.newCall(new Request.Builder().url(server.url("/one")).get().build()).execute() - client.newCall(new Request.Builder().url(server.url("/two")).get().build()).execute() - - when: - def result = server.getLastRequest() - - then: - assert result.getPath() == "/two" - } - - def "getLastRequest, with multiple requests, can be invoked multiple times"() { - given: - client.newCall(new Request.Builder().url(server.url("/")).get().build()).execute() - client.newCall(new Request.Builder().url(server.url("/one")).get().build()).execute() + async.evaluate { assert server.getLastRequest().getPath() == "/" } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "getLastRequest, with multiple requests, should return the latest request"() { + given: + def all = client.get(server.port, server.getHostName(), "/").send() + .compose { _ -> client.get(server.port, server.getHostName(), "/one").send() } + .compose { _ -> client.get(server.port, server.getHostName(), "/two").send() } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + + when: "The request is sent and completed" + all.onComplete {isr -> + async.evaluate { assert server.getLastRequest().getPath() == "/two" } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "getLastRequest, with multiple requests, can be invoked multiple times"() { + given: + def all = client.get(server.port, server.getHostName(), "/").send() + .compose { _ -> client.get(server.port, server.getHostName(), "/one").send()} + .compose { _ -> client.get(server.port, server.getHostName(), "/two").send()} + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + all.onComplete {isr -> server.getLastRequest() - client.newCall(new Request.Builder().url(server.url("/two")).get().build()).execute() - server.getLastRequest() - - when: - def result = server.getLastRequest() - - then: - assert result.getPath() == "/two" - } - - def "takeRequest, with timeout and no requests, should return null and don't block (after timeout)"() { - when: - def result = server.takeRequest(1, TimeUnit.MICROSECONDS) - - then: - assert result == null - } - - def "when setting an expectation with once it should be met only the first time"() { - given: - server.expect().get().withPath("/api/v1/users").andReturn(200, "admin").once() - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users")).get().build() - Response response1 = client.newCall(request).execute() - Response response2 = client.newCall(request).execute() - - then: - assert response1.code() == 200 - assert response1.body().string() == "admin" - assert response2.code() == 404 - - cleanup: - response1.close() - response2.close() - } - - def "when setting an expectation with n-th times it should be met only the for the first n-th times"() { - given: - server.expect().get().withPath("/api/v1/users").andReturn(200, "admin").times(3) - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users")).get().build() - Response response1 = client.newCall(request).execute() - Response response2 = client.newCall(request).execute() - Response response3 = client.newCall(request).execute() - Response response4 = client.newCall(request).execute() - - then: - assert response1.code() == 200 - assert response1.body().string() == "admin" - assert response2.code() == 200 - assert response2.body().string() == "admin" - assert response3.code() == 200 - assert response3.body().string() == "admin" - assert response4.code() == 404 - - cleanup: - response1.close() - response2.close() - response3.close() - response4.close() - } - - def "when setting an expectation with always it should be met only always"() { - given: - server.expect().get().withPath("/api/v1/users").andReturn(200, "admin").always() - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users")).get().build() - Response response1 = client.newCall(request).execute() - Response response2 = client.newCall(request).execute() - Response response3 = client.newCall(request).execute() - Response response4 = client.newCall(request).execute() - - then: - assert response1.code() == 200 - assert response1.body().string() == "admin" - assert response2.code() == 200 - assert response2.body().string() == "admin" - assert response3.code() == 200 - assert response3.body().string() == "admin" - assert response4.code() == 200 - assert response4.body().string() == "admin" - - cleanup: - response1.close() - response2.close() - response3.close() - response4.close() - } - - def "when setting an expectation as an object it should be serialized to json"() { - given: - User root = new User(0, "root", true) - - server.expect().get().withPath("/api/v1/users").andReturn(200, root).always() - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users")).get().build() - Response response1 = client.newCall(request).execute() - - then: - assert response1.code() == 200 - assert response1.body().string() == "{\"id\":0,\"username\":\"root\",\"enabled\":true}" - - cleanup: - response1.close() - } - - def "when setting a timed websocket message it should be fire at the specified time"() { - given: - CountDownLatch closed = new CountDownLatch(1) - Queue messages = new ArrayBlockingQueue(1) - AtomicReference webSocketRef = new AtomicReference<>() - WebSocketListener listener = new WebSocketListener() { - @Override - void onMessage(WebSocket webSocket, String text) { - messages.add(text) - } - - @Override - void onMessage(WebSocket webSocket, ByteString bytes) { - onMessage(webSocket, bytes.utf8()) - } - - @Override - void onClosing(WebSocket webSocket, int code, String reason) { - webSocket.close(code, reason) - } - - @Override - void onClosed(WebSocket webSocket, int code, String reason) { - closed.countDown() - } + async.evaluate { assert server.getLastRequest().getPath() == "/two" } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "takeRequest, with timeout and no requests, should return null and don't block (after timeout)"() { + when: + def result = server.takeRequest(1, TimeUnit.MICROSECONDS) + + then: + assert result == null + } + + def "when setting an expectation with once it should be met only the first time"() { + given: "An expectation with once" + server.expect().get().withPath("/api/v1/users").andReturn(200, "admin").once() + and: "A first request" + def req1 = client.get(server.port, server.getHostName(), "/api/v1/users").send() + and: "A second request" + def req2 = req1.compose { _ -> + client.get(server.port, server.getHostName(), "/api/v1/users").send() + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The requests are sent and completed" + Future.all(req1, req2).onComplete {isr -> + async.evaluate { + assert req1.result().statusCode() == 200 + assert req1.result().body().toString() == "admin" + assert req2.result().statusCode() == 404 + assert req2.result().body() == null } - - server.expect().get().withPath("/api/v1/users/watch") - .andUpgradeToWebSocket() - .open() - .waitFor(1000).andEmit("DELETED") - .done() - .once() - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users/watch")).get().build() - webSocketRef.set(client.newWebSocket(request, listener)) - - then: - messages.poll(10, TimeUnit.SECONDS) == "DELETED" - - when: - webSocketRef.get().close(1000, "just close") - - then: - closed.await(10, TimeUnit.SECONDS) - } - - def "when setting a request/response websocket message it should be fired when the event is triggered"() { - given: - CountDownLatch opened = new CountDownLatch(1) - CountDownLatch closed = new CountDownLatch(1) - CountDownLatch queued = new CountDownLatch(2) - Queue messages = new ArrayBlockingQueue(2) - AtomicReference webSocketRef = new AtomicReference<>() - - WebSocketListener listener = new WebSocketListener() { - @Override - void onOpen(WebSocket webSocket, Response response) { - webSocketRef.set(webSocket) - opened.countDown() - } - - @Override - void onMessage(WebSocket webSocket, String text) { - messages.add(text) - queued.countDown() - } - - @Override - void onMessage(WebSocket webSocket, ByteString bytes) { - onMessage(webSocket, bytes.utf8()) - } - - @Override - void onClosing(WebSocket webSocket, int code, String reason) { - webSocket.close(code, reason) - } - - @Override - void onClosed(WebSocket webSocket, int code, String reason) { - closed.countDown() - } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "when setting an expectation with n-th times it should be met only the for the first n-th times"() { + given: "An expectation with times (3)" + server.expect().get().withPath("/api/v1/users").andReturn(200, "admin").times(3) + and: "A first request" + def req1 = client.get(server.port, server.getHostName(), "/api/v1/users").send() + and: "A second request" + def req2 = req1.compose { _ -> + client.get(server.port, server.getHostName(), "/api/v1/users").send() + } + and: "A third request" + def req3 = req2.compose { _ -> + client.get(server.port, server.getHostName(), "/api/v1/users").send() + } + and: "A fourth request" + def req4 = req3.compose { _ -> + client.get(server.port, server.getHostName(), "/api/v1/users").send() + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The requests are sent and completed" + Future.all(req1, req2, req3, req4).onComplete {isr -> + async.evaluate { + assert req1.result().statusCode() == 200 + assert req1.result().body().toString() == "admin" + assert req2.result().statusCode() == 200 + assert req2.result().body().toString() == "admin" + assert req3.result().statusCode() == 200 + assert req3.result().body().toString() == "admin" + assert req4.result().statusCode() == 404 + assert req4.result().body() == null } - - server.expect().get().withPath("/api/v1/users/watch") - .andUpgradeToWebSocket() - .open() - .expect("create root").andEmit("CREATED").once() - .expect("delete root").andEmit("DELETED").once() - .done() - .once() - - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users/watch")).get().build() - webSocketRef.set(client.newWebSocket(request, listener)) - - then: - opened.await(10, TimeUnit.SECONDS) - WebSocket ws = webSocketRef.get() - ws.send("create root") - ws.send("delete root") - queued.await(10, TimeUnit.SECONDS) - messages.poll(10, TimeUnit.SECONDS) == "CREATED" - messages.poll(10, TimeUnit.SECONDS) == "DELETED" - - when: - ws.close(1000, "just close") - - then: - closed.await(10, TimeUnit.SECONDS) - } - - def "when receiving an unexpected websocket message it should close the connection with status code 1002"() { - given: - CountDownLatch opened = new CountDownLatch(1) - CountDownLatch closed = new CountDownLatch(1) - int closeCode = -1 - String closeReason = null - AtomicReference webSocketRef = new AtomicReference<>() - - WebSocketListener listener = new WebSocketListener() { - @Override - void onOpen(WebSocket webSocket, Response response) { - webSocketRef.set(webSocket) - opened.countDown() - } - - @Override - void onMessage(WebSocket webSocket, String text) { - System.out.println(text) - } - - @Override - void onMessage(WebSocket webSocket, ByteString bytes) { - onMessage(webSocket, bytes.utf8()) - } - - @Override - void onClosing(WebSocket webSocket, int code, String reason) { - System.out.println("Closing: " + code + " : " + reason) - webSocket.close(code, reason) - } - - @Override - void onClosed(WebSocket webSocket, int code, String reason) { - closeCode = code - closeReason = reason - closed.countDown() - } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "when setting an expectation with always it should always be met"() { + given: "An expectation with always" + server.expect().get().withPath("/api/v1/users").andReturn(200, "admin").always() + and: "A first request" + def req1 = client.get(server.port, server.getHostName(), "/api/v1/users").send() + and: "A second request" + def req2 = client.get(server.port, server.getHostName(), "/api/v1/users").send() + and: "A third request" + def req3 = client.get(server.port, server.getHostName(), "/api/v1/users").send() + and: "A fourth request" + def req4 = client.get(server.port, server.getHostName(), "/api/v1/users").send() + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The requests are sent and completed" + Future.all(req1, req2, req3, req4).onComplete {isr -> + async.evaluate { + assert req1.result().statusCode() == 200 + assert req1.result().body().toString() == "admin" + assert req2.result().statusCode() == 200 + assert req2.result().body().toString() == "admin" + assert req3.result().statusCode() == 200 + assert req3.result().body().toString() == "admin" + assert req4.result().statusCode() == 200 + assert req4.result().body().toString() == "admin" + } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "when setting an expectation as an object it should be serialized to json"() { + given: "An expectation with always" + def root = new User(0, "root", true) + server.expect().get().withPath("/api/v1/users").andReturn(200, root).always() + and: "A request" + def req1 = client.get(server.port, server.getHostName(), "/api/v1/users").send() + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + req1.onComplete {isr -> + async.evaluate { + assert req1.result().statusCode() == 200 + assert req1.result().body().toString() == "{\"id\":0,\"username\":\"root\",\"enabled\":true}" + } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "when setting a timed websocket String message it should be fired at the specified time"() { + given: "A WebSocket expectation" + server.expect().get().withPath("/api/v1/users/watch") + .andUpgradeToWebSocket() + .open() + .waitFor(1000).andEmit("DELETED") + .done() + .once() + and: + Queue messages = new ArrayBlockingQueue<>(1) + and: "A WebSocket request" + def wsReq =wsClient.webSocket().connect(server.port, server.getHostName(), "/api/v1/users/watch") + and: "A WebSocket listener" + wsReq.onComplete { ws -> + ws.result().textMessageHandler { text -> + messages.add(text) } + ws.result().closeHandler { _ -> + ws.result().close() + } + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert messages.poll(10, TimeUnit.SECONDS) == "DELETED" + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "when setting a timed websocket binary message it should be fire at the specified time"() { + given: "A WebSocket expectation" + server.expect().get().withPath("/api/v1/users/watch") + .andUpgradeToWebSocket() + .open() + .waitFor(1000).andEmit(new WebSocketMessage(new byte[]{1, 2, 3})) + .done() + .once() + and: + Queue messages = new ArrayBlockingQueue<>(1) + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/api/v1/users/watch") + and: "A WebSocket listener" + wsReq.onComplete { ws -> + ws.result().binaryMessageHandler { buffer -> + messages.add(buffer.getBytes(0, buffer.length())) + } + ws.result().closeHandler { _ -> + ws.result().close() + } + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert messages.poll(10, TimeUnit.SECONDS) == new byte[]{1, 2, 3} + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "when setting a request/response websocket message it should be fired when the event is triggered"() { + given: "A WebSocket expectation" + server.expect().get().withPath("/api/v1/users/watch") + .andUpgradeToWebSocket() + .open() + .expect("create root").andEmit("CREATED").once() + .expect("delete root").andEmit("DELETED").once() + .done() + .once() + and: + Queue messages = new ArrayBlockingQueue<>(2) + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/api/v1/users/watch") + and: "A WebSocket listener" + wsReq.onComplete { ws -> + ws.result().textMessageHandler { text -> + messages.add(text) + } + ws.result().writeTextMessage("create root") + ws.result().writeTextMessage("delete root") + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert messages.poll(10, TimeUnit.SECONDS) == "CREATED" + assert messages.poll(10, TimeUnit.SECONDS) == "DELETED" + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "when receiving an unexpected websocket message it should close the connection with status code 1002"() { + given: "A WebSocket expectation" + server.expect().get().withPath("/api/v1/users/watch") + .andUpgradeToWebSocket() + .open() + .expect("expected message").andEmit("MESSAGE OK").once() + .done() + .once() + and: + def closeCode = new CompletableFuture() + def closeReason = new CompletableFuture() + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/api/v1/users/watch") + and: "A WebSocket listener" + wsReq.onComplete { ws -> + ws.result().closeHandler { v -> + closeCode.complete(ws.result().closeStatusCode()) + closeReason.complete(ws.result().closeReason()) + } + ws.result().writeTextMessage("unexpected message") + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert closeCode.get(10, TimeUnit.SECONDS) == 1002 + assert closeReason.get(10, TimeUnit.SECONDS) == "Unexpected message:unexpected message" + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "when setting a delayed response it should be delayed for the specified duration"() { + given: "An expectation with delay" + server.expect().get().withPath("/api/v1/users") + .delay(100, TimeUnit.MILLISECONDS) + .andReturn(200, "admin") + .once() + and: "A start time" + def startTime = System.currentTimeMillis() + and: "A request" + def req1 = client.get(server.port, server.getHostName(), "/api/v1/users").send() + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + req1.onComplete {isr -> + async.evaluate { + assert req1.result().statusCode() == 200 + assert req1.result().body().toString() == "admin" + assert System.currentTimeMillis() - startTime >= 100 + } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "when using a body provider it should work as for static responses"() { + given: "A counter" + def counter = new AtomicInteger(0); + and: "An expectation with body provider" + server.expect().get().withPath("/api/v1/users") + .andReply(200, {req -> "admin-" + counter.getAndIncrement()}) + .always() + and: "A request" + def req1 = client.get(server.port, server.getHostName(), "/api/v1/users").send() + and: "A second request" + def req2 = req1.compose { _ -> + client.get(server.port, server.getHostName(), "/api/v1/users").send() + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + Future.all(req1, req2).onComplete {isr -> + async.evaluate { + assert req1.result().statusCode() == 200 + assert req1.result().body().toString() == "admin-0" + assert req2.result().statusCode() == 200 + assert req2.result().body().toString() == "admin-1" + } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } - server.expect().get().withPath("/api/v1/users/watch") - .andUpgradeToWebSocket() - .open() - .expect("expected message").andEmit("MESSAGE OK").once() - .done() - .once() - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users/watch")).get().build() - webSocketRef.set(client.newWebSocket(request, listener)) - - then: - opened.await(10, TimeUnit.SECONDS) - WebSocket ws = webSocketRef.get() - ws.send("unexpected message") - closed.await(10, TimeUnit.SECONDS) - assert closeCode == 1002 - assert closeReason == "Unexpected message:unexpected message" - - } - - def "when setting a delayed response it should be delayed for the specified duration"() { - given: - server.expect().get().withPath("/api/v1/users").delay(100, TimeUnit.MILLISECONDS).andReturn(200, "admin").once() - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users")).get().build() - long startTime = System.currentTimeMillis() - Response response1 = client.newCall(request).execute() - - then: - assert response1.code() == 200 - assert response1.body().string() == "admin" - assert System.currentTimeMillis() - startTime >= 100 - - cleanup: - response1.close() - } - - def "when using a body provider it should work as for static responses"() { - given: - int[] counter = [0] - server.expect().get().withPath("/api/v1/users").andReply(200, {req -> "admin" + (counter[0]++)}).always() - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users")).get().build() - Response response1 = client.newCall(request).execute() - Response response2 = client.newCall(request).execute() - - then: - assert response1.code() == 200 - assert response1.body().string() == "admin0" - assert response2.code() == 200 - assert response2.body().string() == "admin1" - - cleanup: - response1.close() - response2.close() - } - - def "when using a response provider it should work as for static responses"() { - given: - int[] counter = [0, 0] - server.expect().get().withPath("/api/v1/users").andReply(new ResponseProvider() { - private Headers headers = new Headers.Builder().build() + def "when using a response provider it should work as for static responses"() { + given: "An expectation with response provider" + server.expect().get().withPath("/api/v1/users") + .andReply(new ResponseProvider() { + def counter = new AtomicInteger(0); + def headers = new Headers.Builder().build() int getStatusCode(RecordedRequest request) { - return 200 + (counter[0]++) + return 200 } Object getBody(RecordedRequest request) { - return "admin" + (counter[1]++) + return "admin-" + counter.get() } @Override Headers getHeaders() { - return headers + return headers.newBuilder().add("Count", "" + counter.incrementAndGet()).build() } @Override void setHeaders(Headers headers) { this.headers = headers } - }).always() - - when: - Request req = new Request.Builder().url(server.url("/api/v1/users")).get().build() - Response response1 = client.newCall(req).execute() - Response response2 = client.newCall(req).execute() - - then: - assert response1.code() == 200 - assert response1.body().string() == "admin0" - assert response2.code() == 201 - assert response2.body().string() == "admin1" - - cleanup: - response1.close() - response2.close() - } - - def "should be able to set headers on responses"() { - given: - server.expect().get().withPath("/api/v1/users").andReturn(200, "admin").withHeader("test: header").withHeader("test2", "header2").once() - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users")).get().build() - Response response = client.newCall(request).execute() - - then: - assert response.code() == 200 - assert response.body().string() == "admin" - assert response.header("test") == "header" - assert response.header("test2") == "header2" - - cleanup: - response.close() - } - - def "when setting an httprequest/response websocket message it should be fired when the event is triggered"() { - given: - CountDownLatch opened = new CountDownLatch(1) - CountDownLatch closed = new CountDownLatch(1) - CountDownLatch queued = new CountDownLatch(2) - Queue messages = new ArrayBlockingQueue(2) - AtomicReference webSocketRef = new AtomicReference<>() - - WebSocketListener listener = new WebSocketListener() { - @Override - void onOpen(WebSocket webSocket, Response response) { - webSocketRef.set(webSocket) - opened.countDown() - } - - @Override - void onMessage(WebSocket webSocket, String text) { - messages.add(text) - queued.countDown() - } - - @Override - void onMessage(WebSocket webSocket, ByteString bytes) { - onMessage(webSocket, bytes.utf8()) - } - - @Override - void onClosing(WebSocket webSocket, int code, String reason) { - webSocket.close(code, reason) - } - - @Override - void onClosed(WebSocket webSocket, int code, String reason) { - closed.countDown() - } + }) + .always() + and: "A request" + def req1 = client.get(server.port, server.getHostName(), "/api/v1/users").send() + and: "A second request" + def req2 = req1.compose { _ -> + client.get(server.port, server.getHostName(), "/api/v1/users").send() + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + Future.all(req1, req2).onComplete {isr -> + async.evaluate { + assert req1.result().statusCode() == 200 + assert req1.result().body().toString() == "admin-1" + assert req1.result().headers().get("Count") == "1" + assert req2.result().statusCode() == 200 + assert req2.result().body().toString() == "admin-2" + assert req2.result().headers().get("Count") == "2" } - - server.expect().get().withPath("/api/v1/users/watch") - .andUpgradeToWebSocket() - .open() - .expectHttpRequest("/api/v1/create").andEmit("CREATED").once() - .expectHttpRequest("/api/v1/delete").andEmit("DELETED").once() - .done() - .once() - - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users/watch")).get().build() - webSocketRef.set(client.newWebSocket(request, listener)) - - then: - opened.await(10, TimeUnit.SECONDS) - WebSocket ws = webSocketRef.get() - - when: - request = new Request.Builder().url(server.url("/api/v1/create")).get().build() - client.newCall(request).execute() - - then: - messages.poll(10, TimeUnit.SECONDS) == "CREATED" - - when: - request = new Request.Builder().url(server.url("/api/v1/delete")).get().build() - client.newCall(request).execute() - - then: - messages.poll(10, TimeUnit.SECONDS) == "DELETED" - - when: - ws.close(1000, "just close") - - then: - closed.await(10, TimeUnit.SECONDS) - } - - def "when setting an sentWebSocketMessage/response websocket message it should be fired when the event is triggered"() { - given: - CountDownLatch opened = new CountDownLatch(1) - CountDownLatch closed = new CountDownLatch(1) - CountDownLatch queued = new CountDownLatch(2) - Queue messages = new ArrayBlockingQueue(2) - AtomicReference webSocketRef = new AtomicReference<>() - - WebSocketListener listener = new WebSocketListener() { - @Override - void onOpen(WebSocket webSocket, Response response) { - webSocketRef.set(webSocket) - opened.countDown() - } - - @Override - void onMessage(WebSocket webSocket, String text) { - messages.add(text) - queued.countDown() - } - - @Override - void onMessage(WebSocket webSocket, ByteString bytes) { - onMessage(webSocket, bytes.utf8()) - } - - @Override - void onClosing(WebSocket webSocket, int code, String reason) { - webSocket.close(code, reason) - } - - @Override - void onClosed(WebSocket webSocket, int code, String reason) { - closed.countDown() - } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "should be able to set headers on responses"() { + given: "An expectation with header" + server.expect().get().withPath("/api/v1/users") + .andReturn(200, "admin") + .withHeader("test: header") + .withHeader("test2", "header2") + .once() + and: "A request" + def req1 = client.get(server.port, server.getHostName(), "/api/v1/users").send() + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + req1.onComplete {isr -> + async.evaluate { + assert req1.result().statusCode() == 200 + assert req1.result().body().toString() == "admin" + assert req1.result().headers().get("test") == "header" + assert req1.result().headers().get("test2") == "header2" } - - server.expect().get().withPath("/api/v1/users/watch") - .andUpgradeToWebSocket() - .open() - .expectHttpRequest("/api/v1/create").andEmit("CREATED").once() - .expectSentWebSocketMessage("CREATED").andEmit("DELETED").once() - .done() - .once() - - - when: - Request request = new Request.Builder().url(server.url("/api/v1/users/watch")).get().build() - webSocketRef.set(client.newWebSocket(request, listener)) - - then: - opened.await(10, TimeUnit.SECONDS) - WebSocket ws = webSocketRef.get() - - when: - request = new Request.Builder().url(server.url("/api/v1/create")).get().build() - client.newCall(request).execute() - - then: - messages.poll(10, TimeUnit.SECONDS) == "CREATED" - messages.poll(10, TimeUnit.SECONDS) == "DELETED" - - when: - ws.close(1000, "just close") - - then: - closed.await(10, TimeUnit.SECONDS) - } + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "when setting an httprequest/response websocket message it should be fired when the event is triggered"() { + given: "A WebSocket + HTTP expectation" + server.expect().get().withPath("/api/v1/users/watch") + .andUpgradeToWebSocket() + .open() + .expectHttpRequest("/api/v1/create").andEmit("CREATED").once() + .expectHttpRequest("/api/v1/delete").andEmit("DELETED").once() + .done() + .once() + and: + Queue messages = new ArrayBlockingQueue<>(2) + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/api/v1/users/watch") + and: "A WebSocket listener" + wsReq.andThen { ws -> + ws.result().textMessageHandler { text -> + messages.add(text) + } + } + and: "HTTP requests after WS connection initiated" + wsReq.onComplete { + client.get(server.port, server.getHostName(), "/api/v1/create").send() + .compose { _ -> client.get(server.port, server.getHostName(), "/api/v1/delete").send() } + + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert messages.poll(10, TimeUnit.SECONDS) == "CREATED" + assert messages.poll(10, TimeUnit.SECONDS) == "DELETED" + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "when setting an sentWebSocketMessage/response websocket message it should be fired when the event is triggered"() { + given: "A WebSocket + HTTP expectation" + server.expect().get().withPath("/api/v1/users/watch") + .andUpgradeToWebSocket() + .open() + .expectHttpRequest("/api/v1/create").andEmit("CREATED").once() + .expectSentWebSocketMessage("CREATED").andEmit("WS-CREATED").once() + .done() + .once() + and: + Queue messages = new ArrayBlockingQueue<>(2) + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/api/v1/users/watch") + and: "A WebSocket listener" + wsReq.andThen { ws -> + ws.result().textMessageHandler { text -> + messages.add(text) + } + } + and: "HTTP request after WS connection initiated and WS request after HTTP request" + wsReq.andThen { ws -> + client.get(server.port, server.getHostName(), "/api/v1/create").send() + .compose {_ ->{ + ws.result().writeTextMessage("CREATED") + }} + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert messages.poll(10, TimeUnit.SECONDS) == "CREATED" + assert messages.poll(10, TimeUnit.SECONDS) == "WS-CREATED" + } + + then: "Expect the result to be completed in the specified time" + async.await(10) + } } diff --git a/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy b/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy index 6433be20d2b..ac596b9cefe 100644 --- a/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy +++ b/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy @@ -15,153 +15,192 @@ */ package io.fabric8.mockwebserver -import okhttp3.OkHttpClient -import okhttp3.Request -import okhttp3.Response -import okhttp3.WebSocket -import okhttp3.WebSocketListener +import io.vertx.core.AsyncResult +import io.vertx.core.Handler +import io.vertx.core.Vertx +import io.vertx.core.http.WebSocket +import io.vertx.core.http.WebSocketClient +import io.vertx.core.http.WebSocketConnectOptions import spock.lang.Shared import spock.lang.Specification +import spock.util.concurrent.AsyncConditions +import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.CompletableFuture import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import java.util.stream.Collectors import java.util.stream.IntStream class DefaultMockServerWebSocketTest extends Specification { + @Shared + static def vertx = Vertx.vertx() + DefaultMockServer server - @Shared - OkHttpClient client = new OkHttpClient() + WebSocketClient wsClient def setup() { server = new DefaultMockServer() server.start() + wsClient = vertx.createWebSocketClient() } def cleanup() { server.shutdown() + wsClient.close() } def "andUpgradeToWebSocket, with configured events, should emit events"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always() - def future = new CompletableFuture() - when: - def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() { - @Override - void onMessage(WebSocket webSocket, String text) { - future.complete(text) + given: "A WebSocket expectation" + server.expect().withPath("/websocket") + .andUpgradeToWebSocket() + .open() + .waitFor(10L).andEmit("A text message") + .done() + .always() + and: + Queue messages = new ArrayBlockingQueue<>(1) + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket") + and: "A WebSocket listener" + wsReq.onComplete { ws -> + ws.result().textMessageHandler { text -> + messages.add(text) + } + ws.result().closeHandler { _ -> + ws.result().close() + } } - }) - then: - assert future.get(100L, TimeUnit.MILLISECONDS) == "A text message" - cleanup: - ws.close(1000, "Test finished") + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert messages.poll(10, TimeUnit.SECONDS) == "A text message" + } + + then: "Expect the result to be completed in the specified time" + async.await(10) } def "andUpgradeToWebSocket, with configured events, should emit onClose when done"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket().open().immediately().andEmit("event").done().always() - def future = new CompletableFuture() - when: - def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() { - @Override - void onClosing(WebSocket webSocket, int code, String reason) { - future.complete(reason) + given: "A WebSocket expectation" + server.expect() + .withPath("/websocket") + .andUpgradeToWebSocket().open().immediately().andEmit("event").done().always() + and: + def future = new CompletableFuture() + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket") + and: "A WebSocket listener" + wsReq.onComplete { ws -> + ws.result().closeHandler { _ -> + ws.result().close() + future.complete(ws.result().closeReason()) + } + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert future.get(10, TimeUnit.SECONDS) == "Closing..." } - }) - then: - assert future.get(100L, TimeUnit.MILLISECONDS) == "Closing..." + + then: "Expect the result to be completed in the specified time" + async.await(10) } def "andUpgradeToWebSocket, with no events, should emit onClose"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket().open().done().always() - def future = new CompletableFuture() - when: - def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() { - @Override - void onClosing(WebSocket webSocket, int code, String reason) { - future.complete(reason) + given: "A WebSocket expectation" + server.expect() + .withPath("/websocket") + .andUpgradeToWebSocket().open().done().always() + and: + def future = new CompletableFuture() + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket") + and: "A WebSocket listener" + wsReq.onComplete { ws -> + ws.result().closeHandler { _ -> + ws.result().close() + future.complete(ws.result().closeReason()) + } } - }) - then: - assert future.get(100L, TimeUnit.MILLISECONDS) == "Closing..." + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The request is sent and completed" + async.evaluate { + assert future.get(10, TimeUnit.SECONDS) == "Closing..." + } + + then: "Expect the result to be completed in the specified time" + async.await(10) } // https://github.com/fabric8io/mockwebserver/pull/66#issuecomment-944289335 def "andUpgradeToWebSocket, with multiple upgrades, should emit events for all websocket listeners"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always() - def latch = new CountDownLatch(15) - def wsListener = new WebSocketListener() { - @Override - void onMessage(WebSocket webSocket, String text) { - latch.countDown() + given: "A WebSocket expectation" + server.expect() + .withPath("/websocket") + .andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").done().always() + and: "A CountDown latch to verify the event count" + def latch = new CountDownLatch(15) + and: "A Vert.x WebSocket completion handler" + Handler> completionHandler = { ws -> + ws.result().textMessageHandler { text -> + latch.countDown() + } + ws.result().closeHandler { _ -> + ws.result().close() + } + } + and: "WebSocket requests" + IntStream.range(0, 15).forEach {i -> + def wsReq = wsClient.webSocket().connect(server.port, server.getHostName(), "/websocket") + wsReq.onComplete(completionHandler) + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The requests are sent and completed" + async.evaluate { + assert latch.await(10, TimeUnit.SECONDS) } - } - when: - def wss = IntStream.range(0, 15).mapToObj(i -> - client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), wsListener) - ).collect(Collectors.toList()) - then: - assert latch.await(10000L, TimeUnit.MILLISECONDS) - cleanup: - wss.forEach(ws -> ws.close(1000, "Test finished")) + + then: "Expect the result to be completed in the specified time" + async.await(10) } // https://github.com/fabric8io/mockwebserver/issues/77 def "andUpgradeToWebSocket, with request header 'sec-websocket-protocol', should create response with matching header"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket().open().done().always() - def future = new CompletableFuture() - when: - def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).header("sec-websocket-protocol", "v4.channel.k8s.io").build(), new WebSocketListener() { - @Override - void onOpen(WebSocket webSocket, Response response) { - future.complete(response.header("sec-websocket-protocol")) + given: "A WebSocket expectation" + server.expect() + .withPath("/websocket") + .andUpgradeToWebSocket().open().done().always() + and: + def future = new CompletableFuture() + and: "A WebSocket request" + def wsReq = wsClient.webSocket().connect(new WebSocketConnectOptions() + .setPort(server.port) + .setHost(server.getHostName()) + .setURI("/websocket") + .addSubProtocol("v4.channel.k8s.io")) + and: "A WebSocket listener" + wsReq.onComplete { ws -> + future.complete(ws.result().headers().get("sec-websocket-protocol")) } - }) - then: - assert future.get(100L, TimeUnit.MILLISECONDS) == "v4.channel.k8s.io" - cleanup: - ws.close(1000, "Test finished") - } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) - // https://github.com/fabric8io/mockwebserver/issues/77 - def "andUpgradeToWebSocket, with request header 'sec-websocket-protocol', should not change existing response header"() { - given: - server.expect() - .withPath("/websocket") - .andUpgradeToWebSocket() - .open() - .done() - .withHeader("sec-websocket-protocol", "v3.channel.k8s.io,v4.channel.k8s.io") - .always() - def future = new CompletableFuture() - when: - def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).header("sec-websocket-protocol", "v4.channel.k8s.io").build(), new WebSocketListener() { - @Override - void onOpen(WebSocket webSocket, Response response) { - future.complete(response.header("sec-websocket-protocol")) + when: "The request is sent and completed" + async.evaluate { + assert future.get(10, TimeUnit.SECONDS) == "v4.channel.k8s.io" } - }) - then: - assert future.get(100L, TimeUnit.MILLISECONDS) == "v3.channel.k8s.io,v4.channel.k8s.io" - cleanup: - ws.close(1000, "Test finished") + + then: "Expect the result to be completed in the specified time" + async.await(10) } } diff --git a/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/crud/CrudDispatcherTest.groovy b/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/crud/CrudDispatcherTest.groovy index a7f920f1e0b..5557b094cf2 100644 --- a/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/crud/CrudDispatcherTest.groovy +++ b/junit/mockwebserver/src/test/groovy/io/fabric8/mockwebserver/crud/CrudDispatcherTest.groovy @@ -19,106 +19,146 @@ import io.fabric8.mockwebserver.Context import io.fabric8.mockwebserver.DefaultMockServer import io.fabric8.mockwebserver.ServerRequest import io.fabric8.mockwebserver.ServerResponse -import okhttp3.OkHttpClient -import okhttp3.Request -import okhttp3.Response -import okhttp3.RequestBody -import okhttp3.MediaType +import io.vertx.core.Vertx +import io.vertx.core.buffer.Buffer +import io.vertx.ext.web.client.WebClient import okhttp3.mockwebserver.MockWebServer +import spock.lang.Shared import spock.lang.Specification -import com.fasterxml.jackson.databind.JsonNode +import spock.util.concurrent.AsyncConditions class CrudDispatcherTest extends Specification { - AttributeExtractor extractor = new AttributeExtractor() { + @Shared + static def vertx = Vertx.vertx() - @Override - AttributeSet fromPath(String path) { - AttributeSet set = new AttributeSet() + DefaultMockServer server - String[] parts = path.split("/") - if (parts.length > 2) { - set = set.add(new Attribute("namespace", parts[2])) - } + WebClient client - if (parts.length > 4) { - set = set.add(new Attribute("name", parts[4])) - } - return set + def setup() { + final var context = new Context(); + server = new DefaultMockServer(context, new MockWebServer(), new HashMap>(), + new CrudDispatcher(context, new CrudDispatcherTestAttributeExtractor(), new CrudDispatcherTestComposer()), false) + server.start() + client = WebClient.create(vertx) + } + + def cleanup() { + server.shutdown() + client.close() + } + + def "should be able to get after a patch"() { + given: "A POST request with a starting object" + final var postNew = client.post(server.port, server.hostName, "/namespace/test/name/one") + .putHeader("Content-Type", "application/json") + .sendBuffer(Buffer.buffer("""{"foo":{"bar":"startingValue","baz":"keepThis"} }""")) + and: "A PATCH request to update the object" + final var patch = postNew.compose { _ -> + client + .patch(server.port, server.hostName, "/namespace/test/name/one") + .putHeader("Content-Type", "application/strategic-merge-patch+json") + .sendBuffer(Buffer.buffer("""[{"op":"replace","path":"/foo/bar","value":"canary"}]""")) + } + and: "A GET request to retrieve and verify the object" + final var get = patch.compose { _ -> + client.get(server.port, server.hostName, "/namespace/test/name/one").send() + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The requests are sent and completed" + get.onComplete { res -> + async.evaluate { + assert res.result().statusCode() == 200 + assert res.result().body().toString() == """{"foo":{"bar":"canary","baz":"keepThis"}}""" } + } - @Override - AttributeSet fromResource(String resource) { - return null + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "should be able to get after a post"() { + given: "A POST request with a starting object" + final var postNew = client.post(server.port, server.hostName, "/namespace/test/name/one") + .putHeader("Content-Type", "text/html") + .sendBuffer(Buffer.buffer("one")) + and: "A GET request to retrieve and verify the object" + final var get = postNew.compose { _ -> + client.get(server.port, server.hostName, "/namespace/test/name/one").send() + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The requests are sent and completed" + get.onComplete { res -> + async.evaluate { + assert res.result().statusCode() == 200 + assert res.result().body().toString() == "one" } - } + } - ResponseComposer composer = new ResponseComposer() { - @Override - String compose(Collection items) { - StringBuilder sb = new StringBuilder(); - for (String item : items) { - sb.append(item).append(" ") - } - return sb.toString().trim() + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + def "should be able to delete after a post"() { + given: "A POST request with a starting object" + final var postNew = client.post(server.port, server.hostName, "/namespace/test/name/one") + .putHeader("Content-Type", "text/html") + .sendBuffer(Buffer.buffer("one")) + and: "A DELETE request to delete the object" + final var delete = postNew.compose { _ -> + client.delete(server.port, server.hostName, "/namespace/test/name/one").send() + } + and: "A GET request to retrieve and verify the object" + final var get = delete.compose { _ -> + client.get(server.port, server.hostName, "/namespace/test/name/one").send() + } + and: "An instance of AsyncConditions" + def async = new AsyncConditions(1) + + when: "The requests are sent and completed" + get.onComplete { res -> + async.evaluate { + assert res.result().statusCode() == 404 } - } + } - def "should be able to get after a patch"() { - given: - Context context = new Context() - DefaultMockServer server = new DefaultMockServer(context, new MockWebServer(), new HashMap>(), new CrudDispatcher(context, extractor, composer), false) - String startingJson = """{"foo":{"bar":"startingValue","baz":"keepThis"} }""" - String patch = """[{"op":"replace","path":"/foo/bar","value":"canary"}]""" - when: - server.start() - then: - OkHttpClient client = new OkHttpClient() - Request post = new Request.Builder().post(RequestBody.create(MediaType.parse("application/json"), startingJson)).url(server.url("/namespace/test/name/one")).build() - client.newCall(post).execute() - - Request patchRequest = new Request.Builder().patch(RequestBody.create(MediaType.parse("application/strategic-merge-patch+json"), patch)).url(server.url("/namespace/test/name/one")).build() - client.newCall(patchRequest).execute() - - Request get = new Request.Builder().get().url(server.url("/namespace/test/name/one")).build() - Response response = client.newCall(get).execute() - JsonNode responseJson = context.getMapper().readValue(response.body().string(), JsonNode.class); - JsonNode expected = context.mapper.readValue("""{"foo": {"bar": "canary", "baz": "keepThis"}}""", JsonNode.class) - expected == responseJson - } + then: "Expect the result to be completed in the specified time" + async.await(10) + } + + private static final class CrudDispatcherTestAttributeExtractor implements AttributeExtractor { - def "should be able to get after a post"() { - given: - Context context = new Context() - DefaultMockServer server = new DefaultMockServer(context, new MockWebServer(), new HashMap>(), new CrudDispatcher(context, extractor, composer), false) - when: - server.start() - then: - OkHttpClient client = new OkHttpClient() - Request post = new Request.Builder().post(RequestBody.create(MediaType.parse("text/html"), "one")).url(server.url("/namespace/test/name/one")).build() - client.newCall(post).execute() - Request get = new Request.Builder().get().url(server.url("/namespace/test/name/one")).build() - Response response = client.newCall(get).execute() - assert response.body().string().equals("one") + @Override + AttributeSet fromPath(String path) { + final var set = new AttributeSet() + final var parts = path.split("/") + if (parts.length > 2) { + set.add(new Attribute("namespace", parts[2])) + } else if (parts.length > 4) { + set.add(new Attribute("name", parts[4])) + } + return set } - def "should be able to delete after a post"() { - given: - Context context = new Context() - DefaultMockServer server = new DefaultMockServer(context, new MockWebServer(), new HashMap>(), new CrudDispatcher(context, extractor, composer), false) - when: - server.start() - then: - OkHttpClient client = new OkHttpClient() - Request post = new Request.Builder().post(RequestBody.create(MediaType.parse("text/html"), "one")).url(server.url("/namespace/test/name/one")).build() - client.newCall(post).execute() - Request get = new Request.Builder().delete().url(server.url("/namespace/test/name/one")).build() - Response response = client.newCall(get).execute() - assert response.successful - - Request getMissing = new Request.Builder().delete().url(server.url("/namespace/test/name/two")).build() - Response responseMissing = client.newCall(getMissing).execute() - assert !responseMissing.successful + @Override + AttributeSet fromResource(String resource) { + return null } + } + private static final class CrudDispatcherTestComposer implements ResponseComposer { + @Override + String compose(Collection items) { + StringBuilder sb = new StringBuilder(); + for (String item : items) { + sb.append(item).append(" ") + } + return sb.toString().trim() + } + } } diff --git a/pom.xml b/pom.xml index be847df187f..929306b1011 100644 --- a/pom.xml +++ b/pom.xml @@ -96,10 +96,10 @@ 1.17.6 1.15.0_1 2.15.3 - 11.0.18 + 11.0.19 3.9.6 3.10.2 - 4.4.6 + 4.5.1 0.3.0 @@ -129,7 +129,7 @@ 1.77 1.25.0 2.15.1 - 32.1.3-jre + 33.0.0-jre 1.12.0 1.2.1 4.7.5 @@ -817,6 +817,11 @@ + + io.vertx + vertx-web + ${vertx.version} + info.picocli picocli