Skip to content

Commit

Permalink
Rewrite the Vert.x MDC tests to be more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier committed Sep 20, 2023
1 parent 3bf8a5a commit 21d78e5
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
public class InMemoryLogHandler extends Handler {
private static final PatternFormatter FORMATTER = new PatternFormatter("%X{requestId} ### %s");

private final List<String> recordList = new CopyOnWriteArrayList<>();
private static final List<String> recordList = new CopyOnWriteArrayList<>();

public List<String> logRecords() {
return Collections.unmodifiableList(recordList);
}

public static void reset() {
recordList.clear();
}

@Override
public void publish(LogRecord record) {
String loggerName = record.getLoggerName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,26 @@
@ApplicationScoped
public class InMemoryLogHandlerProducer {

public volatile boolean initialized = false;

@Produces
@Singleton
public InMemoryLogHandler inMemoryLogHandler() {
return new InMemoryLogHandler();
}

public boolean isInitialized() {
return initialized;
}

void onStart(@Observes StartupEvent ev, InMemoryLogHandler inMemoryLogHandler) {
InitialConfigurator.DELAYED_HANDLER.clearHandlers();
InitialConfigurator.DELAYED_HANDLER.addHandler(inMemoryLogHandler);
initialized = true;
}

void onStop(@Observes ShutdownEvent ev, InMemoryLogHandler inMemoryLogHandler) {
initialized = false;
InitialConfigurator.DELAYED_HANDLER.removeHandler(inMemoryLogHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.codec.BodyCodec;
import io.vertx.mutiny.core.Vertx;

@ApplicationScoped
Expand All @@ -42,22 +39,24 @@ void onStop(@Observes ShutdownEvent ev) {
private static class TestVerticle extends AbstractVerticle {
private static final Logger LOGGER = Logger.getLogger(TestVerticle.class);

private HttpRequest<JsonObject> request;

@Override
public void start(Promise<Void> startPromise) {
WebClient webClient = WebClient.create(vertx);
request = webClient.getAbs("http://worldclockapi.com/api/json/utc/now").as(BodyCodec.jsonObject());
var request = webClient.getAbs("http://localhost:" + VERTICLE_PORT + "/now");

Promise<HttpServer> httpServerPromise = Promise.promise();
httpServerPromise.future().<Void> mapEmpty().onComplete(startPromise);
vertx.createHttpServer()
.requestHandler(req -> {
String requestId = req.getHeader(REQUEST_ID_HEADER);

if (req.path().equals("/now")) {
req.response().end(Long.toString(System.currentTimeMillis()));
return;
}

String requestId = req.getHeader(REQUEST_ID_HEADER);
MDC.put(MDC_KEY, requestId);
LOGGER.info("Received HTTP request ### " + MDC.get(MDC_KEY));

vertx.setTimer(50, l -> {
LOGGER.info("Timer fired ### " + MDC.get(MDC_KEY));
vertx.executeBlocking(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -14,8 +15,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;

Expand All @@ -25,14 +25,12 @@
import org.jboss.logging.MDC;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpRequest;
Expand Down Expand Up @@ -61,102 +59,92 @@ public class VertxMDCTest {

@Inject
InMemoryLogHandler inMemoryLogHandler;

@Inject
InMemoryLogHandlerProducer producer;
@Inject
VerticleDeployer verticleDeployer;

static final CountDownLatch countDownLatch = new CountDownLatch(1);
static final AtomicReference<Throwable> errorDuringExecution = new AtomicReference<>();

@Test
@RepeatedTest(10)
void mdc() throws Throwable {
List<String> requestIds = IntStream.range(0, 10)

InMemoryLogHandler.reset();
await().until(() -> producer.isInitialized());

List<String> requestIds = IntStream.range(0, 1)
.mapToObj(i -> UUID.randomUUID().toString())
.collect(toList());

sendRequests(requestIds, onSuccess(v -> {
try {
Map<String, List<String>> allMessagesById = inMemoryLogHandler.logRecords()
.stream()
.map(line -> line.split(" ### "))
.peek(split -> assertEquals(split[0], split[2]))
.collect(groupingBy(split -> split[0],
mapping(split -> split[1], toList())));

assertEquals(requestIds.size(), allMessagesById.size());
assertTrue(requestIds.containsAll(allMessagesById.keySet()));

List<String> expected = Stream.<String> builder()
.add("Received HTTP request")
.add("Timer fired")
.add("Blocking task executed")
.add("Received Web Client response")
.build()
.collect(toList());

for (List<String> messages : allMessagesById.values()) {
assertEquals(expected, messages);
}
} catch (Throwable t) {
errorDuringExecution.set(t);
} finally {
countDownLatch.countDown();
}
}));
CountDownLatch done = new CountDownLatch(1);
sendRequests(requestIds, done);

Assertions.assertTrue(done.await(20, TimeUnit.SECONDS));

await().untilAsserted(() -> {
Map<String, List<String>> allMessagesById = inMemoryLogHandler.logRecords()
.stream()
.map(line -> line.split(" ### "))
.peek(split -> assertEquals(split[0], split[2]))
.collect(groupingBy(split -> split[0],
mapping(split -> split[1], toList())));

countDownLatch.await();
assertEquals(requestIds.size(), allMessagesById.size());
assertTrue(requestIds.containsAll(allMessagesById.keySet()));

Throwable throwable = errorDuringExecution.get();
if (throwable != null) {
throw throwable;
}
List<String> expected = Stream.<String> builder()
.add("Received HTTP request")
.add("Timer fired")
.add("Blocking task executed")
.add("Received Web Client response")
.build()
.collect(toList());

for (List<String> messages : allMessagesById.values()) {
assertEquals(expected, messages);
}
});
}

@Test
@RepeatedTest(10)
public void mdcNonVertxThreadTest() {
InMemoryLogHandler.reset();
await().until(() -> producer.isInitialized());

String mdcValue = "Test MDC value";
MDC.put("requestId", mdcValue);
LOGGER.info("Test 1");

assertThat(inMemoryLogHandler.logRecords(),
hasItem(mdcValue + " ### Test 1"));
await().untilAsserted(() -> {
LOGGER.info("Test 1");
assertThat(inMemoryLogHandler.logRecords(), hasItem(mdcValue + " ### Test 1"));
});

MDC.remove("requestId");
LOGGER.info("Test 2");

assertThat(inMemoryLogHandler.logRecords(),
hasItem(" ### Test 2"));
await().untilAsserted(() -> {
LOGGER.info("Test 2");
assertThat(inMemoryLogHandler.logRecords(), hasItem(" ### Test 2"));
});

mdcValue = "New test MDC value";
MDC.put("requestId", mdcValue);
LOGGER.info("Test 3");
String mdcValue2 = "New test MDC value";
MDC.put("requestId", mdcValue2);

assertThat(inMemoryLogHandler.logRecords(),
hasItem(mdcValue + " ### Test 3"));
}

protected <T> Handler<AsyncResult<T>> onSuccess(Consumer<T> consumer) {
return result -> {
if (result.failed()) {
errorDuringExecution.set(result.cause());
countDownLatch.countDown();
} else {
consumer.accept(result.result());
}
};
await().untilAsserted(() -> {
LOGGER.info("Test 3");
assertThat(inMemoryLogHandler.logRecords(), hasItem(mdcValue2 + " ### Test 3"));
});
}

@SuppressWarnings({ "rawtypes" })
private void sendRequests(List<String> ids, Handler<AsyncResult<Void>> handler) {
private void sendRequests(List<String> ids, CountDownLatch done) {
WebClient webClient = WebClient.create(vertx, new WebClientOptions().setDefaultPort(VERTICLE_PORT));

HttpRequest<Buffer> request = webClient.get("/")
.expect(ResponsePredicate.SC_OK);

List<Future> futures = ids.stream()
List<? extends Future<?>> futures = ids.stream()
.map(id -> request.putHeader(REQUEST_ID_HEADER, id).send())
.collect(toList());

CompositeFuture.all(futures).<Void> mapEmpty().onComplete(handler);
Future.all(futures).mapEmpty().onComplete(x -> {
done.countDown();
});
}
}

0 comments on commit 21d78e5

Please sign in to comment.