Skip to content

Commit

Permalink
common: SQL enrichment: get connection only if request not cached (close
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and benjben committed Apr 11, 2023
1 parent 47c2c5f commit 47b74be
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,20 +230,16 @@ object DbExecutor {
/**
* Transform [[Input.PlaceholderMap]] to None if not enough input values were extracted
* This prevents db from start building a statement while not failing event enrichment
* @param placeholderMap some IntMap with extracted values or None if it is known already that not
* all values were extracted
* @return Some unchanged value if all placeholders were filled, None otherwise
* @param intMap The extracted values from the event
* @return Whether all placeholders were filled
*/
def allPlaceholdersFilled(
def allPlaceholdersAreFilled(
connection: Connection,
sql: String,
placeholderMap: Input.PlaceholderMap
): Either[Throwable, Input.PlaceholderMap] =
intMap: Input.ExtractedValueMap
): Either[Throwable, Boolean] =
getPlaceholderCount(connection, sql).map { placeholderCount =>
placeholderMap match {
case Some(intMap) if intMap.keys.size == placeholderCount => placeholderMap
case _ => None
}
if (intMap.keys.size == placeholderCount) true else false
}

def getConnection[F[_]: Monad: DbExecutor](dataSource: DataSource, blocker: BlockerF[F]): F[Either[Throwable, Connection]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,13 @@ object Input {
*/
type ExtractedValue = StatementPlaceholder#Value

type ExtractedValueMap = IntMap[ExtractedValue]

/**
* Optional Int-indexed Map of [[ExtractedValue]]s
* None means some values were not found and SQL Enrichment shouldn't performed
*/
type PlaceholderMap = Option[IntMap[ExtractedValue]]
type PlaceholderMap = Option[ExtractedValueMap]

/**
* Get data out of all JSON contexts matching `schemaCriterion`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory

import java.sql.Connection
import javax.sql.DataSource
import scala.collection.immutable.IntMap

/** Lets us create an SqlQueryConf from a Json */
object SqlQueryEnrichment extends ParseableEnrichment {
Expand Down Expand Up @@ -140,26 +139,67 @@ final case class SqlQueryEnrichment[F[_]: Monad: DbExecutor: ResourceF: Clock](
unstructEvent: Option[SelfDescribingData[Json]]
): F[ValidatedNel[FailureDetails.EnrichmentFailure, List[SelfDescribingData[Json]]]] = {
val contexts = for {
connection <- EitherT(DbExecutor.getConnection[F](dataSource, blocker))
.leftMap(t => NonEmptyList.of("Error while getting the connection from the data source", t.toString))
result <- EitherT(ResourceF[F].use(connection)(closeConnection)(lookup(event, derivedContexts, customContexts, unstructEvent, _)))
placeholders <- buildPlaceHolderMap(inputs, event, derivedContexts, customContexts, unstructEvent)
result <- maybeLookup(placeholders)
} yield result

contexts.leftMap(failureDetails).value.map(_.toValidated)
}

private def lookup(
event: EnrichedEvent,
derivedContexts: List[SelfDescribingData[Json]],
customContexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
connection: Connection
): F[Either[NonEmptyList[String], List[SelfDescribingData[Json]]]] =
(for {
placeholders <- buildPlaceHolderMap(inputs, event, derivedContexts, customContexts, unstructEvent)
filledPlaceholders <- fillPlaceholders(connection, query.sql, placeholders)
result <- getResult(filledPlaceholders, connection)
} yield result).value
private def maybeLookup(placeholders: Input.PlaceholderMap): EitherT[F, NonEmptyList[String], List[SelfDescribingData[Json]]] =
placeholders match {
case Some(intMap) =>
EitherT {
sqlQueryEvaluator.evaluateForKey(
intMap,
getResult = () => runLookup(intMap)
)
}.leftMap { t =>
NonEmptyList.of("Error while executing the sql lookup", t.toString)
}
case None =>
EitherT.rightT(Nil)
}

private def runLookup(intMap: Input.ExtractedValueMap): F[Either[Throwable, List[SelfDescribingData[Json]]]] = {
val eitherT = for {
connection <- EitherT(DbExecutor.getConnection[F](dataSource, blocker))
result <- EitherT(ResourceF[F].use(connection)(closeConnection)(maybeRunWithConnection(_, intMap)))
} yield result
eitherT.value
}

// We now have a connection. But time has passed since we last checked the cache, and another
// fiber might have run a query while we were waiting for the connection. So we check the cache
// one last time before hitting the database.
private def maybeRunWithConnection(
connection: Connection,
intMap: Input.ExtractedValueMap
): F[Either[Throwable, List[SelfDescribingData[Json]]]] =
sqlQueryEvaluator.evaluateForKey(
intMap,
getResult = () => runWithConnection(connection, intMap)
)

private def runWithConnection(
connection: Connection,
intMap: Input.ExtractedValueMap
): F[Either[Throwable, List[SelfDescribingData[Json]]]] = {
val eitherT = DbExecutor.allPlaceholdersAreFilled(connection, query.sql, intMap).toEitherT[F].flatMap {
case false =>
// Not enough values were extracted from the event
EitherT.rightT[F, Throwable](List.empty[SelfDescribingData[Json]])
case true =>
for {
sqlQuery <- DbExecutor.createStatement(connection, query.sql, intMap).toEitherT[F]
resultSet <- DbExecutor[F].execute(sqlQuery)
context <- DbExecutor[F].convert(resultSet, output.json.propertyNames)
result <- output.envelope(context).toEitherT[F]
} yield result
}

shifter.shift(eitherT.value)
}

private def buildPlaceHolderMap(
inputs: List[Input],
Expand All @@ -175,49 +215,6 @@ final case class SqlQueryEnrichment[F[_]: Monad: DbExecutor: ResourceF: Clock](
NonEmptyList.of("Error while building the map of placeholders", t.toString)
}

private def fillPlaceholders(
connection: Connection,
sql: String,
placeholders: Input.PlaceholderMap
): EitherT[F, NonEmptyList[String], Input.PlaceholderMap] =
EitherT(blocker.blockOn(Monad[F].pure(DbExecutor.allPlaceholdersFilled(connection, sql, placeholders))))
.leftMap { t =>
NonEmptyList.of("Error while filling the placeholders", t.toString)
}

private def getResult(
placeholders: Input.PlaceholderMap,
connection: Connection
): EitherT[F, NonEmptyList[String], List[SelfDescribingData[Json]]] =
placeholders match {
case Some(intMap) =>
EitherT {
sqlQueryEvaluator.evaluateForKey(
intMap,
getResult = () => shifter.shift(query(connection, intMap).value)
)
}
.leftMap { t =>
NonEmptyList.of("Error while executing the query/getting the results", t.toString)
}
case None =>
EitherT.rightT[F, NonEmptyList[String]](List.empty[SelfDescribingData[Json]])
}

/**
* Perform SQL query and convert result to JSON object
* @param intMap map with values extracted from inputs and ready to be set placeholders in
* prepared statement
* @return validated list of Self-describing contexts
*/
def query(connection: Connection, intMap: IntMap[Input.ExtractedValue]): EitherT[F, Throwable, List[SelfDescribingData[Json]]] =
for {
sqlQuery <- DbExecutor.createStatement(connection, query.sql, intMap).toEitherT[F]
resultSet <- DbExecutor[F].execute(sqlQuery)
context <- DbExecutor[F].convert(resultSet, output.json.propertyNames)
result <- output.envelope(context).toEitherT[F]
} yield result

private def failureDetails(errors: NonEmptyList[String]) =
errors.map { error =>
val message = FailureDetails.EnrichmentFailureMessage.Simple(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ class SqlQueryEnrichmentIntegrationTest extends Specification {

context must beLeft.like {
case NonEmptyList(one, two :: Nil)
if one.toString.contains("Error while getting the connection from the data source") &&
if one.toString.contains("Error while executing the sql lookup") &&
two.toString.contains("FATAL: password authentication failed for user") =>
ok
case left => ko(s"error(s) don't contain the expected error messages: $left")
Expand Down

0 comments on commit 47b74be

Please sign in to comment.