Skip to content

Commit

Permalink
Skip schemas when resolving types
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Mar 11, 2024
1 parent 40bea85 commit 86e3319
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.data.NonEmptyList
import cats.effect.Sync
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SelfDescribingSchema}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.snowplow.badrows.FailureDetails

object NonAtomicFields {
Expand Down Expand Up @@ -47,18 +50,41 @@ object NonAtomicFields {

def resolveTypes[F[_]: Sync: RegistryLookup](
resolver: Resolver[F],
entities: Map[TabledEntity, Set[SchemaSubVersion]]
entities: Map[TabledEntity, Set[SchemaSubVersion]],
entitiesToSkip: List[SchemaCriterion]
): F[Result] =
entities.toList
.traverse { case (tabledEntity, subVersions) =>
SchemaProvider
.fetchSchemasWithSameModel(resolver, TabledEntity.toSchemaKey(tabledEntity, subVersions.max))
.map(TypedTabledEntity.build(tabledEntity, subVersions, _))
.map(schemas => filterOutMatchingSubversions(entitiesToSkip, schemas))
.map(filteredSchemas => buildTypedEntities(tabledEntity, subVersions, filteredSchemas))
.leftMap(ColumnFailure(tabledEntity, subVersions, _))
.value
}
.map { eithers =>
val (failures, good) = eithers.separate
Result(good, failures)
Result(good.flatten, failures)
}

private def filterOutMatchingSubversions(
entitiesToSkip: List[SchemaCriterion],
schemas: NonEmptyList[SelfDescribingSchema[Schema]]
): Option[NonEmptyList[SelfDescribingSchema[Schema]]] =
if (entitiesToSkip.nonEmpty) {
schemas.filterNot { sdd =>
entitiesToSkip.exists(_.matches(sdd.self.schemaKey))
}.toNel
} else {
Some(schemas)
}

private def buildTypedEntities(
tabledEntity: TabledEntity,
subVersions: Set[(Int, Int)],
filteredSchemas: Option[NonEmptyList[SelfDescribingSchema[Schema]]]
): Option[TypedTabledEntity] =
filteredSchemas.map { schemas =>
TypedTabledEntity.build(tabledEntity, subVersions, schemas)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "myvendor",
"name": "myschema",
"format": "jsonschema",
"version": "9-0-0"
},
"type": "object",
"properties": {
"col_z": {"type": "string"}
},
"required": ["col_z"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -105,7 +105,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -140,7 +140,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -176,7 +176,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -214,7 +214,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -253,7 +253,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand All @@ -271,7 +271,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity2 -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(2)) and
(fields.map(_.tabledEntity) must contain(allOf(tabledEntity1, tabledEntity2)))
Expand Down Expand Up @@ -304,7 +304,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -354,7 +354,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -405,7 +405,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand All @@ -421,7 +421,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 9))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand All @@ -441,7 +441,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand All @@ -461,7 +461,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand Down
Loading

0 comments on commit 86e3319

Please sign in to comment.