Skip to content

Commit

Permalink
Common: fix ClassCastException with multi-thread access (close #278)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Jul 12, 2020
1 parent 56516f4 commit 48e4ce8
Show file tree
Hide file tree
Showing 29 changed files with 229 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ package apirequest

import java.util.UUID

import cats.{Eval, Id, Monad}
import cats.{Id, Monad}
import cats.data.{EitherT, NonEmptyList, ValidatedNel}
import cats.effect.Sync
import cats.implicits._

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
Expand Down Expand Up @@ -215,45 +214,6 @@ sealed trait CreateApiRequestEnrichment[F[_]] {
object CreateApiRequestEnrichment {
def apply[F[_]](implicit ev: CreateApiRequestEnrichment[F]): CreateApiRequestEnrichment[F] = ev

implicit def syncCreateApiRequestEnrichment[F[_]: Sync: HttpClient](
implicit CLM: CreateLruMap[F, String, (Either[Throwable, Json], Long)]
): CreateApiRequestEnrichment[F] =
new CreateApiRequestEnrichment[F] {
override def create(conf: ApiRequestConf): F[ApiRequestEnrichment[F]] =
CLM
.create(conf.cache.size)
.map(c =>
ApiRequestEnrichment(
conf.schemaKey,
conf.inputs,
conf.api,
conf.outputs,
conf.cache.ttl,
c
)
)
}

implicit def evalCreateApiRequestEnrichment(
implicit CLM: CreateLruMap[Eval, String, (Either[Throwable, Json], Long)],
HTTP: HttpClient[Eval]
): CreateApiRequestEnrichment[Eval] =
new CreateApiRequestEnrichment[Eval] {
override def create(conf: ApiRequestConf): Eval[ApiRequestEnrichment[Eval]] =
CLM
.create(conf.cache.size)
.map(c =>
ApiRequestEnrichment(
conf.schemaKey,
conf.inputs,
conf.api,
conf.outputs,
conf.cache.ttl,
c
)
)
}

implicit def idCreateApiRequestEnrichment(
implicit CLM: CreateLruMap[Id, String, (Either[Throwable, Json], Long)],
HTTP: HttpClient[Id]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.common

import cats.Eval
import cats.Id
import cats.implicits._
import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.core.SelfDescribingData
Expand Down Expand Up @@ -57,9 +57,8 @@ object SpecHelpers {
}"""

/** Builds an Iglu client from the above Iglu configuration. */
val client: Client[Eval, Json] = Client
.parseDefault[Eval](igluConfig)
.value
val client: Client[Id, Json] = Client
.parseDefault[Id](igluConfig)
.value
.getOrElse(throw new RuntimeException("invalid resolver configuration"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ package com.snowplowanalytics.snowplow.enrich.common
package adapters
package registry

import cats.Eval
import cats.Id
import cats.data.NonEmptyList
import cats.syntax.option._

import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup._
import com.snowplowanalytics.snowplow.badrows._
import org.joda.time.DateTime
Expand Down Expand Up @@ -96,7 +97,7 @@ class CallrailAdapterSpec extends Specification with DataTables with ValidatedMa
"nuid" -> "-"
)
val payload = CollectorPayload(Shared.api, params, None, None, Shared.source, Shared.context)
val actual = CallrailAdapter.toRawEvents[Eval](payload, SpecHelpers.client).value
val actual = CallrailAdapter.toRawEvents[Id](payload, SpecHelpers.client)

val expectedJson =
"""|{
Expand Down Expand Up @@ -159,7 +160,7 @@ class CallrailAdapterSpec extends Specification with DataTables with ValidatedMa
def e2 = {
val params = toNameValuePairs()
val payload = CollectorPayload(Shared.api, params, None, None, Shared.source, Shared.context)
val actual = CallrailAdapter.toRawEvents[Eval](payload, SpecHelpers.client).value
val actual = CallrailAdapter.toRawEvents[Id](payload, SpecHelpers.client)

actual must beInvalid(
NonEmptyList.one(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class CloudfrontAccessLogAdapterSpec extends Specification with DataTables with
val payload = loader.toCollectorPayload(input, processor)

val actual = payload.map(
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client).value)
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client))
)

val expectedJson =
Expand Down Expand Up @@ -133,7 +133,7 @@ class CloudfrontAccessLogAdapterSpec extends Specification with DataTables with
val payload = loader.toCollectorPayload(input, processor)

val actual = payload.map(
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client).value)
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client))
)

val expectedJson =
Expand Down Expand Up @@ -184,7 +184,7 @@ class CloudfrontAccessLogAdapterSpec extends Specification with DataTables with
val payload = loader.toCollectorPayload(input, processor)

val actual = payload.map(
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client).value)
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client))
)

val expectedJson =
Expand Down Expand Up @@ -238,7 +238,7 @@ class CloudfrontAccessLogAdapterSpec extends Specification with DataTables with
val payload = loader.toCollectorPayload(input, processor)

val actual = payload.map(
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client).value)
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client))
)

val expectedJson =
Expand Down Expand Up @@ -293,7 +293,7 @@ class CloudfrontAccessLogAdapterSpec extends Specification with DataTables with
val payload = loader.toCollectorPayload(input, processor)

val actual = payload.map(
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client).value)
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client))
)

val expectedJson =
Expand Down Expand Up @@ -352,7 +352,7 @@ class CloudfrontAccessLogAdapterSpec extends Specification with DataTables with
val payload = loader.toCollectorPayload(input, processor)

val actual = payload.map(
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client).value)
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client))
)

val expectedJson =
Expand Down Expand Up @@ -412,7 +412,7 @@ class CloudfrontAccessLogAdapterSpec extends Specification with DataTables with
val payload = loader.toCollectorPayload(input, processor)

val actual = payload.map(
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client).value)
_.map(CloudfrontAccessLogAdapter.toRawEvents(_, SpecHelpers.client))
)

val expectedJson =
Expand Down Expand Up @@ -478,7 +478,7 @@ class CloudfrontAccessLogAdapterSpec extends Specification with DataTables with
Shared.source,
Shared.context
)
val actual = CloudfrontAccessLogAdapter.toRawEvents(payload, SpecHelpers.client).value
val actual = CloudfrontAccessLogAdapter.toRawEvents(payload, SpecHelpers.client)

actual must beInvalid(
NonEmptyList
Expand All @@ -503,7 +503,7 @@ class CloudfrontAccessLogAdapterSpec extends Specification with DataTables with
Shared.source,
Shared.context
)
val actual = CloudfrontAccessLogAdapter.toRawEvents(payload, SpecHelpers.client).value
val actual = CloudfrontAccessLogAdapter.toRawEvents(payload, SpecHelpers.client)

actual must beInvalid(
NonEmptyList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali

def e1 = {
val payload = CollectorPayload(api, Nil, None, None, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)
actual must beInvalid(
NonEmptyList.one(
FailureDetails.AdapterFailure.InputData("body", None, "empty body")
Expand All @@ -87,7 +87,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e2 = {
val body = "dl=docloc"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)
actual must beInvalid(
NonEmptyList.one(
FailureDetails.AdapterFailure.InputData(
Expand All @@ -102,7 +102,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e3 = {
val body = "t=unknown&dl=docloc"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)
actual must beInvalid(
NonEmptyList.of(
FailureDetails.AdapterFailure
Expand All @@ -119,7 +119,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e4 = {
val body = "t=pageview&dh=host&dp=path"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedJson =
"""|{
Expand All @@ -144,7 +144,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e5 = {
val body = "t=pageview&dh=host&cid=id&v=version"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedUE =
"""|{
Expand Down Expand Up @@ -174,7 +174,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e6 = {
val body = "t=pageview&dp=path&uip=ip"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedUE =
"""|{
Expand All @@ -200,7 +200,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e7 = {
val body = "t=item&in=name&ip=12.228&iq=12&aip=0"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedUE =
"""|{
Expand Down Expand Up @@ -236,7 +236,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e8 = {
val body = "t=exception&exd=desc&exf=1&dh=host"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedUE =
"""|{
Expand All @@ -261,7 +261,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e9 = {
val body = "t=transaction&ti=tr&cu=EUR&pr12id=ident&pr12cd42=val"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedUE =
"""|{
Expand Down Expand Up @@ -294,7 +294,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e10 = {
val body = "t=pageview&dp=path&il12pi42id=s&il12pi42cd36=dim"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedUE =
"""|{
Expand Down Expand Up @@ -322,7 +322,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e11 = {
val body = "t=screenview&cd=name&cd12=dim"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedUE =
"""|{
Expand All @@ -347,7 +347,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e12 = {
val body = "t=pageview&dp=path&pr1id=s1&pr2id=s2&pr1cd1=v1&pr1cd2=v2"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedUE =
"""|{
Expand Down Expand Up @@ -381,7 +381,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e13 = {
val body = "t=pageview&dp=path&promoa=action&promo12id=id"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedUE =
"""|{
Expand Down Expand Up @@ -409,7 +409,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
def e14 = {
val body = "t=pageview&dh=host&dp=path\nt=pageview&dh=host&dp=path"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedJson =
"""|{
Expand All @@ -436,7 +436,7 @@ class GoogleAnalyticsAdapterSpec extends Specification with DataTables with Vali
val body =
"t=pageview&dh=host&dp=path&cu=EUR&il1pi1pr=1&il1pi1nm=name1&il1pi1ps=1&il1pi1ca=cat1&il1pi1id=id1&il1pi1br=brand1&il1pi2pr=2&il1pi2nm=name2&il1pi2ps=2&il1pi2ca=cat2&il1pi2id=id2&il1pi2br=brand2&il2pi1pr=21&il2pi1nm=name21&il2pi1ps=21&il2pi1ca=cat21&il2pi1id=id21&il2pi1br=brand21"
val payload = CollectorPayload(api, Nil, None, body.some, source, context)
val actual = toRawEvents(payload, SpecHelpers.client).value
val actual = toRawEvents(payload, SpecHelpers.client)

val expectedJson =
"""|{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class HubSpotAdapterSpec extends Specification with DataTables with ValidatedMat
Shared.context
)
)
HubSpotAdapter.toRawEvents(payload, SpecHelpers.client).value must beValid(expected)
HubSpotAdapter.toRawEvents(payload, SpecHelpers.client) must beValid(expected)
}

def e4 = {
Expand All @@ -116,15 +116,15 @@ class HubSpotAdapterSpec extends Specification with DataTables with ValidatedMat
HubSpotAdapter.EventSchemaMap,
"no schema associated with the provided type parameter at index 0"
)
HubSpotAdapter.toRawEvents(payload, SpecHelpers.client).value must beInvalid(
HubSpotAdapter.toRawEvents(payload, SpecHelpers.client) must beInvalid(
NonEmptyList.one(expected)
)
}

def e5 = {
val payload =
CollectorPayload(Shared.api, Nil, ContentType.some, None, Shared.cljSource, Shared.context)
HubSpotAdapter.toRawEvents(payload, SpecHelpers.client).value must beInvalid(
HubSpotAdapter.toRawEvents(payload, SpecHelpers.client) must beInvalid(
NonEmptyList.one(
FailureDetails.AdapterFailure
.InputData("body", None, "empty body: not events to process")
Expand All @@ -135,7 +135,7 @@ class HubSpotAdapterSpec extends Specification with DataTables with ValidatedMat
def e6 = {
val payload =
CollectorPayload(Shared.api, Nil, None, "stub".some, Shared.cljSource, Shared.context)
HubSpotAdapter.toRawEvents(payload, SpecHelpers.client).value must beInvalid(
HubSpotAdapter.toRawEvents(payload, SpecHelpers.client) must beInvalid(
NonEmptyList.one(
FailureDetails.AdapterFailure.InputData(
"contentType",
Expand All @@ -156,7 +156,7 @@ class HubSpotAdapterSpec extends Specification with DataTables with ValidatedMat
Shared.cljSource,
Shared.context
)
HubSpotAdapter.toRawEvents(payload, SpecHelpers.client).value must beInvalid(
HubSpotAdapter.toRawEvents(payload, SpecHelpers.client) must beInvalid(
NonEmptyList.one(
FailureDetails.AdapterFailure
.InputData("contentType", ct, "expected application/json")
Expand Down
Loading

0 comments on commit 48e4ce8

Please sign in to comment.