From f35e6496aa003dc0667dc338001a5b51be78fe76 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 4 Mar 2020 09:36:06 +0000 Subject: [PATCH] KAFKA-9632; Fix MockScheduler synchronization for safe use in Log/Partition tests (#8209) Reviewers: Manikumar Reddy --- .../unit/kafka/utils/MockScheduler.scala | 53 ++++++++++++------- .../unit/kafka/utils/SchedulerTest.scala | 42 +++++++++++++-- 2 files changed, 71 insertions(+), 24 deletions(-) diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala index 75a089aec2231..cfbbe0f01a4c4 100644 --- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala +++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.utils.Time * Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time). */ class MockScheduler(val time: Time) extends Scheduler { - + /* a priority queue of tasks ordered by next execution time */ private val tasks = new PriorityQueue[MockTask]() @@ -44,10 +44,11 @@ class MockScheduler(val time: Time) extends Scheduler { def startup(): Unit = {} def shutdown(): Unit = { - this synchronized { - tasks.foreach(_.fun()) - tasks.clear() - } + var currTask: Option[MockTask] = None + do { + currTask = poll(_ => true) + currTask.foreach(_.fun()) + } while (currTask.nonEmpty) } /** @@ -56,28 +57,26 @@ class MockScheduler(val time: Time) extends Scheduler { * If you are using the scheduler associated with a MockTime instance this call be triggered automatically. */ def tick(): Unit = { - this synchronized { - val now = time.milliseconds - while(tasks.nonEmpty && tasks.head.nextExecution <= now) { - /* pop and execute the task with the lowest next execution time */ - val curr = tasks.dequeue + val now = time.milliseconds + var currTask: Option[MockTask] = None + /* pop and execute the task with the lowest next execution time if ready */ + do { + currTask = poll(_.nextExecution <= now) + currTask.foreach { curr => curr.fun() /* if the task is periodic, reschedule it and re-enqueue */ if(curr.periodic) { curr.nextExecution += curr.period - this.tasks += curr + add(curr) } } - } + } while (currTask.nonEmpty) } - + def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[Unit] = { - var task : MockTask = null - this synchronized { - task = MockTask(name, fun, time.milliseconds + delay, period = period, time=time) - tasks += task - tick() - } + val task = MockTask(name, fun, time.milliseconds + delay, period = period, time=time) + add(task) + tick() task } @@ -86,7 +85,21 @@ class MockScheduler(val time: Time) extends Scheduler { tasks.clear() } } - + + private def poll(predicate: MockTask => Boolean): Option[MockTask] = { + this synchronized { + if (tasks.nonEmpty && predicate.apply(tasks.head)) + Some(tasks.dequeue) + else + None + } + } + + private def add(task: MockTask): Unit = { + this synchronized { + tasks += task + } + } } case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, period: Long, time: Time) extends ScheduledFuture[Unit] { diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index 201216c6d7291..42241dee71cfd 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -16,13 +16,15 @@ */ package kafka.utils -import org.junit.Assert._ +import java.util.Properties import java.util.concurrent.atomic._ +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} + import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager} import kafka.server.{BrokerTopicStats, LogDirFailureChannel} -import org.junit.{After, Before, Test} import kafka.utils.TestUtils.retry -import java.util.Properties +import org.junit.Assert._ +import org.junit.{After, Before, Test} class SchedulerTest { @@ -129,4 +131,36 @@ class SchedulerTest { assertTrue(!(scheduler.taskRunning(log.producerExpireCheck))) } -} \ No newline at end of file + /** + * Verify that scheduler lock is not held when invoking task method, allowing new tasks to be scheduled + * when another is being executed. This is required to avoid deadlocks when: + * a) Thread1 executes a task which attempts to acquire LockA + * b) Thread2 holding LockA attempts to schedule a new task + */ + @Test(timeout = 15000) + def testMockSchedulerLocking(): Unit = { + val initLatch = new CountDownLatch(1) + val completionLatch = new CountDownLatch(2) + val taskLatches = List(new CountDownLatch(1), new CountDownLatch(1)) + def scheduledTask(taskLatch: CountDownLatch): Unit = { + initLatch.countDown() + assertTrue("Timed out waiting for latch", taskLatch.await(30, TimeUnit.SECONDS)) + completionLatch.countDown() + } + mockTime.scheduler.schedule("test1", () => scheduledTask(taskLatches.head), delay=1) + val tickExecutor = Executors.newSingleThreadScheduledExecutor() + try { + tickExecutor.scheduleWithFixedDelay(() => mockTime.sleep(1), 0, 1, TimeUnit.MILLISECONDS) + + // wait for first task to execute and then schedule the next task while the first one is running + assertTrue(initLatch.await(10, TimeUnit.SECONDS)) + mockTime.scheduler.schedule("test2", () => scheduledTask(taskLatches(1)), delay = 1) + + taskLatches.foreach(_.countDown()) + assertTrue("Tasks did not complete", completionLatch.await(10, TimeUnit.SECONDS)) + + } finally { + tickExecutor.shutdownNow() + } + } +}