Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT failed inserts recovery #139

Draft
wants to merge 28 commits into
base: master
Choose a base branch
from

Conversation

lukeindykiewicz
Copy link

This is not ready solution, it's a stub of solution that should contain the most important parts. PR is to verify the concept and direction it follows.

Copy link
Contributor

@dilyand dilyand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it should work @lukeindykiewicz !

Copy link
Contributor

@benjben benjben left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

object Recover {

//TODO: hardcode proper GCS path
val DeadQueuePath = Path("gs://sp-storage-loader-failed-inserts-dev1-com_snplow_eng_gcp/dead_queue")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not appear on a public repo like this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - thanks. Do you have an idea where to put it for such one off job? env var feels right, but I would prefer to not touch TF for this task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Dilyan. This means changes in TF unfortuntelly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, not sure I follow. Why does it mean changes in TF? Isn't it the same bucket that is being passed in as argument to Repeater on deploy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well there are only 2 possibilities for configuration:

  • either it's hard-coded
  • or it needs to be provided, meaning TF

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Dilyan's answer appeared while I was writing)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dilyand You might be right we already have it! Awesome - thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forced pushed to remove the hardcoded path.

resources
.store
.list(DeadQueuePath)
.evalMap(resources.store.getContents)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with Store.getContents, do all the data need to fit into memory ? If yes, are we sure that data is not too big?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asumed that every single file should fit in memory without problems. Is that a wrong assumption?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can read the files line by line to be sure?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can definitely. I just thought these files are pretty small. Thanks you both!

import cats.effect.Sync
import cats.syntax.all._

object Recover {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want a main() to be able to recover already existing bad rows with the percentage issue ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by ?

It’s added in Repeater.scala to main for the repeater
https://github.com/snowplow-incubator/snowplow-bigquery-loader/pull/139/files#diff-d0e5513b7035a18d3239b7e5b227c63eb1db4386df406d89c23b3b6e7e331cb5

Do you have something else in mind?

Copy link
Contributor

@benjben benjben Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that there was 2 use cases:

  1. auto recover failed inserts live
  2. recover already existing bad rows

The main() in Repeater.scala does the first one but I struggle to see where 2) happens.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This job is only to for point 2). The main starts the Recover stream, which does the job. Do I miss something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that Repeater is a long running Scala app that reads from PubSub and writes to BQ. How will you make it run as a "batch" run to just read GCS ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see it recover in repeater should only be called for current failed inserts, not for the whole history.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added stream to Repeater that will work next to the main flow. It will read the data from GCS and write to Pub/Sub. Kind of similar to what repeater does, but the other way round. It will read all the data that are in filtered buckets, recover them, write to pub/sub and that's it. Stream will stop. All other streams with the main flow will continue to work normally. I'm only adding functionality, not changing the existing one.

It's here: https://github.com/snowplow-incubator/snowplow-bigquery-loader/pull/139/files#diff-d0e5513b7035a18d3239b7e5b227c63eb1db4386df406d89c23b3b6e7e331cb5R55

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that we will start the repeater and will expect it to go and try to recover failed inserts from weeks ago. I guess what is troubling me is that we want to use a long-running streaming app to perform a batch job (for this very use case).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why it's a one off job. There is nothing wrong in this solution, imho. We just add recovery feature to the app to not spin the new app and bother with all the infrastructure code and troubles spinning up something new. The long-term goal would be to use recovery project for this purpose, but currently it can not recover events in this part of the pipeline.

Copy link
Contributor

@benjben benjben left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having BQ loader writing failed insert to GCS, and then BQ loader reading these files from GCS, can't we directly try to recover on the fly and then write to GCS only if recovery was not successful ?

def recoverFailedInserts[F[_]: Timer: Concurrent](resources: Resources[F]): Stream[F, Unit] =
resources
.store
.list(DeadQueuePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have one Path for each batch of failed inserts or is it always the same? In later case if failed inserts keep being added we will always try to reprocess them all. I didn't see a place where we remove a successfully recovered file.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It's not removed at the moment, I'll probably ask support to remove it after successful recovery. To not read the same file twice the filtering part (in TODO) will ensure we only recover the folder once.
I don't think there will be problems like every second event will fail.
I rather assume that all will be ok, or fail (if there is a mistake in the recovery).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To not read the same file twice the filtering part (in TODO) will ensure we only recover the folder once.

So you want to maintain a manifest outside of BQ loader ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I will simply filter for proper dates in which the failed events occurred. I will not do it for all past and current events in dead_queue, only for particular dates.

@lukeindykiewicz
Copy link
Author

Instead of having BQ loader writing failed insert to GCS, and then BQ loader reading these files from GCS, can't we directly try to recover on the fly and then write to GCS only if recovery was not successful ?

We would do it like you say, but this events are already there and waiting for us. This wrong column name is already corrected and we only need to reprocess past failed events.

@benjben benjben self-requested a review November 12, 2020 09:02
@lukeindykiewicz lukeindykiewicz force-pushed the feature/failed-inserts-recovery branch from 73ef80b to 6ddd52e Compare November 12, 2020 09:06
@benjben
Copy link
Contributor

benjben commented Nov 12, 2020

Maybe I'm just missing some context. Do we just want to run it once and then remove the recover part and go back to the normal job ?

@lukeindykiewicz
Copy link
Author

Maybe I'm just missing some context. Do we just want to run it once and then remove the recover part and go back to the normal job ?

Yes, that's exactly what we're planning to do.

@benjben
Copy link
Contributor

benjben commented Nov 12, 2020

Ok sorry I totally missed that, I thought that the plan was also to add auto-recovery for _%. My bad!

@lukeindykiewicz
Copy link
Author

No worries, Thanks for asking questions Ben!

Copy link
Contributor

@chuwy chuwy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @benjben that it feels a bit hacky to combine this job with the repeater, but also respect the time pressure, so agree it might be justified. But I also see two possiblle correctness mistakes:

  1. The payload - the original one does not contain eventId, etlTstamp and payload keys - if you look at EventContainer decoder - you'll see that it's just content of payload. I might be missing something, but that's how I remember it.
  2. Global _% replace. The chance of corrupting data is small, but still there - I would strongly recommend switching to keys only.

Also I strongly rercommend to use BadRow type for parsing.

@@ -0,0 +1,124 @@
{
"eventId": "90aebb1b-1d49-455b-9049-15ec72dfe5a9",
"etlTstamp": "2020-11-10T16:56:39.283Z",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something, but I don't think there are eventId and etlTstamp anywhere in repeater's output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I'm also not sure where data like this should go, it's neither a bad row (not SDJ, nor failed insert that can be forwarded to BQ)

def recover: String => Either[String, EventContainer] =
b =>
stringToFailedInsertBadRow(b).map { ev =>
val fixed = fix(ev.payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can potentially corrupt a lot of good data - if there are any vaules with _% it wil change them without a reason - it should operate only on keys.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants