Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADR-038 Part 1: WriteListener, listen.KVStore, MultiStore and KVStore updates #8551

Merged
merged 15 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (x/upgrade) [\#8743](https://github.com/cosmos/cosmos-sdk/pull/8743) Add tracking module versions as per ADR-041
* (types) [\#8962](https://github.com/cosmos/cosmos-sdk/issues/8962) Add `Abs()` method to `sdk.Int`.
* (x/bank) [\#8950](https://github.com/cosmos/cosmos-sdk/pull/8950) Improve efficiency on supply updates.
* (store) [\#8012](https://github.com/cosmos/cosmos-sdk/pull/8012) Implementation of ADR-038 WriteListener and listen.KVStore

### Bug Fixes

Expand Down
37 changes: 22 additions & 15 deletions docs/architecture/adr-038-state-listening.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type WriteListener interface {
// if value is nil then it was deleted
// storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores
// set bool indicates if it was a set; true: set, false: delete
OnWrite(storeKey types.StoreKey, set bool, key []byte, value []byte)
OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error
}
```

Expand Down Expand Up @@ -72,15 +72,20 @@ func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryMarshaler) *StoreKVP
}

// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs
func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, set bool, key []byte, value []byte) {
func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte, delete bool) error error {
kvPair := new(types.StoreKVPair)
kvPair.StoreKey = storeKey.Name()
kvPair.Set = set
kvPair.Delete = Delete
kvPair.Key = key
kvPair.Value = value
if by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair); err == nil {
wl.writer.Write(by)
by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair)
if err != nil {
return err
}
if _, err := wl.writer.Write(by); err != nil {
return err
}
return nil
}
```

Expand Down Expand Up @@ -110,20 +115,22 @@ func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteL
func (s *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
s.parent.Set(key, value)
s.onWrite(true, key, value)
s.onWrite(false, key, value)
}

// Delete implements the KVStore interface. It traces a write operation and
// delegates the Delete call to the parent KVStore.
func (s *Store) Delete(key []byte) {
s.parent.Delete(key)
s.onWrite(false, key, nil)
s.onWrite(true, key, nil)
}

// onWrite writes a KVStore operation to all of the WriteListeners
func (s *Store) onWrite(set bool, key, value []byte) {
func (s *Store) onWrite(delete bool, key, value []byte) {
for _, l := range s.listeners {
l.OnWrite(s.parentStoreKey, set, key, value)
if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil {
// log error
}
}
}
```
Expand All @@ -140,9 +147,9 @@ type MultiStore interface {
// ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey
ListeningEnabled(key StoreKey) bool

// SetListeners sets the WriteListeners for the KVStore belonging to the provided StoreKey
// AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey
// It appends the listeners to a current set, if one already exists
SetListeners(key StoreKey, listeners []WriteListener)
AddListeners(key StoreKey, listeners []WriteListener)
}
```

Expand Down Expand Up @@ -342,7 +349,7 @@ func (fss *FileStreamingService) Stream(wg *sync.WaitGroup, quitChan <-chan stru
case <-quitChan:
return
case by := <-fss.srcChan:
append(fss.stateCache, by)
fss.stateCache = append(fss.stateCache, by)
}
}
}()
Expand Down Expand Up @@ -380,7 +387,7 @@ We will add a new method to the `BaseApp` to enable the registration of `Streami
func (app *BaseApp) RegisterHooks(s StreamingService) {
// set the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.SetListeners(key, lis)
app.cms.AddListeners(key, lis)
}
// register the streaming service hooks within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks
Expand All @@ -398,7 +405,7 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
...

// Call the streaming service hooks with the BeginBlock messages
for _ hook := range app.hooks {
for _, hook := range app.hooks {
hook.ListenBeginBlock(app.deliverState.ctx, req, res)
}

Expand Down Expand Up @@ -445,7 +452,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
}

// Call the streaming service hooks with the DeliverTx messages
for _, hook := range app.hook {
for _, hook := range app.hooks {
hook.ListenDeliverTx(app.deliverState.ctx, req, res)
}

Expand Down
56 changes: 38 additions & 18 deletions docs/core/proto-docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
- [Output](#cosmos.bank.v1beta1.Output)
- [Params](#cosmos.bank.v1beta1.Params)
- [SendEnabled](#cosmos.bank.v1beta1.SendEnabled)
- [Supply](#cosmos.bank.v1beta1.Supply)

- [cosmos/bank/v1beta1/genesis.proto](#cosmos/bank/v1beta1/genesis.proto)
- [Balance](#cosmos.bank.v1beta1.Balance)
Expand Down Expand Up @@ -133,6 +132,9 @@
- [CommitInfo](#cosmos.base.store.v1beta1.CommitInfo)
- [StoreInfo](#cosmos.base.store.v1beta1.StoreInfo)

- [cosmos/base/store/v1beta1/listening.proto](#cosmos/base/store/v1beta1/listening.proto)
- [StoreKVPair](#cosmos.base.store.v1beta1.StoreKVPair)

- [cosmos/base/store/v1beta1/snapshot.proto](#cosmos/base/store/v1beta1/snapshot.proto)
- [SnapshotIAVLItem](#cosmos.base.store.v1beta1.SnapshotIAVLItem)
- [SnapshotItem](#cosmos.base.store.v1beta1.SnapshotItem)
Expand Down Expand Up @@ -1563,23 +1565,6 @@ sendable).




<a name="cosmos.bank.v1beta1.Supply"></a>

### Supply
Supply represents a struct that passively keeps track of the total supply
amounts in the network.
This message is deprecated now that supply is indexed by denom.


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| `total` | [cosmos.base.v1beta1.Coin](#cosmos.base.v1beta1.Coin) | repeated | |





<!-- end messages -->

<!-- end enums -->
Expand Down Expand Up @@ -2200,6 +2185,41 @@ between a store name and the commit ID.



<!-- end messages -->

<!-- end enums -->

<!-- end HasExtensions -->

<!-- end services -->



<a name="cosmos/base/store/v1beta1/listening.proto"></a>
<p align="right"><a href="#top">Top</a></p>

## cosmos/base/store/v1beta1/listening.proto



<a name="cosmos.base.store.v1beta1.StoreKVPair"></a>

### StoreKVPair
StoreKVPair is a KVStore KVPair used for listening to state changes (Sets and Deletes)
It optionally includes the StoreKey for the originating KVStore and a Boolean flag to distinguish between Sets and Deletes


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| `store_key` | [string](#string) | | the store key for the KVStore this pair originates from |
| `delete` | [bool](#bool) | | true indicates a delete operation, false indicates a set operation |
| `key` | [bytes](#bytes) | | |
| `value` | [bytes](#bytes) | | |





<!-- end messages -->

<!-- end enums -->
Expand Down
13 changes: 13 additions & 0 deletions proto/cosmos/base/store/v1beta1/listening.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";
package cosmos.base.store.v1beta1;

option go_package = "github.com/cosmos/cosmos-sdk/store/types";

// StoreKVPair is a KVStore KVPair used for listening to state changes (Sets and Deletes)
// It optionally includes the StoreKey for the originating KVStore and a Boolean flag to distinguish between Sets and Deletes
message StoreKVPair {
string store_key = 1; // the store key for the KVStore this pair originates from
bool delete = 2; // true indicates a delete operation, false indicates a set operation
bytes key = 3;
bytes value = 4;
}
16 changes: 16 additions & 0 deletions server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (ms multiStore) CacheWrapWithTrace(_ io.Writer, _ sdk.TraceContext) sdk.Cac
panic("not implemented")
}

func (ms multiStore) CacheWrapWithListeners(_ store.StoreKey, _ []store.WriteListener) store.CacheWrap {
panic("not implemented")
}

func (ms multiStore) TracingEnabled() bool {
panic("not implemented")
}
Expand All @@ -43,6 +47,14 @@ func (ms multiStore) SetTracer(w io.Writer) sdk.MultiStore {
panic("not implemented")
}

func (ms multiStore) AddListeners(key store.StoreKey, listeners []store.WriteListener) {
panic("not implemented")
}

func (ms multiStore) ListeningEnabled(key store.StoreKey) bool {
panic("not implemented")
}

func (ms multiStore) Commit() sdk.CommitID {
panic("not implemented")
}
Expand Down Expand Up @@ -131,6 +143,10 @@ func (kv kvStore) CacheWrapWithTrace(w io.Writer, tc sdk.TraceContext) sdk.Cache
panic("not implemented")
}

func (kv kvStore) CacheWrapWithListeners(_ store.StoreKey, _ []store.WriteListener) store.CacheWrap {
panic("not implemented")
}

func (kv kvStore) GetStoreType() sdk.StoreType {
panic("not implemented")
}
Expand Down
6 changes: 6 additions & 0 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/internal/conv"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
Expand Down Expand Up @@ -146,6 +147,11 @@ func (store *Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types
return NewStore(tracekv.NewStore(store, w, tc))
}

// CacheWrapWithListeners implements the CacheWrapper interface.
func (store *Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap {
return NewStore(listenkv.NewStore(store, storeKey, listeners))
}

//----------------------------------------
// Iteration

Expand Down
42 changes: 37 additions & 5 deletions store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Store struct {

traceWriter io.Writer
traceContext types.TraceContext

listeners map[types.StoreKey][]types.WriteListener
}

var _ types.CacheMultiStore = Store{}
Expand All @@ -35,20 +37,28 @@ var _ types.CacheMultiStore = Store{}
func NewFromKVStore(
store types.KVStore, stores map[types.StoreKey]types.CacheWrapper,
keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext,
listeners map[types.StoreKey][]types.WriteListener,
) Store {
cms := Store{
db: cachekv.NewStore(store),
stores: make(map[types.StoreKey]types.CacheWrap, len(stores)),
keys: keys,
traceWriter: traceWriter,
traceContext: traceContext,
listeners: listeners,
}

for key, store := range stores {
var cacheWrapped types.CacheWrap
if cms.TracingEnabled() {
cms.stores[key] = store.CacheWrapWithTrace(cms.traceWriter, cms.traceContext)
cacheWrapped = store.CacheWrapWithTrace(cms.traceWriter, cms.traceContext)
} else {
cacheWrapped = store.CacheWrap()
}
if cms.ListeningEnabled(key) {
cms.stores[key] = cacheWrapped.CacheWrapWithListeners(key, cms.listeners[key])
} else {
cms.stores[key] = store.CacheWrap()
cms.stores[key] = cacheWrapped
}
}

Expand All @@ -59,10 +69,10 @@ func NewFromKVStore(
// CacheWrapper objects. Each CacheWrapper store is a branched store.
func NewStore(
db dbm.DB, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey,
traceWriter io.Writer, traceContext types.TraceContext,
traceWriter io.Writer, traceContext types.TraceContext, listeners map[types.StoreKey][]types.WriteListener,
) Store {

return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext)
return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext, listeners)
}

func newCacheMultiStoreFromCMS(cms Store) Store {
Expand All @@ -71,7 +81,7 @@ func newCacheMultiStoreFromCMS(cms Store) Store {
stores[k] = v
}

return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext)
return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext, cms.listeners)
}

// SetTracer sets the tracer for the MultiStore that the underlying
Expand Down Expand Up @@ -102,6 +112,23 @@ func (cms Store) TracingEnabled() bool {
return cms.traceWriter != nil
}

// AddListeners adds listeners for a specific KVStore
func (cms Store) AddListeners(key types.StoreKey, listeners []types.WriteListener) {
if ls, ok := cms.listeners[key]; ok {
cms.listeners[key] = append(ls, listeners...)
} else {
cms.listeners[key] = listeners
}
}

// ListeningEnabled returns if listening is enabled for a specific KVStore
func (cms Store) ListeningEnabled(key types.StoreKey) bool {
if ls, ok := cms.listeners[key]; ok {
return len(ls) != 0
}
return false
}

// GetStoreType returns the type of the store.
func (cms Store) GetStoreType() types.StoreType {
return types.StoreTypeMulti
Expand All @@ -125,6 +152,11 @@ func (cms Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.Cac
return cms.CacheWrap()
}

// CacheWrapWithListeners implements the CacheWrapper interface.
func (cms Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap {
return cms.CacheWrap()
}

// Implements MultiStore.
func (cms Store) CacheMultiStore() types.CacheMultiStore {
return newCacheMultiStoreFromCMS(cms)
Expand Down
6 changes: 6 additions & 0 deletions store/dbadapter/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/types"
)
Expand Down Expand Up @@ -85,5 +86,10 @@ func (dsa Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Ca
return cachekv.NewStore(tracekv.NewStore(dsa, w, tc))
}

// CacheWrapWithListeners implements the CacheWrapper interface.
func (dsa Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap {
return cachekv.NewStore(listenkv.NewStore(dsa, storeKey, listeners))
}

// dbm.DB implements KVStore so we can CacheKVStore it.
var _ types.KVStore = Store{}
Loading