Skip to content

Commit

Permalink
review fixes/adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
i-norden committed Nov 25, 2020
1 parent 94e3f10 commit 7476da1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 139 deletions.
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ Ref: https://keepachangelog.com/en/1.0.0/

* (crypto) [\#7966](https://github.com/cosmos/cosmos-sdk/issues/7966) `Bip44Params` `String()` function now correctly returns the absolute HD path by adding the `m/` prefix.

### Features
* (SDK) [\#7888](https://github.com/cosmos/cosmos-sdk/issues/7888) [\#7889](https://github.com/cosmos/cosmos-sdk/issues/7889) State listening features proposed [ADR 038](./docs/architecture/adr-038-state-listening.md).

## [v0.40.0-rc3](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.40.0-rc3) - 2020-11-06

### Client Breaking
Expand Down
176 changes: 40 additions & 136 deletions docs/architecture/adr-038-state-listening.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,183 +19,87 @@ In addition to these request/response queries, it would be beneficial to have a

## Decision

We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to specific state changes in underlying KVStores.
We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores.
We will also introduce two approaches for exposing the data to external consumers: writing to files and writing to a gRPC stream.

### Listening interface
In a new file- `store/types/listening.go`- we will create a `Listening` interface for streaming out an allowed subset of state changes from a KVStore.
The interface can be backed by a simple wrapper around any underlying `io.Writer`.
In a new file- `store/types/listening.go`- we will create a `WriteListener` interface for streaming out state changes from a KVStore.

```go
// Listening interface comprises the methods needed to filter and stream data out from a KVStore
type Listening interface {
io.Writer // the underlying io.Writer
Allowed(op Operation, key []byte) bool // method used to check if the Listener is allowed to listen to a specific state change
TraceContext() TraceContext // method to access this Listener's TraceContext
}
```

We will lift some of the private types currently being used by the `tracekv.Store` into public types at a new location- `store/types/tracing.go`- for use in the `Listening` interface
```go
type Operation string

const (
WriteOp Operation = "write"
ReadOp Operation = "read"
DeleteOp Operation = "delete"
IterKeyOp Operation = "iterKey"
IterValueOp Operation = "iterValue"
)

// TraceOperation implements a traced KVStore operation
type TraceOperation struct {
Operation Operation `json:"operation"`
Key string `json:"key"`
Value string `json:"value"`
Metadata map[string]interface{} `json:"metadata"`
// WriteListener interface for writing data out from a listenkv.Store
type WriteListener interface {
// if value is nil then it was deleted
OnWrite(key []byte, value []byte)
}
```

### Listener type
We will create a concrete implementation of the `Listening` interface in `store/types/listening.go`.
This implementation will be configurable with a list of allowed `Operation`s, a set of whitelisted keys and prefixes, and a set of blacklisted keys and prefixes.
We will create a concrete implementation of the `WriteListener` interface in `store/types/listening.go` that gob
encodes and writes the key value pair into an underlying `io.Writer`.

```go

// Listener is used to configure listening on specific keys of a KVStore
type Listener struct {
writer io.Writer
context TraceContext
allowedOperations map[Operation]struct{} // The operations which this listener is allowed to listen to
whitelistedKeys [][]byte // Keys explicitly allowed to be listened to
blacklistedKeys [][]byte // Keys explicitly disallowed to be listened to
whitelistedPrefixes [][]byte // Key prefixes explicitly allowed to be listened to
blacklistedPrefixes [][]byte // Key prefixes explicitly disallowed to be listened to
// GobWriteListener is used to configure listening to a KVStore using a gob encoding wrapper around an io.Writer
type GobWriteListener struct {
writer *gob.Encoder
}

...

// GetContext satisfies Listening interface
func (l *Listener) GetContext() TraceContext {
return l.context
}

// Write satisfies Listening interface
// it wraps around the underlying writer interface
func (l *Listener) Write(b []byte) (int, error) {
return l.writer.Write(b)
// NewGobWriteListener wraps a WriteListenerWriter around an io.Writer
func NewGobWriteListener(w io.Writer) *GobWriteListener {
return &GobWriteListener{
writer: gob.NewEncoder(w),
}
}

// Allowed satisfies Listening interface
// it returns whether or not the Listener is allowed to listen to the provided operation at the provided key
func (l *Listener) Allowed(op Operation, key []byte) bool {
// first check if the operation is allowed
if _, ok := l.allowedOperations[op]; !ok {
return false
}
// if there are no keys or prefixes in the whitelists then every key is allowed (unless disallowed in blacklists)
// if there are whitelisted keys or prefixes then only the keys which conform are allowed (unless disallowed in blacklists)
allowed := true
if len(l.whitelistedKeys) > 0 || len(l.whitelistedPrefixes) > 0 {
allowed = listsContain(l.whitelistedKeys, l.whitelistedPrefixes, key)
}
return allowed && !listsContain(l.blacklistedKeys, l.blacklistedPrefixes, key)
// KVPair for gob encoding
type KVPair struct {
Key []byte
Value []byte
}

func listsContain(keys, prefixes [][]byte, key []byte) bool {
for _, k := range keys {
if bytes.Equal(key, k) {
return true
}
}
for _, p := range prefixes {
if bytes.HasPrefix(key, p) {
return true
}
}
return false
// OnWrite satisfies WriteListener interface by writing out key-value gobs to the underlying io.Writer
func (l *Listener) OnWrite(key []byte, value []byte) {
l.writer.Encode(KVPair{Key: key, Value: value})
}
```

### ListenKVStore
We will create a new `Store` type `listenkv.Store` that the `MultiStore` wraps around a `KVStore` to enable state listening.
This is closely modeled after the `tracekv.Store` with the primary difference being that we can configure the `Store` with a set of `Listening` types
which direct the streaming of only certain allowed subsets of keys and/or operations to specific `io.Writer` destinations.
We can configure the `Store` with a set of `WriteListener`s which stream the output to specific destinations.

```go
// Store implements the KVStore interface with listening enabled.
// Operations are traced on each core KVStore call and written to any of the
// underlying listeners with the proper key and operation permissions
type Store struct {
parent types.KVStore
listeners []types.Listening
listeners []types.WriteListener
}

// NewStore returns a reference to a new traceKVStore given a parent
// KVStore implementation and a buffered writer.
func NewStore(parent types.KVStore, listeners []types.Listening) *Store {
func NewStore(parent types.KVStore, listeners []types.WriteListener) *Store {
return &Store{parent: parent, listeners: listeners}
}

...

// Get implements the KVStore interface. It traces a read operation and
// delegates a Get call to the parent KVStore.
func (tkv *Store) Get(key []byte) []byte {
value := tkv.parent.Get(key)
writeOperation(tkv.listeners, types.ReadOp, key, value)
return value
}

// Set implements the KVStore interface. It traces a write operation and
// delegates the Set call to the parent KVStore.
func (tkv *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
writeOperation(tkv.listeners, types.WriteOp, key, value)
onWrite(tkv.listeners, key, value)
tkv.parent.Set(key, value)
}

// Delete implements the KVStore interface. It traces a write operation and
// delegates the Delete call to the parent KVStore.
func (tkv *Store) Delete(key []byte) {
writeOperation(tkv.listeners, types.DeleteOp, key, nil)
onWrite(tkv.listeners, key, nil)
tkv.parent.Delete(key)
}

// Has implements the KVStore interface. It delegates the Has call to the
// parent KVStore.
func (tkv *Store) Has(key []byte) bool {
return tkv.parent.Has(key)
}

...

// writeOperation writes a KVStore operation to the underlying io.Writer of
// every listener that has permissions to listen to that operation at the given key
// The TraceOperation is JSON-encoded with the `key` and `value` fields as base64 encoded strings
func writeOperation(listeners []types.Listening, op types.Operation, key, value []byte) {
// short circuit if there are no listeners so we don't waste time base64 encoding `key` and `value`
if len(listeners) == 0 {
return
}
traceOp := types.TraceOperation{
Operation: op,
Key: base64.StdEncoding.EncodeToString(key),
Value: base64.StdEncoding.EncodeToString(value),
}
// onWrite writes a KVStore operation to all of the WriteListeners
func onWrite(listeners []types.WriteListener, key, value []byte) {
for _, l := range listeners {
if !l.Allowed(op, key) {
continue
}
traceOp.Metadata = l.GetContext()
raw, err := json.Marshal(traceOp)
if err != nil {
panic(errors.Wrap(err, "failed to serialize listen operation"))
}
if _, err := l.Write(raw); err != nil {
panic(errors.Wrap(err, "failed to write listen operation"))
}
io.WriteString(l, "\n")
l.OnWrite(key, value)
}
}
```
Expand All @@ -212,8 +116,8 @@ type MultiStore interface {
// ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey
ListeningEnabled(key StoreKey) bool

// SetListeners sets the listener set for the KVStore belonging to the provided StoreKey
SetListeners(key StoreKey, listeners []Listening)
// SetListeners sets the WriteListeners for the KVStore belonging to the provided StoreKey
SetListeners(key StoreKey, listeners []WriteListener)

// CacheListening enables or disables KVStore listening at the cache layer
CacheListening(listen bool)
Expand All @@ -225,14 +129,14 @@ type CacheWrap interface {
...

// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(listeners []Listening) CacheWrap
CacheWrapWithListeners(listeners []WriteListener) CacheWrap
}

type CacheWrapper interface {
...

// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(listeners []Listening) CacheWrap
CacheWrapWithListeners(listeners []WriteListener) CacheWrap
}
```

Expand Down Expand Up @@ -264,7 +168,7 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore {
for k, v := range rs.stores {
stores[k] = v
}
var cacheListeners map[types.StoreKey][]types.Listening
var cacheListeners map[types.StoreKey][]types.WriteListener
if rs.cacheListening {
cacheListeners = rs.listeners
}
Expand All @@ -277,7 +181,7 @@ We will introduce and document mechanisms for exposing data from the above liste

#### Writing to file
We will document and provide examples of how to configure a listener to write out to a file.
No new type implementation will be needed, a `os.File` can be used as the underlying `io.Writer` for a listener.
No new type implementation will be needed, an `os.File` can be used as the underlying `io.Writer` for a `GobWriteListener`.

Writing to a file is the simplest approach for streaming the data out to consumers.
This approach also provide the advantages of being persistent and durable, and the files can be read directly
Expand All @@ -302,7 +206,7 @@ using the provided `AppOptions`. We will add two methods to the `BaseApp` to ena

```go
// SetCommitMultiStoreListeners sets the KVStore listeners for the provided StoreKey
func (app *BaseApp) SetCommitMultiStoreListeners(key sdk.StoreKey, listeners []storeTypes.Listening) {
func (app *BaseApp) SetCommitMultiStoreListeners(key sdk.StoreKey, listeners []storeTypes.WriteListener) {
app.cms.SetListeners(key, listeners)
}

Expand Down Expand Up @@ -363,8 +267,8 @@ func loadListener(bApp *baseapp.BaseApp, writeDir string, key sdk.StoreKey) {
if err != nil {
tmos.Exit(err.Error())
}
// using single listener with all operations and keys permitted and no TraceContext
listener := storeTypes.NewDefaultStateListener(fileHandler, nil)
// using single io.Writer based listener
listener := storeTypes.NewGobWriteListener(fileHandler)
bApp.SetCommitMultiStoreListeners(key, []storeTypes.Listening{listener})
}

Expand Down

0 comments on commit 7476da1

Please sign in to comment.