Skip to content

Commit

Permalink
New Build Applications: Create pipeline with ingest SDK (#687)
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland authored Jun 24, 2024
1 parent 900dc7f commit a837fe6
Show file tree
Hide file tree
Showing 3 changed files with 359 additions and 0 deletions.
7 changes: 7 additions & 0 deletions docs/building-apps/ingest-sdk/_category_.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"position": 55,
"label": "Build Custom Network Ingestion Pipeline",
"link": {
"type": "generated-index"
}
}
290 changes: 290 additions & 0 deletions docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
---
title: Ingestion Pipeline Sample Code
sidebar_position: 30
---

Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingestion SDK](overview.mdx#the-ingestion-sdk-packages) to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber.

This example uses the ZeroMQ [goczmq](https://github.com/zeromq/goczmq) Go wrapper SDK, which requires a few o/s [dependent libraries to also be installed on the host machine](https://github.com/zeromq/goczmq?tab=readme-ov-file#dependencies).

Put these files in a directory, compile and run with `go build -o pipeline ./.; ./pipeline`

### `go.mod`

<CodeExample>

```
module example/pipeline
go 1.22
toolchain go1.22.1
require (
github.com/stellar/go v0.0.0-20240614234001-3a31ed780c58
github.com/zeromq/goczmq v4.1.0+incompatible
)
```

</CodeExample>

### `main.go`

<CodeExample>

```go
package main

import (
"context"
"encoding/json"
"io"
"log"
"os"
"os/signal"

"github.com/pkg/errors"
"github.com/stellar/go/amount"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"

"github.com/zeromq/goczmq"
)

// Application specifics
type AppPayment struct {
Timestamp uint
BuyerAccountId string
SellerAccountId string
AssetCode string
Amount string
}

// General stream topology
type Message struct {
Payload []byte
}

type Processor interface {
Process(context.Context, Message) error
}

type Publisher interface {
Subscribe(receiver Processor)
}

// Ingestion Pipeline Processors
type ZeroMQOutboundAdapter struct {
Publisher *goczmq.Sock
}

func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
_, err := adapter.Publisher.Write(msg.Payload)
return err
}

type AppPaymentTransformer struct {
processors []Processor
networkPassPhrase string
}

func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
transformer.processors = append(transformer.processors, receiver)
}

func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
ledgerCloseMeta := xdr.LedgerCloseMeta{}
err := ledgerCloseMeta.UnmarshalBinary(msg.Payload)
if err != nil {
return errors.Wrapf(err, "failed to unmarshal message payload to LedgerCloseMeta")
}

ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())
}

closeTime := uint(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime)

// scan all transactions in a ledger for payments to derive new model from
transaction, err := ledgerTxReader.Read()
for ; err == nil; transaction, err = ledgerTxReader.Read() {
for _, op := range transaction.Envelope.Operations() {
switch op.Body.Type {
case xdr.OperationTypePayment:
networkPayment := op.Body.MustPaymentOp()
myPayment := AppPayment{
Timestamp: closeTime,
BuyerAccountId: networkPayment.Destination.Address(),
SellerAccountId: op.SourceAccount.Address(),
AssetCode: networkPayment.Asset.StringCanonical(),
Amount: amount.String(networkPayment.Amount),
}
jsonBytes, err := json.Marshal(myPayment)
if err != nil {
return err
}

for _, processor := range transformer.processors {
processor.Process(ctx, Message{Payload: jsonBytes})
}
}
}
}
if err != io.EOF {
return errors.Wrapf(err, "failed to read transaction from ledger %v", ledgerCloseMeta.LedgerSequence())
}
return nil
}

type CaptiveCoreInboundAdapter struct {
TomlParams ledgerbackend.CaptiveCoreTomlParams
processors []Processor
historyArchiveURLs []string
coreConfigNetworkTemplate []byte
}

func (adapter *CaptiveCoreInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)
}

func (adapter *CaptiveCoreInboundAdapter) Run(ctx context.Context) error {
// Setup captive core config to use the Pubnet network
captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromData(adapter.coreConfigNetworkTemplate, adapter.TomlParams)
if err != nil {
return errors.Wrap(err, "error creating captive core toml")
}

captiveConfig := ledgerbackend.CaptiveCoreConfig{
BinaryPath: adapter.TomlParams.CoreBinaryPath,
HistoryArchiveURLs: adapter.TomlParams.HistoryArchiveURLs,
Context: ctx,
Toml: captiveCoreToml,
}

// Create a new captive core backend, the origin of ingestion pipeline
captiveBackend, err := ledgerbackend.NewCaptive(captiveConfig)
if err != nil {
return errors.Wrap(err, "error creating captive core instance")
}

// Create a client to the network's history archives
historyAchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "my_app",
Context: ctx,
},
})

if err != nil {
return errors.Wrap(err, "error creating history archive client")
}

// Acquire the most recent ledger on network
latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(historyAchive)
if err != nil {
return errors.Wrap(err, "error getting latest ledger")
}

// Tell the captive core instance to emit LedgerCloseMeta starting at
// latest network ledger and continuing indefintely, streaming.
if err := captiveBackend.PrepareRange(ctx, ledgerbackend.UnboundedRange(latestNetworkLedger)); err != nil {
return errors.Wrap(err, "error preparing captive core ledger range")
}

// Run endless loop that receives LedgerCloseMeta from captive core for each new
// ledger generated by the network and pushes it to next processors in pipeline
for nextLedger := latestNetworkLedger; true; nextLedger++ {
ledgerCloseMeta, err := captiveBackend.GetLedger(ctx, nextLedger)
if err != nil {
return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)
}

payload, err := ledgerCloseMeta.MarshalBinary()
if err != nil {
return errors.Wrapf(err, "failed to encode ledger %d from xdr to binary", nextLedger)
}

log.Printf("Processing Ledger %v", nextLedger)
for _, processor := range adapter.processors {
if err := processor.Process(ctx, Message{Payload: payload}); err != nil {
return errors.Wrapf(err, "failed to process ledger %d", nextLedger)
}
}
}
return nil
}

func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()

// create the inbound 'source of origin' adapter,
// imports network data using this captive core config
networkInboundAdapter := &CaptiveCoreInboundAdapter{
historyArchiveURLs: network.TestNetworkhistoryArchiveURLs,
coreConfigNetworkTemplate: ledgerbackend.TestnetDefaultConfig,
TomlParams: ledgerbackend.CaptiveCoreTomlParams{
NetworkPassphrase: network.TestNetworkPassphrase,
HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs,
UseDB: true,
CoreBinaryPath: "stellar-core", // assumes you have installed stellar-core on your o/s PATH
},
}

// create the app transformer to convert network data to application data model
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.TestNetworkPassphrase}

// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555")
if err != nil {
log.Printf("error creating 0MQ publisher: %v", err)
return
}
defer publisher.Destroy()
outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher}

// wire up the ingestion pipeline and let it run
appTransformer.Subscribe(outboundAdapter)
networkInboundAdapter.Subscribe(appTransformer)
log.Printf("Payment ingestion pipeline ended %v", networkInboundAdapter.Run(ctx))
}
```

</CodeExample>

### `distributed_payment_subsciber.py`

A Python script demonstrating how we now have distributed processing and event driven architecture by leveraging the MQ Broker to push derived application payment data model out to other microservices. Make sure to `pip install pyzmq`

<CodeExample>

```python
import sys
import zmq
import json

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting next 10 payments from pipeline ...")
socket.connect("tcp://127.0.0.1:5555")
socket.subscribe("")

for request in range(10):

message = socket.recv()
json_object = json.loads(message)
json_formatted_str = json.dumps(json_object, indent=2)
print(f"Received payment:\n\n{json_formatted_str}")

```

</CodeExample>
62 changes: 62 additions & 0 deletions docs/building-apps/ingest-sdk/overview.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
---
title: Overview
sidebar_position: 10
---

This tutorial walks through how an application can leverage common streaming data patterns to ingest Stellar network transaction data using a few select packages from the Stellar Go Repo [github.com/stellar/go](https://github.com/stellar/go/blob/master/) collectively known as the 'Ingestion' SDK:

## The Ingestion SDK packages

- `github.com/stellar/go/amount` utility package to convert prices from network transaction operations to string
- `github.com/stellar/go/historyarchive` `github.com/stellar/go/support/datastore` `github.com/stellar/go/support/storage` utility package with convenient wrappers for accessing history archives, and avoid low-level http aspects
- `github.com/stellar/go/ingest` provides parsing functionality over the network ledger data, converts to more developer-centric `LedgerTransaction` model
- `github.com/stellar/go/ingest/ledgerbackend` provides the captive core backend implementation
- `github.com/stellar/go/network` provides convenient pre-configured settings for Testnet and Mainnet networks
- `github.com/stellar/go/xdr` a complete Golang binding to the Stellar network data model

## Ingestion project setup

### Project requirements

To build an example streaming network ingestion pipeline from live Stellar network transaction data, you'll need:

- A developer workstation with [Go](https://go.dev/learn/) programming language runtime installed
- An IDE to edit Go code, [VSCode](https://code.visualstudio.com/download) is good if one is needed
- A newly initialized, empty Go project folder. `mkdir pipeline; cd pipeline; go mod init example/pipeline`
- `stellar-core` must be [installed](https://developers.stellar.org/network/core-node/admin-guide/installation) on your workstation and available on your o/s PATH

The [Stellar network data model](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr). Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment, and defines its own data model internally:

<CodeExample>

```
::AppPayment
Timestamp: uint
BuyerAccountId: string
SellerAccountId: string
AssetCode: string
Amount: string
}
```

</CodeExample>

The example application will run a [network ingestion pipeline](https://github.com/stellar/go/blob/master/ingest/doc.go) to derive a smaller `ApplicationPayment` model from the [Stellar network transaction data model](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) as 'source of origin' and thus enable the application to avoid large compute resources that would have been required for maintaining storage of the full Stellar network data model.

The ingestion pipeline will perform three distinct stream processor roles:

### Inbound Adapter

Acts as the 'source of origin' for the pipeline. Retrieves [LedgerCloseMeta](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) generated from a Stellar network using captive core. `LedgerCloseMeta` is the top-level aggregate in the Stellar data model of which all [Stellar network transaction data](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) is nested within. Publishes the `LedgerCloseMeta` onto the pipeline.

### Transformer

Subscribes to receive `LedgerCloseMeta` from the pipeline. Uses the Go SDK package [github.com/stellar/go/xdr](https://github.com/stellar/go/tree/master/xdr) to parse the nested network data model for payment operations and convert those into a new instance of application data model `ApplicationPayment` instances. Publishes `ApplicationPayment` to the pipeline.

### Outbound Adapter

Acts as the termination of the pipeline, it subscribes to receive `ApplicationPayment` and publishes the data off the pipeline and to an external data store, a ZeroMQ Publisher Socket, which is essentially a message broker.

### Summary

Refer to [Ingestion Pipeline Sample Application](ingestion-pipeline-code.mdx) for complete code demonstrating usage of the 'ingestion' SDK packages to create these adapters and transformers and run a live pipeline against the Stellar network.

0 comments on commit a837fe6

Please sign in to comment.