From d5bb9026ea4887f12bb14963438a4e0de5390255 Mon Sep 17 00:00:00 2001 From: Antonio Navarro Perez Date: Wed, 14 Feb 2024 11:46:19 +0100 Subject: [PATCH] fix(storage): Refactor improving performance and atomicity. (#8) --- .github/golangci.yaml | 2 - .github/workflows/test.yaml | 4 +- Makefile | 2 +- cmd/start.go | 9 +- fetch/fetch.go | 32 ++-- fetch/fetch_test.go | 189 +++++++++++--------- fetch/mocks_test.go | 48 +---- fetch/types.go | 17 +- internal/mock/filters.go | 45 +++++ internal/mock/storage.go | 100 +++++++++++ serve/filters/manager.go | 6 +- serve/filters/manager_test.go | 15 +- serve/filters/mocks/mocks.go | 74 -------- serve/filters/subscription/block_test.go | 9 +- serve/filters/types.go | 10 -- serve/handlers/subs/subs_test.go | 37 ++-- serve/jsonrpc.go | 16 +- storage/pebble.go | 136 ++++++++++++++ storage/pebble/pebble.go | 51 ------ storage/pebble/pebble_test.go | 76 -------- storage/{storage_test.go => pebble_test.go} | 33 ++-- storage/storage.go | 90 ---------- storage/types.go | 51 ++++++ 23 files changed, 538 insertions(+), 514 deletions(-) create mode 100644 internal/mock/filters.go create mode 100644 internal/mock/storage.go delete mode 100644 serve/filters/mocks/mocks.go create mode 100644 storage/pebble.go delete mode 100644 storage/pebble/pebble.go delete mode 100644 storage/pebble/pebble_test.go rename storage/{storage_test.go => pebble_test.go} (83%) delete mode 100644 storage/storage.go create mode 100644 storage/types.go diff --git a/.github/golangci.yaml b/.github/golangci.yaml index 7539e04b..cade26c4 100644 --- a/.github/golangci.yaml +++ b/.github/golangci.yaml @@ -7,8 +7,6 @@ run: modules-download-mode: readonly allow-parallel-runners: false go: "" - build-tags: - - testmocks skip-dirs: - serve/filters/mocks diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 9afd7321..83a11d24 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -15,7 +15,7 @@ jobs: uses: actions/checkout@v4 - name: Go test - run: go test -tags "testmocks" -shuffle=on -coverprofile coverage.out -timeout 5m ./... + run: go test -shuffle=on -coverprofile coverage.out -timeout 5m ./... test-with-race: runs-on: ubuntu-latest @@ -29,4 +29,4 @@ jobs: uses: actions/checkout@v4 - name: Go race test - run: go test -tags "testmocks" -race -shuffle=on -timeout 5m ./... + run: go test -race -shuffle=on -timeout 5m ./... diff --git a/Makefile b/Makefile index 9c155e6d..0d55d775 100644 --- a/Makefile +++ b/Makefile @@ -22,4 +22,4 @@ fixalign: .PHONY: test test: go clean -testcache - go test -v -tags "testmocks" ./... \ No newline at end of file + go test -v ./... \ No newline at end of file diff --git a/cmd/start.go b/cmd/start.go index 0e760703..ff8da971 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -6,13 +6,14 @@ import ( "flag" "fmt" + "github.com/peterbourgon/ff/v3/ffcli" + "go.uber.org/zap" + "github.com/gnolang/tx-indexer/client" "github.com/gnolang/tx-indexer/events" "github.com/gnolang/tx-indexer/fetch" "github.com/gnolang/tx-indexer/serve" "github.com/gnolang/tx-indexer/storage" - "github.com/peterbourgon/ff/v3/ffcli" - "go.uber.org/zap" ) const ( @@ -112,7 +113,7 @@ func (c *startCfg) exec(ctx context.Context) error { } // Create a DB instance - db, err := storage.New(c.dbPath) + db, err := storage.NewPebble(c.dbPath) if err != nil { return fmt.Errorf("unable to open storage DB, %w", err) } @@ -165,7 +166,7 @@ func (c *startCfg) exec(ctx context.Context) error { // setupJSONRPC sets up the JSONRPC instance func setupJSONRPC( listenAddress string, - db *storage.Storage, + db *storage.Pebble, em *events.Manager, logger *zap.Logger, ) *serve.JSONRPC { diff --git a/fetch/fetch.go b/fetch/fetch.go index f3b59893..a646f4cc 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -8,10 +8,12 @@ import ( "sort" "time" - storageErrors "github.com/gnolang/tx-indexer/storage/errors" - "github.com/gnolang/tx-indexer/types" queue "github.com/madz-lab/insertion-queue" "go.uber.org/zap" + + "github.com/gnolang/tx-indexer/storage" + storageErrors "github.com/gnolang/tx-indexer/storage/errors" + "github.com/gnolang/tx-indexer/types" ) const ( @@ -22,7 +24,7 @@ const ( // Fetcher is an instance of the block indexer // fetcher type Fetcher struct { - storage Storage + storage storage.Storage client Client events Events @@ -38,7 +40,7 @@ type Fetcher struct { // New creates a new data fetcher instance // that gets blockchain data from a remote chain func New( - storage Storage, + storage storage.Storage, client Client, events Events, opts ...Option, @@ -176,9 +178,11 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error { // Pop the next chunk f.chunkBuffer.PopFront() + wb := f.storage.WriteBatch() + // Save the fetched data for blockIndex, block := range item.chunk.blocks { - if saveErr := f.storage.SaveBlock(block); saveErr != nil { + if saveErr := wb.SetBlock(block); saveErr != nil { // This is a design choice that really highlights the strain // of keeping legacy testnets running. Current TM2 testnets // have blocks / transactions that are no longer compatible @@ -189,21 +193,21 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error { continue } - f.logger.Debug("Saved block data", zap.Int64("number", block.Height)) + f.logger.Debug("Added block data to batch", zap.Int64("number", block.Height)) // Get block results txResults := item.chunk.results[blockIndex] // Save the fetched transaction results for _, txResult := range txResults { - if err := f.storage.SaveTx(txResult); err != nil { + if err := wb.SetTx(txResult); err != nil { f.logger.Error("unable to save tx", zap.String("err", err.Error())) continue } f.logger.Debug( - "Saved tx", + "Added tx to batch", zap.String("hash", base64.StdEncoding.EncodeToString(txResult.Tx.Hash())), ) } @@ -218,15 +222,23 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error { } f.logger.Info( - "Saved block and tx data for range", + "Added to batch block and tx data for range", zap.Int64("from", item.chunkRange.from), zap.Int64("to", item.chunkRange.to), ) // Save the latest height data - if err := f.storage.SaveLatestHeight(item.chunkRange.to); err != nil { + if err := wb.SetLatestHeight(item.chunkRange.to); err != nil { + if rErr := wb.Rollback(); rErr != nil { + return fmt.Errorf("unable to save latest height info, %w, %w", err, rErr) + } + return fmt.Errorf("unable to save latest height info, %w", err) } + + if err := wb.Commit(); err != nil { + return fmt.Errorf("error persisting block information into storage, %w", err) + } } } } diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index 9e38fe3d..18dd6921 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -13,13 +13,16 @@ import ( "github.com/gnolang/gno/tm2/pkg/bft/state" "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/std" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + clientTypes "github.com/gnolang/tx-indexer/client/types" "github.com/gnolang/tx-indexer/events" + "github.com/gnolang/tx-indexer/internal/mock" + "github.com/gnolang/tx-indexer/storage" storageErrors "github.com/gnolang/tx-indexer/storage/errors" indexerTypes "github.com/gnolang/tx-indexer/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" ) func TestFetcher_FetchTransactions_Invalid(t *testing.T) { @@ -31,8 +34,8 @@ func TestFetcher_FetchTransactions_Invalid(t *testing.T) { var ( fetchErr = errors.New("random DB error") - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { return 0, fetchErr }, } @@ -84,31 +87,35 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { latestSaved = int64(0) - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { if latestSaved == 0 { return 0, storageErrors.ErrNotFound } return latestSaved, nil }, - saveBlockFn: func(block *types.Block) error { - savedBlocks = append(savedBlocks, block) - - // Check if all blocks are saved - if block.Height == int64(blockNum) { - // At this point, we can cancel the process - cancelFn() - } + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } - latestSaved = block.Height + latestSaved = block.Height - return nil - }, - saveTxFn: func(result *types.TxResult) error { - savedTxs = append(savedTxs, result) + return nil + }, + SetTxFn: func(result *types.TxResult) error { + savedTxs = append(savedTxs, result) - return nil + return nil + }, + } }, } @@ -235,31 +242,35 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { latestSaved = int64(0) - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { if latestSaved == 0 { return 0, storageErrors.ErrNotFound } return latestSaved, nil }, - saveBlockFn: func(block *types.Block) error { - savedBlocks = append(savedBlocks, block) - - // Check if all blocks are saved - if block.Height == int64(blockNum) { - // At this point, we can cancel the process - cancelFn() - } + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } - latestSaved = block.Height + latestSaved = block.Height - return nil - }, - saveTxFn: func(result *types.TxResult) error { - savedTxs = append(savedTxs, result) + return nil + }, + SetTxFn: func(result *types.TxResult) error { + savedTxs = append(savedTxs, result) - return nil + return nil + }, + } }, } @@ -405,25 +416,29 @@ func TestFetcher_FetchTransactions_Valid_EmptyBlocks(t *testing.T) { }, } - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { return 0, storageErrors.ErrNotFound }, - saveBlockFn: func(block *types.Block) error { - savedBlocks = append(savedBlocks, block) - - // Check if all blocks are saved - if block.Height == int64(blockNum) { - // At this point, we can cancel the process - cancelFn() - } + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } - return nil - }, - saveTxFn: func(_ *types.TxResult) error { - t.Fatalf("should not save txs") + return nil + }, + SetTxFn: func(_ *types.TxResult) error { + t.Fatalf("should not save txs") - return nil + return nil + }, + } }, } @@ -452,7 +467,7 @@ func TestFetcher_FetchTransactions_Valid_EmptyBlocks(t *testing.T) { Block: blocks[num], }, nil }, - getBlockResultsFn: func(num int64) (*core_types.ResultBlockResults, error) { + getBlockResultsFn: func(_ int64) (*core_types.ResultBlockResults, error) { t.Fatalf("should not request results") return nil, nil @@ -507,25 +522,29 @@ func TestFetcher_FetchTransactions_Valid_EmptyBlocks(t *testing.T) { }, } - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { return 0, storageErrors.ErrNotFound }, - saveBlockFn: func(block *types.Block) error { - savedBlocks = append(savedBlocks, block) - - // Check if all blocks are saved - if block.Height == int64(blockNum) { - // At this point, we can cancel the process - cancelFn() - } + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } - return nil - }, - saveTxFn: func(_ *types.TxResult) error { - t.Fatalf("should not save txs") + return nil + }, + SetTxFn: func(_ *types.TxResult) error { + t.Fatalf("should not save txs") - return nil + return nil + }, + } }, } @@ -623,25 +642,29 @@ func TestFetcher_InvalidBlocks(t *testing.T) { }, } - mockStorage = &mockStorage{ - getLatestSavedHeightFn: func() (int64, error) { + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (int64, error) { return 0, storageErrors.ErrNotFound }, - saveBlockFn: func(block *types.Block) error { - savedBlocks = append(savedBlocks, block) + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } + + return fmt.Errorf("unable to save block %d", block.Height) + }, + SetTxFn: func(_ *types.TxResult) error { + t.Fatalf("should not save txs") - // Check if all blocks are saved - if block.Height == int64(blockNum) { - // At this point, we can cancel the process - cancelFn() + return nil + }, } - - return fmt.Errorf("unable to save block %d", block.Height) - }, - saveTxFn: func(_ *types.TxResult) error { - t.Fatalf("should not save txs") - - return nil }, } diff --git a/fetch/mocks_test.go b/fetch/mocks_test.go index c7735bfc..dc966054 100644 --- a/fetch/mocks_test.go +++ b/fetch/mocks_test.go @@ -2,57 +2,11 @@ package fetch import ( core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" - "github.com/gnolang/gno/tm2/pkg/bft/types" + clientTypes "github.com/gnolang/tx-indexer/client/types" "github.com/gnolang/tx-indexer/events" ) -type ( - getLatestHeightDelegate func() (int64, error) - saveLatestHeightDelegate func(int64) error - saveBlockDelegate func(*types.Block) error - saveTxDelegate func(*types.TxResult) error -) - -type mockStorage struct { - getLatestSavedHeightFn getLatestHeightDelegate - saveLatestHeightFn saveLatestHeightDelegate - saveBlockFn saveBlockDelegate - saveTxFn saveTxDelegate -} - -func (m *mockStorage) GetLatestHeight() (int64, error) { - if m.getLatestSavedHeightFn != nil { - return m.getLatestSavedHeightFn() - } - - return 0, nil -} - -func (m *mockStorage) SaveLatestHeight(blockNum int64) error { - if m.saveLatestHeightFn != nil { - return m.saveLatestHeightFn(blockNum) - } - - return nil -} - -func (m *mockStorage) SaveTx(tx *types.TxResult) error { - if m.saveTxFn != nil { - return m.saveTxFn(tx) - } - - return nil -} - -func (m *mockStorage) SaveBlock(block *types.Block) error { - if m.saveBlockFn != nil { - return m.saveBlockFn(block) - } - - return nil -} - type ( getLatestBlockNumberDelegate func() (int64, error) getBlockDelegate func(int64) (*core_types.ResultBlock, error) diff --git a/fetch/types.go b/fetch/types.go index 8bbcdb18..80a0cac0 100644 --- a/fetch/types.go +++ b/fetch/types.go @@ -2,26 +2,11 @@ package fetch import ( core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" - "github.com/gnolang/gno/tm2/pkg/bft/types" + clientTypes "github.com/gnolang/tx-indexer/client/types" "github.com/gnolang/tx-indexer/events" ) -// Storage defines the transaction storage interface -type Storage interface { - // GetLatestHeight returns the latest block height from the storage - GetLatestHeight() (int64, error) - - // SaveLatestHeight saves the latest block height to the storage - SaveLatestHeight(int64) error - - // SaveBlock saves the block to the permanent storage - SaveBlock(block *types.Block) error - - // SaveTx saves the transaction to the permanent storage - SaveTx(tx *types.TxResult) error -} - // Client defines the interface for the node (client) communication type Client interface { // GetLatestBlockNumber returns the latest block height from the chain diff --git a/internal/mock/filters.go b/internal/mock/filters.go new file mode 100644 index 00000000..ddc8b5d2 --- /dev/null +++ b/internal/mock/filters.go @@ -0,0 +1,45 @@ +package mock + +import ( + "github.com/gnolang/tx-indexer/events" +) + +type ( + writeDataFnDelegate func(any) error +) + +type Conn struct { + WriteDataFn writeDataFnDelegate +} + +func (m *Conn) WriteData(data any) error { + if m.WriteDataFn != nil { + return m.WriteDataFn(data) + } + + return nil +} + +type ( + subscribeDelegate func([]events.Type) *events.Subscription + cancelSubscriptionDelegate func(events.SubscriptionID) +) + +type Events struct { + SubscribeFn subscribeDelegate + CancelSubscriptionFn cancelSubscriptionDelegate +} + +func (m *Events) Subscribe(eventTypes []events.Type) *events.Subscription { + if m.SubscribeFn != nil { + return m.SubscribeFn(eventTypes) + } + + return nil +} + +func (m *Events) CancelSubscription(id events.SubscriptionID) { + if m.CancelSubscriptionFn != nil { + m.CancelSubscriptionFn(id) + } +} diff --git a/internal/mock/storage.go b/internal/mock/storage.go new file mode 100644 index 00000000..083c432a --- /dev/null +++ b/internal/mock/storage.go @@ -0,0 +1,100 @@ +package mock + +import ( + "github.com/gnolang/gno/tm2/pkg/bft/types" + + "github.com/gnolang/tx-indexer/storage" +) + +var _ storage.Storage = &Storage{} + +type Storage struct { + GetLatestSavedHeightFn func() (int64, error) + GetWriteBatchFn func() storage.Batch + GetBlockFn func(int64) (*types.Block, error) + GetTxFn func([]byte) (*types.TxResult, error) +} + +func (m *Storage) GetLatestHeight() (int64, error) { + if m.GetLatestSavedHeightFn != nil { + return m.GetLatestSavedHeightFn() + } + + return 0, nil +} + +// GetBlock fetches the block by its number +func (m *Storage) GetBlock(blockNum int64) (*types.Block, error) { + if m.GetBlockFn != nil { + return m.GetBlockFn(blockNum) + } + + panic("not implemented") +} + +// GetTx fetches the tx using its hash +func (m *Storage) GetTx(tx []byte) (*types.TxResult, error) { + if m.GetTxFn != nil { + return m.GetTxFn(tx) + } + + panic("not implemented") +} + +// WriteBatch provides a batch intended to do a write action that +// can be cancelled or committed all at the same time +func (m *Storage) WriteBatch() storage.Batch { + if m.GetWriteBatchFn != nil { + return m.GetWriteBatchFn() + } + + panic("not implemented") +} + +func (m *Storage) Close() error { + return nil +} + +type WriteBatch struct { + SetLatestHeightFn func(int64) error + SetBlockFn func(*types.Block) error + SetTxFn func(*types.TxResult) error +} + +// SetLatestHeight saves the latest block height to the storage +func (mb *WriteBatch) SetLatestHeight(h int64) error { + if mb.SetLatestHeightFn != nil { + return mb.SetLatestHeightFn(h) + } + + return nil +} + +// SetBlock saves the block to the permanent storage +func (mb *WriteBatch) SetBlock(block *types.Block) error { + if mb.SetBlockFn != nil { + return mb.SetBlockFn(block) + } + + return nil +} + +// SetTx saves the transaction to the permanent storage +func (mb *WriteBatch) SetTx(tx *types.TxResult) error { + if mb.SetTxFn != nil { + return mb.SetTxFn(tx) + } + + return nil +} + +// Commit stores all the provided info on the storage and make +// it available for other storage readers +func (mb *WriteBatch) Commit() error { + return nil +} + +// Rollback rollbacks the operation not persisting the provided changes +func (mb *WriteBatch) Rollback() error { + return nil +} diff --git a/serve/filters/manager.go b/serve/filters/manager.go index c5313e40..d5897183 100644 --- a/serve/filters/manager.go +++ b/serve/filters/manager.go @@ -6,17 +6,19 @@ import ( "time" "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/tx-indexer/events" "github.com/gnolang/tx-indexer/serve/conns" "github.com/gnolang/tx-indexer/serve/filters/filter" filterSubscription "github.com/gnolang/tx-indexer/serve/filters/subscription" + "github.com/gnolang/tx-indexer/storage" commonTypes "github.com/gnolang/tx-indexer/types" ) // Manager manages all running filters type Manager struct { ctx context.Context - storage Storage // temporarily unused + storage storage.Storage // temporarily unused events Events filters *filterMap subscriptions *subscriptionMap @@ -26,7 +28,7 @@ type Manager struct { // NewFilterManager creates new filter manager object func NewFilterManager( ctx context.Context, - storage Storage, + storage storage.Storage, events Events, opts ...Option, ) *Manager { diff --git a/serve/filters/manager_test.go b/serve/filters/manager_test.go index cfabce54..7e8314d6 100644 --- a/serve/filters/manager_test.go +++ b/serve/filters/manager_test.go @@ -7,12 +7,13 @@ import ( "time" tm2Types "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/gnolang/tx-indexer/events" + "github.com/gnolang/tx-indexer/internal/mock" "github.com/gnolang/tx-indexer/serve/filters/filter" - "github.com/gnolang/tx-indexer/serve/filters/mocks" "github.com/gnolang/tx-indexer/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) // generateBlocks generates dummy blocks @@ -39,7 +40,7 @@ func Test_BlockFilters(t *testing.T) { filterManager := NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, events.NewManager(), ) @@ -77,7 +78,7 @@ func Test_NewBlockEvents(t *testing.T) { blocks = generateBlocks(t, 10) blockCh = make(chan events.Event) - mockEvents = &mocks.MockEvents{ + mockEvents = &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{ SubCh: blockCh, @@ -89,7 +90,7 @@ func Test_NewBlockEvents(t *testing.T) { // Init filter manager filterManager := NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, mockEvents, ) @@ -163,7 +164,7 @@ func Test_FilterCleanup(t *testing.T) { // Create filter manager filterManager := NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, events.NewManager(), WithCleanupInterval(10*time.Millisecond), ) diff --git a/serve/filters/mocks/mocks.go b/serve/filters/mocks/mocks.go deleted file mode 100644 index b2256665..00000000 --- a/serve/filters/mocks/mocks.go +++ /dev/null @@ -1,74 +0,0 @@ -//go:build testmocks - -package mocks - -import ( - tm2Types "github.com/gnolang/gno/tm2/pkg/bft/types" - "github.com/gnolang/tx-indexer/events" -) - -type ( - writeDataFnDelegate func(any) error -) - -type MockConn struct { - WriteDataFn writeDataFnDelegate -} - -func (m *MockConn) WriteData(data any) error { - if m.WriteDataFn != nil { - return m.WriteDataFn(data) - } - - return nil -} - -type ( - getBlockDelegate func(int64) (*tm2Types.Block, error) - getTxDelegate func([]byte) (*tm2Types.TxResult, error) -) - -type MockStorage struct { - GetBlockFn getBlockDelegate - GetTxFn getTxDelegate -} - -func (m *MockStorage) GetBlock(blockNum int64) (*tm2Types.Block, error) { - if m.GetBlockFn != nil { - return m.GetBlockFn(blockNum) - } - - return nil, nil -} - -func (m *MockStorage) GetTx(hash []byte) (*tm2Types.TxResult, error) { - if m.GetTxFn != nil { - return m.GetTxFn(hash) - } - - return nil, nil -} - -type ( - subscribeDelegate func([]events.Type) *events.Subscription - cancelSubscriptionDelegate func(events.SubscriptionID) -) - -type MockEvents struct { - SubscribeFn subscribeDelegate - CancelSubscriptionFn cancelSubscriptionDelegate -} - -func (m *MockEvents) Subscribe(eventTypes []events.Type) *events.Subscription { - if m.SubscribeFn != nil { - return m.SubscribeFn(eventTypes) - } - - return nil -} - -func (m *MockEvents) CancelSubscription(id events.SubscriptionID) { - if m.CancelSubscriptionFn != nil { - m.CancelSubscriptionFn(id) - } -} diff --git a/serve/filters/subscription/block_test.go b/serve/filters/subscription/block_test.go index 2cdef697..1ef5e9b3 100644 --- a/serve/filters/subscription/block_test.go +++ b/serve/filters/subscription/block_test.go @@ -4,11 +4,12 @@ import ( "testing" "github.com/gnolang/gno/tm2/pkg/bft/types" - "github.com/gnolang/tx-indexer/serve/encode" - "github.com/gnolang/tx-indexer/serve/filters/mocks" - "github.com/gnolang/tx-indexer/serve/spec" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/gnolang/tx-indexer/internal/mock" + "github.com/gnolang/tx-indexer/serve/encode" + "github.com/gnolang/tx-indexer/serve/spec" ) func TestBlockSubscription_WriteResponse(t *testing.T) { @@ -29,7 +30,7 @@ func TestBlockSubscription_WriteResponse(t *testing.T) { expectedBlockResponse := spec.NewJSONSubscribeResponse("", encodedResponse) - mockConn := &mocks.MockConn{ + mockConn := &mock.Conn{ WriteDataFn: func(data any) error { capturedWrite = data diff --git a/serve/filters/types.go b/serve/filters/types.go index b3d6ff6c..f89dcd65 100644 --- a/serve/filters/types.go +++ b/serve/filters/types.go @@ -20,16 +20,6 @@ type Events interface { CancelSubscription(events.SubscriptionID) } -// Storage represents the permanent storage abstraction -// required by the filter manager -type Storage interface { - // GetBlock fetches the block by its number - GetBlock(int64) (*types.Block, error) - - // GetTx fetches the tx using its hash - GetTx([]byte) (*types.TxResult, error) -} - // Filter interface is used for different filter types type Filter interface { // GetType returns the filter type diff --git a/serve/handlers/subs/subs_test.go b/serve/handlers/subs/subs_test.go index 44ef45dc..a62b573b 100644 --- a/serve/handlers/subs/subs_test.go +++ b/serve/handlers/subs/subs_test.go @@ -10,17 +10,18 @@ import ( "github.com/gnolang/gno/tm2/pkg/amino" "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/gnolang/tx-indexer/events" + "github.com/gnolang/tx-indexer/internal/mock" "github.com/gnolang/tx-indexer/serve/conns" "github.com/gnolang/tx-indexer/serve/filters" "github.com/gnolang/tx-indexer/serve/filters/filter" - "github.com/gnolang/tx-indexer/serve/filters/mocks" "github.com/gnolang/tx-indexer/serve/filters/subscription" "github.com/gnolang/tx-indexer/serve/metadata" "github.com/gnolang/tx-indexer/serve/spec" indexerTypes "github.com/gnolang/tx-indexer/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) // generateBlocks generates dummy blocks @@ -63,8 +64,8 @@ func TestNewBlockFilter_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, - &mocks.MockEvents{ + &mock.Storage{}, + &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{} }, @@ -132,8 +133,8 @@ func TestUninstallFilter_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, - &mocks.MockEvents{ + &mock.Storage{}, + &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{} }, @@ -156,8 +157,8 @@ func TestUninstallFilter_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, - &mocks.MockEvents{ + &mock.Storage{}, + &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{} }, @@ -236,7 +237,7 @@ func TestGetFilterChanges_Valid(t *testing.T) { eventsCh = make(chan events.Event) - mockEvents = &mocks.MockEvents{ + mockEvents = &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{ ID: events.SubscriptionID(1), @@ -248,7 +249,7 @@ func TestGetFilterChanges_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, mockEvents, ) @@ -358,7 +359,7 @@ func TestSubscribe_InvalidParams(t *testing.T) { getWSConnectionFn: func(wsID string) conns.WSConnection { require.Equal(t, id, wsID) - return &mocks.MockConn{} // connection found + return &mock.Conn{} // connection found }, } ) @@ -397,7 +398,7 @@ func TestSubscribe_Valid(t *testing.T) { WebSocketID: &connID, } - mockEvents = &mocks.MockEvents{ + mockEvents = &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{ ID: events.SubscriptionID(1), @@ -407,7 +408,7 @@ func TestSubscribe_Valid(t *testing.T) { } writtenData = make([]any, 0) - mockConn = &mocks.MockConn{ + mockConn = &mock.Conn{ WriteDataFn: func(data any) error { defer wg.Done() writtenData = append(writtenData, data) @@ -426,7 +427,7 @@ func TestSubscribe_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, mockEvents, ) @@ -578,13 +579,13 @@ func TestUnsubscribe_Valid(t *testing.T) { WebSocketID: &connID, } - mockEvents = &mocks.MockEvents{ + mockEvents = &mock.Events{ SubscribeFn: func(_ []events.Type) *events.Subscription { return &events.Subscription{} }, } - mockConn = &mocks.MockConn{} + mockConn = &mock.Conn{} mockConnFetcher = &mockConnectionFetcher{ getWSConnectionFn: func(id string) conns.WSConnection { @@ -597,7 +598,7 @@ func TestUnsubscribe_Valid(t *testing.T) { fm := filters.NewFilterManager( context.Background(), - &mocks.MockStorage{}, + &mock.Storage{}, mockEvents, ) diff --git a/serve/jsonrpc.go b/serve/jsonrpc.go index 9a42f89f..31a32735 100644 --- a/serve/jsonrpc.go +++ b/serve/jsonrpc.go @@ -9,6 +9,13 @@ import ( "net/http" "time" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + "github.com/google/uuid" + "github.com/olahol/melody" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "github.com/gnolang/tx-indexer/serve/conns" "github.com/gnolang/tx-indexer/serve/conns/wsconn" "github.com/gnolang/tx-indexer/serve/filters" @@ -20,12 +27,7 @@ import ( "github.com/gnolang/tx-indexer/serve/writer" httpWriter "github.com/gnolang/tx-indexer/serve/writer/http" wsWriter "github.com/gnolang/tx-indexer/serve/writer/ws" - "github.com/go-chi/chi/v5" - "github.com/go-chi/chi/v5/middleware" - "github.com/google/uuid" - "github.com/olahol/melody" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" + "github.com/gnolang/tx-indexer/storage" ) const ( @@ -188,7 +190,7 @@ func (j *JSONRPC) RegisterBlockEndpoints(db block.Storage) { ) } -func (j *JSONRPC) RegisterSubEndpoints(db filters.Storage) { +func (j *JSONRPC) RegisterSubEndpoints(db storage.Storage) { fm := filters.NewFilterManager(context.Background(), db, j.events) subsHandler := subs.NewHandler( diff --git a/storage/pebble.go b/storage/pebble.go new file mode 100644 index 00000000..d651138e --- /dev/null +++ b/storage/pebble.go @@ -0,0 +1,136 @@ +package storage + +import ( + "errors" + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/gnolang/gno/tm2/pkg/bft/types" + + storageErrors "github.com/gnolang/tx-indexer/storage/errors" +) + +var _ Storage = &Pebble{} + +// Pebble is the instance of an embedded storage +type Pebble struct { + db *pebble.DB +} + +// NewPebble creates a new storage instance at the given path +func NewPebble(path string) (*Pebble, error) { + db, err := pebble.Open(path, &pebble.Options{ + // TODO: EventListener + // Start with defaults + }) + if err != nil { + return nil, fmt.Errorf("unable to create DB, %w", err) + } + + return &Pebble{ + db: db, + }, nil +} + +// GetLatestHeight fetches the latest saved height from storage +func (s *Pebble) GetLatestHeight() (int64, error) { + height, c, err := s.db.Get(latestHeightKey) + if errors.Is(err, pebble.ErrNotFound) { + return 0, storageErrors.ErrNotFound + } + + if err != nil { + return 0, err + } + + defer c.Close() + + return decodeInt64(height), nil +} + +// GetBlock fetches the specified block from storage, if any +func (s *Pebble) GetBlock(blockNum int64) (*types.Block, error) { + block, c, err := s.db.Get(append(blockPrefix, encodeInt64(blockNum)...)) + if errors.Is(err, pebble.ErrNotFound) { + return nil, storageErrors.ErrNotFound + } + + if err != nil { + return nil, err + } + + defer c.Close() + + return decodeBlock(block) +} + +// GetTx fetches the specified tx result from storage, if any +func (s *Pebble) GetTx(txHash []byte) (*types.TxResult, error) { + tx, c, err := s.db.Get(append(txResultKey, txHash...)) + if errors.Is(err, pebble.ErrNotFound) { + return nil, storageErrors.ErrNotFound + } + + if err != nil { + return nil, err + } + + defer c.Close() + + return decodeTx(tx) +} + +func (s *Pebble) WriteBatch() Batch { + return &PebbleBatch{ + b: s.db.NewBatch(), + } +} + +func (s *Pebble) Close() error { + return s.db.Close() +} + +var _ Batch = &PebbleBatch{} + +type PebbleBatch struct { + b *pebble.Batch +} + +func (b *PebbleBatch) SetLatestHeight(h int64) error { + return b.b.Set(latestHeightKey, encodeInt64(h), pebble.NoSync) +} + +func (b *PebbleBatch) SetBlock(block *types.Block) error { + eb, err := encodeBlock(block) + if err != nil { + return err + } + + return b.b.Set( + append(blockPrefix, encodeInt64(block.Height)...), + eb, + pebble.NoSync, + ) +} + +func (b *PebbleBatch) SetTx(tx *types.TxResult) error { + encodedTx, err := encodeTx(tx) + if err != nil { + return err + } + + return b.b.Set( + append(txResultKey, tx.Tx.Hash()...), + encodedTx, + pebble.NoSync, + ) +} + +func (b *PebbleBatch) Commit() error { + return b.b.Commit(pebble.Sync) +} + +// Rollback closes the pebble batch without persisting any data. error output is always nil. +func (b *PebbleBatch) Rollback() error { + return b.b.Close() +} diff --git a/storage/pebble/pebble.go b/storage/pebble/pebble.go deleted file mode 100644 index c1156f6c..00000000 --- a/storage/pebble/pebble.go +++ /dev/null @@ -1,51 +0,0 @@ -package pebble - -import ( - "errors" - - "github.com/cockroachdb/pebble" - storageErrors "github.com/gnolang/tx-indexer/storage/errors" -) - -type Storage struct { - db *pebble.DB -} - -// NewDB initializes a new pebble DB instance at the given path -func NewDB(path string) (*Storage, error) { - db, err := pebble.Open(path, &pebble.Options{}) - if err != nil { - return nil, err - } - - return &Storage{ - db: db, - }, nil -} - -// Set stores a value for the given key -func (p *Storage) Set(key, value []byte) error { - return p.db.Set(key, value, pebble.Sync) -} - -// Get retrieves the value for a given key -func (p *Storage) Get(key []byte) ([]byte, error) { - value, closer, err := p.db.Get(key) - if errors.Is(err, pebble.ErrNotFound) { - // Wrap the error - return nil, storageErrors.ErrNotFound - } - - if err != nil { - return nil, err - } - - defer closer.Close() - - return value, nil -} - -// Close closes the database connection -func (p *Storage) Close() error { - return p.db.Close() -} diff --git a/storage/pebble/pebble_test.go b/storage/pebble/pebble_test.go deleted file mode 100644 index 26f54aff..00000000 --- a/storage/pebble/pebble_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package pebble - -import ( - "crypto/rand" - "testing" - - storageErrors "github.com/gnolang/tx-indexer/storage/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// generateRandomBytes generates random bytes -func generateRandomBytes(t *testing.T, length int) []byte { - t.Helper() - - bytes := make([]byte, length) - _, err := rand.Read(bytes) - require.NoError(t, err) - - return bytes -} - -// generateRandomPairs generates random key value pairs -func generateRandomPairs(t *testing.T, count int) map[string][]byte { - t.Helper() - - pairs := make(map[string][]byte, count) - - for i := 0; i < count; i++ { - key := generateRandomBytes(t, 8) - value := generateRandomBytes(t, 32) - - pairs[string(key)] = value - } - - return pairs -} - -func TestPebble_GetMissingItem(t *testing.T) { - t.Parallel() - - // Initialize the pebble DB - store, err := NewDB(t.TempDir()) - require.NoError(t, err) - - defer store.Close() - - // Fetch a non-existent value - _, err = store.Get([]byte("non_existent_key")) - if !assert.ErrorIs(t, err, storageErrors.ErrNotFound) { - t.Errorf("Expected error not found when getting non-existent key") - } -} - -func TestPebble_WriteRead(t *testing.T) { - t.Parallel() - - // Initialize the pebble DB - store, err := NewDB(t.TempDir()) - require.NoError(t, err) - - defer store.Close() - - pairs := generateRandomPairs(t, 50) - - for key, value := range pairs { - // Set the key - require.NoError(t, store.Set([]byte(key), value)) - - // Get the value - retrievedValue, err := store.Get([]byte(key)) - require.NoError(t, err) - - assert.Equal(t, value, retrievedValue) - } -} diff --git a/storage/storage_test.go b/storage/pebble_test.go similarity index 83% rename from storage/storage_test.go rename to storage/pebble_test.go index 436106c7..5242d0a1 100644 --- a/storage/storage_test.go +++ b/storage/pebble_test.go @@ -7,15 +7,16 @@ import ( "github.com/gnolang/gno/tm2/pkg/amino" "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/std" - storageErrors "github.com/gnolang/tx-indexer/storage/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + storageErrors "github.com/gnolang/tx-indexer/storage/errors" ) func TestStorage_New(t *testing.T) { t.Parallel() - s, err := New(t.TempDir()) + s, err := NewPebble(t.TempDir()) require.NotNil(t, s) assert.NoError(t, err) @@ -25,7 +26,7 @@ func TestStorage_New(t *testing.T) { func TestStorage_LatestHeight(t *testing.T) { t.Parallel() - s, err := New(t.TempDir()) + s, err := NewPebble(t.TempDir()) require.NoError(t, err) defer func() { @@ -39,7 +40,10 @@ func TestStorage_LatestHeight(t *testing.T) { // Save the latest height and grab it for i := int64(0); i < 100; i++ { - require.NoError(t, s.SaveLatestHeight(i)) + b := s.WriteBatch() + + require.NoError(t, b.SetLatestHeight(i)) + require.NoError(t, b.Commit()) latest, err = s.GetLatestHeight() @@ -51,7 +55,7 @@ func TestStorage_LatestHeight(t *testing.T) { func TestStorage_Block(t *testing.T) { t.Parallel() - s, err := New(t.TempDir()) + s, err := NewPebble(t.TempDir()) require.NoError(t, err) defer func() { @@ -61,12 +65,16 @@ func TestStorage_Block(t *testing.T) { blocks := generateRandomBlocks(t, 100) // Save the blocks and fetch them + b := s.WriteBatch() for _, block := range blocks { - assert.NoError(t, s.SaveBlock(block)) + assert.NoError(t, b.SetBlock(block)) + } + + require.NoError(t, b.Commit()) + for _, block := range blocks { savedBlock, err := s.GetBlock(block.Height) require.NoError(t, err) - assert.Equal(t, block, savedBlock) } } @@ -74,7 +82,7 @@ func TestStorage_Block(t *testing.T) { func TestStorage_Tx(t *testing.T) { t.Parallel() - s, err := New(t.TempDir()) + s, err := NewPebble(t.TempDir()) require.NoError(t, err) defer func() { @@ -83,13 +91,18 @@ func TestStorage_Tx(t *testing.T) { txs := generateRandomTxs(t, 100) + wb := s.WriteBatch() + // Save the txs and fetch them for _, tx := range txs { - assert.NoError(t, s.SaveTx(tx)) + assert.NoError(t, wb.SetTx(tx)) + } + require.NoError(t, wb.Commit()) + + for _, tx := range txs { savedTx, err := s.GetTx(tx.Tx.Hash()) require.NoError(t, err) - assert.Equal(t, tx, savedTx) } } diff --git a/storage/storage.go b/storage/storage.go deleted file mode 100644 index 6a4acac4..00000000 --- a/storage/storage.go +++ /dev/null @@ -1,90 +0,0 @@ -package storage - -import ( - "fmt" - - "github.com/gnolang/gno/tm2/pkg/bft/types" - "github.com/gnolang/tx-indexer/storage/pebble" -) - -// Storage is the instance of an embedded storage -type Storage struct { - db *pebble.Storage -} - -// New creates a new storage instance at the given path -func New(path string) (*Storage, error) { - db, err := pebble.NewDB(path) - if err != nil { - return nil, fmt.Errorf("unable to create DB, %w", err) - } - - return &Storage{ - db: db, - }, nil -} - -func (s *Storage) Close() error { - return s.db.Close() -} - -// GetLatestHeight fetches the latest saved height from storage -func (s *Storage) GetLatestHeight() (int64, error) { - height, err := s.db.Get(latestHeightKey) - if err != nil { - return 0, err - } - - return decodeInt64(height), nil -} - -// SaveLatestHeight saves the latest height to storage -func (s *Storage) SaveLatestHeight(height int64) error { - return s.db.Set(latestHeightKey, encodeInt64(height)) -} - -// GetBlock fetches the specified block from storage, if any -func (s *Storage) GetBlock(blockNum int64) (*types.Block, error) { - block, err := s.db.Get(append(blockPrefix, encodeInt64(blockNum)...)) - if err != nil { - return nil, err - } - - return decodeBlock(block) -} - -// SaveBlock saves the specified block to storage -func (s *Storage) SaveBlock(block *types.Block) error { - encodedBlock, err := encodeBlock(block) - if err != nil { - return err - } - - return s.db.Set( - append(blockPrefix, encodeInt64(block.Height)...), - encodedBlock, - ) -} - -// GetTx fetches the specified tx result from storage, if any -func (s *Storage) GetTx(txHash []byte) (*types.TxResult, error) { - tx, err := s.db.Get(append(txResultKey, txHash...)) - if err != nil { - return nil, err - } - - return decodeTx(tx) -} - -// SaveTx saves the specified tx result to storage -func (s *Storage) SaveTx(tx *types.TxResult) error { - encodedTx, err := encodeTx(tx) - if err != nil { - return err - } - - return s.db.Set( - append(txResultKey, tx.Tx.Hash()...), - encodedTx, - ) -} diff --git a/storage/types.go b/storage/types.go new file mode 100644 index 00000000..ead829be --- /dev/null +++ b/storage/types.go @@ -0,0 +1,51 @@ +package storage + +import ( + "io" + + "github.com/gnolang/gno/tm2/pkg/bft/types" +) + +// Storage represents the permanent storage abstraction +// for reading and writing operations +type Storage interface { + Reader + Writer +} + +// Reader defines the transaction storage interface for read methods +type Reader interface { + io.Closer + // GetLatestHeight returns the latest block height from the storage + GetLatestHeight() (int64, error) + + // GetBlock fetches the block by its number + GetBlock(int64) (*types.Block, error) + + // GetTx fetches the tx using its hash + GetTx([]byte) (*types.TxResult, error) +} + +// Writer defines the transaction storage interface for write methods +type Writer interface { + io.Closer + // WriteBatch provides a batch intended to do a write action that + // can be cancelled or committed all at the same time + WriteBatch() Batch +} + +type Batch interface { + // SetLatestHeight saves the latest block height to the storage + SetLatestHeight(int64) error + // SetBlock saves the block to the permanent storage + SetBlock(block *types.Block) error + // SetTx saves the transaction to the permanent storage + SetTx(tx *types.TxResult) error + + // Commit stores all the provided info on the storage and make + // it available for other storage readers + Commit() error + + // Rollback rollbacks the operation not persisting the provided changes + Rollback() error +}