Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Kafka last mile batching & retrying processEvent #273

Closed
mariusandra opened this issue Mar 23, 2021 · 24 comments
Closed

Kafka last mile batching & retrying processEvent #273

mariusandra opened this issue Mar 23, 2021 · 24 comments
Assignees

Comments

@mariusandra
Copy link
Collaborator

mariusandra commented Mar 23, 2021

The worst thing a data processing company can do is to lose data, yet that's what we're currently capable of :).

Imagine an export plugin (say bigquery). If for whatever reason the export fails (a rat ate a network cable) while we're processing an event, it will never be exported again. Worse, we also won't know if it was exported or not, unless we diff the uuids in the source and destination databases and see what's missing. (See #269 for one alternative solution)

To get around this, we need some retrying logic for processEvent. There is an existing issue for a dead letter queue, which partially covers this usecase. However there could be a more broad solution that solves this as well.

Both segment and rudderstack also came up against this issue, and fixed it by implementing a last mile queue on top of a relational database (mysql for segment, postgres for rudderstack):

Basically, events come in through Kafka, end up in a postgres/mysql table. This table is write-heavy with expiring timestamps for segment (only read from if something must be retried), not sure how it works for rudderstack.

We might need to do the same, unless some technology other than postgres is a better fit.

Browsing around, I came accross this project: https://github.com/graphile/worker - a Job queue for PostgreSQL running on Node.js.

It's not 100% what segment does (you also read from it), but looks pretty sweet and could be a decent and easy to implement stopgap solution:

const { run, quickAddJob } = require("graphile-worker");

async function main() {
  // Run a worker to execute jobs:
  const runner = await run({
    connectionString: "postgres:///my_db",
    concurrency: 5,
    // Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
    noHandleSignals: false,
    pollInterval: 1000,
    // you can set the taskList or taskDirectory but not both
    taskList: {
      hello: async (payload, helpers) => {
        const { name } = payload;
        helpers.logger.info(`Hello, ${name}`);
      },
    },
    // or:
    //   taskDirectory: `${__dirname}/tasks`,
  });

  // Or add a job to be executed:
  await quickAddJob(
    // makeWorkerUtils options
    { connectionString: "postgres:///my_db" },

    // Task identifier
    "hello",

    // Payload
    { name: "Bobby Tables" },
  );

  // If the worker exits (whether through fatal error or otherwise), this
  // promise will resolve/reject:
  await runner.promise;
}

main().catch((err) => {
  console.error(err);
  process.exit(1);
});

Similar to the celery worker, we'd just listen to jobs and send them to piscina for handling. We'd just have to reorient the Kafka reader in such a way that it feeds tasks to the system, in case there are not enough of them already in the queue.

For performance, they claim:

graphile-worker is not intended to replace extremely high performance dedicated job queues, it's intended to be a very easy way to get a reasonably performant job queue up and running with Node.js and PostgreSQL. But this doesn't mean it's a slouch by any means - it achieves an average latency from triggering a job in one process to executing it in another of under 3ms, and a 12-core database server can process around 10,000 jobs per second.

That's 2x what we're experiencing during peak load. It should be horizontally scalable, yet definitely not as fast as segment's implementation, that's closer to a dead letter queue in its implementation.

This could also be an optional extra that you enable, and it could also work with celery. For cloud, I'd definitely hook it up to a different database than the main Heroku one. If for nothing else, then just for data locality.

This library also has crontab support with some pretty nice guarantees that we'd be happy to give as well (see #15 and #68):

  • guarantees (thanks to ACID-compliant transactions) that no duplicate task schedules will occur
  • can backfill missed jobs if desired (e.g. if the Worker wasn't running when the job was due to be scheduled)
  • schedules tasks using Graphile Worker's regular job queue, so you get all the regular features such as exponential back-off on failure.
@macobo
Copy link
Contributor

macobo commented Mar 24, 2021

Won't dive into the implementation proposal yet, as there's quite a few open questions we should consider:

  1. When do we want to retry a message? Is it only on explicit errors, is it timeouts as well, do plugin authors have other ways of controlling this?
  2. How frequently will the retries occur?
  3. How many retries in general?
  4. Are timeout-based retries and error-based retries equivelent (e.g. do we retry one more times? more frequently?)
  5. Do we show information about the failures to the user somehow?
  6. Do they have an option to retry?
  7. Timeouts: How much time will we give each plugin or plugins in general to do it's things?
  8. Are retries message-based or plugin*message-based?
  9. Say user has 3 plugins, with the middle one consistently failing. What do we want to be happening? Skipping ingesting anything until middle one succeeds after retries? Ignore that plugin in the flow and run others then run the middle one after the fact out-of-order and delete the event?
  10. How important are exactly-once semantics here? Say plugin server becomes unresponsive for a while, we trigger a retry and the plugin server becomes responsive again. Would we ingest the events twice? Say for a bigquery plugin, would it result in two rows in their database?
  11. Do we retry scheduled plugins?
  12. If we retry scheduled plugins, how should plugin authors handle that?
  13. Is the order events are sent to plugins important (e.g. do we need to guarantee that event at time T is seen before by a plugin than one at T + 1day?)
  14. Say a plugin makes some progress before running into e.g. a timeout. Is this a valid scenario, what sorts of guarantees can we provide plugin authors around resuming progress when retrying?
  15. What services will we assume will be always available? Kafka? The DB used as a retry queue? ..?
  16. How do we want to handle "kafka.produce" errors? Do we retry the whole message together with the plugins?

Note that we should answer from a 2022-2023 posthog perspective here even if we won't implement all of these immediately. This sort of architecture is really hard to change after the fact if we get some fundamental assumption incorrect.

@mariusandra
Copy link
Collaborator Author

That's a bunch of good questions! I don't know the answers to them all either, but to keep the ideas flowing, I'll give my first quick thoughts. And thanks for helping figure out what we really need to build!

  1. When do we want to retry a message? Is it only on explicit errors, is it timeouts as well, do plugin authors have other ways of controlling this?

S€gment has an option to throw specific errors to indicate what you want to happen. Basically you throw a RetryError if it should be retried, otherwise not. I'd do something like that as well:

  • throw RetryError --> we retry processing this event again with some backoff
  • throw any other error --> plugin gets ignored, event is passed through, error gets logged
  • return null --> stop executing plugins / discard event
  • return event -> pass the event on to the next plugin
  1. How frequently will the retries occur?
  2. How many retries in general?

"Segment retries 9 times over the course of 4 hours. This increases the number of attempts for messages, so we try to re-deliver them another 4 times after some backoff."

Something like that as a default and we adapt as needed? 4 hours seems a bit on the lower side though given our current scale and potential random outages.

  1. Are timeout-based retries and error-based retries equivelent (e.g. do we retry one more times? more frequently?)

We might need to move in a direction where too many timeouts disables a plugin altogether, as such out of control plugins might harm the system.

  1. Do we show information about the failures to the user somehow?

We can take inspiration from this delivery issues dashboard.

  1. Do they have an option to retry?

Nope.

  1. Timeouts: How much time will we give each plugin or plugins in general to do it's things?

We could eventually bill by CPU time (even if totally idle and awaiting 🤑), so it wouldn't really matter. 30sec for processEvent, 1min for runEveryMinute and 60min for runEveryHour? We could even have a runForever method, which gets no timeouts and at the end of the day is basically billed like a heroku dyno? I imagine several import/export plugins might need or want a continuous running worker.

  1. Are retries message-based or plugin*message-based?

I'd say plugin*message based. So if you have 3 plugins: ["currency", "geoip", "export"] and "geoip" errors, it should retry from that spot, skipping "currency".

  1. Say user has 3 plugins, with the middle one consistently failing. What do we want to be happening? Skipping ingesting anything until middle one succeeds after retries? Ignore that plugin in the flow and run others then run the middle one after the fact out-of-order and delete the event?

Skipping ingesting anything until middle one succeeds after retries

  1. How important are exactly-once semantics here? Say plugin server becomes unresponsive for a while, we trigger a retry and the plugin server becomes responsive again. Would we ingest the events twice? Say for a bigquery plugin, would it result in two rows in their database?

We shouldn't make guarantees we can't keep. I'd say any export should be best-effort and unique by uuid.

  1. Do we retry scheduled plugins?
  2. If we retry scheduled plugins, how should plugin authors handle that?

Possibly only if throwing a RetryError

  1. Is the order events are sent to plugins important (e.g. do we need to guarantee that event at time T is seen before by a plugin than one at T + 1day?)

"No, Segment can’t guarantee the order in which the events are delivered to an endpoint" and neither can't we.

  1. Say a plugin makes some progress before running into e.g. a timeout. Is this a valid scenario, what sorts of guarantees can we provide plugin authors around resuming progress when retrying?

They can use the cache and storage apis to store their state and resume if restarted.

@macobo
Copy link
Contributor

macobo commented Mar 24, 2021

This is very good context, especially around RetryErrors and scheduled plugins.

The only thing I disagree with is:

I'd say plugin*message based. So if you have 3 plugins: ["currency", "geoip", "export"] and "geoip" errors, it should retry from that spot, skipping "currency".

From a semantics standpoint I agree here, this would be ideal.

However thinking of how to implement this, we would need to write the message "down" into a persistent storage N_Plugins times, which is going to be a driver in terms of $cost_of_storage.
We can estimate this with some in-memory queuing but we wouldn't guarantee the wrong plugin would not get called twice.

How about this?

  • For every message, on retriable failure we store what plugin caused the retry.
  • We expose (an estimation of) if this plugin has been processed successfuly previously as part of the meta argument?

Proposed semantics

To summarize the rest, the semantics would come out to be something like:

  1. Before we start processing a message (and committing offsets), we store the message, timestamp in a persistent queue/db.
  2. On successful processing runs we store the success and some meta information about the event as well (time/plugin) in the queue/db
  3. If message has not been processed in $LIMITED_AMOUNT_OF_TIME, it gets retried.
  4. If a plugin fails with a RetryError, times out or from a plugin-server internal error (e.g. restart), we set the message to be retried at a later date together with information about the failure
  5. If a plugin fails from a error thrown from the plugin, the plugin gets ignored during processing.1
  6. We retry with an exponential backoff a $LIMITED_NUMBER of times.
  7. As retries get processed, we expose (an estimation of) if this plugin has been processed successfuly previously as part of the meta argument Note: this might change, see discussions below
  8. We might have an internal delivery issues dashboard, inspired by segment.

This should allow us to yeah create a system that basically functions like the current one, except:

  1. We do writes to a queue/DB with start, retries and successes per message
  2. Have a process reading from the queue and putting messages to be retried into kafka to be processed again by plugin-server

In my head it should work as expected for the different classes of plugins:

  1. Synchronization plugins
  2. Data munging plugins
  3. Webhooks

WDYT?

@mariusandra
Copy link
Collaborator Author

  • For every message, on retriable failure we store what plugin caused the retry.
  • We expose (an estimation of) if this plugin has been processed successfuly previously as part of the meta argument?

I'm not fully sure why we can't just directly skip the already processed plugins on a best effort basis? Requiring every other plugin authors to add if (meta.alreadyProcessedMaybe) return event feels like an unnecessary burden and a leakage of implementation details.

@macobo
Copy link
Contributor

macobo commented Mar 24, 2021

Good question, I tried answering this earlier but let's try again.

It's because of difference of expectation for data munging plugins vs webhooks/synchronization.

For data munging plugins (e.g. currency converter), it does not care for retries, it needs to "do" its thing every time.
For webhooks/synchronization, the retry attribute matters a lot.

As to why we are not storing the processed message after every plugin to avoid this added complexity:

However thinking of how to implement this, we would need to write the message "down" into a persistent storage N_Plugins times, which is going to be a driver in terms of $cost_of_storage.

You can think of our cost of infrastructure increasing linearly with $bytes_written_somewhere and that being the main cost point (besides devs). Saving the message per plugin would result of increasing the bill significantly (we'd be going from 3 times of writing the message somewhere to 3 + N_Plugins).

Even if we did all that, there's no guarantee we wouldn't do double-deliveries. Plugin worker stalling after doing the $write-to-biguery but before $write-to-queue is an easy example of this.

Another alternative here is separating these two plugin types somehow, but that also ends up with a lot of extra constraints.

@macobo
Copy link
Contributor

macobo commented Mar 24, 2021

I guess you could store the state of the message before $step_of_retry_failure in the DB and proceed from there.

This brings up a new question though - what if the plugins or order of then changes? I'd expect the retry to go through the "new" pipeline rather than the old.

@mariusandra
Copy link
Collaborator Author

I'd imagine we'd write the transient state of the event only if something fails and needs to be retried. Effectively a dead letter queue. That shouldn't add too much extra write overhead, except for when large outages happen. It could/would be best-effort, meaning if something in the order of plugins changes, all bets are off or if something is retried twice (failure between $write-to-bigquery and $write-to-retry-queue), no big deal.

@macobo
Copy link
Contributor

macobo commented Mar 24, 2021

I'd imagine we'd write the transient state of the event only if something fails and needs to be retried. Effectively a dead letter queue.

Not sure I understand where you're proposing these writes would go.

Are you describing a separate solution compared to the thing described under Proposed semantics here or something different? Why do we need a separate solution for this?

If you're proposing the same solution, what would the semantics be? Do we only write things after failures or eagerly after every plugin?

@mariusandra
Copy link
Collaborator Author

I haven't confirmed that these semantics are exactly what we will implement, so I'm just thinking abstractly in terms of product here and not proposing anything specific. These semantics get deep into implementation details already in step 2 ("how will that look like?"), so I haven't had the chance to fully play them through nor poke holes in them.

@macobo
Copy link
Contributor

macobo commented Mar 24, 2021

Ack, will wait until you have specific feedback.

You are correct that the "Proposed semantics" section had some implementation/algorithm thinking involved here. This is because some edge cases only became obvious for me when thinking through the whole data flow and trying to work through things that can break during every step.

@mariusandra
Copy link
Collaborator Author

I've been reading though segment's centrifuge blog post to get ideas.

They have an interesting system that took 9 months to build. Some facts from there:

  1. Two immutable append-only tables. One for jobs and one for job state transitions. "Of course, we also want to keep track of what state each job is in, whether it is waiting to be delivered, in the process of executing, or awaiting retry. For that, we use a separate table, the job_state_transitions table. " and "because Centrifuge is mostly accepting and delivering new data, we don’t end up reading from the database. Instead, we can cache most new items in memory, and then age entries out of cache as they are delivered."
  2. Directors that handle the jobs. When a director boots up, it asks for its own personal database from the JobDB manager, runs CREATE statements to boot up a schema and then uses it as its own personal cache. Nobody else uses this same database while this director is running.
  3. Directors get a fresh new DB every 30min, no DELETE statements are run and thus cache invalidation is extremely efficient. "By the time 30 minutes have passed, 99.9% of all events have either failed or been delivered, and a small subset are currently in the process of being retried. The manager is then responsible for pairing a small drainer process with each JobDB, which will migrate currently retrying jobs into another database before fully dropping the existing tables. "

I'm not sure if, how nor when we should build something as complex.

@mariusandra
Copy link
Collaborator Author

How's this for a crazy idea? For last mile batching after kafka, how about using... Kafka?

Currently for PostHog cloud, the plugin server runs on 4-core instances (4096 ECS CPU). Each of these is able to process 10k events per minute, when sequentially processing kafka batches of size 1. Beyond that the batch size (and parallelism) increases, all the way up to 4 events per batch during normal cloud hours. The most we've seen on cloud is ~40 events per batch during huge congestions.

So how's this for a strategy: we increase the number of Kafka consumers per server and limit the batch size to always be just 1.

Is this a bad idea? I feel like this is slightly rebellious, but it could work. Am I missing something fundamental @fuziontech @macobo?

On a 4-core server, that gives us the theoretical maximum processing+ingestion of 40k events/minute (1.7B events/month), unless we over provision to better handle async plugins.

Since Kafka right now supports thousands of partitions and is very soon moving to a model that supports millions of partitions, we can probably increase our consumer count from the low single digits we have now to low double digits... and eventually add a comma or two.

If even then we exceed the feasible number of consumers, we can implement app level sharding. Either sharding by topic (round-robin in capture.py) or even by deploying new Kafka clusters and adding those to the mix. However Kafka will most likely support over a million consumers before we reach this point.

What would this "one event per batch" give us?

We will trade some raw throughput for increased control and safety.

Currently if we have one consumer and a batch of 40, with one 30sec event in the middle, ingestion is frozen. With the "consumer per thread" model, we could have processed and committed 39 events, while leaving that one hanging for longer.

That one hanging event? We could have the possibility to commit it anyway after 3 seconds, and throw its metadata on a separate retry queue. We'll check that after X seconds to see if the event had completed in the background, or if it needs to be retried.

Unless we overprovision consumers to threads (piscina is configured to handle 10 parallel tasks per thread after all), we would take a hit in our ingestion speed. However the flexibility we gain would be worth it.

And if we bill per plugin-second, we should be good anyway :).


This might sound like we should also just make the entire plugin server singlethreaded (only 1 core to play with) and launch a dozen of them, but that's a bad idea. Thinking a few steps ahead, I'd still prefer having multiple cores per plugin server. The reason is simple: we will have different types of tasks a plugin server can do. Scheduled tasks. Processing events. Processing retries. Replying to HTTP requests (webhooks). Distributing frontend plugins. Etc.

Imagine a dozen plugin servers running. We'll want to somehow distribute the workload. For example with 16 cores in a cluster, have 10 cores process events, 2 run scheduled tasks, 1 deals with retries and 1 deals with web requests. Coordinating 4 servers with 4 cores feels a lot easier than coordinating 16 servers with 1 core. It becomes more apparent if you increase the scale. It feels easier to coordinate 80 cores on 20 servers than 80 cores on 80 servers.

This coordination is a topic for another issue, but shouldn't be too hard to implement. We already use redlock, that designates one server as the "scheduler". Instead we should just designate one server as "coordinator" and let it tell the others what to do... and unless told otherwise, every server just processes events. Once that single-coordinator model gets too complicated, we can implement some version of Raft.... or do a reverse-kafka and implement Zookeeper :D.

@macobo
Copy link
Contributor

macobo commented Apr 8, 2021

Feeling a bit lost here.

That one hanging event? We could have the possibility to commit it anyway after 3 seconds, and throw its metadata on a separate retry queue. We'll check that after X seconds to see if the event had completed in the background, or if it needs to be retried.

Can you clarify a bit: Where would that queue live and how would we make sure the task has completed when it's processed?

@mariusandra
Copy link
Collaborator Author

I'm not sure and that's an open question. Either kafka again (though there's no delayed consumption support there, requiring app level fun) or it could be a postgres queue like https://github.com/graphile/worker

The requirements for this slow backup ingestion are different here compared to normal fast ingestion. The main point is to just keep these events somewhere and to not lose data.

@mariusandra
Copy link
Collaborator Author

mariusandra commented Apr 9, 2021

Consumer batching is now implemented here, though I haven't properly tested it yet. No retries either.

@macobo
Copy link
Contributor

macobo commented Apr 9, 2021

My 2c: This issue is now tackling two (IMO) separate issues.

  1. What are the desired semantics for retries and how to build out a scalable system for them
  2. Performance-tuning ingestion so slow plugins/retries don't block ingestion (introduced in Kafka last mile batching & retrying processEvent #273 (comment))

I don't have input for 2 because:

  • It depends on solution to (1) - "how do we detect a message needs to be retried".
  • For performance-tuning I don't think anyone but the extensibility team will have great context. As usual with tuning, the process should be establish metrics, get a pattern, figure out cause and solve it.

As for (1), I don't really have anything to add which I didn't cover here. To summarize the architectural components of it:

  • Have a write-heavy DB which receives writes when message is started to be processed and when it finishes/fails
  • We acknowledge kafka offsets when message has been written to the write-heavy DB
  • Other worker(s) handle retries by reading from the write-heavy DB

There are still some questions open re semantics - N_messages vs N_messages*M_plugins writes, exposing things as meta arguments, but the gist there stays the same.

@fuziontech
Copy link
Member

Hi - slowpoke to this distributed queue party, but just finished reading segment's centrifuge architecture and :chef_kiss: That's quite nice.

They nailed it. The key here is the queues per <source, destination> but for us it would be queue per <team, plugin>

They use mysql for their JobDB which is a great choice. Especially considering that they are running Director on ECS where your services have to be stateless. The downside of this is that they have some extra complexity around having a Manager service that is responsible for spinning up and down a dedicated RDS instance for every instance of the Manager. Then they have to have the Director grab a lock on the DB from Consul.

So for us, we are planning on moving over the K8s for everything. This means that we can use different kinds of deployments like StatefulSets. Which means we can have volumes associated with our services. This is how you can run Postgres on Kubernetes. What we can do then is just have a volume associated with each Plugin server with a Sqlite DB on it that is the JobDB. No locks or manager necessary. Alternatively we can setup a mysql container in the same pod as the plugin server to accomplish the same thing with only slightly more complexity (1 more container per pod for plugin server deployment).

In the meantime we could basically build towards this goal but abstract out the JobDB side and spin up a larger Mysql RDS instance that all of the plugin-servers talk to as the queue per <team, plugin> storage layer. Maybe have each plugin-server have an id and we have a db per id on mysql for isolation. This would work out as long as we are on ECS. This is optional, we could just build direct to K8s since we will need to support that deployment strategy anyways.

Something else we would need to do is to build out the state machine for handling payloads that need to be handled later. For cloud this is pretty straight forward with another Kafka topic ending up in S3. As for on prem we will probably need to use S3 or GCS unless we can allocate plenty of EBS for the cluster. We would then need to update plugin-server to occasionally retry from S3 if there are any payloads there waiting to be reprocessed.

Overview:
image

@mariusandra
Copy link
Collaborator Author

Why is mysql better for this than postgres?

@fuziontech
Copy link
Member

I have seen Mysql scream at basic key value functions like this where you are not doing any joins and doing basic inserts and reads, granted usually with more reads then writes. We used it heavily for this kind of thing at Uber. To be fair Postgres would work just fine here too. I've just never seen anyone use Postgres as a core for this kind of thing, almost always it's for some OLAP thing. For something like this (simpler, higher TPS, no joins) it always seems to be built on mysql. I can't find any solid benchmarks comparing the two to validate that though.

Don't get me wrong Postgres is my favorite RDMBS, but anytime you don't need those sophisticated functions and features Mysql has done the trick nicely. It's also a relatively lighter and simpler db.

To be clear though I am advocating something like sqlite or rocksdb - local to the plugin-service so that we don't need to manage db instances separately from plugin-service or having some sort of lock to bind the plugin-service to the db instance.

@mariusandra
Copy link
Collaborator Author

Approach nr 42 (I lost count)

This issue is indeed mixing two topics (1. case with one plugin slowing down ingestion; 2. case with retrying event processing). Proposed solutions to both of them need some sort of "retry queue". So here's another idea for a queue.

Our priority is data integrity first, speed second. As long as we don't lose anything... and retries do happen within a timely manner, we can store this retry queue where it's the easiest to keep... and where we're sure it won't run out of space.

💡 So why not just store the retry queue in the same place where we eventually store the events?

That means Postgres and Clickhouse.

With postgres ingestion, it'd be just another random table "posthog_retryqueue".
With clickhouse it would be a table with collapsing merge tree and a TTL of something. 48h maybe? TBD.

Something simple like:

create table posthog_retryqueue (
  created_at datetime, 
  uuid uuid,  
  plugin_config_id integer, 
  retry_at datetime, 
  retry_count number, 
  retry_type varchar(200), 
  retry_payload text -- actually json, but no need to deserialize into the db
)

In the plugin server, we will use redlock to select a random server that gets the role of "queue cleaner". This one server just periodically runs select * where retry_at < now() and runs the required functions in the relevant VMs with the payload.

<sidenote>
Why just one server gets the job? Ease of implementation and to avoid conflicts where multiple servers work on the same retry. It would be easy to avoid with postgres with a few "locked_at" fields, but not so with clickhouse. We'd have to implement something like "uuid % 3 == 2" on our side. This is not first priority. We'll soon anyway need a quorum system between plugin servers to divide them between scheduled jobs, event processing jobs, retry jobs, etc. The parallelism can wait for that.
</sidenote>

Inside the plugins, we'll need support for different types of retries (events, batches, etc). This db-backed queue would be universal and support all of them. The interface to interact with the queue would be pretty simple:

// inside blabla-plugin/index.js

function sendToAPI(event: PluginEvent, meta: PluginMeta) {
  try {
    await fetch(...)
  } catch (error) {
    meta.retry("sendToAPI", event)
  }
}

export function onRetry(retryType: string, retryPayload: any, meta: PluginMeta) {
  if (retryType === "sendToAPI") {
    sendToAPI(retryPayload, meta)
  }
}

export function processEvent(event: PluginEvent, meta: PluginMeta) {
  void sendToApi(event, meta) // void to ignore async and run in the background
  return event
}

Running meta.retry("sendToAPI", event) will schedule the retry to run with increasing backoff. For example: 10 sec, 30sec, 1min, 5min, 15min, 60min, 2h, 4h, 16h, 24h (last retry).


I originally also had an idea to use Kafka as a "database" "queue", where a consumer would just produce the event back into the queue if its run_at time hadn't arrived yet. However seeing how big of a pipe Kafka could be, we'd just be munching network traffic like we're at an all you can eat chinese buffet. So that probably won't work.

@fuziontech
Copy link
Member

This sounds like a great first cut design on a retry system...but

This will get really messy on clickhouse when there are a lot of writes to the retry table.

Imagine bigquery goes down and we end up sending a ton of events to be retried to clickhouse...and then bigquery comes back up, but intermittently - some events succeed and some don't. Even if we have the modulus coordination system for grabbing and distributing events to plugin workers how are you going to update the statuses on clickhouse? You'll have to rewrite the rows with an updated retry_at repeatedly which will cause duplicate rows to be written (and then merged later). Even with the collapsing merge tree I think this might get ugly. Every retry query to grab new events is going to have to scan a ton of disk to pick up the most recent version of retry-able events.

An even more extreme example is because we have some level of fanout in the event -> plugins flow. What if AWS goes down taking down 6 services plugins depend on. We'll be writing 6 * events count to clickhouse and reading that number * the number of retries rows which could get pretty big. This feels much more OLTP than OLAP to me. I'd almost rather just throw this all into a giant Postgres db to start out with and hope nothing huge happens until we code the flush to S3 logic.

It does feel like this is the simplest way to get going moving forward, but I think the mysql/postgres/rocksdb (local to plugin workers) -> s3 way might be a better solution in the longer term. Maybe we can as a first step just setup the postgres solution for both EE and OSS? After a certain number of retries or a timeout we can then evict to CH or S3 for long term retry logic. Later we can swap out big PG instance with a local OLTP db to the plugin server?

I do love that this solution is leaning in on clickhouse. There's a great article on AWS leaning into Dynamo in the same way.

@mariusandra
Copy link
Collaborator Author

The conclusion I reach from all of the above is that there's no best approach.

Thus I made an abstract approach with swappable retry queues in PR #325 . It is still very WIP, needs a layer of refactoring and has not gone through any kind of load testing. But it kind of works.

Until now I've implemented two different retry queues:

  • FS: A horribly inefficient way with a .txt file in the filesystem. Useful for tests only.
  • Graphile: A slightly better way using graphile/worker to store the retry in postgres.

We can stack as many retry queues as we want. If one throws an error while queueing, we'll just use the next one. The last one could always be a S3 or SQS fallback. S3 might be slow to read back from, but who cares as long as we don't lose data :).

We can also still have a clickhouse fallback. I think we can do it without modifying any rows, and reading all new data just once. That surely can't be too much to ask from it? :)

@fuziontech
Copy link
Member

The conclusion I reach from all of the above is that there's no best approach.

The curse of engineering: Tradeoffs ⚖️

This is glorious and I love it. Basically we get to get it done and leave the path open to improve it later...Nice. We aren't even limiting ourselves to a local vs global solution. Nice.

@mariusandra
Copy link
Collaborator Author

We can close this now.

In summary, we didn't implement last mile batching before Kafka. Instead two things changed:

  1. Kafka now always gives us max 40 events at a time. Now in the worse case we retry these 40 events. This is down from batches of ~400 events that we had before.
  2. We have app-level primitives for background jobs. Thus if something needs to be retried (plugins failed, timed out, etc), we have job queues that will take care of storing these failed events in a persistent database and then retrying them.
  3. The next step is to implement some dead letter queue #256 to take out of the ingestion hot path the rare events whose processing takes longer than the others... or to implement something like Create exportEvents function in plugins that abstracts away all the queueing, batching, retrying logic #404 that abstracts away all the complicated async logic.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants