Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/ingest: Ingest pipeline prototype #1264

Merged
merged 22 commits into from
May 29, 2019
Merged

exp/ingest: Ingest pipeline prototype #1264

merged 22 commits into from
May 29, 2019

Conversation

bartekn
Copy link
Contributor

@bartekn bartekn commented May 10, 2019

This PR was based on #1216. To see pipeline code only go here.

This is a prototype of ingestion pipeline of the new ingestion system. This is a result of experimentation with different approaches (like #1154 or ratchet). In this prototype pipeline is a tree of nodes. Each node reads from StateReadCloser (close #1309) provided by the previous node and writes to StateWriteCloser that will be converted to StateReadCloser read by the following node, and so on.

Processors can save aggregated data in a Store shared across all processors in the pipeline. This means that processors can see data generated by processors in a different subtree.

In this design state is never stored fully in memory. In fact, it uses very little memory because all processors are started at the same time and read data as soon it's available. Buffers are used to transform data between processors (read below) but in most cases (if there are no delays reading data from StateReadClosers) they will be empty most of the time. However, StateReadCloser and StateWriteCloser are so generic that it would be possible to write an implementation that runs pipeline on a cluster of machines in a future.

Classes and interfaces

(edit: ingest/filters below should be ingest/pipeline - unfortunately draw.io did no save my changes...)
Copy of Ingest imported

Most structs and methods have decent godoc but here's a quick summary:

Pipeline

Pipeline represents a processing pipeline. User can add a tree using AddStateProcessorTree.

PipelineNode

Tree can be constructed using helper Node method and PipelineNode struct. See examples below.

Store

Store allows storing artifacts to be used by following nodes. Ex. aggregations.

multiWriteCloser

multiWriteCloser works like io.MultiWriter with two exceptions:

  • It sends data to writers concurrently.
  • It also allows Close()-ing streams.

It's used when a pipeline node has many children. It allows distributing written entries to all of the workers.

bufferedStateReadWriteCloser

bufferedStateReadWriteCloser is a buffered struct implementing both StateReader and StateWriteCloser. Acts like a pipe between pipeline nodes. Consider simple A -> B -> C pipeline:

  • A writes to StateWriteCloser (which in reality is bufferedStateReadWriteCloser).
  • B reads from StateReadCloser (which in reality is bufferedStateReadWriteCloser that A wrote to). B writes to StateWriteCloser (which in reality is a new bufferedStateReadWriteCloser).
  • C reads from StateReadCloser (bufferedStateReadWriteCloser) that B writes to.

bufferedStateReadWriteCloser maintains internal buffer:

  • When buffer is empty and something calls Read it locks until there's a Write adding data OR returns io.EOF when stream has been closed.
  • When buffer is full and something Writes to it, it locks until there's a Read reading data from it.

That way we can reason about memory usage and cap it at sum of max capacity of all buffers. It's using a channel internally.

StateProcessor (interface)

Defines method that processors must implement. Check godoc.

Data flow and concurrency

Processor can be run concurrently. When this happens, pipeline starts multiple workers running the same ProcessState(store *Store, readCloser io.StateReadCloser, writeCloser io.StateWriteCloser) (err error) method but they all read from the same StateReadCloser and write to the same StateWriteCloser. What is more, each processor can send data to multiple children.

The following diagram explains how bufferedStateReadWriteCloser and multiWriteCloser help achieve this:

multiWriteCloser

When multiple workers are started, multiWriteCloser that writes to N bufferedStateReadWriteCloser is created where N is a number of children of the current processor. Children then read from bufferedStateReadWriteCloser.

Demo

EDIT You can run "Accounts for Signer" demo now: go run -v ./exp/tools/accounts-for-signer/.

go test -v ./ingest/pipeline/ -run TestPipeline

Demo runs the following pipeline:

pipeline.Node(passthroughProcessor).
  Pipe(
    // Passes accounts only
    pipeline.Node(accountsOnlyFilter).
      Pipe(
        // Finds accounts for a single signer
        pipeline.Node(&AccountsForSignerProcessor{Signer: "GCS26OX27PF67V22YYCTBLW3A4PBFAL723QG3X3FQYEL56FXX2C7RX5G"}).
          Pipe(pipeline.Node(printAllProcessor)),

        // Counts accounts with prefix GA/GB/GC/GD and stores results in a store
        pipeline.Node(&CountPrefixProcessor{Prefix: "GA"}).
          Pipe(pipeline.Node(printCountersProcessor)),
        pipeline.Node(&CountPrefixProcessor{Prefix: "GB"}).
          Pipe(pipeline.Node(printCountersProcessor)),
        pipeline.Node(&CountPrefixProcessor{Prefix: "GC"}).
          Pipe(pipeline.Node(printCountersProcessor)),
        pipeline.Node(&CountPrefixProcessor{Prefix: "GD"}).
          Pipe(pipeline.Node(printCountersProcessor)),
      ),
    // Passes trust lines only
    pipeline.Node(trustLinesOnlyFilter).
      Pipe(pipeline.Node(printAllProcessor)),
  ),

That can be represented as a diagram:
Demo pipeline

After starting the demo, it will display updated stats every second, ex:

Alloc = 73 MiB	HeapAlloc = 73 MiB	Sys = 135 MiB	NumGC = 31	Goroutines = 154	NumCPU = 4

Duration: 45.68112544s

└ PassthroughProcessor read=274155 (queued=0 rps=6525) wrote=274165 (w/r ratio = 1.00004) concurrent=true jobs=20
  • • • • • • • • • • • • • • • • • • • • 
  └ EntryTypeFilter (LedgerEntryTypeAccount) read=274165 (queued=0 rps=6525) wrote=137083 (w/r ratio = 0.50000) concurrent=false jobs=1
    └ AccountsForSignerProcessor read=137082 (queued=0 rps=3262) wrote=1397 (w/r ratio = 0.01019) concurrent=false jobs=1
      └ PrintAllProcessor read=1397 (queued=0 rps=35) wrote=0 (w/r ratio = 0.00000) concurrent=false jobs=1
    └ CountPrefixProcessor (GA) read=137082 (queued=0 rps=3262) wrote=34459 (w/r ratio = 0.25138) concurrent=true jobs=20
      • • • • • • • • • • • • • • • • • • • • 
      └ PrintCountersProcessor read=34459 (queued=0 rps=781) wrote=0 (w/r ratio = 0.00000) concurrent=false jobs=1
    └ CountPrefixProcessor (GB) read=137082 (queued=0 rps=3262) wrote=34067 (w/r ratio = 0.24852) concurrent=true jobs=20
      • • • • • • • • • • • • • • • • • • • • 
      └ PrintCountersProcessor read=34067 (queued=0 rps=844) wrote=0 (w/r ratio = 0.00000) concurrent=false jobs=1
    └ CountPrefixProcessor (GC) read=137082 (queued=0 rps=3262) wrote=33953 (w/r ratio = 0.24768) concurrent=true jobs=20
      • • • • • • • • • • • • • • • • • • • • 
      └ PrintCountersProcessor read=33953 (queued=0 rps=807) wrote=0 (w/r ratio = 0.00000) concurrent=false jobs=1
    └ CountPrefixProcessor (GD) read=137082 (queued=0 rps=3262) wrote=34603 (w/r ratio = 0.25243) concurrent=true jobs=20
      • • • • • • • • • • • • • • • • • • • • 
      └ PrintCountersProcessor read=34603 (queued=0 rps=830) wrote=0 (w/r ratio = 0.00000) concurrent=false jobs=1
  └ EntryTypeFilter (LedgerEntryTypeTrustline) read=274165 (queued=0 rps=6525) wrote=137082 (w/r ratio = 0.50000) concurrent=false jobs=1
    └ PrintAllProcessor read=137082 (queued=0 rps=3262) wrote=0 (w/r ratio = 0.00000) concurrent=false jobs=1

Notes:

  • (dots) represent workers and are displayed only when processor is run concurrently (currently 20 workers are started).
  • rps = reads per second,
  • queued - number of unread items queued in a buffer. You can uncomment time.Sleep in CountPrefixProcessor to observe what happens when buffer is getting full.

@bartekn bartekn added the ingest New ingestion system label May 17, 2019
@bartekn bartekn force-pushed the ingest-pipeline branch from 5e64994 to c3753ec Compare May 22, 2019 15:01
@bartekn bartekn marked this pull request as ready for review May 27, 2019 15:25
@bartekn bartekn changed the title Ingest pipeline prototype exp/ingest: Ingest pipeline prototype May 27, 2019
Copy link
Contributor

@tomquisel tomquisel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great 🎉 ! I added a few minor comments.

"github.com/stellar/go/xdr"
)

const bufferSize = 50000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing you're planning on making this configurable later? Definitely not a blocker for the prototype.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can make it configurable in a future. However, in a good pipeline the buffer contains mostly a few elements or the number of elements is constant.


entry, more := <-b.buffer
if more {
b.readEntries++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like incrementing b.readEntries may not be threadsafe? What if two goroutines call Read() at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, added a mutex protecting this variable. It's displayed in stats only (which are also not super accurate for multiple reasons) but this one was easy to fix.

return nil
}

var _ io.StateReadCloser = &bufferedStateReadWriteCloser{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment explaining that these ensure the interface is satisfied? I'm not sure how idiomatic this is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is idiomatic.

// from the same StateReader and write to the same StateWriter.
// Example: you can calculate number of asset holders in a single processor but
// you can also start multiple processors that sum asset holders in a shared
// variable to calculate it faster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this example, wouldn't the shared variable need a mutex and so not be any faster than a single-threaded approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right the example was stupid. I changed it to a processor saving data into a DB (which makes sense if you need to do some data conversions first, ex. strkey-encoding public key, converting balances etc.).

m.wroteEntries++
m.mutex.Unlock()

var wg sync.WaitGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the WaitGroup? It seems like we know exactly how many times to call err := <- results below, and that channel read will block correctly until all results have been read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, removed.

m.mutex.Lock()
defer m.mutex.Unlock()

m.closeAfter--
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could go below 0. Doesn't look like a bug, but may be confusing when debugging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It now returns error when below zero.


for i := 1; i <= jobs; i++ {
wg.Add(1)
go func(reader io.StateReadCloser, writer io.StateWriteCloser) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to pass these as arguments? It seems like it should be fine to use the variables scoped at the procesStateNode level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, removed. I automatically do it to prevent bugs like Figure 8 in this paper. However, you're right that it's not needed in this case.

s.values = make(map[string]interface{})
}

func (s *Store) Put(name string, value interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this interface generalize to a postgres Store?

Copy link
Contributor Author

@bartekn bartekn May 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a short note to Store in UML diagram I sent yesterday. Store is responsible for storing artifacts or share data between processors in a single pipeline. For example, let's say you want to calculate average XLM balance. You can create a StateProcessor that calculates this value and then saves it to the Store (because there's no other way to pass data down the pipeline than by using StateWriteCloser). Then the other processor will save this value to a DB.

return true
}

func (n *SimpleProcessor) CallCount() int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect CallCount just to read a value rather than increment it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the name as it was confusing. It's important that this is atomic.

@bartekn bartekn merged commit cca9f0a into master May 29, 2019
@bartekn bartekn deleted the ingest-pipeline branch May 29, 2019 13:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingest New ingestion system
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Change StateReader to StateReadCloser
3 participants