Skip to content

Commit

Permalink
address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Oct 11, 2021
1 parent 9c87e96 commit 1a97585
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ object Enrich {
env.source
.pauseWhen(env.pauseEnrich)
.evalTap(_ => env.metrics.rawCount)
.map(a => (a, env.getPayload(a)))
.fproduct(env.getPayload)
.through(enrichPipe)
.through(sinkResult(sinkOne(env), env.metrics.enrichLatency, env.streamsSettings.concurrency.output))
.through(env.checkpointer)
Expand Down Expand Up @@ -181,17 +181,18 @@ object Enrich {
.SizeViolation(
processor,
Failure.SizeViolation(Instant.now(), maxRecordSize, size, msg),
BadRowPayload.RawPayload(asStr.take(maxRecordSize / 10))
BadRowPayload.RawPayload(asStr.take(maxRecordSize * 8 / 10))
)
Left(br)
} else Right(asBytes)
}

def sinkResult[F[_]: Concurrent: Parallel, A](
/** @tparam B can be anything, there is no constraint on this type */
def sinkResult[F[_]: Concurrent: Parallel, B](
sink: Validated[BadRow, EnrichedEvent] => F[Unit],
trackLatency: Option[Long] => F[Unit],
concurrency: Int
): Pipe[F, (A, Result), A] =
): Pipe[F, (B, Result), B] =
_.parEvalMap(concurrency){ case (orig, (events, collectorTstamp)) => events.parTraverse_(sink).as(orig) <* trackLatency(collectorTstamp) }

def sinkOne[F[_]: Concurrent: Parallel, A](env: Environment[F, A])(result: Validated[BadRow, EnrichedEvent]): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, ParsedConfigs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Concurrency
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{Client, Clients, Metrics}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{Clients, Metrics}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client

import scala.concurrent.ExecutionContext

Expand Down Expand Up @@ -74,6 +75,8 @@ import scala.concurrent.ExecutionContext
* @param piiAttributes fields from a PII event to use as output message attributes
* @param processor identifies enrich asset in bad rows
* @param streamsSettings parameters used to configure the streams
* @tparam A type emitted by the source (e.g. `ConsumerRecord` for PubSub).
* getPayload must be defined for this type, as well as a checkpointer
*/
final case class Environment[F[_], A](
igluClient: IgluClient[F, Json],
Expand Down Expand Up @@ -150,9 +153,9 @@ object Environment {
clts = Clients.init[F](http, clients)
igluClient <- IgluClient.parseDefault[F](parsedConfigs.igluJson).resource
metrics <- Resource.eval(metricsReporter[F](blocker, file))
assets = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache)
download = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache)
pauseEnrich <- makePause[F]
assets <- Assets.State.make[F](blocker, pauseEnrich, assets, clts)
assets <- Assets.State.make[F](blocker, pauseEnrich, download, clts)
enrichments <- Enrichments.make[F](parsedConfigs.enrichmentConfigs, BlockerF.ofBlocker(blocker))
sentry <- file.monitoring.flatMap(_.sentry).map(_.dsn) match {
case Some(dsn) => Resource.eval[F, Option[SentryClient]](Sync[F].delay(Sentry.init(dsn.toString).some))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import com.snowplowanalytics.snowplow.badrows.Processor

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Authentication, Input, Output}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{CliConfig, ParsedConfigs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{Client, Sink, Source}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{Sink, Source}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client

object Run {
private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Expand All @@ -56,7 +57,7 @@ object Run {
case Right(cfg) =>

ParsedConfigs.parse[F](cfg).fold (
err => Sync[F].delay(System.err.println(err)).as[ExitCode](ExitCode.Error),
err => Logger[F].error(s"CLI arguments valid but some of the configuration is not correct. Error: $err").as[ExitCode](ExitCode.Error),
parsed =>
Blocker[F].use { blocker =>
for {
Expand Down Expand Up @@ -113,7 +114,7 @@ object Run {
}
).flatten
case Left(error) =>
Sync[F].delay(System.err.println(error)) >> Sync[F].pure(ExitCode.Error)
Logger[F].error(s"CLI arguments are invalid. Error: $error") >> Sync[F].pure(ExitCode.Error)
}

private def initAttributedSink[F[_]: Concurrent: ContextShift: Timer](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.http4s.client.blaze.BlazeClientBuilder

import scala.concurrent.ExecutionContext

import Clients._

case class Clients[F[_]: ConcurrentEffect](clients: List[Client[F]]) {
/** Download a URI as a stream of bytes, using the appropriate client */
def download(uri: URI): Stream[F, Byte] =
Expand Down Expand Up @@ -59,9 +61,9 @@ object Clients {
case class DownloadingFailure(uri: URI) extends Throwable {
override def getMessage: String = s"Cannot download $uri"
}
}

trait Client[F[_]] {
val prefixes: List[String]
def download(uri: URI): Stream[F, Byte]
trait Client[F[_]] {
val prefixes: List[String]
def download(uri: URI): Stream[F, Byte]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object TestEnvironment extends CatsIO {
g => goodRef.update(_ :+ g),
Some(p => piiRef.update(_ :+ p)),
b => badRef.update(_ :+ b),
_.flatMap(_ => Stream.eval(IO.unit)),
_.map(_ => ()),
identity,
None,
metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import blobstore.Path

import com.google.cloud.storage.StorageOptions

import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Client
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client

object GcsClient {

Expand Down

0 comments on commit 1a97585

Please sign in to comment.