Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make user's logger used everywhere #350

Merged
merged 3 commits into from
Jan 28, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/it/scala/com/criteo/cuttle/DatabaseITest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package com.criteo.cuttle
import java.time.{Instant, LocalDateTime, ZoneOffset}
import java.time.temporal.ChronoUnit

import scala.concurrent.Future

import cats.effect.IO
import com.criteo.cuttle.Auth.User
import doobie.implicits._
import doobie.scalatest.IOChecker

import scala.concurrent.Future
import com.criteo.cuttle.Auth.User


class DatabaseITest extends DatabaseSuite with IOChecker with TestScheduling {
val dbConfig = DatabaseConfig(
Expand Down
10 changes: 9 additions & 1 deletion core/src/it/scala/com/criteo/cuttle/DatabaseSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
class DatabaseSuite extends FunSuite with BeforeAndAfter {
val dbName = "cuttle_it_test"

val queries: Queries = new Queries {}
implicit val logger: Logger = new Logger {
override def debug(message: => String): Unit = ()
override def info(message: => String): Unit = ()
override def warn(message: => String): Unit = ()
override def error(message: => String): Unit = ()
override def trace(message: => String): Unit = ()
}

val queries: Queries = Queries(logger)

private val dbConfig = DatabaseConfig(
Seq(DBLocation("localhost", 3388)),
Expand Down
23 changes: 8 additions & 15 deletions core/src/main/scala/com/criteo/cuttle/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.circe.parser._
import cats.data.NonEmptyList
import cats.implicits._
import cats.effect.{IO, Resource}
import doobie.util.log

import com.criteo.cuttle.ExecutionStatus._
import com.criteo.cuttle.events.{Event, JobSuccessForced}
Expand Down Expand Up @@ -65,18 +66,6 @@ object DatabaseConfig {
}

private[cuttle] object Database {

implicit val ExecutionStatusMeta: Meta[ExecutionStatus] =
Meta[Boolean].imap(x => if (x) ExecutionSuccessful else ExecutionFailed: ExecutionStatus) {
case ExecutionSuccessful => true
case ExecutionFailed => false
case x => sys.error(s"Unexpected ExecutionLog status to write in database: $x")
}

implicit val JsonMeta: Meta[Json] = Meta[String].imap(x => parse(x).fold(e => throw e, identity))(
x => x.noSpaces
)

val schemaEvolutions = List(
sql"""
CREATE TABLE executions (
Expand Down Expand Up @@ -213,7 +202,7 @@ private[cuttle] object Database {
private[cuttle] def reset(): Unit =
connections.clear()

def connect(dbConfig: DatabaseConfig): XA = {
def connect(dbConfig: DatabaseConfig)(implicit logger: Logger): XA = {
// FIXME we shouldn't use allocated as it's unsafe instead we have to flatMap on the Resource[HikariTransactor]
val (transactor, releaseIO) = newHikariTransactor(dbConfig).allocated.unsafeRunSync
logger.debug("Allocated new Hikari transactor")
Expand All @@ -229,8 +218,12 @@ private[cuttle] object Database {
}
}

private[cuttle] trait Queries {
import Database._
private[cuttle] case class Queries(logger: Logger) {
implicit val logHandler: log.LogHandler = DoobieLogsHandler(logger).handler

implicit val JsonMeta: Meta[Json] = Meta[String].imap(x => parse(x).fold(e => throw e, identity))(
x => x.noSpaces
)

def logExecution(e: ExecutionLog, logContext: ConnectionIO[String]): ConnectionIO[Int] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ case class DoobieLogsHandler(private val logger: Logger) {
implicit val handler: LogHandler = LogHandler {

case Success(s, a, e1, e2) =>
logger.debug(s"""
logger.trace(s"""
| Successful Statement Execution:
|
| ${s.lines.dropWhile(_.trim.isEmpty).mkString("\n ")}
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ private[cuttle] object ExecutionStreams {
private val maxExecutionLogSizeProp = "com.criteo.cuttle.maxExecutionLogSize"
private val maxExecutionLogSize = sys.props.get(maxExecutionLogSizeProp).map(_.toInt).getOrElse(524288)

logger.info(s"Transient execution streams go to $transientStorage")

private def logFile(id: ExecutionId): File = new File(transientStorage, id)

private def getWriter(id: ExecutionId): PrintWriter = {
Expand Down
16 changes: 10 additions & 6 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import java.util.{Timer, TimerTask}
import scala.concurrent.duration._
import scala.concurrent.stm._
import scala.concurrent.{Future, Promise}
import scala.reflect.{classTag, ClassTag}
import scala.reflect.{ClassTag, classTag}
import scala.util._

import cats.Eq
import cats.effect.IO
import cats.implicits._
Expand All @@ -21,12 +20,12 @@ import io.circe._
import io.circe.java8.time._
import io.circe.syntax._
import lol.http.PartialService

import com.criteo.cuttle.Auth._
import com.criteo.cuttle.ExecutionStatus._
import com.criteo.cuttle.ThreadPools.{SideEffectThreadPool, _}
import com.criteo.cuttle.Metrics._
import com.criteo.cuttle.platforms.ExecutionPool
import doobie.util.Meta

/** The strategy to use to retry stuck executions.
*
Expand Down Expand Up @@ -77,6 +76,13 @@ object RetryStrategy {

private[cuttle] sealed trait ExecutionStatus
private[cuttle] object ExecutionStatus {
implicit val ExecutionStatusMeta: Meta[ExecutionStatus] =
Meta[Boolean].imap(x => if (x) ExecutionSuccessful else ExecutionFailed: ExecutionStatus) {
case ExecutionSuccessful => true
case ExecutionFailed => false
case x => sys.error(s"Unexpected ExecutionLog status to write in database: $x")
}

case object ExecutionSuccessful extends ExecutionStatus
case object ExecutionFailed extends ExecutionStatus
case object ExecutionRunning extends ExecutionStatus
Expand Down Expand Up @@ -413,9 +419,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla

private implicit val contextOrdering: Ordering[S#Context] = Ordering.by(c => c: SchedulingContext)

private val queries = new Queries {
val appLogger: Logger = logger
}
private val queries = Queries(logger)

// TODO: move to the scheduler
private[cuttle] val runningState = TMap.empty[Execution[S], Future[Completed]]
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/com/criteo/cuttle/Logger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package com.criteo.cuttle
trait Logger {
def debug(message: => String): Unit
def info(message: => String): Unit
def warning(message: => String): Unit
def warn(message: => String): Unit
def error(message: => String): Unit
def trace(message: => String): Unit
}
15 changes: 6 additions & 9 deletions core/src/main/scala/com/criteo/cuttle/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import scala.concurrent._

import cats.effect.IO
import cats.free._
import doobie.imports._
import io.circe.Json
import io.circe.parser._
import doobie._

package cuttle {

Expand Down Expand Up @@ -61,12 +63,7 @@ package object cuttle {
*/
implicit def scopedExecutionContext(implicit execution: Execution[_]): ExecutionContext = execution.executionContext

/** Default implicit logger that output everything to __stdout__ */
implicit val logger = new Logger {
def logMe(message: => String, level: String) = println(s"${java.time.Instant.now}\t${level}\t${message}")
override def info(message: => String): Unit = logMe(message, "INFO")
override def debug(message: => String): Unit = logMe(message, "DEBUG")
override def warning(message: => String): Unit = logMe(message, "WARNING")
override def error(message: => String): Unit = logMe(message, "ERROR")
}
implicit val JsonMeta: Meta[Json] = Meta[String].imap(x => parse(x).fold(e => throw e, identity _))(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either adapt name to something more self-explanatory, or add a simple comment above. (You can also do both :) )

x => x.noSpaces
)
}
1 change: 1 addition & 0 deletions core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.scalatest.FunSuite
import com.criteo.cuttle.ThreadPools.Implicits.sideEffectThreadPool
import com.criteo.cuttle.ThreadPools.Implicits.sideEffectContextShift
import com.criteo.cuttle.ThreadPools._
import com.criteo.cuttle.Utils.logger

import com.criteo.cuttle.Metrics.Prometheus

Expand Down
11 changes: 11 additions & 0 deletions core/src/test/scala/com/criteo/cuttle/Utils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.criteo.cuttle

object Utils {
implicit val logger: Logger = new Logger {
override def debug(message: => String): Unit = ()
override def info(message: => String): Unit = ()
override def warn(message: => String): Unit = ()
override def error(message: => String): Unit = ()
override def trace(message: => String): Unit = ()
}
}
1 change: 1 addition & 0 deletions cron/src/it/scala/com/criteo/cuttle/cron/UIITest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.scalatest.{FlatSpec, Matchers}
import com.criteo.cuttle.Auth.User
import com.criteo.cuttle.{Job, _}
import com.criteo.cuttle.ThreadPools.Implicits.sideEffectThreadPool
import com.criteo.cuttle.cron.Utils.logger

/**
* These tests don't verify anything but the rendering for "/" and "/executions" pages.
Expand Down
12 changes: 12 additions & 0 deletions cron/src/it/scala/com/criteo/cuttle/cron/Utils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.criteo.cuttle.cron
import com.criteo.cuttle.Logger

object Utils {
implicit val logger: Logger = new Logger {
override def debug(message: => String): Unit = ()
override def info(message: => String): Unit = ()
override def warn(message: => String): Unit = ()
override def error(message: => String): Unit = ()
override def trace(message: => String): Unit = ()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class CronProject private[cuttle] (val name: String,
retryStrategy: RetryStrategy = CronProject.retryStrategy
): Unit = {
logger.info("Connecting to database")
implicit val transactor = com.criteo.cuttle.Database.connect(databaseConfig)
implicit val transactor = com.criteo.cuttle.Database.connect(databaseConfig)(logger)

logger.info("Creating Executor")
val executor = new Executor[CronScheduling](platforms, transactor, logger, name, version)(retryStrategy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] {

override val name = "cron"

private val queries = new Queries {}
private val queries = Queries(logger)

private val state = CronState(logger)

private def logState = IO(logger.debug(state.toString()))
Expand Down
14 changes: 14 additions & 0 deletions examples/src/main/scala/com/criteo/cuttle/examples/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.criteo.cuttle

package object examples {

/** Default implicit logger that output everything to __stdout__ */
implicit val logger = new Logger {
def logMe(message: => String, level: String) = println(s"${java.time.Instant.now}\t${level}\t${message}")
override def info(message: => String): Unit = logMe(message, "INFO")
override def debug(message: => String): Unit = logMe(message, "DEBUG")
override def warn(message: => String): Unit = logMe(message, "WARNING")
override def error(message: => String): Unit = logMe(message, "ERROR")
override def trace(message: => String): Unit = logMe(message, "ERROR")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CuttleProject private[cuttle] (val name: String,
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy,
paused: Boolean = false
): Unit = {
val xa = CuttleDatabase.connect(databaseConfig)
val xa = CuttleDatabase.connect(databaseConfig)(logger)
val executor = new Executor[TimeSeries](platforms, xa, logger, name, version)(retryStrategy)

if (paused) {
Expand Down Expand Up @@ -69,7 +69,7 @@ class CuttleProject private[cuttle] (val name: String,
databaseConfig: DatabaseConfig = DatabaseConfig.fromEnv,
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy
): (Service, () => Unit) = {
val xa = CuttleDatabase.connect(databaseConfig)
val xa = CuttleDatabase.connect(databaseConfig)(logger)
val executor = new Executor[TimeSeries](platforms, xa, logger, name, version)(retryStrategy)

val startScheduler = () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import doobie.implicits._

private[timeseries] object Database {
import TimeSeriesUtils._
import com.criteo.cuttle.Database._

import intervals.{Interval, IntervalMap}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E
Ok(Prometheus.serialize(metrics))

case GET at url"/api/executions/status/$kind?limit=$l&offset=$o&events=$events&sort=$sort&order=$order&jobs=$jobs" =>
logger.debug(s"Retreiving $kind executions with sse mode $events")
val jobIds = parseJobIds(jobs)
val limit = Try(l.toInt).toOption.getOrElse(25)
val offset = Try(o.toInt).toOption.getOrElse(0)
Expand Down Expand Up @@ -343,7 +342,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E
"end" -> interval.hi.asJson
)
}
private val queries = new Queries {}
private val queries = Queries(project.logger)

private trait ExecutionPeriod {
val period: Interval[Instant]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,7 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] {
_pausedJobs()
}

private val queries = new Queries {
val appLogger: Logger = logger
}
private val queries = Queries(logger)

private[timeseries] def state: (State, Set[Backfill]) = atomic { implicit txn =>
(_state(), _backfills())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.criteo.cuttle.timeseries

import java.time.ZoneOffset.UTC
import java.time.temporal.ChronoUnit.HOURS
import java.time.{Duration, Instant, LocalDate}
import java.util.concurrent.TimeUnit

Expand All @@ -17,6 +16,7 @@ import com.wix.mysql.distribution.Version._
import com.criteo.cuttle.platforms.local._
import com.criteo.cuttle.timeseries.TimeSeriesUtils.{Run, TimeSeriesJob}
import com.criteo.cuttle.{Auth, Database => CuttleDatabase, _}
import Utils.logger

object TimeSeriesSchedulerIntegrationTests {
// TODO: turn this into a unit test. This is not done for now as the thread pool responsible for checking the lock on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import scala.concurrent.Future

import org.scalatest.FunSuite

import com.criteo.cuttle.{logger, Completed, Job, TestScheduling}
import com.criteo.cuttle.{Completed, Job, TestScheduling}
import com.criteo.cuttle.timeseries.JobState.{Done, Todo}
import com.criteo.cuttle.timeseries.TimeSeriesUtils.State
import com.criteo.cuttle.timeseries.intervals.{Interval, IntervalMap}
import com.criteo.cuttle.Utils.logger

class TimeSeriesSchedulerSpec extends FunSuite with TestScheduling {
private val scheduling: TimeSeries = hourly(date"2017-03-25T02:00:00Z")
Expand Down Expand Up @@ -38,7 +39,7 @@ class TimeSeriesSchedulerSpec extends FunSuite with TestScheduling {
Interval(date"2017-03-25T02:00:00Z", date"2017-03-25T05:00:00Z") -> Done("")
)
)
val (stateSnapshot, newBackfills, completedBackfills) =
val (_, newBackfills, completedBackfills) =
scheduler.collectCompletedJobs(state, Set(backfill), completed = Set.empty)
assert(newBackfills.equals(Set(backfill)))
assert(completedBackfills.isEmpty)
Expand All @@ -51,7 +52,7 @@ class TimeSeriesSchedulerSpec extends FunSuite with TestScheduling {
Interval(date"2017-03-25T02:00:00Z", date"2017-03-25T05:00:00Z") -> Done("")
)
)
val (stateSnapshot, newBackfills, completedBackfills) = scheduler.collectCompletedJobs(
val (_, newBackfills, completedBackfills) = scheduler.collectCompletedJobs(
state,
Set(backfill),
completed = Set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.scalatest.FunSuite
import com.criteo.cuttle._
import com.criteo.cuttle.timeseries.intervals.Bound.{Finite, Top}
import com.criteo.cuttle.timeseries.intervals._
import com.criteo.cuttle.Utils.logger

class TimeSeriesSpec extends FunSuite with TestScheduling {
val scheduling: TimeSeries = hourly(date"2017-03-25T02:00:00Z")
Expand Down