From 2ba226c1ce48f7face080a4469c47c07c81ad70e Mon Sep 17 00:00:00 2001 From: Jesse Chung Date: Wed, 24 Nov 2021 16:41:48 +0900 Subject: [PATCH] feat: add snappy compressor db for indexer (#5) * feat: add snappy compressor db for indexer * deps: tidy * fix: nil getter, test * feat: compat mode for snappydb * fix: better handling for snappy error --- bin/v0.34.x/db/snappy/snappy_batch.go | 38 ++++++++ bin/v0.34.x/db/snappy/snappy_db.go | 115 ++++++++++++++++++++++++ bin/v0.34.x/db/snappy/snappy_db_test.go | 92 +++++++++++++++++++ bin/v0.34.x/go.mod | 2 +- bin/v0.34.x/indexer/indexer.go | 5 +- bin/v0.34.x/indexer/tx/tx_test.go | 1 - 6 files changed, 250 insertions(+), 3 deletions(-) create mode 100644 bin/v0.34.x/db/snappy/snappy_batch.go create mode 100644 bin/v0.34.x/db/snappy/snappy_db.go create mode 100644 bin/v0.34.x/db/snappy/snappy_db_test.go diff --git a/bin/v0.34.x/db/snappy/snappy_batch.go b/bin/v0.34.x/db/snappy/snappy_batch.go new file mode 100644 index 0000000..21054a9 --- /dev/null +++ b/bin/v0.34.x/db/snappy/snappy_batch.go @@ -0,0 +1,38 @@ +package snappy + +import ( + "github.com/golang/snappy" + tmdb "github.com/tendermint/tm-db" +) + +var _ tmdb.Batch = (*SnappyBatch)(nil) + +type SnappyBatch struct { + batch tmdb.Batch +} + +func NewSnappyBatch(batch tmdb.Batch) *SnappyBatch { + return &SnappyBatch{ + batch: batch, + } +} + +func (s *SnappyBatch) Set(key, value []byte) error { + return s.batch.Set(key, snappy.Encode(nil, value)) +} + +func (s *SnappyBatch) Delete(key []byte) error { + return s.batch.Delete(key) +} + +func (s *SnappyBatch) Write() error { + return s.batch.Write() +} + +func (s *SnappyBatch) WriteSync() error { + return s.batch.WriteSync() +} + +func (s *SnappyBatch) Close() error { + return s.batch.Close() +} diff --git a/bin/v0.34.x/db/snappy/snappy_db.go b/bin/v0.34.x/db/snappy/snappy_db.go new file mode 100644 index 0000000..b76030c --- /dev/null +++ b/bin/v0.34.x/db/snappy/snappy_db.go @@ -0,0 +1,115 @@ +package snappy + +import ( + "encoding/json" + "github.com/golang/snappy" + "github.com/pkg/errors" + tmdb "github.com/tendermint/tm-db" + "sync" +) + +const ( + CompatModeEnabled = iota + CompatModeDisabled +) + +var ( + errIteratorNotSupported = errors.New("iterator unsupported") + errUnknownData = errors.New("unknown format") +) + +var _ tmdb.DB = (*SnappyDB)(nil) + +// SnappyDB implements a tmdb.DB overlay with snappy compression/decompression +// Iterator is NOT supported -- main purpose of this library is to support indexer.db, +// which never makes use of iterators anyway +// NOTE: implement when needed +// NOTE2: monitor mem pressure, optimize by pre-allocating dst buf when there is bottleneck +type SnappyDB struct { + db tmdb.DB + mtx *sync.Mutex + compatMode int +} + +func NewSnappyDB(db tmdb.DB, compatMode int) *SnappyDB { + return &SnappyDB{ + mtx: new(sync.Mutex), + db: db, + compatMode: compatMode, + } +} + +func (s *SnappyDB) Get(key []byte) ([]byte, error) { + if item, err := s.db.Get(key); err != nil { + return nil, err + } else if item == nil && err == nil { + return nil, nil + } else { + decoded, decodeErr := snappy.Decode(nil, item) + + // if snappy decode fails, try to replace the underlying + // only recover & replace when the blob is a valid json + if s.compatMode == CompatModeEnabled { + if decodeErr != nil { + if json.Valid(item) { + s.mtx.Lock() + // run item by Set() to encode & replace + _ = s.db.Set(key, item) + defer s.mtx.Unlock() + + return item, nil + } else { + return nil, errUnknownData + } + } else { + return decoded, nil + } + } + + return decoded, decodeErr + } +} + +func (s *SnappyDB) Has(key []byte) (bool, error) { + return s.db.Has(key) +} + +func (s *SnappyDB) Set(key []byte, value []byte) error { + return s.db.Set(key, snappy.Encode(nil, value)) +} + +func (s *SnappyDB) SetSync(key []byte, value []byte) error { + return s.Set(key, value) +} + +func (s *SnappyDB) Delete(key []byte) error { + return s.db.Delete(key) +} + +func (s *SnappyDB) DeleteSync(key []byte) error { + return s.Delete(key) +} + +func (s *SnappyDB) Iterator(start, end []byte) (tmdb.Iterator, error) { + return nil, errIteratorNotSupported +} + +func (s *SnappyDB) ReverseIterator(start, end []byte) (tmdb.Iterator, error) { + return nil, errIteratorNotSupported +} + +func (s *SnappyDB) Close() error { + return s.db.Close() +} + +func (s *SnappyDB) NewBatch() tmdb.Batch { + return NewSnappyBatch(s.db.NewBatch()) +} + +func (s *SnappyDB) Print() error { + return s.db.Print() +} + +func (s *SnappyDB) Stats() map[string]string { + return s.db.Stats() +} diff --git a/bin/v0.34.x/db/snappy/snappy_db_test.go b/bin/v0.34.x/db/snappy/snappy_db_test.go new file mode 100644 index 0000000..1ea4c09 --- /dev/null +++ b/bin/v0.34.x/db/snappy/snappy_db_test.go @@ -0,0 +1,92 @@ +package snappy + +import ( + "github.com/stretchr/testify/assert" + tmjson "github.com/tendermint/tendermint/libs/json" + tendermint "github.com/tendermint/tendermint/types" + db "github.com/tendermint/tm-db" + "io/ioutil" + "os" + "testing" +) + +func TestSnappyDB(t *testing.T) { + snappy := NewSnappyDB(db.NewMemDB(), CompatModeEnabled) + + assert.Nil(t, snappy.Set([]byte("test"), []byte("testValue"))) + + var v []byte + var err error + + // nil buffer test + v, err = snappy.Get([]byte("non-existing")) + assert.Nil(t, v) + assert.Nil(t, err) + + v, err = snappy.Get([]byte("test")) + assert.Nil(t, err) + assert.Equal(t, []byte("testValue"), v) + + assert.Nil(t, snappy.Delete([]byte("test"))) + v, err = snappy.Get([]byte("test")) + assert.Nil(t, v) + assert.Nil(t, err) + + // iterator is not supported + var it db.Iterator + it, err = snappy.Iterator([]byte("start"), []byte("end")) + assert.Nil(t, it) + assert.Equal(t, errIteratorNotSupported, err) + + it, err = snappy.ReverseIterator([]byte("start"), []byte("end")) + assert.Nil(t, it) + assert.Equal(t, errIteratorNotSupported, err) + + // batched store is compressed as well + var batch db.Batch + batch = snappy.NewBatch() + + assert.Nil(t, batch.Set([]byte("key"), []byte("batchedValue"))) + assert.Nil(t, batch.Write()) + assert.Nil(t, batch.Close()) + + v, err = snappy.Get([]byte("key")) + assert.Equal(t, []byte("batchedValue"), v) + + batch = snappy.NewBatch() + assert.Nil(t, batch.Delete([]byte("key"))) + assert.Nil(t, batch.Write()) + assert.Nil(t, batch.Close()) + + v, err = snappy.Get([]byte("key")) + assert.Nil(t, v) + assert.Nil(t, err) +} + +func TestSnappyDBCompat(t *testing.T) { + mdb := db.NewMemDB() + testKey := []byte("testKey") + + nocompat := NewSnappyDB(mdb, CompatModeDisabled) + indexSampleTx(nocompat, testKey) + + nocompatResult, _ := nocompat.Get(testKey) + + compat := NewSnappyDB(mdb, CompatModeEnabled) + compatResult, _ := compat.Get(testKey) + assert.Equal(t, nocompatResult, compatResult) + + nocompatResult2, _ := nocompat.Get(testKey) + assert.Equal(t, compatResult, nocompatResult2) +} + +func indexSampleTx(mdb db.DB, key []byte) { + block := &tendermint.Block{} + blockFile, _ := os.Open("../../indexer/fixtures/block_4814775.json") + blockJSON, _ := ioutil.ReadAll(blockFile) + if err := tmjson.Unmarshal(blockJSON, block); err != nil { + panic(err) + } + + _ = mdb.Set(key, blockJSON) +} diff --git a/bin/v0.34.x/go.mod b/bin/v0.34.x/go.mod index bcb30c4..02b783e 100644 --- a/bin/v0.34.x/go.mod +++ b/bin/v0.34.x/go.mod @@ -6,6 +6,7 @@ require ( github.com/cosmos/cosmos-sdk v0.44.2 github.com/cosmos/iavl v0.17.1 github.com/gogo/protobuf v1.3.3 + github.com/golang/snappy v0.0.4 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 @@ -52,7 +53,6 @@ require ( github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gogo/gateway v1.1.0 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.0.0 // indirect github.com/google/orderedcode v0.0.1 // indirect github.com/gorilla/handlers v1.5.1 // indirect diff --git a/bin/v0.34.x/indexer/indexer.go b/bin/v0.34.x/indexer/indexer.go index 437550a..fed8a83 100644 --- a/bin/v0.34.x/indexer/indexer.go +++ b/bin/v0.34.x/indexer/indexer.go @@ -5,6 +5,7 @@ import ( "github.com/gorilla/mux" tm "github.com/tendermint/tendermint/types" tmdb "github.com/tendermint/tm-db" + "github.com/terra-money/mantlemint-provider-v0.34.x/db/snappy" "github.com/terra-money/mantlemint-provider-v0.34.x/mantlemint" "net/http" "time" @@ -23,8 +24,10 @@ func NewIndexer(dbName, path string) (*Indexer, error) { return nil, indexerDBError } + indexerDBCompressed := snappy.NewSnappyDB(indexerDB, snappy.CompatModeEnabled) + return &Indexer{ - db: indexerDB, + db: indexerDBCompressed, indexerTags: []string{}, indexers: []IndexFunc{}, }, nil diff --git a/bin/v0.34.x/indexer/tx/tx_test.go b/bin/v0.34.x/indexer/tx/tx_test.go index 8413466..4f997f4 100644 --- a/bin/v0.34.x/indexer/tx/tx_test.go +++ b/bin/v0.34.x/indexer/tx/tx_test.go @@ -49,5 +49,4 @@ func TestIndexTx(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, txns) fmt.Println(string(txns)) - }