Skip to content

Commit

Permalink
For structured data add ability to skip schemas matching criterion.
Browse files Browse the repository at this point in the history
Slightly different than filtering for unstructured that (which is done during transformation). It's only possible to filter out whole schema family, but not part of the family. That's because we can't have 'holes' in schema family when building typed entities. Otherwise we may end up with invalid and incompatible merged schema model.
  • Loading branch information
pondzix committed Mar 13, 2024
1 parent 50cddf9 commit 37e57f6
Show file tree
Hide file tree
Showing 4 changed files with 448 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ package com.snowplowanalytics.snowplow.loaders.transform

import cats.effect.Sync
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey}
import com.snowplowanalytics.snowplow.badrows.FailureDetails

object NonAtomicFields {
Expand Down Expand Up @@ -47,10 +48,21 @@ object NonAtomicFields {

def resolveTypes[F[_]: Sync: RegistryLookup](
resolver: Resolver[F],
entities: Map[TabledEntity, Set[SchemaSubVersion]]
entities: Map[TabledEntity, Set[SchemaSubVersion]],
filterCriteria: List[SchemaCriterion]
): F[Result] =
entities.toList
.map { case (tabledEntity, subVersions) =>
tabledEntity -> filterSubVersions(filterCriteria, tabledEntity, subVersions)
}
.filter { case (_, subVersions) =>
// Remove whole schema family if there is no subversion left after filtering
subVersions.nonEmpty
}
.traverse { case (tabledEntity, subVersions) =>
// Even though we may filter some of the subVersions earlier, we still fetch all available from Iglu (same family ~> same model).
// It's because we can't have 'holes' in schema family when building typed entities. Otherwise we may end up with invalid and incompatible merged schema model.
// As a result, it's only possible to filter out whole schema family, but not part of the family.
SchemaProvider
.fetchSchemasWithSameModel(resolver, TabledEntity.toSchemaKey(tabledEntity, subVersions.max))
.map(TypedTabledEntity.build(tabledEntity, subVersions, _))
Expand All @@ -61,4 +73,22 @@ object NonAtomicFields {
val (failures, good) = eithers.separate
Result(good, failures)
}

private def filterSubVersions(
filterCriteria: List[SchemaCriterion],
tabledEntity: TabledEntity,
subVersions: Set[SchemaSubVersion]
): Set[SchemaSubVersion] =
if (filterCriteria.nonEmpty) {
subVersions
.filter { subVersion =>
val schemaKey = TabledEntity.toSchemaKey(tabledEntity, subVersion)
doesNotMatchCriteria(filterCriteria, schemaKey)
}
} else {
subVersions
}

private def doesNotMatchCriteria(filterCriteria: List[SchemaCriterion], schemaKey: SchemaKey): Boolean =
!filterCriteria.exists(_.matches(schemaKey))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "myvendor",
"name": "myschema",
"format": "jsonschema",
"version": "9-0-0"
},
"type": "object",
"properties": {
"col_z": {"type": "string"}
},
"required": ["col_z"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -105,7 +105,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -140,7 +140,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -176,7 +176,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -214,7 +214,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -253,7 +253,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand All @@ -271,7 +271,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity2 -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(2)) and
(fields.map(_.tabledEntity) must contain(allOf(tabledEntity1, tabledEntity2)))
Expand Down Expand Up @@ -304,7 +304,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -354,7 +354,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -405,7 +405,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand All @@ -421,7 +421,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 9))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand All @@ -441,7 +441,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand All @@ -461,7 +461,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand Down
Loading

0 comments on commit 37e57f6

Please sign in to comment.