Skip to content

Commit

Permalink
Add recovery steps
Browse files Browse the repository at this point in the history
  • Loading branch information
lukeindykiewicz committed Nov 11, 2020
1 parent 8c16d84 commit 73ef80b
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,62 @@ import fs2.Stream
import cats.effect.Concurrent
import cats.effect.Timer
import blobstore.Path
import blobstore.implicits.GetOps
import io.circe.JsonObject
import java.time.Instant
import java.util.UUID
import io.circe.generic.auto._, io.circe.parser._
import cats.effect.Sync
import cats.syntax.all._

object Recover {

//TODO: hardcode proper GCS path
val DeadQueuePath = Path("gs://sp-storage-loader-failed-inserts-dev1-com_snplow_eng_gcp/dead_queue")

def recoverFailedInserts[F[_]: Timer: Concurrent](resources: Resources[F]): Stream[F, Unit] =
for {
readFromGcs <- resources.store.get(Path("gs://foo/bar"), 1024)
_ <- Stream.eval(resources.pubSubProducer.produce(recover(readFromGcs)))
} yield ()
resources
.store
.list(DeadQueuePath)
.evalMap(resources.store.getContents)
.map(recover)
.evalMap {
case Left(e) => Sync[F].pure(println(s"Error: $e")) // TODO: use logger
case Right(read) => resources.pubSubProducer.produce(read) *> Sync[F].unit
}

case class SimpleEventContainer(eventId: UUID, etlTstamp: Instant, payload: String)

// TODO: filter only time period that is needed (based on names in GCS bucket)
// TODO: filter only failures that are due to invalid column
// TODO: count successfuly recovered events (?)
def recover: String => Either[String, EventContainer] =
b =>
stringToFailedInsertBadRow(b).map { ev =>
val fixed = fix(ev.payload)
EventContainer(ev.eventId, ev.etlTstamp, stringToJson(fixed))
}

case class FailedInsert(schema: String, data: Data)
case class Data(payload: String)

case class Payload(eventId: UUID, etlTstamp: Instant)

case class Combined(eventId: UUID, etlTstamp: Instant, payload: String)

def stringToFailedInsertBadRow: String => Either[String, Combined] =
in => {
val parse =
for {
raw <- decode[FailedInsert](in).map(_.data.payload)
extra <- decode[Payload](raw)
} yield Combined(extra.eventId, extra.etlTstamp, raw)

parse.left.map(_.toString)
}

def fix: String => String = _.replaceAll("_%", "_percentage")

private def recover: Byte => EventContainer = ???
def stringToJson: String => JsonObject = ???

}
17 changes: 17 additions & 0 deletions repeater/src/test/resources/failed_inserts.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.badrows/loader_recovery_error/jsonschema/1-0-0",
"data": {
"processor": {
"artifact": "snowplow-bigquery-repeater",
"version": "0.6.0-rc7"
},
"failure": {
"error": {
"message": "no such field.",
"location": "unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0",
"reason": "invalid"
}
},
"payload": "{\"eventId\":\"90aebb1b-1d49-455b-9049-15ec72dfe5a9\",\"etlTstamp\":\"2020-11-10T16:56:39.283Z\",\"payload\":{\"geo_country\":\"DE\",\"event\":\"unstruct\",\"user_ipaddress\":\"18.194.133.57\",\"event_version\":\"1-0-0\",\"geo_city\":\"Frankfurt am Main\",\"platform\":\"srv\",\"event_id\":\"90aebb1b-1d49-455b-9049-15ec72dfe5a9\",\"etl_tstamp\":\"2020-11-10T16:56:39.283Z\",\"geo_latitude\":50.1188,\"v_collector\":\"ssc-2.0.0-googlepubsub\",\"collector_tstamp\":\"2020-11-10T16:56:37.401Z\",\"event_vendor\":\"com.snplow.eng.gcp\",\"network_userid\":\"70ec6f9e-cfe9-46c7-8b9c-fab280ea120e\",\"geo_region\":\"HE\",\"geo_timezone\":\"Europe/Berlin\",\"event_format\":\"jsonschema\",\"geo_zipcode\":\"60313\",\"useragent\":\"Go-http-client/2.0\",\"event_name\":\"luke_test_percentage\",\"dvce_created_tstamp\":\"2020-11-10T16:56:37.115Z\",\"dvce_sent_tstamp\":\"2020-11-10T16:56:37.117Z\",\"geo_longitude\":8.6843,\"v_tracker\":\"golang-2.3.0\",\"derived_tstamp\":\"2020-11-10T16:56:37.399Z\",\"app_id\":\"test_percentage_luke\",\"geo_region_name\":\"Hesse\",\"event_fingerprint\":\"b9f667befa875fcae1bedd55e463330e\",\"v_etl\":\"beam-enrich-1.4.1-common-1.4.1\",\"contexts_com_snowplowanalytics_snowplow_ua_parser_context_1_0_0\":[{\"device_family\":\"Other\",\"os_family\":\"Other\",\"useragent_family\":\"Go-http-client\",\"os_major\":null,\"os_minor\":null,\"os_patch\":null,\"os_patch_minor\":null,\"os_version\":\"Other\",\"useragent_major\":\"2\",\"useragent_minor\":\"0\",\"useragent_patch\":null,\"useragent_version\":\"Go-http-client 2.0\"}],\"contexts_com_iab_snowplow_spiders_and_robots_1_0_0\":[{\"category\":\"SPIDER_OR_ROBOT\",\"primary_impact\":\"UNKNOWN\",\"reason\":\"FAILED_UA_INCLUDE\",\"spider_or_robot\":true}],\"contexts_nl_basjes_yauaa_context_1_0_1\":[{\"device_class\":\"Unknown\",\"agent_build\":null,\"agent_class\":\"Special\",\"agent_information_email\":null,\"agent_information_url\":null,\"agent_language\":null,\"agent_language_code\":null,\"agent_name\":\"Go-http-client\",\"agent_name_version\":\"Go-http-client 2.0\",\"agent_name_version_major\":\"Go-http-client 2\",\"agent_security\":null,\"agent_uuid\":null,\"agent_version\":\"2.0\",\"agent_version_major\":\"2\",\"anonymized\":null,\"carrier\":null,\"device_brand\":\"Unknown\",\"device_cpu\":null,\"device_cpu_bits\":null,\"device_firmware_version\":null,\"device_name\":\"Unknown\",\"device_version\":null,\"facebook_carrier\":null,\"facebook_device_class\":null,\"facebook_device_name\":null,\"facebook_device_version\":null,\"facebook_fbop\":null,\"facebook_fbss\":null,\"facebook_operating_system_name\":null,\"facebook_operating_system_version\":null,\"g_sa_installation_id\":null,\"hacker_attack_vector\":null,\"hacker_toolkit\":null,\"i_e_compatibility_name_version\":null,\"i_e_compatibility_name_version_major\":null,\"i_e_compatibility_version\":null,\"i_e_compatibility_version_major\":null,\"kobo_affiliate\":null,\"kobo_platform_id\":null,\"layout_engine_build\":null,\"layout_engine_class\":\"Unknown\",\"layout_engine_name\":\"Unknown\",\"layout_engine_name_version\":\"Unknown ??\",\"layout_engine_name_version_major\":\"Unknown ??\",\"layout_engine_version\":\"??\",\"layout_engine_version_major\":\"??\",\"network_type\":null,\"operating_system_class\":\"Unknown\",\"operating_system_name\":\"Unknown\",\"operating_system_name_version\":\"Unknown ??\",\"operating_system_name_version_major\":\"Unknown ??\",\"operating_system_version\":\"??\",\"operating_system_version_build\":null,\"operating_system_version_major\":\"??\",\"webview_app_name\":null,\"webview_app_name_version_major\":null,\"webview_app_version\":null,\"webview_app_version_major\":null}],\"unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0\":{\"availability_%\":\"10\",\"name\":\"test01\"}}}"
}
}
124 changes: 124 additions & 0 deletions repeater/src/test/resources/payload.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
{
"eventId": "90aebb1b-1d49-455b-9049-15ec72dfe5a9",
"etlTstamp": "2020-11-10T16:56:39.283Z",
"payload": {
"geo_country": "DE",
"event": "unstruct",
"user_ipaddress": "18.194.133.57",
"event_version": "1-0-0",
"geo_city": "Frankfurt am Main",
"platform": "srv",
"event_id": "90aebb1b-1d49-455b-9049-15ec72dfe5a9",
"etl_tstamp": "2020-11-10T16:56:39.283Z",
"geo_latitude": 50.1188,
"v_collector": "ssc-2.0.0-googlepubsub",
"collector_tstamp": "2020-11-10T16:56:37.401Z",
"event_vendor": "com.snplow.eng.gcp",
"network_userid": "70ec6f9e-cfe9-46c7-8b9c-fab280ea120e",
"geo_region": "HE",
"geo_timezone": "Europe/Berlin",
"event_format": "jsonschema",
"geo_zipcode": "60313",
"useragent": "Go-http-client/2.0",
"event_name": "luke_test_percentage",
"dvce_created_tstamp": "2020-11-10T16:56:37.115Z",
"dvce_sent_tstamp": "2020-11-10T16:56:37.117Z",
"geo_longitude": 8.6843,
"v_tracker": "golang-2.3.0",
"derived_tstamp": "2020-11-10T16:56:37.399Z",
"app_id": "test_percentage_luke",
"geo_region_name": "Hesse",
"event_fingerprint": "b9f667befa875fcae1bedd55e463330e",
"v_etl": "beam-enrich-1.4.1-common-1.4.1",
"contexts_com_snowplowanalytics_snowplow_ua_parser_context_1_0_0": [
{
"device_family": "Other",
"os_family": "Other",
"useragent_family": "Go-http-client",
"os_major": null,
"os_minor": null,
"os_patch": null,
"os_patch_minor": null,
"os_version": "Other",
"useragent_major": "2",
"useragent_minor": "0",
"useragent_patch": null,
"useragent_version": "Go-http-client 2.0"
}
],
"contexts_com_iab_snowplow_spiders_and_robots_1_0_0": [
{
"category": "SPIDER_OR_ROBOT",
"primary_impact": "UNKNOWN",
"reason": "FAILED_UA_INCLUDE",
"spider_or_robot": true
}
],
"contexts_nl_basjes_yauaa_context_1_0_1": [
{
"device_class": "Unknown",
"agent_build": null,
"agent_class": "Special",
"agent_information_email": null,
"agent_information_url": null,
"agent_language": null,
"agent_language_code": null,
"agent_name": "Go-http-client",
"agent_name_version": "Go-http-client 2.0",
"agent_name_version_major": "Go-http-client 2",
"agent_security": null,
"agent_uuid": null,
"agent_version": "2.0",
"agent_version_major": "2",
"anonymized": null,
"carrier": null,
"device_brand": "Unknown",
"device_cpu": null,
"device_cpu_bits": null,
"device_firmware_version": null,
"device_name": "Unknown",
"device_version": null,
"facebook_carrier": null,
"facebook_device_class": null,
"facebook_device_name": null,
"facebook_device_version": null,
"facebook_fbop": null,
"facebook_fbss": null,
"facebook_operating_system_name": null,
"facebook_operating_system_version": null,
"g_sa_installation_id": null,
"hacker_attack_vector": null,
"hacker_toolkit": null,
"i_e_compatibility_name_version": null,
"i_e_compatibility_name_version_major": null,
"i_e_compatibility_version": null,
"i_e_compatibility_version_major": null,
"kobo_affiliate": null,
"kobo_platform_id": null,
"layout_engine_build": null,
"layout_engine_class": "Unknown",
"layout_engine_name": "Unknown",
"layout_engine_name_version": "Unknown ??",
"layout_engine_name_version_major": "Unknown ??",
"layout_engine_version": "??",
"layout_engine_version_major": "??",
"network_type": null,
"operating_system_class": "Unknown",
"operating_system_name": "Unknown",
"operating_system_name_version": "Unknown ??",
"operating_system_name_version_major": "Unknown ??",
"operating_system_version": "??",
"operating_system_version_build": null,
"operating_system_version_major": "??",
"webview_app_name": null,
"webview_app_name_version_major": null,
"webview_app_version": null,
"webview_app_version_major": null
}
],
"unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0": {
"availability_%": "10",
"name": "test01"
}
}
}
124 changes: 124 additions & 0 deletions repeater/src/test/resources/payload_fixed.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
{
"eventId": "90aebb1b-1d49-455b-9049-15ec72dfe5a9",
"etlTstamp": "2020-11-10T16:56:39.283Z",
"payload": {
"geo_country": "DE",
"event": "unstruct",
"user_ipaddress": "18.194.133.57",
"event_version": "1-0-0",
"geo_city": "Frankfurt am Main",
"platform": "srv",
"event_id": "90aebb1b-1d49-455b-9049-15ec72dfe5a9",
"etl_tstamp": "2020-11-10T16:56:39.283Z",
"geo_latitude": 50.1188,
"v_collector": "ssc-2.0.0-googlepubsub",
"collector_tstamp": "2020-11-10T16:56:37.401Z",
"event_vendor": "com.snplow.eng.gcp",
"network_userid": "70ec6f9e-cfe9-46c7-8b9c-fab280ea120e",
"geo_region": "HE",
"geo_timezone": "Europe/Berlin",
"event_format": "jsonschema",
"geo_zipcode": "60313",
"useragent": "Go-http-client/2.0",
"event_name": "luke_test_percentage",
"dvce_created_tstamp": "2020-11-10T16:56:37.115Z",
"dvce_sent_tstamp": "2020-11-10T16:56:37.117Z",
"geo_longitude": 8.6843,
"v_tracker": "golang-2.3.0",
"derived_tstamp": "2020-11-10T16:56:37.399Z",
"app_id": "test_percentage_luke",
"geo_region_name": "Hesse",
"event_fingerprint": "b9f667befa875fcae1bedd55e463330e",
"v_etl": "beam-enrich-1.4.1-common-1.4.1",
"contexts_com_snowplowanalytics_snowplow_ua_parser_context_1_0_0": [
{
"device_family": "Other",
"os_family": "Other",
"useragent_family": "Go-http-client",
"os_major": null,
"os_minor": null,
"os_patch": null,
"os_patch_minor": null,
"os_version": "Other",
"useragent_major": "2",
"useragent_minor": "0",
"useragent_patch": null,
"useragent_version": "Go-http-client 2.0"
}
],
"contexts_com_iab_snowplow_spiders_and_robots_1_0_0": [
{
"category": "SPIDER_OR_ROBOT",
"primary_impact": "UNKNOWN",
"reason": "FAILED_UA_INCLUDE",
"spider_or_robot": true
}
],
"contexts_nl_basjes_yauaa_context_1_0_1": [
{
"device_class": "Unknown",
"agent_build": null,
"agent_class": "Special",
"agent_information_email": null,
"agent_information_url": null,
"agent_language": null,
"agent_language_code": null,
"agent_name": "Go-http-client",
"agent_name_version": "Go-http-client 2.0",
"agent_name_version_major": "Go-http-client 2",
"agent_security": null,
"agent_uuid": null,
"agent_version": "2.0",
"agent_version_major": "2",
"anonymized": null,
"carrier": null,
"device_brand": "Unknown",
"device_cpu": null,
"device_cpu_bits": null,
"device_firmware_version": null,
"device_name": "Unknown",
"device_version": null,
"facebook_carrier": null,
"facebook_device_class": null,
"facebook_device_name": null,
"facebook_device_version": null,
"facebook_fbop": null,
"facebook_fbss": null,
"facebook_operating_system_name": null,
"facebook_operating_system_version": null,
"g_sa_installation_id": null,
"hacker_attack_vector": null,
"hacker_toolkit": null,
"i_e_compatibility_name_version": null,
"i_e_compatibility_name_version_major": null,
"i_e_compatibility_version": null,
"i_e_compatibility_version_major": null,
"kobo_affiliate": null,
"kobo_platform_id": null,
"layout_engine_build": null,
"layout_engine_class": "Unknown",
"layout_engine_name": "Unknown",
"layout_engine_name_version": "Unknown ??",
"layout_engine_name_version_major": "Unknown ??",
"layout_engine_version": "??",
"layout_engine_version_major": "??",
"network_type": null,
"operating_system_class": "Unknown",
"operating_system_name": "Unknown",
"operating_system_name_version": "Unknown ??",
"operating_system_name_version_major": "Unknown ??",
"operating_system_version": "??",
"operating_system_version_build": null,
"operating_system_version_major": "??",
"webview_app_name": null,
"webview_app_name_version_major": null,
"webview_app_version": null,
"webview_app_version_major": null
}
],
"unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0": {
"availability_percentage": "10",
"name": "test01"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.snowplowanalytics.snowplow.storage.bigquery.repeater

import org.specs2.Specification
import scala.io.Source

class RecoverSpec extends Specification {

def is =
s2"""
fix wrong column name $e1
"""

def e1 = {
val in = Source.fromResource("payload.json").mkString
val recovered = Recover.fix(in)
val out = Source.fromResource("payload_fixed.json").mkString

recovered ==== out
}

}

0 comments on commit 73ef80b

Please sign in to comment.