From 47b74be932f9bfc2c154be19a1f7aecb1134e1ca Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Sat, 8 Apr 2023 10:52:59 +0100 Subject: [PATCH] common: SQL enrichment: get connection only if request not cached (close #765) --- .../registry/sqlquery/DbExecutor.scala | 16 +-- .../enrichments/registry/sqlquery/Input.scala | 4 +- .../sqlquery/SqlQueryEnrichment.scala | 115 +++++++++--------- .../SqlQueryEnrichmentIntegrationTest.scala | 2 +- 4 files changed, 66 insertions(+), 71 deletions(-) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/DbExecutor.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/DbExecutor.scala index 19c9026e7..7ad325aa1 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/DbExecutor.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/DbExecutor.scala @@ -230,20 +230,16 @@ object DbExecutor { /** * Transform [[Input.PlaceholderMap]] to None if not enough input values were extracted * This prevents db from start building a statement while not failing event enrichment - * @param placeholderMap some IntMap with extracted values or None if it is known already that not - * all values were extracted - * @return Some unchanged value if all placeholders were filled, None otherwise + * @param intMap The extracted values from the event + * @return Whether all placeholders were filled */ - def allPlaceholdersFilled( + def allPlaceholdersAreFilled( connection: Connection, sql: String, - placeholderMap: Input.PlaceholderMap - ): Either[Throwable, Input.PlaceholderMap] = + intMap: Input.ExtractedValueMap + ): Either[Throwable, Boolean] = getPlaceholderCount(connection, sql).map { placeholderCount => - placeholderMap match { - case Some(intMap) if intMap.keys.size == placeholderCount => placeholderMap - case _ => None - } + if (intMap.keys.size == placeholderCount) true else false } def getConnection[F[_]: Monad: DbExecutor](dataSource: DataSource, blocker: BlockerF[F]): F[Either[Throwable, Connection]] = diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/Input.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/Input.scala index 7f0aaf88a..b64819bb8 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/Input.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/Input.scala @@ -198,11 +198,13 @@ object Input { */ type ExtractedValue = StatementPlaceholder#Value + type ExtractedValueMap = IntMap[ExtractedValue] + /** * Optional Int-indexed Map of [[ExtractedValue]]s * None means some values were not found and SQL Enrichment shouldn't performed */ - type PlaceholderMap = Option[IntMap[ExtractedValue]] + type PlaceholderMap = Option[ExtractedValueMap] /** * Get data out of all JSON contexts matching `schemaCriterion` diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/SqlQueryEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/SqlQueryEnrichment.scala index 19803edad..255fedff3 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/SqlQueryEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/sqlquery/SqlQueryEnrichment.scala @@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory import java.sql.Connection import javax.sql.DataSource -import scala.collection.immutable.IntMap /** Lets us create an SqlQueryConf from a Json */ object SqlQueryEnrichment extends ParseableEnrichment { @@ -140,26 +139,67 @@ final case class SqlQueryEnrichment[F[_]: Monad: DbExecutor: ResourceF: Clock]( unstructEvent: Option[SelfDescribingData[Json]] ): F[ValidatedNel[FailureDetails.EnrichmentFailure, List[SelfDescribingData[Json]]]] = { val contexts = for { - connection <- EitherT(DbExecutor.getConnection[F](dataSource, blocker)) - .leftMap(t => NonEmptyList.of("Error while getting the connection from the data source", t.toString)) - result <- EitherT(ResourceF[F].use(connection)(closeConnection)(lookup(event, derivedContexts, customContexts, unstructEvent, _))) + placeholders <- buildPlaceHolderMap(inputs, event, derivedContexts, customContexts, unstructEvent) + result <- maybeLookup(placeholders) } yield result contexts.leftMap(failureDetails).value.map(_.toValidated) } - private def lookup( - event: EnrichedEvent, - derivedContexts: List[SelfDescribingData[Json]], - customContexts: List[SelfDescribingData[Json]], - unstructEvent: Option[SelfDescribingData[Json]], - connection: Connection - ): F[Either[NonEmptyList[String], List[SelfDescribingData[Json]]]] = - (for { - placeholders <- buildPlaceHolderMap(inputs, event, derivedContexts, customContexts, unstructEvent) - filledPlaceholders <- fillPlaceholders(connection, query.sql, placeholders) - result <- getResult(filledPlaceholders, connection) - } yield result).value + private def maybeLookup(placeholders: Input.PlaceholderMap): EitherT[F, NonEmptyList[String], List[SelfDescribingData[Json]]] = + placeholders match { + case Some(intMap) => + EitherT { + sqlQueryEvaluator.evaluateForKey( + intMap, + getResult = () => runLookup(intMap) + ) + }.leftMap { t => + NonEmptyList.of("Error while executing the sql lookup", t.toString) + } + case None => + EitherT.rightT(Nil) + } + + private def runLookup(intMap: Input.ExtractedValueMap): F[Either[Throwable, List[SelfDescribingData[Json]]]] = { + val eitherT = for { + connection <- EitherT(DbExecutor.getConnection[F](dataSource, blocker)) + result <- EitherT(ResourceF[F].use(connection)(closeConnection)(maybeRunWithConnection(_, intMap))) + } yield result + eitherT.value + } + + // We now have a connection. But time has passed since we last checked the cache, and another + // fiber might have run a query while we were waiting for the connection. So we check the cache + // one last time before hitting the database. + private def maybeRunWithConnection( + connection: Connection, + intMap: Input.ExtractedValueMap + ): F[Either[Throwable, List[SelfDescribingData[Json]]]] = + sqlQueryEvaluator.evaluateForKey( + intMap, + getResult = () => runWithConnection(connection, intMap) + ) + + private def runWithConnection( + connection: Connection, + intMap: Input.ExtractedValueMap + ): F[Either[Throwable, List[SelfDescribingData[Json]]]] = { + val eitherT = DbExecutor.allPlaceholdersAreFilled(connection, query.sql, intMap).toEitherT[F].flatMap { + case false => + // Not enough values were extracted from the event + EitherT.rightT[F, Throwable](List.empty[SelfDescribingData[Json]]) + case true => + for { + sqlQuery <- DbExecutor.createStatement(connection, query.sql, intMap).toEitherT[F] + resultSet <- DbExecutor[F].execute(sqlQuery) + context <- DbExecutor[F].convert(resultSet, output.json.propertyNames) + result <- output.envelope(context).toEitherT[F] + } yield result + } + + shifter.shift(eitherT.value) + } private def buildPlaceHolderMap( inputs: List[Input], @@ -175,49 +215,6 @@ final case class SqlQueryEnrichment[F[_]: Monad: DbExecutor: ResourceF: Clock]( NonEmptyList.of("Error while building the map of placeholders", t.toString) } - private def fillPlaceholders( - connection: Connection, - sql: String, - placeholders: Input.PlaceholderMap - ): EitherT[F, NonEmptyList[String], Input.PlaceholderMap] = - EitherT(blocker.blockOn(Monad[F].pure(DbExecutor.allPlaceholdersFilled(connection, sql, placeholders)))) - .leftMap { t => - NonEmptyList.of("Error while filling the placeholders", t.toString) - } - - private def getResult( - placeholders: Input.PlaceholderMap, - connection: Connection - ): EitherT[F, NonEmptyList[String], List[SelfDescribingData[Json]]] = - placeholders match { - case Some(intMap) => - EitherT { - sqlQueryEvaluator.evaluateForKey( - intMap, - getResult = () => shifter.shift(query(connection, intMap).value) - ) - } - .leftMap { t => - NonEmptyList.of("Error while executing the query/getting the results", t.toString) - } - case None => - EitherT.rightT[F, NonEmptyList[String]](List.empty[SelfDescribingData[Json]]) - } - - /** - * Perform SQL query and convert result to JSON object - * @param intMap map with values extracted from inputs and ready to be set placeholders in - * prepared statement - * @return validated list of Self-describing contexts - */ - def query(connection: Connection, intMap: IntMap[Input.ExtractedValue]): EitherT[F, Throwable, List[SelfDescribingData[Json]]] = - for { - sqlQuery <- DbExecutor.createStatement(connection, query.sql, intMap).toEitherT[F] - resultSet <- DbExecutor[F].execute(sqlQuery) - context <- DbExecutor[F].convert(resultSet, output.json.propertyNames) - result <- output.envelope(context).toEitherT[F] - } yield result - private def failureDetails(errors: NonEmptyList[String]) = errors.map { error => val message = FailureDetails.EnrichmentFailureMessage.Simple(error) diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/sqlquery/SqlQueryEnrichmentIntegrationTest.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/sqlquery/SqlQueryEnrichmentIntegrationTest.scala index f6ca07d70..538a42ce0 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/sqlquery/SqlQueryEnrichmentIntegrationTest.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/sqlquery/SqlQueryEnrichmentIntegrationTest.scala @@ -457,7 +457,7 @@ class SqlQueryEnrichmentIntegrationTest extends Specification { context must beLeft.like { case NonEmptyList(one, two :: Nil) - if one.toString.contains("Error while getting the connection from the data source") && + if one.toString.contains("Error while executing the sql lookup") && two.toString.contains("FATAL: password authentication failed for user") => ok case left => ko(s"error(s) don't contain the expected error messages: $left")