Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For structured data add ability to skip schemas matching criterion. #61

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ package com.snowplowanalytics.snowplow.loaders.transform

import cats.effect.Sync
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey}
import com.snowplowanalytics.snowplow.badrows.FailureDetails

object NonAtomicFields {
Expand Down Expand Up @@ -47,18 +48,54 @@ object NonAtomicFields {

def resolveTypes[F[_]: Sync: RegistryLookup](
resolver: Resolver[F],
entities: Map[TabledEntity, Set[SchemaSubVersion]]
entities: Map[TabledEntity, Set[SchemaSubVersion]],
filterCriteria: List[SchemaCriterion]
): F[Result] =
entities.toList
.map { case (tabledEntity, subVersions) =>
// First phase of entity filtering, before we fetch schemas from Iglu and create `TypedTabledEntity`.
// If all sub-versions are filtered out, whole family is removed.
tabledEntity -> filterSubVersions(filterCriteria, tabledEntity, subVersions)
}
.filter { case (_, subVersions) =>
// Remove whole schema family if there is no subversion left after filtering
subVersions.nonEmpty
}
.traverse { case (tabledEntity, subVersions) =>
SchemaProvider
.fetchSchemasWithSameModel(resolver, TabledEntity.toSchemaKey(tabledEntity, subVersions.max))
.map(TypedTabledEntity.build(tabledEntity, subVersions, _))
// Second phase of entity filtering.
// We can't do it sooner based on a result of `fetchSchemasWithSameModel` because we can't have 'holes' in Iglu schema family when building typed entities.
// Otherwise we may end up with invalid and incompatible merged schema model.
// Here `TypedTabledEntity` is already properly created using contiguous series of schemas, so we can try to skip some sub-versions.
.map { typedTabledEntity =>
val filteredSubVersions = filterSubVersions(filterCriteria, typedTabledEntity.tabledEntity, typedTabledEntity.mergedVersions)
typedTabledEntity.copy(mergedVersions = filteredSubVersions)
}
.leftMap(ColumnFailure(tabledEntity, subVersions, _))
.value
}
.map { eithers =>
val (failures, good) = eithers.separate
Result(good, failures)
}

private def filterSubVersions(
filterCriteria: List[SchemaCriterion],
tabledEntity: TabledEntity,
subVersions: Set[SchemaSubVersion]
): Set[SchemaSubVersion] =
if (filterCriteria.nonEmpty) {
subVersions
.filter { subVersion =>
val schemaKey = TabledEntity.toSchemaKey(tabledEntity, subVersion)
doesNotMatchCriteria(filterCriteria, schemaKey)
}
} else {
subVersions
}

private def doesNotMatchCriteria(filterCriteria: List[SchemaCriterion], schemaKey: SchemaKey): Boolean =
!filterCriteria.exists(_.matches(schemaKey))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "myvendor",
"name": "myschema",
"format": "jsonschema",
"version": "9-0-0"
},
"type": "object",
"properties": {
"col_z": {"type": "string"}
},
"required": ["col_z"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -105,7 +105,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -140,7 +140,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -176,7 +176,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -214,7 +214,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -253,7 +253,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand All @@ -271,7 +271,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity2 -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(2)) and
(fields.map(_.tabledEntity) must contain(allOf(tabledEntity1, tabledEntity2)))
Expand Down Expand Up @@ -304,7 +304,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -354,7 +354,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand Down Expand Up @@ -405,7 +405,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
)
}

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(failures must beEmpty) and
(fields must haveSize(1)) and
(fields.head must beEqualTo(expected))
Expand All @@ -421,7 +421,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 9))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand All @@ -441,7 +441,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand All @@ -461,7 +461,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
tabledEntity -> Set((0, 0))
)

NonAtomicFields.resolveTypes(embeddedResolver, input).map { case NonAtomicFields.Result(fields, failures) =>
NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) =>
(fields must beEmpty) and
(failures must haveSize(1)) and
(failures.head must beLike { case failure: NonAtomicFields.ColumnFailure =>
Expand Down
Loading
Loading