Skip to content

Commit

Permalink
Enable Spring WebFlux concurrency tests (#3077)
Browse files Browse the repository at this point in the history
* Enable Spring WebFlux concurrency tests

* Fix Spring WebFlux test application scan scope

* codenarc fix

* Renamed package server.http to server.base

* Addressed PR comments
  • Loading branch information
agoallikmaa authored May 26, 2021
1 parent 38830ea commit 0a34b2b
Show file tree
Hide file tree
Showing 14 changed files with 671 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package client

import io.netty.channel.ChannelOption
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.test.base.SingleConnection
import org.springframework.http.HttpMethod
import org.springframework.http.client.reactive.ReactorClientHttpConnector
import org.springframework.web.reactive.function.client.WebClient
import reactor.ipc.netty.http.client.HttpClientOptions
import reactor.ipc.netty.resources.PoolResources

import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException

class SpringWebFluxSingleConnection implements SingleConnection {
private final ReactorClientHttpConnector connector
private final String host
private final int port

SpringWebFluxSingleConnection(boolean isOldVersion, String host, int port) {
if (isOldVersion) {
connector = new ReactorClientHttpConnector({ HttpClientOptions.Builder clientOptions ->
clientOptions.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, HttpClientTest.CONNECT_TIMEOUT_MS)
clientOptions.poolResources(PoolResources.fixed("pool", 1, HttpClientTest.CONNECT_TIMEOUT_MS))
})
} else {
def httpClient = reactor.netty.http.client.HttpClient.create(reactor.netty.resources.create("pool", 1)).tcpConfiguration({ tcpClient ->
tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, HttpClientTest.CONNECT_TIMEOUT_MS)
})
connector = new ReactorClientHttpConnector(httpClient)
}

this.host = host
this.port = port
}

@Override
int doRequest(String path, Map<String, String> headers) throws ExecutionException, InterruptedException, TimeoutException {
String requestId = Objects.requireNonNull(headers.get(REQUEST_ID_HEADER))

URI uri
try {
uri = new URL("http", host, port, path).toURI()
} catch (MalformedURLException e) {
throw new ExecutionException(e)
}

def request = WebClient.builder().clientConnector(connector).build().method(HttpMethod.GET)
.uri(uri)
.headers { h -> headers.forEach({ key, value -> h.add(key, value) }) }

def response = request.exchange().block()

String responseId = response.headers().asHttpHeaders().getFirst(REQUEST_ID_HEADER)
if (requestId != responseId) {
throw new IllegalStateException(
String.format("Received response with id %s, expected %s", responseId, requestId))
}

return response.statusCode().value()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.netty.channel.ChannelOption
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.asserts.SpanAssert
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.test.base.SingleConnection
import org.springframework.http.HttpMethod
import org.springframework.http.client.reactive.ReactorClientHttpConnector
import org.springframework.web.reactive.function.client.WebClient
Expand Down Expand Up @@ -77,4 +78,9 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySp
boolean testRedirects() {
false
}

@Override
SingleConnection createSingleConnection(String host, int port) {
return new SpringWebFluxSingleConnection(isOldVersion(), host, port)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import io.opentelemetry.api.trace.Tracer
import java.time.Duration
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.FilterType
import org.springframework.http.MediaType
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.BodyInserters
Expand All @@ -24,6 +26,7 @@ import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Mono

@SpringBootApplication
@ComponentScan(basePackages = ["server"], excludeFilters = @ComponentScan.Filter(type = FilterType.REGEX, pattern = "server.base.*"))
class SpringWebFluxTestApplication {

private static final Tracer tracer = GlobalOpenTelemetry.getTracer("test")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package server.base

import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.NOT_FOUND

import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.opentelemetry.sdk.trace.data.SpanData
import org.springframework.web.server.ResponseStatusException

abstract class ControllerSpringWebFluxServerTest extends SpringWebFluxServerTest {
@Override
void handlerSpan(TraceAssert trace, int index, Object parent, String method, HttpServerTest.ServerEndpoint endpoint) {
def handlerSpanName = "${ServerTestController.simpleName}.${endpoint.name().toLowerCase()}"
if (endpoint == NOT_FOUND) {
handlerSpanName = "ResourceWebHandler.handle"
}
trace.span(index) {
name handlerSpanName
kind INTERNAL
if (endpoint == EXCEPTION) {
status StatusCode.ERROR
errorEvent(RuntimeException, EXCEPTION.body)
} else if (endpoint == NOT_FOUND) {
status StatusCode.ERROR
errorEvent(ResponseStatusException, "Response status 404")
}
childOf((SpanData) parent)
}
}

@Override
boolean hasHandlerAsControllerParentSpan(HttpServerTest.ServerEndpoint endpoint) {
return false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package server.base

import io.opentelemetry.instrumentation.test.base.HttpServerTest
import java.time.Duration
import java.util.concurrent.Callable
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.bind.annotation.RestController
import reactor.core.publisher.Mono

/**
* Tests the case which uses annotated controller methods, and where "controller" span is created
* within a Mono map step, which follows a delay step. For exception endpoint, the exception
* is thrown within the last map step.
*/
class DelayedControllerSpringWebFluxServerTest extends ControllerSpringWebFluxServerTest {
@Override
protected Class<?> getApplicationClass() {
return Application
}

@Configuration
@EnableAutoConfiguration
static class Application {
@Bean
Controller controller() {
return new Controller()
}

@Bean
NettyReactiveWebServerFactory nettyFactory() {
return new NettyReactiveWebServerFactory()
}
}

@RestController
static class Controller extends ServerTestController {
@Override
protected <T> Mono<T> wrapControllerMethod(
HttpServerTest.ServerEndpoint endpoint, Callable<T> handler) {

return Mono.just("")
.delayElement(Duration.ofMillis(10))
.map({ controller(endpoint, handler) })
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package server.base

import io.opentelemetry.instrumentation.test.base.HttpServerTest
import java.time.Duration
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.server.RouterFunction
import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Mono

/**
* Tests the case which uses route handlers, and where "controller" span is created within a Mono
* map step, which follows a delay step. For exception endpoint, the exception is thrown within the
* last map step.
*/
class DelayedHandlerSpringWebFluxServerTest extends HandlerSpringWebFluxServerTest {
@Override
protected Class<?> getApplicationClass() {
return Application
}

@Configuration
@EnableAutoConfiguration
static class Application {
@Bean
RouterFunction<ServerResponse> router() {
return new RouteFactory().createRoutes()
}

@Bean
NettyReactiveWebServerFactory nettyFactory() {
return new NettyReactiveWebServerFactory()
}
}

static class RouteFactory extends ServerTestRouteFactory {

@Override
protected Mono<ServerResponse> wrapResponse(HttpServerTest.ServerEndpoint endpoint, Mono<ServerResponse> response, Runnable spanAction) {
return response.delayElement(Duration.ofMillis(10)).map({ original ->
return controller(endpoint, {
spanAction.run()
return original
})
})
}
}

@Override
boolean hasHandlerAsControllerParentSpan(HttpServerTest.ServerEndpoint endpoint) {
return false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package server.base

import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.NOT_FOUND

import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.opentelemetry.sdk.trace.data.SpanData
import org.springframework.web.server.ResponseStatusException

abstract class HandlerSpringWebFluxServerTest extends SpringWebFluxServerTest {
@Override
void handlerSpan(TraceAssert trace, int index, Object parent, String method, HttpServerTest.ServerEndpoint endpoint) {
def handlerSpanName = "${ServerTestRouteFactory.simpleName}.lambda"
if (endpoint == NOT_FOUND) {
handlerSpanName = "ResourceWebHandler.handle"
}
trace.span(index) {
name handlerSpanName
kind INTERNAL
if (endpoint == EXCEPTION) {
status StatusCode.ERROR
errorEvent(RuntimeException, EXCEPTION.body)
} else if (endpoint == NOT_FOUND) {
status StatusCode.ERROR
errorEvent(ResponseStatusException, "Response status 404")
}
childOf((SpanData) parent)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package server.base

import io.opentelemetry.instrumentation.test.base.HttpServerTest
import java.util.concurrent.Callable
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.bind.annotation.RestController
import reactor.core.publisher.Mono

/**
* Tests the case where "controller" span is created within the controller method scope, and the
* Mono<String> from a handler is already a fully constructed response with no deferred actions.
* For exception endpoint, the exception is thrown within controller method scope.
*/
class ImmediateControllerSpringWebFluxServerTest extends ControllerSpringWebFluxServerTest {
@Override
protected Class<?> getApplicationClass() {
return Application
}

@Configuration
@EnableAutoConfiguration
static class Application {
@Bean
Controller controller() {
return new Controller()
}

@Bean
NettyReactiveWebServerFactory nettyFactory() {
return new NettyReactiveWebServerFactory()
}
}

@RestController
static class Controller extends ServerTestController {
@Override
protected <T> Mono<T> wrapControllerMethod(HttpServerTest.ServerEndpoint endpoint, Callable<T> controllerMethod) {
return Mono.just(controller(endpoint, controllerMethod))
}
}
}
Loading

0 comments on commit 0a34b2b

Please sign in to comment.