Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/0.3.0 #56

Merged
merged 7 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
Version 0.3.0 (2024-01-15)
--------------------------
Add StatsD metrics testing (#47)
Use short `_schema_version` field from analytics-sdk (#55)
Add coldswap class to manage concurrent resource access (#53)
Unit tests for ConfigParser (#51)
Add badrows serializer checking max size (#49)
Extend coverage of Transform.transformEventUnstructured unit tests (#50)

Version 0.2.1 (2023-12-12)
--------------------------
Reinstate shutdown timeout on pubsub source (#44)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Licensed under the [Snowplow Community License](https://docs.snowplow.io/communi
[build-image]: https://github.com/snowplow-incubator/common-streams/workflows/CI/badge.svg
[build]: https://github.com/snowplow-incubator/common-streams/actions/workflows/ci.yml

[release-image]: https://img.shields.io/badge/release-0.2.1-blue.svg?style=flat
[release-image]: https://img.shields.io/badge/release-0.3.0-blue.svg?style=flat
[releases]: https://github.com/snowplow-incubator/common-streams/releases

[license]: https://docs.snowplow.io/docs/contributing/community-license-faq/
Expand Down
13 changes: 7 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ lazy val root = project
.aggregate(
streams,
kinesis,
kinesisIT,
kafka,
pubsub,
runtimeCommon,
loadersCommon
loadersCommon,
IT
)

lazy val streams: Project = project
Expand Down Expand Up @@ -51,16 +51,17 @@ lazy val kinesis: Project = project
.settings(libraryDependencies ++= Dependencies.kinesisDependencies)
.dependsOn(streams)

lazy val kinesisIT: Project = project
lazy val IT: Project = project
.settings(
name := "kinesis-it"
name := "it"
)
.withId("kinesis-it")
.in(file("modules/kinesis-it"))
.withId("it")
.in(file("modules/it"))
.settings(BuildSettings.buildSettings)
.settings(BuildSettings.publishSettings)
.settings(BuildSettings.mimaSettings)
.dependsOn(kinesis)
.dependsOn(runtimeCommon)
.settings(
publish / skip := true,
publishLocal / skip := true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.runtime

import java.net.Socket
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import cats.effect.{IO, Ref, Resource}
import cats.effect.testing.specs2.CatsResource
import org.specs2.mutable.SpecificationLike
import org.testcontainers.containers.GenericContainer

import retry.syntax.all._
import retry.RetryPolicies

class MetricsSpec extends CatsResource[IO, (GenericContainer[_], StatsdAPI[IO])] with SpecificationLike {

override val resource: Resource[IO, (GenericContainer[_], StatsdAPI[IO])] =
for {
statsd <- Statsd.resource(TestMetrics.getClass.getSimpleName)
socket <- Resource.eval(IO.blocking(new Socket(statsd.getHost(), statsd.getMappedPort(8126))))
statsdApi <- StatsdAPI.resource[IO](socket)
} yield (statsd, statsdApi)

override def is = s2"""
MetricsSpec should
deliver metrics to statsd $e1
"""

def e1 = withResource { case (statsd @ _, statsdApi) =>
for {
t <- TestMetrics.impl
_ <- t.count(100)
_ <- t.time(10.seconds)
f <- t.report.compile.drain.start
_ <- IO.sleep(150.millis)
counters <- statsdApi
.get(Metrics.MetricType.Count)
.retryingOnFailures(
v => IO.pure(v.contains("snowplow.counter")),
RetryPolicies.constantDelay[IO](10.milliseconds),
(v, _) => IO.pure(println(s"Retry fetching metrics. Not ready: $v"))
)
gauges <- statsdApi
.get(Metrics.MetricType.Gauge)
.retryingOnFailures(
v => IO.pure(v.contains("snowplow.timer")),
RetryPolicies.constantDelay[IO](10.milliseconds),
(v, _) => IO.pure(println(s"Retry fetching metrics. Not ready: $v"))
)
_ <- f.cancel
} yield List(
counters.get("statsd.metrics_received") must beSome(2),
counters.get("snowplow.counter") must beSome(100),
gauges must haveSize(1),
gauges.get("snowplow.timer") must beSome(10)
).reduce(_ and _)

}
}

object TestMetrics {

case class TestMetrics(
ref: Ref[IO, TestState],
emptyState: TestState,
config: Option[Metrics.StatsdConfig]
) extends Metrics[IO, TestState](ref, emptyState, config) {
def count(c: Int) = ref.update(s => s.copy(counter = s.counter + c))
def time(t: FiniteDuration) = ref.update(s => s.copy(timer = s.timer + t))
}

def impl = Ref[IO]
.of(TestState.empty)
.map { ref =>
TestMetrics(
ref,
TestState.empty,
Some(
Metrics.StatsdConfig(
"localhost",
8125,
Map.empty,
100.millis,
""
)
)
)
}

case class TestState(counter: Int, timer: FiniteDuration) extends Metrics.State {
override def toKVMetrics: List[Metrics.KVMetric] = List(
Count(counter),
Timer(timer)
)
}
object TestState {
def empty = TestState(0, 0.seconds)
}

case class Count(v: Int) extends Metrics.KVMetric {
val key = "snowplow.counter"
val value = v.toString()
val metricType = Metrics.MetricType.Count
}

case class Timer(v: FiniteDuration) extends Metrics.KVMetric {
val key = "snowplow.timer"
val value = v.toSeconds.toString()
val metricType = Metrics.MetricType.Gauge
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.runtime

import scala.jdk.CollectionConverters._
import cats.effect.{IO, Resource}
import org.slf4j.LoggerFactory
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.containers.wait.strategy.Wait
import com.github.dockerjava.api.model.ExposedPort
import com.github.dockerjava.api.model.Ports

object Statsd {

def resource(
loggerName: String
): Resource[IO, GenericContainer[_]] =
Resource.make {
val statsd: GenericContainer[_] = new GenericContainer("statsd/statsd:v0.10.1")
statsd.addExposedPort(8126)
statsd.setWaitStrategy(Wait.forLogMessage("""^(.*)server is up(.+)$""", 1))
statsd.withCreateContainerCmdModifier { cmd =>
val statsPort = 8125
cmd.withExposedPorts((cmd.getExposedPorts().toList :+ ExposedPort.udp(statsPort)).asJava)
val ports = cmd.getHostConfig().getPortBindings()
ports.bind(ExposedPort.udp(statsPort), Ports.Binding.bindPort(statsPort))
cmd.getHostConfig().withPortBindings(ports)
()
}
IO(start(statsd, loggerName))
}(ls => IO.blocking(ls.stop()))

private def start(statsd: GenericContainer[_], loggerName: String): GenericContainer[_] = {
statsd.start()
val logger = LoggerFactory.getLogger(loggerName)
val logs = new Slf4jLogConsumer(logger)
statsd.followOutput(logs)
statsd
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.runtime

import java.io._
import java.net._
import scala.jdk.CollectionConverters._
import cats.implicits._
import cats.effect.{Resource, Sync}
import io.circe.parser._

trait StatsdAPI[F[_]] {
def get(metricType: Metrics.MetricType): F[Map[String, Int]]
}

object StatsdAPI {
def resource[F[_]: Sync](socket: Socket): Resource[F, StatsdAPI[F]] = for {
output <- Resource.eval(Sync[F].pure(new PrintWriter(socket.getOutputStream(), true)))
input <- Resource.eval(Sync[F].pure(new BufferedReader(new InputStreamReader(socket.getInputStream()))))
} yield new StatsdAPI[F] {
def get(metricType: Metrics.MetricType): F[Map[String, Int]] = for {
_ <- Sync[F].blocking(output.println(showMetric(metricType)))
json <-
Sync[F].pure(input.lines().iterator().asScala.takeWhile(!_.toLowerCase().contains("end")).mkString("\n").replaceAll("'", "\""))
res <- Sync[F].fromEither(decode[Map[String, Int]](json))
_ <- Sync[F].pure(println(s"""StatsD metrics received: ${res.mkString(", ")}"""))
} yield res

private[this] def showMetric(metricType: Metrics.MetricType) = metricType match {
case Metrics.MetricType.Count => "counters"
case Metrics.MetricType.Gauge => "gauges"
}
}
}
1 change: 1 addition & 0 deletions modules/kafka/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ snowplow.defaults {
producerConf: {
"client.id": null # invalid value MUST be overriden by the application
}
maxRecordSize: 1000000
}
}
}
1 change: 1 addition & 0 deletions modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ snowplow.defaults: {
}
recordLimit: 500
byteLimit: 5242880
maxRecordSize: 1000000
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.snowplowanalytics.snowplow.loaders.transform

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload, Processor}

import java.nio.charset.StandardCharsets
import java.time.Instant

object BadRowsSerializer {

/**
* If input bad row exceeds provided max size in bytes, return serialized SizeViolation bad row
* with trimmed original payload. If not, return original serialized bad row.
*/
def withMaxSize(
badRow: BadRow,
processor: Processor,
maxSize: Int
): Array[Byte] = {
val originalSerialized = badRow.compactByteArray
val originalSizeBytes = originalSerialized.length

if (originalSizeBytes >= maxSize) {
val trimmedPayload = new String(originalSerialized, 0, maxSize / 10, StandardCharsets.UTF_8)
BadRow
.SizeViolation(
processor,
Failure.SizeViolation(Instant.now(), maxSize, originalSizeBytes, "Badrow exceeds allowed max size"),
Payload.RawPayload(trimmedPayload)
)
.compactByteArray
} else {
originalSerialized
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.snowplowanalytics.snowplow.loaders.transform

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.badrows.Payload
import com.snowplowanalytics.snowplow.badrows.Processor
import io.circe.parser.decode
import org.specs2.Specification

import java.nio.charset.StandardCharsets
import java.time.Instant
import java.util.UUID

class BadRowsSerializerSpec extends Specification {

private val processor = Processor("test-app", "0.1.0")
private val maxSize = 3000

def is = s2"""
Bad row serialized should
return original bad row if max size is not exceeded $e1
return SizeViolation bad row if max size is exceeded $e2
"""

def e1 = {
val inputBadRow = loaderError(Event.minimal(UUID.randomUUID(), Instant.now(), "0.1.0", "0.1.0"))
val output = serialize(inputBadRow)

decode[SelfDescribingData[BadRow]](output).map(_.data) must beRight(inputBadRow)
}

def e2 = {
val inputBadRow = loaderError(Event.minimal(UUID.randomUUID(), Instant.now(), "0.1.0", "0.1.0").copy(mkt_source = Some("A" * 1000)))
val output = serialize(inputBadRow)

decode[SelfDescribingData[BadRow]](output).map(_.data) must beRight.like { case sizeViolation: BadRow.SizeViolation =>
sizeViolation.failure.maximumAllowedSizeBytes must beEqualTo(maxSize) and
(sizeViolation.payload.event.size must beEqualTo(300)) // Max value divided by 10
}
}

private def serialize(badRow: BadRow): String = {
val output = BadRowsSerializer.withMaxSize(badRow, processor, maxSize)
new String(output, StandardCharsets.UTF_8)
}

private def loaderError(event: Event) =
BadRow.LoaderRuntimeError(processor, failure = "Some runtime loader error message", payload = Payload.LoaderPayload(event))
}
Loading
Loading