Skip to content

Commit

Permalink
feat: add snappy compressor db for indexer (#5)
Browse files Browse the repository at this point in the history
* feat: add snappy compressor db for indexer

* deps: tidy

* fix: nil getter, test

* feat: compat mode for snappydb

* fix: better handling for snappy error
  • Loading branch information
kjessec authored Nov 24, 2021
1 parent a2d6343 commit 2ba226c
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 3 deletions.
38 changes: 38 additions & 0 deletions bin/v0.34.x/db/snappy/snappy_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package snappy

import (
"github.com/golang/snappy"
tmdb "github.com/tendermint/tm-db"
)

var _ tmdb.Batch = (*SnappyBatch)(nil)

type SnappyBatch struct {
batch tmdb.Batch
}

func NewSnappyBatch(batch tmdb.Batch) *SnappyBatch {
return &SnappyBatch{
batch: batch,
}
}

func (s *SnappyBatch) Set(key, value []byte) error {
return s.batch.Set(key, snappy.Encode(nil, value))
}

func (s *SnappyBatch) Delete(key []byte) error {
return s.batch.Delete(key)
}

func (s *SnappyBatch) Write() error {
return s.batch.Write()
}

func (s *SnappyBatch) WriteSync() error {
return s.batch.WriteSync()
}

func (s *SnappyBatch) Close() error {
return s.batch.Close()
}
115 changes: 115 additions & 0 deletions bin/v0.34.x/db/snappy/snappy_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package snappy

import (
"encoding/json"
"github.com/golang/snappy"
"github.com/pkg/errors"
tmdb "github.com/tendermint/tm-db"
"sync"
)

const (
CompatModeEnabled = iota
CompatModeDisabled
)

var (
errIteratorNotSupported = errors.New("iterator unsupported")
errUnknownData = errors.New("unknown format")
)

var _ tmdb.DB = (*SnappyDB)(nil)

// SnappyDB implements a tmdb.DB overlay with snappy compression/decompression
// Iterator is NOT supported -- main purpose of this library is to support indexer.db,
// which never makes use of iterators anyway
// NOTE: implement when needed
// NOTE2: monitor mem pressure, optimize by pre-allocating dst buf when there is bottleneck
type SnappyDB struct {
db tmdb.DB
mtx *sync.Mutex
compatMode int
}

func NewSnappyDB(db tmdb.DB, compatMode int) *SnappyDB {
return &SnappyDB{
mtx: new(sync.Mutex),
db: db,
compatMode: compatMode,
}
}

func (s *SnappyDB) Get(key []byte) ([]byte, error) {
if item, err := s.db.Get(key); err != nil {
return nil, err
} else if item == nil && err == nil {
return nil, nil
} else {
decoded, decodeErr := snappy.Decode(nil, item)

// if snappy decode fails, try to replace the underlying
// only recover & replace when the blob is a valid json
if s.compatMode == CompatModeEnabled {
if decodeErr != nil {
if json.Valid(item) {
s.mtx.Lock()
// run item by Set() to encode & replace
_ = s.db.Set(key, item)
defer s.mtx.Unlock()

return item, nil
} else {
return nil, errUnknownData
}
} else {
return decoded, nil
}
}

return decoded, decodeErr
}
}

func (s *SnappyDB) Has(key []byte) (bool, error) {
return s.db.Has(key)
}

func (s *SnappyDB) Set(key []byte, value []byte) error {
return s.db.Set(key, snappy.Encode(nil, value))
}

func (s *SnappyDB) SetSync(key []byte, value []byte) error {
return s.Set(key, value)
}

func (s *SnappyDB) Delete(key []byte) error {
return s.db.Delete(key)
}

func (s *SnappyDB) DeleteSync(key []byte) error {
return s.Delete(key)
}

func (s *SnappyDB) Iterator(start, end []byte) (tmdb.Iterator, error) {
return nil, errIteratorNotSupported
}

func (s *SnappyDB) ReverseIterator(start, end []byte) (tmdb.Iterator, error) {
return nil, errIteratorNotSupported
}

func (s *SnappyDB) Close() error {
return s.db.Close()
}

func (s *SnappyDB) NewBatch() tmdb.Batch {
return NewSnappyBatch(s.db.NewBatch())
}

func (s *SnappyDB) Print() error {
return s.db.Print()
}

func (s *SnappyDB) Stats() map[string]string {
return s.db.Stats()
}
92 changes: 92 additions & 0 deletions bin/v0.34.x/db/snappy/snappy_db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package snappy

import (
"github.com/stretchr/testify/assert"
tmjson "github.com/tendermint/tendermint/libs/json"
tendermint "github.com/tendermint/tendermint/types"
db "github.com/tendermint/tm-db"
"io/ioutil"
"os"
"testing"
)

func TestSnappyDB(t *testing.T) {
snappy := NewSnappyDB(db.NewMemDB(), CompatModeEnabled)

assert.Nil(t, snappy.Set([]byte("test"), []byte("testValue")))

var v []byte
var err error

// nil buffer test
v, err = snappy.Get([]byte("non-existing"))
assert.Nil(t, v)
assert.Nil(t, err)

v, err = snappy.Get([]byte("test"))
assert.Nil(t, err)
assert.Equal(t, []byte("testValue"), v)

assert.Nil(t, snappy.Delete([]byte("test")))
v, err = snappy.Get([]byte("test"))
assert.Nil(t, v)
assert.Nil(t, err)

// iterator is not supported
var it db.Iterator
it, err = snappy.Iterator([]byte("start"), []byte("end"))
assert.Nil(t, it)
assert.Equal(t, errIteratorNotSupported, err)

it, err = snappy.ReverseIterator([]byte("start"), []byte("end"))
assert.Nil(t, it)
assert.Equal(t, errIteratorNotSupported, err)

// batched store is compressed as well
var batch db.Batch
batch = snappy.NewBatch()

assert.Nil(t, batch.Set([]byte("key"), []byte("batchedValue")))
assert.Nil(t, batch.Write())
assert.Nil(t, batch.Close())

v, err = snappy.Get([]byte("key"))
assert.Equal(t, []byte("batchedValue"), v)

batch = snappy.NewBatch()
assert.Nil(t, batch.Delete([]byte("key")))
assert.Nil(t, batch.Write())
assert.Nil(t, batch.Close())

v, err = snappy.Get([]byte("key"))
assert.Nil(t, v)
assert.Nil(t, err)
}

func TestSnappyDBCompat(t *testing.T) {
mdb := db.NewMemDB()
testKey := []byte("testKey")

nocompat := NewSnappyDB(mdb, CompatModeDisabled)
indexSampleTx(nocompat, testKey)

nocompatResult, _ := nocompat.Get(testKey)

compat := NewSnappyDB(mdb, CompatModeEnabled)
compatResult, _ := compat.Get(testKey)
assert.Equal(t, nocompatResult, compatResult)

nocompatResult2, _ := nocompat.Get(testKey)
assert.Equal(t, compatResult, nocompatResult2)
}

func indexSampleTx(mdb db.DB, key []byte) {
block := &tendermint.Block{}
blockFile, _ := os.Open("../../indexer/fixtures/block_4814775.json")
blockJSON, _ := ioutil.ReadAll(blockFile)
if err := tmjson.Unmarshal(blockJSON, block); err != nil {
panic(err)
}

_ = mdb.Set(key, blockJSON)
}
2 changes: 1 addition & 1 deletion bin/v0.34.x/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/cosmos/cosmos-sdk v0.44.2
github.com/cosmos/iavl v0.17.1
github.com/gogo/protobuf v1.3.3
github.com/golang/snappy v0.0.4
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
Expand Down Expand Up @@ -52,7 +53,6 @@ require (
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/gogo/gateway v1.1.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/google/orderedcode v0.0.1 // indirect
github.com/gorilla/handlers v1.5.1 // indirect
Expand Down
5 changes: 4 additions & 1 deletion bin/v0.34.x/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/gorilla/mux"
tm "github.com/tendermint/tendermint/types"
tmdb "github.com/tendermint/tm-db"
"github.com/terra-money/mantlemint-provider-v0.34.x/db/snappy"
"github.com/terra-money/mantlemint-provider-v0.34.x/mantlemint"
"net/http"
"time"
Expand All @@ -23,8 +24,10 @@ func NewIndexer(dbName, path string) (*Indexer, error) {
return nil, indexerDBError
}

indexerDBCompressed := snappy.NewSnappyDB(indexerDB, snappy.CompatModeEnabled)

return &Indexer{
db: indexerDB,
db: indexerDBCompressed,
indexerTags: []string{},
indexers: []IndexFunc{},
}, nil
Expand Down
1 change: 0 additions & 1 deletion bin/v0.34.x/indexer/tx/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,4 @@ func TestIndexTx(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, txns)
fmt.Println(string(txns))

}

0 comments on commit 2ba226c

Please sign in to comment.