From eb961af5c7325e1fbecd8b10aa01671253b423ca Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Tue, 12 Mar 2024 15:11:44 +0100 Subject: [PATCH 1/4] fixes loki --- app/models/voxelytics/LokiClient.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/app/models/voxelytics/LokiClient.scala b/app/models/voxelytics/LokiClient.scala index 1ea3a7f3d9..3fdb925ba2 100644 --- a/app/models/voxelytics/LokiClient.scala +++ b/app/models/voxelytics/LokiClient.scala @@ -195,7 +195,7 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im entry => ((entry \ "vx" \ "workflow_hash").as[String], (entry \ "vx" \ "run_name").as[String], - (entry \ "pid").as[Long]) + (entry \ "pid").as[Long].toString) ) .toList).toFox streams <- Fox.serialCombined(logEntryGroups)( @@ -212,7 +212,7 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im Json.stringify( Json.obj( "level" -> (entry \ "level").as[String], - "pid" -> (entry \ "pid").as[Long], + "pid" -> (entry \ "pid").as[Long].toString, "logger_name" -> (entry \ "vx" \ "logger_name").as[String], "vx_workflow_hash" -> (entry \ "vx" \ "workflow_hash").as[String], "vx_run_name" -> (entry \ "vx" \ "run_name").as[String], @@ -221,13 +221,13 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im "host" -> (entry \ "host").as[String], "program" -> (entry \ "program").as[String], "func_name" -> (entry \ "vx" \ "func_name").as[String], - "line" -> (entry \ "vx" \ "line").as[Long], + "line" -> (entry \ "vx" \ "line").as[Long].toString, "path" -> (entry \ "vx" \ "path").as[String], "process_name" -> (entry \ "vx" \ "process_name").as[String], "thread_name" -> (entry \ "vx" \ "thread_name").as[String], "vx_version" -> (entry \ "vx" \ "version").as[String], "user" -> (entry \ "vx" \ "user").as[String], - "pgid" -> (entry \ "vx" \ "process_group_id").as[Long] + "pgid" -> (entry \ "vx" \ "process_group_id").as[Long].toString )) ).toFox } yield From 940031b29721589baf4da170235f45a9750af828 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Wed, 13 Mar 2024 11:31:00 +0100 Subject: [PATCH 2/4] fixes loki --- CHANGELOG.unreleased.md | 1 + app/models/voxelytics/LokiClient.scala | 176 ++++++++++++------------- 2 files changed, 88 insertions(+), 89 deletions(-) diff --git a/CHANGELOG.unreleased.md b/CHANGELOG.unreleased.md index d93a84d6de..7ab0d11fed 100644 --- a/CHANGELOG.unreleased.md +++ b/CHANGELOG.unreleased.md @@ -48,6 +48,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released - Fixed small styling error with NML drag and drop uploading. [#7641](https://github.com/scalableminds/webknossos/pull/7641) - Fixed a bug where the annotation list would show teams the annotation is shared with multiple times. [#7659](https://github.com/scalableminds/webknossos/pull/7659) - Fixed incorrect menu position that could occur sometimes when clicking the ... button next to a segment. [#7680](https://github.com/scalableminds/webknossos/pull/7680) +- Fixed an error in the Loki integration to support Loki 2.9+. []() ### Removed diff --git a/app/models/voxelytics/LokiClient.scala b/app/models/voxelytics/LokiClient.scala index 3fdb925ba2..1d6e218487 100644 --- a/app/models/voxelytics/LokiClient.scala +++ b/app/models/voxelytics/LokiClient.scala @@ -1,7 +1,5 @@ package models.voxelytics -import org.apache.pekko.actor.ActorSystem -import org.apache.pekko.pattern.after import com.scalableminds.util.mvc.MimeTypes import com.scalableminds.util.time.Instant import com.scalableminds.util.tools.Fox @@ -11,6 +9,8 @@ import com.typesafe.scalalogging.LazyLogging import models.voxelytics.VoxelyticsLogLevel.VoxelyticsLogLevel import net.liftweb.common.Box.tryo import net.liftweb.common.Full +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.pattern.after import play.api.http.{HeaderNames, Status} import play.api.libs.json.{JsArray, JsObject, JsValue, Json} import utils.{ObjectId, WkConf} @@ -26,12 +26,6 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im private lazy val conf = wkConf.Voxelytics.Loki private lazy val enabled = wkConf.Features.voxelyticsEnabled && conf.uri.nonEmpty - - private val POLLING_INTERVAL = 1 second - private val LOG_TIME_BATCH_INTERVAL = 1 days - private val LOG_ENTRY_QUERY_BATCH_SIZE = 5000 - private val LOG_ENTRY_INSERT_BATCH_SIZE = 1000 - private lazy val serverStartupFuture: Fox[Unit] = { for { _ <- bool2Fox(enabled) ?~> "Loki is not enabled." @@ -39,45 +33,10 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im _ <- pollUntilServerStartedUp(Instant.in(conf.startupTimeout)) ~> 500 } yield () } - - private def pollUntilServerStartedUp(until: Instant): Fox[Unit] = { - def waitAndRecurse(until: Instant): Fox[Unit] = - for { - _ <- after(POLLING_INTERVAL, using = system.scheduler)(Future.successful(())) - _ <- bool2Fox(!until.isPast) ?~> s"Loki did not become ready within ${conf.startupTimeout}." - _ <- pollUntilServerStartedUp(until) - } yield () - - for { - isServerAvailableBox <- rpc(s"${conf.uri}/ready").request - .withMethod("GET") - .execute() - .flatMap(result => - if (Status.isSuccessful(result.status)) { - Fox.successful(true) - } else if (result.status >= 500 && result.status < 600) { - logger.debug(s"Loki status: ${result.status}") - Fox.successful(false) - } else { - Fox.failure(s"Unexpected error code from Loki ${result.status}.") - }) - .recoverWith({ - case e: java.net.ConnectException => - logger.debug(s"Loki connection exception: $e") - Fox.successful(false) - case e => - logger.error(s"Unexpected error $e") - Fox.failure("Unexpected error while trying to connect to Loki.", Full(e)) - }) - isServerAvailable <- isServerAvailableBox.toFox - _ <- if (!isServerAvailable) { - waitAndRecurse(until) - } else { - logger.info("Loki is available.") - Fox.successful(()) - } - } yield () - } + private val POLLING_INTERVAL = 1 second + private val LOG_TIME_BATCH_INTERVAL = 1 days + private val LOG_ENTRY_QUERY_BATCH_SIZE = 5000 + private val LOG_ENTRY_INSERT_BATCH_SIZE = 1000 def queryLogsBatched(runName: String, organizationId: ObjectId, @@ -137,48 +96,6 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im } } - private def queryLogs(runName: String, - organizationId: ObjectId, - taskName: Option[String], - minLevel: VoxelyticsLogLevel, - startTime: Instant, - endTime: Instant, - limit: Int): Fox[List[JsValue]] = - if (limit > 0) { - val levels = VoxelyticsLogLevel.sortedValues.drop(VoxelyticsLogLevel.sortedValues.indexOf(minLevel)) - - val logQLFilter = List( - taskName.map(t => s"""vx_task_name="$t""""), - Some(s"""level=~"(${levels.mkString("|")})"""") - ).flatten.mkString(" | ") - val logQL = - s"""{vx_run_name="$runName",wk_org="${organizationId.id}",wk_url="${wkConf.Http.uri}"} | json vx_task_name,level | $logQLFilter""" - - val queryString = - List("query" -> logQL, - "start" -> startTime.toString, - "end" -> endTime.toString, - "limit" -> limit.toString, - "direction" -> "backward") - .map(keyValueTuple => s"${keyValueTuple._1}=${java.net.URLEncoder.encode(keyValueTuple._2, "UTF-8")}") - .mkString("&") - for { - _ <- serverStartupFuture - res <- rpc(s"${conf.uri}/loki/api/v1/query_range?$queryString").silent.getWithJsonResponse[JsValue] - logEntries <- tryo( - (res \ "data" \ "result") - .as[List[JsValue]] - .flatMap( - stream => - (stream \ "values") - .as[List[(String, String)]] - .map(value => - Json.parse(value._2).as[JsObject] ++ (stream \ "stream").as[JsObject] ++ Json.obj( - "timestamp" -> Instant.fromNanosecondsString(value._1)))) - .sortBy(entry => (entry \ "timestamp").as[Long])).toFox - } yield logEntries - } else Fox.successful(List()) - def bulkInsertBatched(logEntries: List[JsValue], organizationId: ObjectId)(implicit ec: ExecutionContext): Fox[Unit] = for { _ <- Fox.serialCombined(logEntries.grouped(LOG_ENTRY_INSERT_BATCH_SIZE).toList)(bulkInsert(_, organizationId)) @@ -254,4 +171,85 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im } else { Fox.successful(()) } + + private def pollUntilServerStartedUp(until: Instant): Fox[Unit] = { + def waitAndRecurse(until: Instant): Fox[Unit] = + for { + _ <- after(POLLING_INTERVAL, using = system.scheduler)(Future.successful(())) + _ <- bool2Fox(!until.isPast) ?~> s"Loki did not become ready within ${conf.startupTimeout}." + _ <- pollUntilServerStartedUp(until) + } yield () + + for { + isServerAvailableBox <- rpc(s"${conf.uri}/ready").request + .withMethod("GET") + .execute() + .flatMap(result => + if (Status.isSuccessful(result.status)) { + Fox.successful(true) + } else if (result.status >= 500 && result.status < 600) { + logger.debug(s"Loki status: ${result.status}") + Fox.successful(false) + } else { + Fox.failure(s"Unexpected error code from Loki ${result.status}.") + }) + .recoverWith({ + case e: java.net.ConnectException => + logger.debug(s"Loki connection exception: $e") + Fox.successful(false) + case e => + logger.error(s"Unexpected error $e") + Fox.failure("Unexpected error while trying to connect to Loki.", Full(e)) + }) + isServerAvailable <- isServerAvailableBox.toFox + _ <- if (!isServerAvailable) { + waitAndRecurse(until) + } else { + logger.info("Loki is available.") + Fox.successful(()) + } + } yield () + } + + private def queryLogs(runName: String, + organizationId: ObjectId, + taskName: Option[String], + minLevel: VoxelyticsLogLevel, + startTime: Instant, + endTime: Instant, + limit: Int): Fox[List[JsValue]] = + if (limit > 0) { + val levels = VoxelyticsLogLevel.sortedValues.drop(VoxelyticsLogLevel.sortedValues.indexOf(minLevel)) + + val logQLFilter = List( + taskName.map(t => s"""vx_task_name="$t""""), + Some(s"""level=~"(${levels.mkString("|")})"""") + ).flatten.mkString(" | ") + val logQL = + s"""{vx_run_name="$runName",wk_org="${organizationId.id}",wk_url="${wkConf.Http.uri}"} | json vx_task_name,level | $logQLFilter""" + + val queryString = + List("query" -> logQL, + "start" -> startTime.toString, + "end" -> endTime.toString, + "limit" -> limit.toString, + "direction" -> "backward") + .map(keyValueTuple => s"${keyValueTuple._1}=${java.net.URLEncoder.encode(keyValueTuple._2, "UTF-8")}") + .mkString("&") + for { + _ <- serverStartupFuture + res <- rpc(s"${conf.uri}/loki/api/v1/query_range?$queryString").silent.getWithJsonResponse[JsValue] + logEntries <- tryo( + (res \ "data" \ "result") + .as[List[JsValue]] + .flatMap( + stream => + (stream \ "values") + .as[List[(String, String)]] + .map(value => + Json.parse(value._2).as[JsObject] ++ (stream \ "stream").as[JsObject] ++ Json.obj( + "timestamp" -> Instant.fromNanosecondsString(value._1)))) + .sortBy(entry => (entry \ "timestamp").as[Long])).toFox + } yield logEntries + } else Fox.successful(List()) } From 8d081121638654323b671d88cec7f97699e871da Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Wed, 13 Mar 2024 11:33:35 +0100 Subject: [PATCH 3/4] changelog --- CHANGELOG.unreleased.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.unreleased.md b/CHANGELOG.unreleased.md index 7ab0d11fed..82f2d7156e 100644 --- a/CHANGELOG.unreleased.md +++ b/CHANGELOG.unreleased.md @@ -48,7 +48,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released - Fixed small styling error with NML drag and drop uploading. [#7641](https://github.com/scalableminds/webknossos/pull/7641) - Fixed a bug where the annotation list would show teams the annotation is shared with multiple times. [#7659](https://github.com/scalableminds/webknossos/pull/7659) - Fixed incorrect menu position that could occur sometimes when clicking the ... button next to a segment. [#7680](https://github.com/scalableminds/webknossos/pull/7680) -- Fixed an error in the Loki integration to support Loki 2.9+. []() +- Fixed an error in the Loki integration to support Loki 2.9+. [#7684](https://github.com/scalableminds/webknossos/pull/7684) ### Removed From f3c69ae3ec2ae71de38738f33011b08399ff76e6 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Wed, 13 Mar 2024 11:36:26 +0100 Subject: [PATCH 4/4] reorder functions --- app/models/voxelytics/LokiClient.scala | 172 +++++++++++++------------ 1 file changed, 87 insertions(+), 85 deletions(-) diff --git a/app/models/voxelytics/LokiClient.scala b/app/models/voxelytics/LokiClient.scala index 1d6e218487..2d5ff69105 100644 --- a/app/models/voxelytics/LokiClient.scala +++ b/app/models/voxelytics/LokiClient.scala @@ -26,6 +26,12 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im private lazy val conf = wkConf.Voxelytics.Loki private lazy val enabled = wkConf.Features.voxelyticsEnabled && conf.uri.nonEmpty + + private val POLLING_INTERVAL = 1 second + private val LOG_TIME_BATCH_INTERVAL = 1 days + private val LOG_ENTRY_QUERY_BATCH_SIZE = 5000 + private val LOG_ENTRY_INSERT_BATCH_SIZE = 1000 + private lazy val serverStartupFuture: Fox[Unit] = { for { _ <- bool2Fox(enabled) ?~> "Loki is not enabled." @@ -33,10 +39,45 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im _ <- pollUntilServerStartedUp(Instant.in(conf.startupTimeout)) ~> 500 } yield () } - private val POLLING_INTERVAL = 1 second - private val LOG_TIME_BATCH_INTERVAL = 1 days - private val LOG_ENTRY_QUERY_BATCH_SIZE = 5000 - private val LOG_ENTRY_INSERT_BATCH_SIZE = 1000 + + private def pollUntilServerStartedUp(until: Instant): Fox[Unit] = { + def waitAndRecurse(until: Instant): Fox[Unit] = + for { + _ <- after(POLLING_INTERVAL, using = system.scheduler)(Future.successful(())) + _ <- bool2Fox(!until.isPast) ?~> s"Loki did not become ready within ${conf.startupTimeout}." + _ <- pollUntilServerStartedUp(until) + } yield () + + for { + isServerAvailableBox <- rpc(s"${conf.uri}/ready").request + .withMethod("GET") + .execute() + .flatMap(result => + if (Status.isSuccessful(result.status)) { + Fox.successful(true) + } else if (result.status >= 500 && result.status < 600) { + logger.debug(s"Loki status: ${result.status}") + Fox.successful(false) + } else { + Fox.failure(s"Unexpected error code from Loki ${result.status}.") + }) + .recoverWith({ + case e: java.net.ConnectException => + logger.debug(s"Loki connection exception: $e") + Fox.successful(false) + case e => + logger.error(s"Unexpected error $e") + Fox.failure("Unexpected error while trying to connect to Loki.", Full(e)) + }) + isServerAvailable <- isServerAvailableBox.toFox + _ <- if (!isServerAvailable) { + waitAndRecurse(until) + } else { + logger.info("Loki is available.") + Fox.successful(()) + } + } yield () + } def queryLogsBatched(runName: String, organizationId: ObjectId, @@ -96,6 +137,48 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im } } + private def queryLogs(runName: String, + organizationId: ObjectId, + taskName: Option[String], + minLevel: VoxelyticsLogLevel, + startTime: Instant, + endTime: Instant, + limit: Int): Fox[List[JsValue]] = + if (limit > 0) { + val levels = VoxelyticsLogLevel.sortedValues.drop(VoxelyticsLogLevel.sortedValues.indexOf(minLevel)) + + val logQLFilter = List( + taskName.map(t => s"""vx_task_name="$t""""), + Some(s"""level=~"(${levels.mkString("|")})"""") + ).flatten.mkString(" | ") + val logQL = + s"""{vx_run_name="$runName",wk_org="${organizationId.id}",wk_url="${wkConf.Http.uri}"} | json vx_task_name,level | $logQLFilter""" + + val queryString = + List("query" -> logQL, + "start" -> startTime.toString, + "end" -> endTime.toString, + "limit" -> limit.toString, + "direction" -> "backward") + .map(keyValueTuple => s"${keyValueTuple._1}=${java.net.URLEncoder.encode(keyValueTuple._2, "UTF-8")}") + .mkString("&") + for { + _ <- serverStartupFuture + res <- rpc(s"${conf.uri}/loki/api/v1/query_range?$queryString").silent.getWithJsonResponse[JsValue] + logEntries <- tryo( + (res \ "data" \ "result") + .as[List[JsValue]] + .flatMap( + stream => + (stream \ "values") + .as[List[(String, String)]] + .map(value => + Json.parse(value._2).as[JsObject] ++ (stream \ "stream").as[JsObject] ++ Json.obj( + "timestamp" -> Instant.fromNanosecondsString(value._1)))) + .sortBy(entry => (entry \ "timestamp").as[Long])).toFox + } yield logEntries + } else Fox.successful(List()) + def bulkInsertBatched(logEntries: List[JsValue], organizationId: ObjectId)(implicit ec: ExecutionContext): Fox[Unit] = for { _ <- Fox.serialCombined(logEntries.grouped(LOG_ENTRY_INSERT_BATCH_SIZE).toList)(bulkInsert(_, organizationId)) @@ -171,85 +254,4 @@ class LokiClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(im } else { Fox.successful(()) } - - private def pollUntilServerStartedUp(until: Instant): Fox[Unit] = { - def waitAndRecurse(until: Instant): Fox[Unit] = - for { - _ <- after(POLLING_INTERVAL, using = system.scheduler)(Future.successful(())) - _ <- bool2Fox(!until.isPast) ?~> s"Loki did not become ready within ${conf.startupTimeout}." - _ <- pollUntilServerStartedUp(until) - } yield () - - for { - isServerAvailableBox <- rpc(s"${conf.uri}/ready").request - .withMethod("GET") - .execute() - .flatMap(result => - if (Status.isSuccessful(result.status)) { - Fox.successful(true) - } else if (result.status >= 500 && result.status < 600) { - logger.debug(s"Loki status: ${result.status}") - Fox.successful(false) - } else { - Fox.failure(s"Unexpected error code from Loki ${result.status}.") - }) - .recoverWith({ - case e: java.net.ConnectException => - logger.debug(s"Loki connection exception: $e") - Fox.successful(false) - case e => - logger.error(s"Unexpected error $e") - Fox.failure("Unexpected error while trying to connect to Loki.", Full(e)) - }) - isServerAvailable <- isServerAvailableBox.toFox - _ <- if (!isServerAvailable) { - waitAndRecurse(until) - } else { - logger.info("Loki is available.") - Fox.successful(()) - } - } yield () - } - - private def queryLogs(runName: String, - organizationId: ObjectId, - taskName: Option[String], - minLevel: VoxelyticsLogLevel, - startTime: Instant, - endTime: Instant, - limit: Int): Fox[List[JsValue]] = - if (limit > 0) { - val levels = VoxelyticsLogLevel.sortedValues.drop(VoxelyticsLogLevel.sortedValues.indexOf(minLevel)) - - val logQLFilter = List( - taskName.map(t => s"""vx_task_name="$t""""), - Some(s"""level=~"(${levels.mkString("|")})"""") - ).flatten.mkString(" | ") - val logQL = - s"""{vx_run_name="$runName",wk_org="${organizationId.id}",wk_url="${wkConf.Http.uri}"} | json vx_task_name,level | $logQLFilter""" - - val queryString = - List("query" -> logQL, - "start" -> startTime.toString, - "end" -> endTime.toString, - "limit" -> limit.toString, - "direction" -> "backward") - .map(keyValueTuple => s"${keyValueTuple._1}=${java.net.URLEncoder.encode(keyValueTuple._2, "UTF-8")}") - .mkString("&") - for { - _ <- serverStartupFuture - res <- rpc(s"${conf.uri}/loki/api/v1/query_range?$queryString").silent.getWithJsonResponse[JsValue] - logEntries <- tryo( - (res \ "data" \ "result") - .as[List[JsValue]] - .flatMap( - stream => - (stream \ "values") - .as[List[(String, String)]] - .map(value => - Json.parse(value._2).as[JsObject] ++ (stream \ "stream").as[JsObject] ++ Json.obj( - "timestamp" -> Instant.fromNanosecondsString(value._1)))) - .sortBy(entry => (entry \ "timestamp").as[Long])).toFox - } yield logEntries - } else Fox.successful(List()) }