From e773a7082bc5bfe0b6c8812da4b2d9c23e623196 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 12 Apr 2021 15:49:19 +0300 Subject: [PATCH] Add TestNG listeners for "fail fast" and resource cleanup - add listeners for cleaning up - Mockito mocking state in all threads - FastThreadLocal state with org.apache.pulsar.* class instances in all threads - add listener for detecting leaked threads - rewrite RetryAnalyzer based on TestNG's RetryAnalyzerCount class - add tests for RetryAnalyzer - Fix test dependencies in pulsar-io/flume and pulsar-io/netty - add custom fail fast solution - Maven Surefire built-in solution is broken with TestNG 7.3.0 --- buildtools/pom.xml | 89 ++++++++++- .../pulsar/tests/AnnotationListener.java | 5 +- .../BetweenTestClassesListenerAdapter.java | 60 ++++++++ .../apache/pulsar/tests/FailFastNotifier.java | 85 +++++++++++ .../tests/FastThreadLocalCleanupListener.java | 61 ++++++++ .../tests/FastThreadLocalStateCleaner.java | 141 ++++++++++++++++++ .../pulsar/tests/MockitoCleanupListener.java | 45 ++++++ .../tests/MockitoThreadLocalStateCleaner.java | 88 +++++++++++ .../pulsar/tests/PulsarTestListener.java | 10 +- .../apache/pulsar/tests/RetryAnalyzer.java | 17 ++- .../tests/ThreadLeakDetectorListener.java | 117 +++++++++++++++ .../pulsar/tests/ThreadLocalStateCleaner.java | 123 +++++++++++++++ .../FastThreadLocalStateCleanerTest.java | 128 ++++++++++++++++ .../pulsar/tests/RetryAnalyzerTest.java | 58 +++++++ .../tests/ThreadLocalStateCleanerTest.java | 104 +++++++++++++ pom.xml | 2 +- pulsar-io/flume/pom.xml | 12 -- pulsar-io/netty/pom.xml | 1 - 18 files changed, 1115 insertions(+), 31 deletions(-) create mode 100644 buildtools/src/main/java/org/apache/pulsar/tests/BetweenTestClassesListenerAdapter.java create mode 100644 buildtools/src/main/java/org/apache/pulsar/tests/FailFastNotifier.java create mode 100644 buildtools/src/main/java/org/apache/pulsar/tests/FastThreadLocalCleanupListener.java create mode 100644 buildtools/src/main/java/org/apache/pulsar/tests/FastThreadLocalStateCleaner.java create mode 100644 buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java create mode 100644 buildtools/src/main/java/org/apache/pulsar/tests/MockitoThreadLocalStateCleaner.java create mode 100644 buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java create mode 100644 buildtools/src/main/java/org/apache/pulsar/tests/ThreadLocalStateCleaner.java create mode 100644 buildtools/src/test/java/org/apache/pulsar/tests/FastThreadLocalStateCleanerTest.java create mode 100644 buildtools/src/test/java/org/apache/pulsar/tests/RetryAnalyzerTest.java create mode 100644 buildtools/src/test/java/org/apache/pulsar/tests/ThreadLocalStateCleanerTest.java diff --git a/buildtools/pom.xml b/buildtools/pom.xml index 183e2f946e4d2..3922aaefc22cb 100644 --- a/buildtools/pom.xml +++ b/buildtools/pom.xml @@ -37,28 +37,70 @@ 1.8 1.8 + 3.0.0-M3 + 2.14.0 + 1.7.25 + 7.3.0 + 3.11 + 3.2.4 + + + + org.apache.logging.log4j + log4j-bom + ${log4j2.version} + pom + import + + + + org.testng testng - 7.3.0 + ${testng.version} org.apache.logging.log4j log4j-api - 2.14.0 org.apache.logging.log4j log4j-core - 2.14.0 org.apache.logging.log4j log4j-slf4j-impl - 2.14.0 + + + org.apache.logging.log4j + log4j-1.2-api + + + org.slf4j + jcl-over-slf4j + ${slf4j.version} + + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + commons-logging + * + + + + + + io.netty + netty-common + 4.1.60.Final + test @@ -79,6 +121,45 @@ + + org.apache.maven.plugins + maven-shade-plugin + ${maven-shade-plugin.version} + + true + true + false + + + org.apache.commons:commons-lang3 + + + + + org.apache.commons.lang3 + org.apache.pulsar.buildtools.shaded.org.apache.commons.lang3 + + + + + + package + + shade + + + + + + org.apache.rat + apache-rat-plugin + + + + dependency-reduced-pom.xml + + + diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/AnnotationListener.java b/buildtools/src/main/java/org/apache/pulsar/tests/AnnotationListener.java index 7a62fa914354d..b2719b239a69f 100644 --- a/buildtools/src/main/java/org/apache/pulsar/tests/AnnotationListener.java +++ b/buildtools/src/main/java/org/apache/pulsar/tests/AnnotationListener.java @@ -24,6 +24,7 @@ import org.testng.IAnnotationTransformer; import org.testng.annotations.ITestAnnotation; +import org.testng.internal.annotations.DisabledRetryAnalyzer; public class AnnotationListener implements IAnnotationTransformer { @@ -38,7 +39,9 @@ public void transform(ITestAnnotation annotation, Class testClass, Constructor testConstructor, Method testMethod) { - annotation.setRetryAnalyzer(RetryAnalyzer.class); + if (annotation.getRetryAnalyzerClass() == DisabledRetryAnalyzer.class) { + annotation.setRetryAnalyzer(RetryAnalyzer.class); + } // Enforce default test timeout if (annotation.getTimeOut() == 0) { diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/BetweenTestClassesListenerAdapter.java b/buildtools/src/main/java/org/apache/pulsar/tests/BetweenTestClassesListenerAdapter.java new file mode 100644 index 0000000000000..7e803f29ef16a --- /dev/null +++ b/buildtools/src/main/java/org/apache/pulsar/tests/BetweenTestClassesListenerAdapter.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import org.testng.IClassListener; +import org.testng.ITestClass; +import org.testng.ITestContext; +import org.testng.ITestListener; + +/** + * TestNG listener adapter for detecting when execution finishes in previous + * test class and starts in a new class. + */ +abstract class BetweenTestClassesListenerAdapter implements IClassListener, ITestListener { + Class lastTestClass; + + @Override + public void onBeforeClass(ITestClass testClass) { + checkIfTestClassChanged(testClass.getRealClass()); + } + + private void checkIfTestClassChanged(Class testClazz) { + if (lastTestClass != testClazz) { + onBetweenTestClasses(lastTestClass, testClazz); + lastTestClass = testClazz; + } + } + + @Override + public void onFinish(ITestContext context) { + if (lastTestClass != null) { + onBetweenTestClasses(lastTestClass, null); + lastTestClass = null; + } + } + + /** + * Call back hook for adding logic when test execution moves from test class to another. + * + * @param endedTestClass the test class which has finished execution. null if the started test class is the first + * @param startedTestClass the test class which has started execution. null if the ended test class is the last + */ + protected abstract void onBetweenTestClasses(Class endedTestClass, Class startedTestClass); +} diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/FailFastNotifier.java b/buildtools/src/main/java/org/apache/pulsar/tests/FailFastNotifier.java new file mode 100644 index 0000000000000..12246773c1411 --- /dev/null +++ b/buildtools/src/main/java/org/apache/pulsar/tests/FailFastNotifier.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import org.testng.IInvokedMethod; +import org.testng.IInvokedMethodListener; +import org.testng.ITestResult; +import org.testng.SkipException; + +/** + * Notifies TestNG core skipping remaining tests after first failure has appeared. + * + * Enabled when -DtestFailFast=true + * + * This is a workaround for https://issues.apache.org/jira/browse/SUREFIRE-1762 since + * the bug makes the built-in fast-fast feature `-Dsurefire.skipAfterFailureCount=1` unusable. + * Maven Surefire version 3.0.0-M5 contains the fix, but that version is unusable because of problems + * with test output, https://issues.apache.org/jira/browse/SUREFIRE-1827. + * It makes the Pulsar integration tests slow and to fail. + * + * This implementation is based on org.apache.maven.surefire.testng.utils.FailFastNotifier + * implementation that is part of the Maven Surefire plugin. + * + */ +public class FailFastNotifier + implements IInvokedMethodListener { + private static final boolean FAIL_FAST_ENABLED = Boolean.valueOf( + System.getProperty("testFailFast", "true")); + + static class FailFastEventsSingleton { + private static final FailFastEventsSingleton INSTANCE = new FailFastEventsSingleton(); + + private volatile boolean skipAfterFailure; + + private FailFastEventsSingleton() { + } + + public static FailFastEventsSingleton getInstance() { + return INSTANCE; + } + + public boolean isSkipAfterFailure() { + return skipAfterFailure; + } + + public void setSkipOnNextTest() { + this.skipAfterFailure = true; + } + } + + static class FailFastSkipException extends SkipException { + FailFastSkipException(String skipMessage) { + super(skipMessage); + reduceStackTrace(); + } + } + + @Override + public void beforeInvocation(IInvokedMethod iInvokedMethod, ITestResult iTestResult) { + if (FAIL_FAST_ENABLED && FailFastEventsSingleton.getInstance().isSkipAfterFailure()) { + throw new FailFastSkipException("Skipped after failure since testFailFast system property is set."); + } + } + + @Override + public void afterInvocation(IInvokedMethod iInvokedMethod, ITestResult iTestResult) { + + } +} diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/FastThreadLocalCleanupListener.java b/buildtools/src/main/java/org/apache/pulsar/tests/FastThreadLocalCleanupListener.java new file mode 100644 index 0000000000000..cd75bfbd4318e --- /dev/null +++ b/buildtools/src/main/java/org/apache/pulsar/tests/FastThreadLocalCleanupListener.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cleanup Thread Local state attach to Netty's FastThreadLocal. + */ +public class FastThreadLocalCleanupListener extends BetweenTestClassesListenerAdapter { + private static final Logger LOG = LoggerFactory.getLogger(FastThreadLocalCleanupListener.class); + private static final boolean FAST_THREAD_LOCAL_CLEANUP_ENABLED = + Boolean.valueOf(System.getProperty("testFastThreadLocalCleanup", "true")); + private static final String FAST_THREAD_LOCAL_CLEANUP_PACKAGE = + System.getProperty("testFastThreadLocalCleanupPackage", "org.apache.pulsar"); + private static final FastThreadLocalStateCleaner CLEANER = new FastThreadLocalStateCleaner(object -> { + if ("*".equals(FAST_THREAD_LOCAL_CLEANUP_PACKAGE)) { + return true; + } + Class clazz = object.getClass(); + if (clazz.isArray()) { + clazz = clazz.getComponentType(); + } + Package pkg = clazz.getPackage(); + if (pkg != null && pkg.getName() != null) { + return pkg.getName() + .startsWith(FAST_THREAD_LOCAL_CLEANUP_PACKAGE); + } else { + return false; + } + }); + + @Override + protected void onBetweenTestClasses(Class endedTestClass, Class startedTestClass) { + if (FAST_THREAD_LOCAL_CLEANUP_ENABLED && FastThreadLocalStateCleaner.isEnabled()) { + LOG.info("Cleaning up FastThreadLocal thread local state."); + CLEANER.cleanupAllFastThreadLocals((thread, value) -> { + LOG.info("Cleaning FastThreadLocal state for thread {}, instance of class {}, value is {}", thread, + value.getClass().getName(), value); + }); + } + } + +} diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/FastThreadLocalStateCleaner.java b/buildtools/src/main/java/org/apache/pulsar/tests/FastThreadLocalStateCleaner.java new file mode 100644 index 0000000000000..b16c7b85792f8 --- /dev/null +++ b/buildtools/src/main/java/org/apache/pulsar/tests/FastThreadLocalStateCleaner.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import org.apache.commons.lang3.ClassUtils; +import org.apache.commons.lang3.ThreadUtils; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.commons.lang3.reflect.MethodUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cleanup Thread Local state attach to Netty's FastThreadLocal. + * + * This is not thread-safe, but that aspect is ignored. + */ +public final class FastThreadLocalStateCleaner { + private static final Logger LOG = LoggerFactory.getLogger(FastThreadLocalStateCleaner.class); + private static final ThreadLocal SLOW_THREAD_LOCAL_MAP = lookupSlowThreadLocalMap(); + private static final Class FAST_THREAD_LOCAL_CLASS; + private static final Method GET_THREAD_LOCAL_MAP; + private static final Field INDEXED_VARIABLES_FIELD; + private static final Object UNSET_OBJECT; + + static { + Class clazz = null; + Method getThreadLocalMapMethod = null; + Field indexedVariablesField = null; + Object unsetObject = null; + if (SLOW_THREAD_LOCAL_MAP != null) { + try { + clazz = ClassUtils.getClass("io.netty.util.concurrent.FastThreadLocalThread"); + Class internalThreadLocalMapClass = + ClassUtils.getClass("io.netty.util.internal.InternalThreadLocalMap"); + getThreadLocalMapMethod = MethodUtils + .getMatchingAccessibleMethod(clazz, "threadLocalMap"); + indexedVariablesField = FieldUtils.getDeclaredField(internalThreadLocalMapClass, + "indexedVariables", true); + Field unsetField = FieldUtils.getField(internalThreadLocalMapClass, "UNSET"); + unsetObject = unsetField.get(null); + } catch (ClassNotFoundException | IllegalAccessException e) { + // ignore + LOG.debug("Ignoring exception", e); + clazz = null; + getThreadLocalMapMethod = null; + indexedVariablesField = null; + unsetObject = null; + } + } + FAST_THREAD_LOCAL_CLASS = clazz; + GET_THREAD_LOCAL_MAP = getThreadLocalMapMethod; + INDEXED_VARIABLES_FIELD = indexedVariablesField; + UNSET_OBJECT = unsetObject; + } + + private final Predicate valueFilter; + + private static ThreadLocal lookupSlowThreadLocalMap() { + try { + Field slowThreadLocalMapField = FieldUtils.getDeclaredField( + ClassUtils.getClass("io.netty.util.internal.InternalThreadLocalMap"), + "slowThreadLocalMap", true); + if (slowThreadLocalMapField != null) { + return (ThreadLocal) slowThreadLocalMapField.get(null); + } else { + LOG.warn("Cannot find InternalThreadLocalMap.slowThreadLocalMap field." + + " This might be due to using an unsupported netty-common version."); + return null; + } + } catch (IllegalAccessException | ClassNotFoundException e) { + LOG.warn("Cannot find InternalThreadLocalMap.slowThreadLocalMap thread local", e); + return null; + } + } + + public FastThreadLocalStateCleaner(Predicate valueFilter) { + this.valueFilter = valueFilter; + } + + public void cleanupAllFastThreadLocals(Thread thread, BiConsumer cleanedValueListener) { + Objects.nonNull(thread); + try { + Object internalThreadLocalMap; + if (FAST_THREAD_LOCAL_CLASS.isInstance(thread)) { + internalThreadLocalMap = GET_THREAD_LOCAL_MAP.invoke(thread); + } else { + internalThreadLocalMap = ThreadLocalStateCleaner.INSTANCE + .getThreadLocalValue(SLOW_THREAD_LOCAL_MAP, thread); + } + if (internalThreadLocalMap != null) { + Object[] indexedVariables = (Object[]) INDEXED_VARIABLES_FIELD.get(internalThreadLocalMap); + for (int i = 0; i < indexedVariables.length; i++) { + Object threadLocalValue = indexedVariables[i]; + if (threadLocalValue != UNSET_OBJECT && threadLocalValue != null) { + if (valueFilter.test(threadLocalValue)) { + indexedVariables[i] = UNSET_OBJECT; + if (cleanedValueListener != null) { + cleanedValueListener.accept(thread, threadLocalValue); + } + } + } + } + } + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.warn("Cannot reset state for FastLocalThread {}", thread, e); + } + } + + // cleanup all fast thread local state on all active threads + public void cleanupAllFastThreadLocals(BiConsumer cleanedValueListener) { + for (Thread thread : ThreadUtils.getAllThreads()) { + cleanupAllFastThreadLocals(thread, cleanedValueListener); + } + } + + public static boolean isEnabled() { + return SLOW_THREAD_LOCAL_MAP != null && FAST_THREAD_LOCAL_CLASS != null; + } +} diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java b/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java new file mode 100644 index 0000000000000..b66d0ee690b70 --- /dev/null +++ b/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cleanup Mockito's Thread Local state that leaks memory + * Mockito.reset method should be called at the end of a test in the same thread where the methods were + * mocked/stubbed. There are some tests which mock methods in the ForkJoinPool thread and these leak memory. + * This listener doesn't support parallel execution at TestNG level. This is not thread safe. + * Separate forks (testForkCount > 1) controlled with Maven Surefire is the recommended solution + * for parallel test execution and that is fine. + */ +public class MockitoCleanupListener extends BetweenTestClassesListenerAdapter { + private static final Logger LOG = LoggerFactory.getLogger(MockitoCleanupListener.class); + private static final boolean + MOCKITO_CLEANUP_ENABLED = Boolean.valueOf(System.getProperty("testMockitoCleanup", "true")); + + @Override + protected void onBetweenTestClasses(Class endedTestClass, Class startedTestClass) { + if (MOCKITO_CLEANUP_ENABLED && MockitoThreadLocalStateCleaner.INSTANCE.isEnabled()) { + LOG.info("Cleaning up Mockito's ThreadSafeMockingProgress.MOCKING_PROGRESS_PROVIDER thread local state."); + MockitoThreadLocalStateCleaner.INSTANCE.cleanup(); + } + } + +} diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/MockitoThreadLocalStateCleaner.java b/buildtools/src/main/java/org/apache/pulsar/tests/MockitoThreadLocalStateCleaner.java new file mode 100644 index 0000000000000..3c383fb1d92fd --- /dev/null +++ b/buildtools/src/main/java/org/apache/pulsar/tests/MockitoThreadLocalStateCleaner.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import org.apache.commons.lang3.ClassUtils; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.commons.lang3.reflect.MethodUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cleanup Mockito's Thread Local state. This is needed when Mockito has been used in an invalid way. + * Mockito.reset method should be called at the end of a test in the same thread where the methods were + * mocked/stubbed. + */ +public final class MockitoThreadLocalStateCleaner { + public static final MockitoThreadLocalStateCleaner INSTANCE = new MockitoThreadLocalStateCleaner(); + private static final Logger LOG = LoggerFactory.getLogger(MockitoThreadLocalStateCleaner.class); + private static final ThreadLocal MOCKING_PROGRESS_PROVIDER = lookupMockingProgressThreadLocal(); + + private static ThreadLocal lookupMockingProgressThreadLocal() { + try { + Field profilerField = FieldUtils.getDeclaredField( + ClassUtils.getClass("org.mockito.internal.progress.ThreadSafeMockingProgress"), + "MOCKING_PROGRESS_PROVIDER", true); + if (profilerField != null) { + return (ThreadLocal) profilerField.get(null); + } else { + LOG.warn("Cannot find Mockito's ThreadSafeMockingProgress.MOCKING_PROGRESS_PROVIDER field." + + " This might be due to using an unsupported Mockito version."); + return null; + } + } catch (IllegalAccessException | ClassNotFoundException e) { + LOG.warn("Cannot find Mockito's ThreadSafeMockingProgress.MOCKING_PROGRESS_PROVIDER thread local", e); + return null; + } + } + + // force singleton + private MockitoThreadLocalStateCleaner() { + + } + + public void cleanup() { + ThreadLocalStateCleaner.INSTANCE.cleanupThreadLocal(MOCKING_PROGRESS_PROVIDER, (thread, mockingProgress) -> { + try { + LOG.info("Removing {} instance from thread {}", mockingProgress.getClass().getName(), thread); + LOG.info("Calling MockingProgress.validateState() method on instance (toString={})", mockingProgress); + MethodUtils.invokeMethod(mockingProgress, "validateState"); + Object ongoingStubbing = MethodUtils.invokeMethod(mockingProgress, "pullOngoingStubbing"); + if (ongoingStubbing != null) { + Object mock = MethodUtils.invokeMethod(ongoingStubbing, "getMock"); + if (mock != null) { + LOG.warn("Invalid usage of Mockito detected on thread {}." + + " There is ongoing stubbing on mock of class={} instance={}", + thread, mock.getClass().getName(), mock); + } + } + } catch (NoSuchMethodException | IllegalAccessException e) { + LOG.debug("Cannot call validateState on existing Mockito ProgressProvider"); + } catch (InvocationTargetException e) { + LOG.warn("Invalid usage of Mockito detected on thread {}", thread, e.getCause()); + } + }); + } + + public boolean isEnabled() { + return MOCKING_PROGRESS_PROVIDER != null; + } +} diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java b/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java index 4f0987819a176..18ab56c1fdae4 100644 --- a/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java +++ b/buildtools/src/main/java/org/apache/pulsar/tests/PulsarTestListener.java @@ -19,10 +19,10 @@ package org.apache.pulsar.tests; import java.util.Arrays; - import org.testng.ITestContext; import org.testng.ITestListener; import org.testng.ITestResult; +import org.testng.SkipException; import org.testng.internal.thread.ThreadTimeoutException; public class PulsarTestListener implements ITestListener { @@ -41,9 +41,11 @@ public void onTestSuccess(ITestResult result) { @Override public void onTestFailure(ITestResult result) { - System.out.format("!!!!!!!!! FAILURE-- %s.%s(%s)-------\n", result.getTestClass(), - result.getMethod().getMethodName(), Arrays.toString(result.getParameters())); - + FailFastNotifier.FailFastEventsSingleton.getInstance().setSkipOnNextTest(); + if (!(result.getThrowable() instanceof SkipException)) { + System.out.format("!!!!!!!!! FAILURE-- %s.%s(%s)-------\n", result.getTestClass(), + result.getMethod().getMethodName(), Arrays.toString(result.getParameters())); + } if (result.getThrowable() instanceof ThreadTimeoutException) { System.out.println("====== THREAD DUMPS ======"); System.out.println(ThreadDumpUtil.buildThreadDiagnosticString()); diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/RetryAnalyzer.java b/buildtools/src/main/java/org/apache/pulsar/tests/RetryAnalyzer.java index c5a145f000ab3..c790d0922e1ad 100644 --- a/buildtools/src/main/java/org/apache/pulsar/tests/RetryAnalyzer.java +++ b/buildtools/src/main/java/org/apache/pulsar/tests/RetryAnalyzer.java @@ -18,19 +18,20 @@ */ package org.apache.pulsar.tests; -import org.testng.IRetryAnalyzer; import org.testng.ITestResult; +import org.testng.SkipException; +import org.testng.util.RetryAnalyzerCount; -public class RetryAnalyzer implements IRetryAnalyzer { - - private int count = 0; - +public class RetryAnalyzer extends RetryAnalyzerCount { // Only try again once private static final int MAX_RETRIES = Integer.parseInt(System.getProperty("testRetryCount", "1")); - @Override - public boolean retry(ITestResult result) { - return count++ < MAX_RETRIES; + public RetryAnalyzer() { + setCount(MAX_RETRIES); } + @Override + public boolean retryMethod(ITestResult result) { + return !(result.getThrowable() instanceof SkipException); + } } diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java new file mode 100644 index 0000000000000..ddfebc9a44d9e --- /dev/null +++ b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Detects new threads that have been created during the test execution. + */ +public class ThreadLeakDetectorListener extends BetweenTestClassesListenerAdapter { + private static final Logger LOG = LoggerFactory.getLogger(ThreadLeakDetectorListener.class); + private static final boolean + THREAD_LEAK_DETECTOR_ENABLED = Boolean.valueOf(System.getProperty("testThreadLeakDetector", + "true")); + private Set capturedThreadKeys; + + @Override + protected void onBetweenTestClasses(Class endedTestClass, Class startedTestClass) { + LOG.info("Capturing identifiers of running threads."); + capturedThreadKeys = compareThreads(capturedThreadKeys, endedTestClass); + } + + private static Set compareThreads(Set previousThreadKeys, Class endedTestClass) { + Set threadKeys = Collections.unmodifiableSet(ThreadUtils.getAllThreads().stream() + .map(ThreadKey::of) + .collect(Collectors.>toCollection(LinkedHashSet::new))); + + if (endedTestClass != null && previousThreadKeys != null) { + int newThreadsCounter = 0; + LOG.info("Checking for new threads created by {}.", endedTestClass.getName()); + for (ThreadKey threadKey : threadKeys) { + if (!previousThreadKeys.contains(threadKey)) { + newThreadsCounter++; + LOG.warn("Tests in class {} created thread id {} with name '{}'", endedTestClass.getSimpleName(), + threadKey.getThreadId(), threadKey.getThreadName()); + } + } + if (newThreadsCounter > 0) { + LOG.warn("Summary: Tests in class {} created {} new threads", endedTestClass.getName(), + newThreadsCounter); + } + } + + return threadKeys; + } + + /** + * Unique key for a thread + * Based on thread id and it's identity hash code + * + * Both thread id and identity hash code have chances of getting reused, + * so this solution helps mitigate that issue. + */ + private static class ThreadKey { + private final long threadId; + private final int threadIdentityHashCode; + private final String threadName; + + private ThreadKey(long threadId, int threadIdentityHashCode, String threadName) { + this.threadId = threadId; + this.threadIdentityHashCode = threadIdentityHashCode; + this.threadName = threadName; + } + + static ThreadKey of(Thread thread) { + return new ThreadKey(thread.getId(), System.identityHashCode(thread), thread.toString()); + } + + public long getThreadId() { + return threadId; + } + + public String getThreadName() { + return threadName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ThreadKey threadKey = (ThreadKey) o; + return threadId == threadKey.threadId && threadIdentityHashCode == threadKey.threadIdentityHashCode; + } + + @Override + public int hashCode() { + return Objects.hash(threadId, threadIdentityHashCode); + } + } +} diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLocalStateCleaner.java b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLocalStateCleaner.java new file mode 100644 index 0000000000000..3183960df8ec4 --- /dev/null +++ b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLocalStateCleaner.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Objects; +import java.util.function.BiConsumer; +import org.apache.commons.lang3.ThreadUtils; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.commons.lang3.reflect.MethodUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cleans up thread local state for all threads for a given thread local instance. + */ +public final class ThreadLocalStateCleaner { + private static final Logger LOG = LoggerFactory.getLogger(ThreadLocalStateCleaner.class); + public static final ThreadLocalStateCleaner INSTANCE = new ThreadLocalStateCleaner(); + private static final Method GET_THREADLOCAL_MAP_METHOD = MethodUtils + .getMatchingMethod(ThreadLocal.class, "getMap", Thread.class); + + static { + GET_THREADLOCAL_MAP_METHOD.setAccessible(true); + } + + private volatile Method removeThreadlocalMethod; + private volatile Method getThreadlocalEntryMethod; + private volatile Field threadLocalEntryValueField; + + // enforce singleton + private ThreadLocalStateCleaner() { + + } + + // use reflection to clear the state of the given thread local and thread + public void cleanupThreadLocal(ThreadLocal threadLocal, Thread thread, + BiConsumer cleanedValueListener) { + Objects.nonNull(threadLocal); + Objects.nonNull(thread); + try { + Object threadLocalMap = GET_THREADLOCAL_MAP_METHOD.invoke(threadLocal, thread); + if (threadLocalMap != null) { + if (cleanedValueListener != null) { + callCleanedValueListener(threadLocal, thread, cleanedValueListener, threadLocalMap); + } + if (removeThreadlocalMethod == null) { + removeThreadlocalMethod = MethodUtils.getMatchingMethod( + threadLocalMap.getClass(), "remove", ThreadLocal.class); + removeThreadlocalMethod.setAccessible(true); + } + removeThreadlocalMethod.invoke(threadLocalMap, threadLocal); + } + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.warn("Cannot cleanup thread local", e); + } + } + + private void callCleanedValueListener(ThreadLocal threadLocal, Thread thread, + BiConsumer cleanedValueListener, Object threadLocalMap) + throws IllegalAccessException, InvocationTargetException { + T currentValue = getCurrentValue(threadLocal, threadLocalMap); + if (currentValue != null) { + cleanedValueListener.accept(thread, currentValue); + } + } + + public T getThreadLocalValue(ThreadLocal threadLocal, Thread thread) + throws InvocationTargetException, IllegalAccessException { + Objects.nonNull(threadLocal); + Objects.nonNull(thread); + Object threadLocalMap = GET_THREADLOCAL_MAP_METHOD.invoke(threadLocal, thread); + if (threadLocalMap != null) { + return getCurrentValue(threadLocal, threadLocalMap); + } else { + return null; + } + } + + private T getCurrentValue(ThreadLocal threadLocal, Object threadLocalMap) throws IllegalAccessException, + InvocationTargetException { + if (getThreadlocalEntryMethod == null) { + getThreadlocalEntryMethod = MethodUtils.getMatchingMethod( + threadLocalMap.getClass(), "getEntry", ThreadLocal.class); + getThreadlocalEntryMethod.setAccessible(true); + } + Object entry = getThreadlocalEntryMethod.invoke(threadLocalMap, threadLocal); + if (entry != null) { + if (threadLocalEntryValueField == null) { + threadLocalEntryValueField = FieldUtils.getField(entry.getClass(), "value", + true); + } + return (T) threadLocalEntryValueField.get(entry); + } + return null; + } + + // cleanup thread local state on all active threads + public void cleanupThreadLocal(ThreadLocal threadLocal, BiConsumer cleanedValueListener) { + Objects.nonNull(threadLocal); + for (Thread thread : ThreadUtils.getAllThreads()) { + cleanupThreadLocal(threadLocal, thread, cleanedValueListener); + } + } +} diff --git a/buildtools/src/test/java/org/apache/pulsar/tests/FastThreadLocalStateCleanerTest.java b/buildtools/src/test/java/org/apache/pulsar/tests/FastThreadLocalStateCleanerTest.java new file mode 100644 index 0000000000000..2fd8c4bbbe2c7 --- /dev/null +++ b/buildtools/src/test/java/org/apache/pulsar/tests/FastThreadLocalStateCleanerTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocalThread; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import org.testng.annotations.Test; + +public class FastThreadLocalStateCleanerTest { + private static final class MagicNumberWrapper { + private final int value; + + private MagicNumberWrapper(int value) { + this.value = value; + } + + public int intValue() { + return value; + } + } + + final FastThreadLocal magicNumberThreadLocal = new FastThreadLocal() { + @Override + protected MagicNumberWrapper initialValue() throws Exception { + return new MagicNumberWrapper(42); + } + }; + final FastThreadLocalStateCleaner cleaner = new FastThreadLocalStateCleaner(object -> + object.getClass() == MagicNumberWrapper.class); + + @Test + public void testThreadLocalStateCleanupInCurrentThread() { + magicNumberThreadLocal.set(new MagicNumberWrapper(44)); + assertEquals(magicNumberThreadLocal.get().intValue(), 44); + cleaner.cleanupAllFastThreadLocals(Thread.currentThread(), ((thread, o) -> { + System.out.println("Cleaning up " + thread + " value " + o); + })); + assertEquals(magicNumberThreadLocal.get().intValue(), 42); + } + + @Test + public void testThreadLocalStateCleanupInCurrentAndOtherThread() throws InterruptedException, ExecutionException { + magicNumberThreadLocal.set(new MagicNumberWrapper(44)); + assertEquals(magicNumberThreadLocal.get().intValue(), 44); + + CountDownLatch numberHasBeenSet = new CountDownLatch(1); + CountDownLatch shutdownLatch = new CountDownLatch(1); + CompletableFuture valueAfterReset = new CompletableFuture<>(); + Thread thread = new Thread(() -> { + try { + magicNumberThreadLocal.set(new MagicNumberWrapper(45)); + assertEquals(magicNumberThreadLocal.get().intValue(), 45); + numberHasBeenSet.countDown(); + shutdownLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } finally { + valueAfterReset.complete(magicNumberThreadLocal.get()); + } + }); + thread.start(); + numberHasBeenSet.await(); + Set cleanedThreads = new HashSet<>(); + cleaner.cleanupAllFastThreadLocals((t, currentValue) -> { + cleanedThreads.add(t); + }); + shutdownLatch.countDown(); + assertEquals(magicNumberThreadLocal.get().intValue(), 42); + assertEquals(valueAfterReset.get().intValue(), 42); + assertEquals(cleanedThreads.size(), 2); + assertTrue(cleanedThreads.contains(thread)); + assertTrue(cleanedThreads.contains(Thread.currentThread())); + } + + @Test + public void testThreadLocalStateCleanupInFastThreadLocalThread() throws InterruptedException, ExecutionException { + CountDownLatch numberHasBeenSet = new CountDownLatch(1); + CountDownLatch shutdownLatch = new CountDownLatch(1); + CompletableFuture valueAfterReset = new CompletableFuture<>(); + Thread thread = new FastThreadLocalThread(() -> { + try { + magicNumberThreadLocal.set(new MagicNumberWrapper(45)); + assertEquals(magicNumberThreadLocal.get().intValue(), 45); + numberHasBeenSet.countDown(); + shutdownLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } finally { + valueAfterReset.complete(magicNumberThreadLocal.get()); + } + }); + thread.start(); + numberHasBeenSet.await(); + Set cleanedThreads = new HashSet<>(); + cleaner.cleanupAllFastThreadLocals((t, currentValue) -> { + cleanedThreads.add(t); + }); + shutdownLatch.countDown(); + assertEquals(valueAfterReset.get().intValue(), 42); + assertTrue(cleanedThreads.contains(thread)); + } + +} \ No newline at end of file diff --git a/buildtools/src/test/java/org/apache/pulsar/tests/RetryAnalyzerTest.java b/buildtools/src/test/java/org/apache/pulsar/tests/RetryAnalyzerTest.java new file mode 100644 index 0000000000000..90f19e8053513 --- /dev/null +++ b/buildtools/src/test/java/org/apache/pulsar/tests/RetryAnalyzerTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import org.testng.annotations.Test; + +public class RetryAnalyzerTest { + private static final int RETRY_COUNT = 3; + + public static class TestRetryAnalyzer extends RetryAnalyzer { + public TestRetryAnalyzer() { + setCount(RETRY_COUNT); + } + } + int invocationCountA; + int invocationCountB; + int invocationCountC; + + @Test(retryAnalyzer = TestRetryAnalyzer.class) + void testMethodA() { + invocationCountA++; + if (invocationCountA < RETRY_COUNT) { + throw new IllegalStateException("Sample failure to trigger retry."); + } + } + + @Test(retryAnalyzer = TestRetryAnalyzer.class) + void testMethodB() { + invocationCountB++; + if (invocationCountB < RETRY_COUNT) { + throw new IllegalStateException("Sample failure to trigger retry."); + } + } + + @Test(retryAnalyzer = TestRetryAnalyzer.class) + void testMethodC() { + invocationCountC++; + if (invocationCountC < RETRY_COUNT) { + throw new IllegalStateException("Sample failure to trigger retry."); + } + } +} \ No newline at end of file diff --git a/buildtools/src/test/java/org/apache/pulsar/tests/ThreadLocalStateCleanerTest.java b/buildtools/src/test/java/org/apache/pulsar/tests/ThreadLocalStateCleanerTest.java new file mode 100644 index 0000000000000..612dd7811638c --- /dev/null +++ b/buildtools/src/test/java/org/apache/pulsar/tests/ThreadLocalStateCleanerTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import org.testng.annotations.Test; + +public class ThreadLocalStateCleanerTest { + final ThreadLocal magicNumberThreadLocal = ThreadLocal.withInitial(() -> 42); + + @Test + public void testThreadLocalStateCleanupInCurrentThread() { + magicNumberThreadLocal.set(44); + assertEquals(magicNumberThreadLocal.get().intValue(), 44); + ThreadLocalStateCleaner.INSTANCE.cleanupThreadLocal(magicNumberThreadLocal, Thread.currentThread(), null); + assertEquals(magicNumberThreadLocal.get().intValue(), 42); + } + + private static class ThreadValueEntry { + private final Thread thread; + private final Object value; + + private ThreadValueEntry(Thread thread, Object value) { + this.thread = thread; + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ThreadValueEntry that = (ThreadValueEntry) o; + return Objects.equals(thread, that.thread) && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(thread, value); + } + } + + + @Test + public void testThreadLocalStateCleanupInCurrentAndOtherThread() throws InterruptedException, ExecutionException { + magicNumberThreadLocal.set(44); + assertEquals(magicNumberThreadLocal.get().intValue(), 44); + + CountDownLatch numberHasBeenSet = new CountDownLatch(1); + CountDownLatch shutdownLatch = new CountDownLatch(1); + CompletableFuture valueAfterReset = new CompletableFuture<>(); + Thread thread = new Thread(() -> { + try { + magicNumberThreadLocal.set(45); + assertEquals(magicNumberThreadLocal.get().intValue(), 45); + numberHasBeenSet.countDown(); + shutdownLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } finally { + valueAfterReset.complete(magicNumberThreadLocal.get()); + } + }); + thread.start(); + numberHasBeenSet.await(); + Set replacedValues = new HashSet<>(); + ThreadLocalStateCleaner.INSTANCE.cleanupThreadLocal(magicNumberThreadLocal, (t, currentValue) -> { + replacedValues.add(new ThreadValueEntry(t, currentValue)); + }); + shutdownLatch.countDown(); + assertEquals(magicNumberThreadLocal.get().intValue(), 42); + assertEquals(valueAfterReset.get().intValue(), 42); + assertEquals(replacedValues.size(), 2); + assertTrue(replacedValues.contains(new ThreadValueEntry(thread, 45))); + assertTrue(replacedValues.contains(new ThreadValueEntry(Thread.currentThread(), 44))); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 0028702d2b802..c53400d8a1098 100644 --- a/pom.xml +++ b/pom.xml @@ -1189,7 +1189,7 @@ flexible messaging model and an intuitive client API. listener - org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener + org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener,org.apache.pulsar.tests.FailFastNotifier,org.apache.pulsar.tests.MockitoCleanupListener,org.apache.pulsar.tests.FastThreadLocalCleanupListener,org.apache.pulsar.tests.ThreadLeakDetectorListener diff --git a/pulsar-io/flume/pom.xml b/pulsar-io/flume/pom.xml index 1e6830ab54579..24bf3ca3993d0 100644 --- a/pulsar-io/flume/pom.xml +++ b/pulsar-io/flume/pom.xml @@ -73,18 +73,6 @@ avro-ipc 1.8.1 - - junit - junit - test - 4.10 - - - org.mockito - mockito-all - test - 1.9.0 - org.apache.curator curator-framework diff --git a/pulsar-io/netty/pom.xml b/pulsar-io/netty/pom.xml index bc5ef9a37761f..81af29807f1fd 100644 --- a/pulsar-io/netty/pom.xml +++ b/pulsar-io/netty/pom.xml @@ -72,7 +72,6 @@ org.testng testng - 6.13.1 test