Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal for reusable Process components #1154

Closed
wants to merge 1 commit into from

Conversation

nikhilsaraf
Copy link
Contributor

@nikhilsaraf nikhilsaraf commented Apr 18, 2019

Added a section on reusable process components. This is the first draft and if it makes sense I can integrate it better into the doc.

Link to doc, see Outputs section.

@nikhilsaraf nikhilsaraf requested a review from tomquisel April 18, 2019 02:22
@nikhilsaraf nikhilsaraf self-assigned this Apr 18, 2019
Copy link
Contributor

@tomquisel tomquisel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhilsaraf I love this direction! I think we should do something close to this.

A few thoughts:

  • I think it'd be great if we could use type assertions to ensure that Processors are hooked up properly rather than hand-rolled checking like you're proposing. Do you see any reason that wouldn't be possible? Here's a basic example of how this could work.
  • Obviously, right now Processors operate at the level of io.Reader / io.Writer objects, since full ledgers may not fit in memory. I think it'd really simplify logic of Processor steps if they could operate one transaction at a time, and the ProcessorPipeline would do the hard work of streaming data in and out of the per-transaction Processors. That gets us to a place where we can have process functions like you're proposing, which take maps rather than streams.
  • As far as high-level organization, I think we can replace the existing filter step and process step with just this pipeline system. Each Processor in the new proposed system can do either filtering or processing. We probably also want to split Stores out from the processing package, since I think they're pretty independent.
  • I think ratchet is a good example of the kind of system we want to build. What are your thoughts on it? Should we use it directly? Should we build something similar ourselves? Is there some superior alternative out there?

@bartekn @MonsieurNicolas I'd love your thoughts too.

@nikhilsaraf if you want to elaborate this PR based on the ideas above, feel free to go for it!

@nikhilsaraf
Copy link
Contributor Author

@tomquisel that's great feedback!

some thoughts:

1. Type-Assertions

  • I'm all for adding more type assertions! -- we probably also need to represent what a field's type will be to do this assertion so that the ProcessingPipeline can validate that types are consistent.
  • Maybe we can augment the definition of a field to be a (fieldName string, type string) instead of just a fieldName string?

2. Pipeline

tl;dr: have ledger-level Processors along with transaction-level Processors

  • operating on one transaction at a time can be very useful and will make things simpler
  • my concern is that for any Processor that wants to do some ledger-specific aggregation (like asset-stats), they will probably save data in-memory to compute their aggregated values. Moreover, they will probably want to finish streaming the full ledger before they can compute their aggregate values. Lastly, any Process that follows an aggregation Process that uses the aggregate value would need the aggregate Process to finish before it can be kicked off. example: median payment amount + 1. median payment amount can only "finish" once it sees the full ledger. The +1 operation can only happen once it knows the median payment amount.
  • we could approach the above by splitting up the concept of a processor into two: a ledger-level Processors and a transaction-level Processor. Ledger-level is done with streaming, whereas transaction-level can be done in-memory.
  • The runner for a transaction-level processor can itself be a ledger-level Processor where it handles the streaming in and out of the transaction-level processors as you described.
  • The pipeline would block on each ledger-level Processor until it is completed before moving on to the next ledger-level Processor because ledger-level processors need full views of the ledger.
  • Transaction-level Processors do not need to see a full ledger before they can output their value and in fact can be run in parallel using goroutines and channels.
  • we would want to model Processors as transaction-level Processors as much as possible since they don't block. example: average size of payments can be modeled as transaction-level Processors

3. High-Level Organization

Agreed

4. Ratchet

  • ratchet seems pretty cool and looks like it does a lot of what we want to do. we should keep time to investigate how we can use it and/or build on top of it instead of starting from scratch.
  • my concern is that there are not a lot of commits and I don't know how well-maintained the project is since most of the code was written a while ago. I'd imagine that we may need to make some modifications to it. It's MIT licensed so we should consider what that means for us if we were to run our own fork of it either now or in the future.

@tomquisel
Copy link
Contributor

@nikhilsaraf this is great!

Type Assertions

Thinking about it, trying to use Go's type system for checking that Processors are hooked up properly is going to be either overly complex or inflexible. So, considering all the options, I like your original requiredFields() and fieldUpdates() idea. We'll have to make some conventions around what values get their own fields vs. what gets put in a nested object.

Pipeline

  • I love your idea of ledger-level Processors and transaction-level Processors, and using a ledger-level one to handle running all the transaction-level ones. LedgerProcessor and TransactionProcessor seem like natural names.
  • I think we can avoid any blocking or need to synchronize between the ledger-level processors. We can use io.TeeReader to read through the input ledger a single time and copy the input in parallel to all the LedgerProcessors.
  • Will LedgerProcessors follow the same contract as the TransactionProcessors? That is, will they have requiredFields() and fieldUpdates()? It seems a little strange since most of them will take a ingest/io/LedgerReader, but the answer is probably yes, since in in chains of LedgerProcessors, the later LedgerProcessors in the chain will probably take some other kind of input (like stats about assets used in that ledger), and we'll need to keep track of those different input & output types in the same way as for TransactionProcessors.

Simplifying the pipeline

One alternate idea for simplifying the data pipeline lifecycle is to create and tear down a new pipeline with each new Ledger or State that comes in. Presumably Processors communicate with channels, and the end of input from a LedgerReader or StateReader can be signaled by closing the channel. A TransactionProcessor can then achieve anything that a LedgerProcessor would achieve by keeping track of state during its lifetime (like the list of all payment amounts in the example above), and then having a Finish method (like in Ratchet) that would output the final results down its output channel. This allows us to only think in terms of TransactionProcessors.

Here's the proposed lifecycle for processing when a new Ledger comes in (largely based on Ratchet):

  • the LedgerPipeline creates new instances of all TransactionProcessors in its pipeline, and creates new communication channels linking all the processors together.
  • the LedgerPipeline uses its input LedgerReader to read Transaction, TransactionResult, and TransactionMeta structs for each transaction in the ledger.
  • For each transaction, LedgerPipeline sends those 3 pieces of info down the channel(s) for its first stage of TransactionProcessors
  • The transactions flow from processor to processor via channels, eventually getting written out by the final processors in the pipeline tree
  • Once the last transaction is sent to the first stage of processors, the pipeline closes those channels
  • each processor detects the close of its input channel, calls Finish, and closes its output channel. The call to Finish enables per-ledger computations like median.
  • Once all processors have finished and all channels have closed, the LedgerPipeline destroys all its TransactionProcessor`s
  • the pipeline begins anew when a new Ledger comes in. Repeat!

The flow for ArchiveLedgerReader is the same, except TransactionMetas are missing.

The flow is the same for processing StateReader, except LedgerEntry structs are passed into the first stage of StateProcessors rather than Transaction, TransactionResult, and TransactionMeta.

Pipeline conclusion

Thinking it through, I'm leaning towards the alternate approach. @nikhilsaraf I'd love your thoughts.

We end up with a TransactionProcessor something like this:

type Data map[string]interface{}

type TransactionProcessor interface {
    RequiredFields() []string
    UpdatedFields() map[string]FieldAction
    Process(input Data, outputChan chan Data, killChan chan error) 
    Finish() (outputChan chan Data, killChan chan error)
}

Ratchet

I think we can learn a lot from Ratchet, but it's probably not suited to our workload. this is a great guide on how to use it.

These seem to be the main challenges with using Ratchet:

  • Ratchet is built on using JSON between DataProcessors, which is slow and difficult to convert our XDR structs into / out of
  • The last commit was 2.5 years ago. We could hit major issues in production and have a very hard time debugging it.

@bartekn bartekn added the ingest New ingestion system label May 17, 2019
@tomquisel
Copy link
Contributor

tomquisel commented May 29, 2019

Great proposal and discussion! @nikhilsaraf @bartekn I'm closing it out since I think we've iterated a few more steps since then. Feel free to reopen if I'm wrong.

@tomquisel tomquisel closed this May 29, 2019
@bartekn bartekn deleted the update_horizon_ingest_design_pipeline branch November 14, 2019 19:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingest New ingestion system
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants