Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): Refactor improving performance and atomicity. #8

Merged
merged 4 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ run:
modules-download-mode: readonly
allow-parallel-runners: false
go: ""
build-tags:
- testmocks
skip-dirs:
- serve/filters/mocks

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
uses: actions/checkout@v4

- name: Go test
run: go test -tags "testmocks" -shuffle=on -coverprofile coverage.out -timeout 5m ./...
run: go test -shuffle=on -coverprofile coverage.out -timeout 5m ./...

test-with-race:
runs-on: ubuntu-latest
Expand All @@ -29,4 +29,4 @@ jobs:
uses: actions/checkout@v4

- name: Go race test
run: go test -tags "testmocks" -race -shuffle=on -timeout 5m ./...
run: go test -race -shuffle=on -timeout 5m ./...
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ fixalign:
.PHONY: test
test:
go clean -testcache
go test -v -tags "testmocks" ./...
go test -v ./...
9 changes: 5 additions & 4 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"flag"
"fmt"

"github.com/peterbourgon/ff/v3/ffcli"
"go.uber.org/zap"

"github.com/gnolang/tx-indexer/client"
"github.com/gnolang/tx-indexer/events"
"github.com/gnolang/tx-indexer/fetch"
"github.com/gnolang/tx-indexer/serve"
"github.com/gnolang/tx-indexer/storage"
"github.com/peterbourgon/ff/v3/ffcli"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -112,7 +113,7 @@ func (c *startCfg) exec(ctx context.Context) error {
}

// Create a DB instance
db, err := storage.New(c.dbPath)
db, err := storage.NewPebble(c.dbPath)
if err != nil {
return fmt.Errorf("unable to open storage DB, %w", err)
}
Expand Down Expand Up @@ -165,7 +166,7 @@ func (c *startCfg) exec(ctx context.Context) error {
// setupJSONRPC sets up the JSONRPC instance
func setupJSONRPC(
listenAddress string,
db *storage.Storage,
db *storage.Pebble,
em *events.Manager,
logger *zap.Logger,
) *serve.JSONRPC {
Expand Down
32 changes: 22 additions & 10 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"sort"
"time"

storageErrors "github.com/gnolang/tx-indexer/storage/errors"
"github.com/gnolang/tx-indexer/types"
queue "github.com/madz-lab/insertion-queue"
"go.uber.org/zap"

"github.com/gnolang/tx-indexer/storage"
storageErrors "github.com/gnolang/tx-indexer/storage/errors"
"github.com/gnolang/tx-indexer/types"
)

const (
Expand All @@ -22,7 +24,7 @@ const (
// Fetcher is an instance of the block indexer
// fetcher
type Fetcher struct {
storage Storage
storage storage.Storage
client Client
events Events

Expand All @@ -38,7 +40,7 @@ type Fetcher struct {
// New creates a new data fetcher instance
// that gets blockchain data from a remote chain
func New(
storage Storage,
storage storage.Storage,
client Client,
events Events,
opts ...Option,
Expand Down Expand Up @@ -176,9 +178,11 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
// Pop the next chunk
f.chunkBuffer.PopFront()

wb := f.storage.WriteBatch()

// Save the fetched data
for blockIndex, block := range item.chunk.blocks {
if saveErr := f.storage.SaveBlock(block); saveErr != nil {
if saveErr := wb.SetBlock(block); saveErr != nil {
// This is a design choice that really highlights the strain
// of keeping legacy testnets running. Current TM2 testnets
// have blocks / transactions that are no longer compatible
Expand All @@ -189,21 +193,21 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
continue
}

f.logger.Debug("Saved block data", zap.Int64("number", block.Height))
f.logger.Debug("Added block data to batch", zap.Int64("number", block.Height))

// Get block results
txResults := item.chunk.results[blockIndex]

// Save the fetched transaction results
for _, txResult := range txResults {
if err := f.storage.SaveTx(txResult); err != nil {
if err := wb.SetTx(txResult); err != nil {
f.logger.Error("unable to save tx", zap.String("err", err.Error()))

continue
}

f.logger.Debug(
"Saved tx",
"Added tx to batch",
zap.String("hash", base64.StdEncoding.EncodeToString(txResult.Tx.Hash())),
)
}
Expand All @@ -218,15 +222,23 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
}

f.logger.Info(
"Saved block and tx data for range",
"Added to batch block and tx data for range",
zivkovicmilos marked this conversation as resolved.
Show resolved Hide resolved
zap.Int64("from", item.chunkRange.from),
zap.Int64("to", item.chunkRange.to),
)

// Save the latest height data
if err := f.storage.SaveLatestHeight(item.chunkRange.to); err != nil {
if err := wb.SetLatestHeight(item.chunkRange.to); err != nil {
if rErr := wb.Rollback(); rErr != nil {
return fmt.Errorf("unable to save latest height info, %w, %w", err, rErr)
}

return fmt.Errorf("unable to save latest height info, %w", err)
}

if err := wb.Commit(); err != nil {
return fmt.Errorf("error persisting block information into storage, %w", err)
}
}
}
}
Expand Down
Loading
Loading