Skip to content

Commit

Permalink
Makes schemas configurable in adapters (close #791)
Browse files Browse the repository at this point in the history
Run scalafmt

Add strictly typed properties for schemas in adapter configs

Add adapters config to stream enrich

Duplicate adapter configuration in each SpecHelpers

Rmove duplicated configuration

Update include_current.txt configuration in common test resources
  • Loading branch information
matus-tomlein committed Jun 29, 2023
1 parent 6a11d15 commit fa2c343
Show file tree
Hide file tree
Showing 52 changed files with 1,509 additions and 432 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ lazy val streamCommon = project
.settings(streamCommonBuildSettings)
.settings(libraryDependencies ++= streamCommonDependencies)
.settings(excludeDependencies ++= exclusions)
.dependsOn(common)
.dependsOn(common % "test->test;compile->compile")

lazy val streamKinesis = project
.in(file("modules/stream/kinesis"))
Expand Down Expand Up @@ -105,7 +105,7 @@ lazy val commonFs2 = project
.settings(Defaults.itSettings)
.configs(IntegrationTest)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(common)
.dependsOn(common % "test->test;compile->compile")


lazy val pubsub = project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry

import com.snowplowanalytics.snowplow.enrich.pubsub.{Enrich, EnrichSpec}

import org.joda.time.DateTime

@State(Scope.Thread)
Expand Down Expand Up @@ -75,7 +73,24 @@ object EtlPipelineBench {
@Setup(Level.Trial)
def setup(): Unit = {
dateTime = DateTime.parse("2010-06-30T01:20+02:00")
adapterRegistry = new AdapterRegistry()
adapterRegistry = new AdapterRegistry(
callrailConfig = callrailConfig,
cloudfrontAccessLogConfig = cloudfrontAccessLogConfig,
googleAnalyticsConfig = googleAnalyticsConfig,
hubspotConfig = hubspotConfig,
mailchimpConfig = mailchimpConfig,
mailgunConfig = mailgunConfig,
mandrillConfig = mandrillConfig,
marketoConfig = marketoConfig,
olarkConfig = olarkConfig,
pagerdutyConfig = pagerdutyConfig,
pingdomConfig = pingdomConfig,
sendgridConfig = sendgridConfig,
statusGatorConfig = statusGatorConfig,
unbounceConfig = unbounceConfig,
urbanAirshipConfig = urbanAirshipConfig,
veroConfig = veroConfig
)
enrichmentRegistryId = EnrichmentRegistry[Id]()
enrichmentRegistryIo = EnrichmentRegistry[IO]()
clientId = Client[Id, Json](Resolver(List(), None), CirceValidator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,25 @@ object Environment {
metadata <- Resource.eval(metadataReporter[F](file, processor.artifact, http))
assets = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache)
(remoteAdaptersHttpClient, remoteAdapters) <- prepareRemoteAdapters[F](file.remoteAdapters, ec, metrics)
adapterRegistry = new AdapterRegistry(remoteAdapters)
adapterRegistry = new AdapterRegistry(
remoteAdapters,
callrailConfig = file.adapters.callrail,
cloudfrontAccessLogConfig = file.adapters.cloudfrontAccessLog,
googleAnalyticsConfig = file.adapters.googleAnalytics,
hubspotConfig = file.adapters.hubspot,
mailchimpConfig = file.adapters.mailchimp,
mailgunConfig = file.adapters.mailgun,
mandrillConfig = file.adapters.mandrill,
marketoConfig = file.adapters.marketo,
olarkConfig = file.adapters.olark,
pagerdutyConfig = file.adapters.pagerduty,
pingdomConfig = file.adapters.pingdom,
sendgridConfig = file.adapters.sendgrid,
statusGatorConfig = file.adapters.statusgator,
unbounceConfig = file.adapters.unbounce,
urbanAirshipConfig = file.adapters.urbanAirship,
veroConfig = file.adapters.vero
)
sem <- Resource.eval(Semaphore(1L))
assetsState <- Resource.eval(Assets.State.make[F](blocker, sem, clts, assets))
shifter <- ShiftExecution.ofSingleThread[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import cats.data.EitherT
import cats.effect.Sync

import _root_.io.circe.{Decoder, Encoder}
import _root_.io.circe.config.syntax._
import _root_.io.circe.generic.extras.semiauto.{deriveConfiguredDecoder, deriveConfiguredEncoder}

import com.typesafe.config.{Config => TSConfig, ConfigFactory}

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{
AdaptersConfig,
Concurrency,
Experimental,
FeatureFlags,
Expand All @@ -46,6 +46,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{
* @param telemetry configuration for telemetry
* @param featureFlags to activate/deactivate enrich features
* @param experimental configuration for experimental features
* @param adapters configuration for adapters
*/
final case class ConfigFile(
input: Input,
Expand All @@ -56,7 +57,8 @@ final case class ConfigFile(
monitoring: Monitoring,
telemetry: Telemetry,
featureFlags: FeatureFlags,
experimental: Option[Experimental]
experimental: Option[Experimental],
adapters: AdaptersConfig
)

object ConfigFile {
Expand All @@ -67,16 +69,16 @@ object ConfigFile {

implicit val configFileDecoder: Decoder[ConfigFile] =
deriveConfiguredDecoder[ConfigFile].emap {
case ConfigFile(_, _, _, Some(aup), _, _, _, _, _) if aup._1 <= 0L =>
case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _) if aup._1 <= 0L =>
"assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype
// Remove pii output if streamName and region empty
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _) if output.streamName.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _) if output.streamName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _, _, _, _) if t.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if t.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _) if topicName.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if topicName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
case other => other.asRight
}
Expand All @@ -88,9 +90,12 @@ object ConfigFile {
* Command line parameters have the highest priority.
* Then the provided file.
* Then the application.conf file.
* Then the application-default.conf file.
*/
private def configFileFallbacks(in: TSConfig): TSConfig =
ConfigFactory.load(in.withFallback(ConfigFactory.load()))
private def configFileFallbacks(in: TSConfig): TSConfig = {
val defaultConf = ConfigFactory.load("application-default.conf")
ConfigFactory.load(in.withFallback(ConfigFactory.load().withFallback(defaultConf)))
}

def parse[F[_]: Sync](raw: EncodedHoconOrPath): EitherT[F, String, ConfigFile] =
ParsedConfigs.parseEncodedOrPath(raw, configFileFallbacks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import scala.concurrent.duration.{Duration, FiniteDuration}

import _root_.io.circe.{Decoder, DecodingFailure, Encoder}
import _root_.io.circe.generic.extras.semiauto._
import _root_.io.circe.config.syntax._
import _root_.io.circe.DecodingFailure

import org.http4s.{ParseFailure, Uri}

import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline.{FeatureFlags => CommonFeatureFlags}
import com.snowplowanalytics.snowplow.enrich.common.adapters._

object io {

Expand Down Expand Up @@ -512,4 +511,94 @@ object io {
legacyEnrichmentOrder = ff.legacyEnrichmentOrder
)
}

case class AdaptersConfig(
callrail: CallrailConfig,
cloudfrontAccessLog: CloudfrontAccessLogConfig,
googleAnalytics: GoogleAnalyticsConfig,
hubspot: HubspotConfig,
mailchimp: MailchimpConfig,
mailgun: MailgunConfig,
mandrill: MandrillConfig,
marketo: MarketoConfig,
olark: OlarkConfig,
pagerduty: PagerdutyConfig,
pingdom: PingdomConfig,
sendgrid: SendgridConfig,
statusgator: StatusGatorConfig,
unbounce: UnbounceConfig,
urbanAirship: UrbanAirshipConfig,
vero: VeroConfig
)

object AdaptersConfig {
implicit val adaptersConfigDecoder: Decoder[AdaptersConfig] =
deriveConfiguredDecoder[AdaptersConfig]
implicit val adaptersConfigEncoder: Encoder[AdaptersConfig] =
deriveConfiguredEncoder[AdaptersConfig]
implicit val callrailConfigDecoder: Decoder[CallrailConfig] =
deriveConfiguredDecoder[CallrailConfig]
implicit val callrailConfigEncoder: Encoder[CallrailConfig] =
deriveConfiguredEncoder[CallrailConfig]
implicit val cloudfrontAccessLogConfigDecoder: Decoder[CloudfrontAccessLogConfig] =
deriveConfiguredDecoder[CloudfrontAccessLogConfig]
implicit val cloudfrontAccessLogConfigEncoder: Encoder[CloudfrontAccessLogConfig] =
deriveConfiguredEncoder[CloudfrontAccessLogConfig]
implicit val googleAnalyticsConfigDecoder: Decoder[GoogleAnalyticsConfig] =
deriveConfiguredDecoder[GoogleAnalyticsConfig]
implicit val googleAnalyticsConfigEncoder: Encoder[GoogleAnalyticsConfig] =
deriveConfiguredEncoder[GoogleAnalyticsConfig]
implicit val hubspotConfigDecoder: Decoder[HubspotConfig] =
deriveConfiguredDecoder[HubspotConfig]
implicit val hubspotConfigEncoder: Encoder[HubspotConfig] =
deriveConfiguredEncoder[HubspotConfig]
implicit val mailchimpConfigDecoder: Decoder[MailchimpConfig] =
deriveConfiguredDecoder[MailchimpConfig]
implicit val mailchimpConfigEncoder: Encoder[MailchimpConfig] =
deriveConfiguredEncoder[MailchimpConfig]
implicit val mailgunConfigDecoder: Decoder[MailgunConfig] =
deriveConfiguredDecoder[MailgunConfig]
implicit val mailgunConfigEncoder: Encoder[MailgunConfig] =
deriveConfiguredEncoder[MailgunConfig]
implicit val mandrillConfigDecoder: Decoder[MandrillConfig] =
deriveConfiguredDecoder[MandrillConfig]
implicit val mandrillConfigEncoder: Encoder[MandrillConfig] =
deriveConfiguredEncoder[MandrillConfig]
implicit val marketoConfigDecoder: Decoder[MarketoConfig] =
deriveConfiguredDecoder[MarketoConfig]
implicit val marketoConfigEncoder: Encoder[MarketoConfig] =
deriveConfiguredEncoder[MarketoConfig]
implicit val olarkConfigDecoder: Decoder[OlarkConfig] =
deriveConfiguredDecoder[OlarkConfig]
implicit val olarkConfigEncoder: Encoder[OlarkConfig] =
deriveConfiguredEncoder[OlarkConfig]
implicit val pagerdutyConfigDecoder: Decoder[PagerdutyConfig] =
deriveConfiguredDecoder[PagerdutyConfig]
implicit val pagerdutyConfigEncoder: Encoder[PagerdutyConfig] =
deriveConfiguredEncoder[PagerdutyConfig]
implicit val pingdomConfigDecoder: Decoder[PingdomConfig] =
deriveConfiguredDecoder[PingdomConfig]
implicit val pingdomConfigEncoder: Encoder[PingdomConfig] =
deriveConfiguredEncoder[PingdomConfig]
implicit val sendgridConfigDecoder: Decoder[SendgridConfig] =
deriveConfiguredDecoder[SendgridConfig]
implicit val sendgridConfigEncoder: Encoder[SendgridConfig] =
deriveConfiguredEncoder[SendgridConfig]
implicit val statusgatorConfigDecoder: Decoder[StatusGatorConfig] =
deriveConfiguredDecoder[StatusGatorConfig]
implicit val statusgatorConfigEncoder: Encoder[StatusGatorConfig] =
deriveConfiguredEncoder[StatusGatorConfig]
implicit val unbounceConfigDecoder: Decoder[UnbounceConfig] =
deriveConfiguredDecoder[UnbounceConfig]
implicit val unbounceConfigEncoder: Encoder[UnbounceConfig] =
deriveConfiguredEncoder[UnbounceConfig]
implicit val urbanAirshipConfigDecoder: Decoder[UrbanAirshipConfig] =
deriveConfiguredDecoder[UrbanAirshipConfig]
implicit val urbanAirshipConfigEncoder: Encoder[UrbanAirshipConfig] =
deriveConfiguredEncoder[UrbanAirshipConfig]
implicit val veroConfigDecoder: Decoder[VeroConfig] =
deriveConfiguredDecoder[VeroConfig]
implicit val veroConfigEncoder: Encoder[VeroConfig] =
deriveConfiguredEncoder[VeroConfig]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.registries.Registry
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}
import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.loaders._
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent.toPartiallyEnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers._
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.fs2.SpecHelpers._
import com.snowplowanalytics.snowplow.eventgen.runGen
import com.snowplowanalytics.snowplow.eventgen.enrich.{SdkEvent => GenSdkEvent}
import org.specs2.matcher.MustMatchers.{left => _, right => _}
Expand Down Expand Up @@ -173,7 +175,24 @@ class EventGenEtlPipelineSpec extends Specification with CatsIO {

val rng: Random = new scala.util.Random(1L)

val adapterRegistry = new AdapterRegistry()
val adapterRegistry = new AdapterRegistry(
callrailConfig = callrailConfig,
cloudfrontAccessLogConfig = cloudfrontAccessLogConfig,
googleAnalyticsConfig = googleAnalyticsConfig,
hubspotConfig = hubspotConfig,
mailchimpConfig = mailchimpConfig,
mailgunConfig = mailgunConfig,
mandrillConfig = mandrillConfig,
marketoConfig = marketoConfig,
olarkConfig = olarkConfig,
pagerdutyConfig = pagerdutyConfig,
pingdomConfig = pingdomConfig,
sendgridConfig = sendgridConfig,
statusGatorConfig = statusGatorConfig,
unbounceConfig = unbounceConfig,
urbanAirshipConfig = urbanAirshipConfig,
veroConfig = veroConfig
)
val enrichmentReg = EnrichmentRegistry[IO]()
val igluCentral = Registry.IgluCentral
val client = IgluCirceClient.parseDefault[IO](json"""
Expand Down
Loading

0 comments on commit fa2c343

Please sign in to comment.