Skip to content

Commit

Permalink
For structured data add ability to skip schemas matching criterion.
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Mar 7, 2024
1 parent 50cddf9 commit 40bea85
Show file tree
Hide file tree
Showing 2 changed files with 344 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package com.snowplowanalytics.snowplow.loaders.transform

import cats.Monoid
import cats.implicits._
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer}
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event

/**
Expand Down Expand Up @@ -43,12 +43,16 @@ object TabledEntity {
* Extracts which TabledEntities will need to be included in the batch for this event, and the
* corresponding sub-versions (e.g. *-1-1)
*/
def forEvent(event: Event): Map[TabledEntity, Set[SchemaSubVersion]] = {
val ue = event.unstruct_event.data.map(sdj => Map(forUnstructEvent(sdj.schema) -> Set(keyToSubVersion(sdj.schema))))
def forEvent(event: Event, entitiesToSkip: List[SchemaCriterion]): Map[TabledEntity, Set[SchemaSubVersion]] = {
val ue = event.unstruct_event.data
.filterNot(entity => entitiesToSkip.exists(_.matches(entity.schema)))
.map(sdj => Map(forUnstructEvent(sdj.schema) -> Set(keyToSubVersion(sdj.schema))))

val contexts = (event.contexts.data ++ event.derived_contexts.data).map { sdj =>
Map(forContext(sdj.schema) -> Set(keyToSubVersion(sdj.schema)))
}
val contexts = (event.contexts.data ++ event.derived_contexts.data)
.filterNot(entity => entitiesToSkip.exists(_.matches(entity.schema)))
.map { sdj =>
Map(forContext(sdj.schema) -> Set(keyToSubVersion(sdj.schema)))
}

Monoid[Map[TabledEntity, Set[SchemaSubVersion]]].combineAll(contexts ++ ue)
}
Expand Down
Loading

0 comments on commit 40bea85

Please sign in to comment.