Skip to content

Commit

Permalink
#368 monitor - tryout task queueing through mongodb collection
Browse files Browse the repository at this point in the history
  • Loading branch information
vmarc committed Dec 24, 2023
1 parent dcd5cb7 commit 1f66b3e
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 0 deletions.
3 changes: 3 additions & 0 deletions server/src/main/scala/kpn/database/base/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import kpn.server.monitor.domain.MonitorRouteChange
import kpn.server.monitor.domain.MonitorRouteChangeGeometry
import kpn.server.monitor.domain.MonitorRouteReference
import kpn.server.monitor.domain.MonitorRouteState
import kpn.server.monitor.domain.MonitorTask
import org.mongodb.scala.MongoCollection

import scala.reflect.ClassTag
Expand Down Expand Up @@ -89,6 +90,8 @@ trait Database {

def monitorRelations: DatabaseCollection[MonitorRelation]

def monitorTasks: DatabaseCollection[MonitorTask]

def statistics: DatabaseCollection[StatisticLongValues]

def status: DatabaseCollection[WithStringId]
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/scala/kpn/database/base/DatabaseImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import kpn.server.monitor.domain.MonitorRouteChange
import kpn.server.monitor.domain.MonitorRouteChangeGeometry
import kpn.server.monitor.domain.MonitorRouteReference
import kpn.server.monitor.domain.MonitorRouteState
import kpn.server.monitor.domain.MonitorTask
import org.mongodb.scala.MongoCollection
import org.mongodb.scala.MongoDatabase

Expand Down Expand Up @@ -147,6 +148,10 @@ class DatabaseImpl(val database: MongoDatabase) extends Database {
new DatabaseCollectionImpl(database.getCollection[MonitorRelation]("monitor-relations"))
}

override def monitorTasks: DatabaseCollection[MonitorTask] = {
new DatabaseCollectionImpl(database.getCollection[MonitorTask]("monitor-tasks"))
}

override def statistics: DatabaseCollection[StatisticLongValues] = {
new DatabaseCollectionImpl(database.getCollection[StatisticLongValues]("statistics"))
}
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/scala/kpn/server/monitor/domain/MonitorTask.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package kpn.server.monitor.domain

import kpn.api.base.ObjectId
import kpn.api.base.WithObjectId

case class MonitorTask(
_id: ObjectId,
priority: Long,
message: String
) extends WithObjectId
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package kpn.server.monitor.tasks

import kpn.api.base.ObjectId
import kpn.core.doc.NetworkInfoDoc
import kpn.core.util.Log
import kpn.database.base.Database
import kpn.database.util.Mongo
import kpn.server.monitor.domain.MonitorTask
import org.mongodb.scala.model.Aggregates.filter
import org.mongodb.scala.model.Aggregates.limit
import org.mongodb.scala.model.Aggregates.project
import org.mongodb.scala.model.Aggregates.sort
import org.mongodb.scala.model.Sorts.ascending
import org.mongodb.scala.model.Sorts.orderBy

import java.lang.Thread.sleep

object MonitorTaskLoopTool {
def main(args: Array[String]): Unit = {
Mongo.executeIn("kpn-monitor-2") { database =>
new MonitorTaskLoopTool(database).taskProcessingLoop()
}
}
}

class MonitorTaskLoopTool(database: Database) {

private val log = Log(classOf[MonitorTaskLoopTool])
private var savedObserver: Option[MonitorTaskObserver] = None
private var abort = false

def taskProcessingLoop(): Unit = {
simulateExternalMonitorShutdown()
log.info("start task processing loop")
do {
processAllTasks()
waitForNewTask()
} while (abort == false)

log.info(s"end of task processing loop")
System.exit(0)
}

private def processAllTasks(): Unit = {
log.info(s"processing pending tasks")
while (processTask()) {}
}

private def processTask(): Boolean = {
val pipeline = Seq(
sort(orderBy(ascending("priority"), ascending("_id"))),
limit(1)
)
val task = database.monitorTasks.optionAggregate[MonitorTask](pipeline, log)
task match {
case None => false // no more tasks to process
case Some(task) =>
log.info(s" process task: ${task.message}")
database.monitorTasks.deleteByObjectId(task._id)
true // task done, can continue with next task
}
}

private def waitForNewTask(): Unit = {
val observer = new MonitorTaskObserver()
savedObserver = Some(observer)
try {
database.monitorTasks.native.watch().first().subscribe(observer)
observer.await()
}
finally {
savedObserver = None
}
}

private def simulateExternalMonitorShutdown(): Unit = {
new Thread() {
override def run(): Unit = {
sleep(60 * 1000)
log.info("abort request after 1 minute")
abort = true
savedObserver match {
case None => log.info("abort: no saved observer!")
case Some(observer) => observer.cancel()
}
}
}.start()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package kpn.server.monitor.tasks

import kpn.core.util.Log
import kpn.server.monitor.domain.MonitorTask
import org.mongodb.scala.model.changestream.ChangeStreamDocument
import org.mongodb.scala.Observer
import org.mongodb.scala.Subscription

import java.util.concurrent.CountDownLatch

case class MonitorTaskObserver() extends Observer[ChangeStreamDocument[MonitorTask]] {

private val latch = new CountDownLatch(1)
private var savedSubscription: Option[Subscription] = None
private val log = Log(classOf[MonitorTaskObserver])

override def onSubscribe(subscription: Subscription): Unit = {
subscription.request(1)
savedSubscription = Some(subscription)
}

override def onNext(changeDocument: ChangeStreamDocument[MonitorTask]): Unit = {
}

override def onError(throwable: Throwable): Unit = {
log.error(s"Something went wrong while waiting for new task: '$throwable")
cancelSubscription()
}

override def onComplete(): Unit = {
cancelSubscription()
}

def cancel(): Unit = {
cancelSubscription()
}

def await(): Unit = {
latch.await()
}

def isSubscribed: Boolean = {
savedSubscription match {
case None => false
case Some(subscription) => !subscription.isUnsubscribed
}
}

private def cancelSubscription(): Unit = {
savedSubscription match {
case Some(subscription) => subscription.cancel()
case None =>
}
latch.countDown()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package kpn.server.monitor.tasks

import kpn.api.base.ObjectId
import kpn.database.base.Database
import kpn.database.util.Mongo
import kpn.server.monitor.domain.MonitorTask

object MonitorTaskWriterTool {
def main(args: Array[String]): Unit = {
Mongo.executeIn("kpn-monitor-2") { database =>
new MonitorTaskWriterTool(database).writeTasks()
}
}
}

class MonitorTaskWriterTool(database: Database) {
def writeTasks(): Unit = {
database.monitorTasks.save(MonitorTask(ObjectId(), 1, "one"))
database.monitorTasks.save(MonitorTask(ObjectId(), 2, "three"))
database.monitorTasks.save(MonitorTask(ObjectId(), 1, "two"))
}
}

0 comments on commit 1f66b3e

Please sign in to comment.