Skip to content

Commit

Permalink
KAFKA-9632; Fix MockScheduler synchronization for safe use in Log/Par…
Browse files Browse the repository at this point in the history
…tition tests (apache#8209)

Reviewers: Manikumar Reddy <[email protected]>
  • Loading branch information
rajinisivaram authored Mar 4, 2020
1 parent c2ec974 commit f35e649
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 24 deletions.
53 changes: 33 additions & 20 deletions core/src/test/scala/unit/kafka/utils/MockScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.kafka.common.utils.Time
* Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time).
*/
class MockScheduler(val time: Time) extends Scheduler {

/* a priority queue of tasks ordered by next execution time */
private val tasks = new PriorityQueue[MockTask]()

Expand All @@ -44,10 +44,11 @@ class MockScheduler(val time: Time) extends Scheduler {
def startup(): Unit = {}

def shutdown(): Unit = {
this synchronized {
tasks.foreach(_.fun())
tasks.clear()
}
var currTask: Option[MockTask] = None
do {
currTask = poll(_ => true)
currTask.foreach(_.fun())
} while (currTask.nonEmpty)
}

/**
Expand All @@ -56,28 +57,26 @@ class MockScheduler(val time: Time) extends Scheduler {
* If you are using the scheduler associated with a MockTime instance this call be triggered automatically.
*/
def tick(): Unit = {
this synchronized {
val now = time.milliseconds
while(tasks.nonEmpty && tasks.head.nextExecution <= now) {
/* pop and execute the task with the lowest next execution time */
val curr = tasks.dequeue
val now = time.milliseconds
var currTask: Option[MockTask] = None
/* pop and execute the task with the lowest next execution time if ready */
do {
currTask = poll(_.nextExecution <= now)
currTask.foreach { curr =>
curr.fun()
/* if the task is periodic, reschedule it and re-enqueue */
if(curr.periodic) {
curr.nextExecution += curr.period
this.tasks += curr
add(curr)
}
}
}
} while (currTask.nonEmpty)
}

def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[Unit] = {
var task : MockTask = null
this synchronized {
task = MockTask(name, fun, time.milliseconds + delay, period = period, time=time)
tasks += task
tick()
}
val task = MockTask(name, fun, time.milliseconds + delay, period = period, time=time)
add(task)
tick()
task
}

Expand All @@ -86,7 +85,21 @@ class MockScheduler(val time: Time) extends Scheduler {
tasks.clear()
}
}


private def poll(predicate: MockTask => Boolean): Option[MockTask] = {
this synchronized {
if (tasks.nonEmpty && predicate.apply(tasks.head))
Some(tasks.dequeue)
else
None
}
}

private def add(task: MockTask): Unit = {
this synchronized {
tasks += task
}
}
}

case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, period: Long, time: Time) extends ScheduledFuture[Unit] {
Expand Down
42 changes: 38 additions & 4 deletions core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package kafka.utils

import org.junit.Assert._
import java.util.Properties
import java.util.concurrent.atomic._
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}

import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import org.junit.{After, Before, Test}
import kafka.utils.TestUtils.retry
import java.util.Properties
import org.junit.Assert._
import org.junit.{After, Before, Test}

class SchedulerTest {

Expand Down Expand Up @@ -129,4 +131,36 @@ class SchedulerTest {
assertTrue(!(scheduler.taskRunning(log.producerExpireCheck)))
}

}
/**
* Verify that scheduler lock is not held when invoking task method, allowing new tasks to be scheduled
* when another is being executed. This is required to avoid deadlocks when:
* a) Thread1 executes a task which attempts to acquire LockA
* b) Thread2 holding LockA attempts to schedule a new task
*/
@Test(timeout = 15000)
def testMockSchedulerLocking(): Unit = {
val initLatch = new CountDownLatch(1)
val completionLatch = new CountDownLatch(2)
val taskLatches = List(new CountDownLatch(1), new CountDownLatch(1))
def scheduledTask(taskLatch: CountDownLatch): Unit = {
initLatch.countDown()
assertTrue("Timed out waiting for latch", taskLatch.await(30, TimeUnit.SECONDS))
completionLatch.countDown()
}
mockTime.scheduler.schedule("test1", () => scheduledTask(taskLatches.head), delay=1)
val tickExecutor = Executors.newSingleThreadScheduledExecutor()
try {
tickExecutor.scheduleWithFixedDelay(() => mockTime.sleep(1), 0, 1, TimeUnit.MILLISECONDS)

// wait for first task to execute and then schedule the next task while the first one is running
assertTrue(initLatch.await(10, TimeUnit.SECONDS))
mockTime.scheduler.schedule("test2", () => scheduledTask(taskLatches(1)), delay = 1)

taskLatches.foreach(_.countDown())
assertTrue("Tasks did not complete", completionLatch.await(10, TimeUnit.SECONDS))

} finally {
tickExecutor.shutdownNow()
}
}
}

0 comments on commit f35e649

Please sign in to comment.