Skip to content

Commit

Permalink
fix(storage): Refactor improving performance and atomicity. (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajnavarro authored Feb 14, 2024
1 parent 9fef618 commit d5bb902
Show file tree
Hide file tree
Showing 23 changed files with 538 additions and 514 deletions.
2 changes: 0 additions & 2 deletions .github/golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ run:
modules-download-mode: readonly
allow-parallel-runners: false
go: ""
build-tags:
- testmocks
skip-dirs:
- serve/filters/mocks

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
uses: actions/checkout@v4

- name: Go test
run: go test -tags "testmocks" -shuffle=on -coverprofile coverage.out -timeout 5m ./...
run: go test -shuffle=on -coverprofile coverage.out -timeout 5m ./...

test-with-race:
runs-on: ubuntu-latest
Expand All @@ -29,4 +29,4 @@ jobs:
uses: actions/checkout@v4

- name: Go race test
run: go test -tags "testmocks" -race -shuffle=on -timeout 5m ./...
run: go test -race -shuffle=on -timeout 5m ./...
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ fixalign:
.PHONY: test
test:
go clean -testcache
go test -v -tags "testmocks" ./...
go test -v ./...
9 changes: 5 additions & 4 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"flag"
"fmt"

"github.com/peterbourgon/ff/v3/ffcli"
"go.uber.org/zap"

"github.com/gnolang/tx-indexer/client"
"github.com/gnolang/tx-indexer/events"
"github.com/gnolang/tx-indexer/fetch"
"github.com/gnolang/tx-indexer/serve"
"github.com/gnolang/tx-indexer/storage"
"github.com/peterbourgon/ff/v3/ffcli"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -112,7 +113,7 @@ func (c *startCfg) exec(ctx context.Context) error {
}

// Create a DB instance
db, err := storage.New(c.dbPath)
db, err := storage.NewPebble(c.dbPath)
if err != nil {
return fmt.Errorf("unable to open storage DB, %w", err)
}
Expand Down Expand Up @@ -165,7 +166,7 @@ func (c *startCfg) exec(ctx context.Context) error {
// setupJSONRPC sets up the JSONRPC instance
func setupJSONRPC(
listenAddress string,
db *storage.Storage,
db *storage.Pebble,
em *events.Manager,
logger *zap.Logger,
) *serve.JSONRPC {
Expand Down
32 changes: 22 additions & 10 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"sort"
"time"

storageErrors "github.com/gnolang/tx-indexer/storage/errors"
"github.com/gnolang/tx-indexer/types"
queue "github.com/madz-lab/insertion-queue"
"go.uber.org/zap"

"github.com/gnolang/tx-indexer/storage"
storageErrors "github.com/gnolang/tx-indexer/storage/errors"
"github.com/gnolang/tx-indexer/types"
)

const (
Expand All @@ -22,7 +24,7 @@ const (
// Fetcher is an instance of the block indexer
// fetcher
type Fetcher struct {
storage Storage
storage storage.Storage
client Client
events Events

Expand All @@ -38,7 +40,7 @@ type Fetcher struct {
// New creates a new data fetcher instance
// that gets blockchain data from a remote chain
func New(
storage Storage,
storage storage.Storage,
client Client,
events Events,
opts ...Option,
Expand Down Expand Up @@ -176,9 +178,11 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
// Pop the next chunk
f.chunkBuffer.PopFront()

wb := f.storage.WriteBatch()

// Save the fetched data
for blockIndex, block := range item.chunk.blocks {
if saveErr := f.storage.SaveBlock(block); saveErr != nil {
if saveErr := wb.SetBlock(block); saveErr != nil {
// This is a design choice that really highlights the strain
// of keeping legacy testnets running. Current TM2 testnets
// have blocks / transactions that are no longer compatible
Expand All @@ -189,21 +193,21 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
continue
}

f.logger.Debug("Saved block data", zap.Int64("number", block.Height))
f.logger.Debug("Added block data to batch", zap.Int64("number", block.Height))

// Get block results
txResults := item.chunk.results[blockIndex]

// Save the fetched transaction results
for _, txResult := range txResults {
if err := f.storage.SaveTx(txResult); err != nil {
if err := wb.SetTx(txResult); err != nil {
f.logger.Error("unable to save tx", zap.String("err", err.Error()))

continue
}

f.logger.Debug(
"Saved tx",
"Added tx to batch",
zap.String("hash", base64.StdEncoding.EncodeToString(txResult.Tx.Hash())),
)
}
Expand All @@ -218,15 +222,23 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
}

f.logger.Info(
"Saved block and tx data for range",
"Added to batch block and tx data for range",
zap.Int64("from", item.chunkRange.from),
zap.Int64("to", item.chunkRange.to),
)

// Save the latest height data
if err := f.storage.SaveLatestHeight(item.chunkRange.to); err != nil {
if err := wb.SetLatestHeight(item.chunkRange.to); err != nil {
if rErr := wb.Rollback(); rErr != nil {
return fmt.Errorf("unable to save latest height info, %w, %w", err, rErr)
}

return fmt.Errorf("unable to save latest height info, %w", err)
}

if err := wb.Commit(); err != nil {
return fmt.Errorf("error persisting block information into storage, %w", err)
}
}
}
}
Expand Down
Loading

0 comments on commit d5bb902

Please sign in to comment.