From 3283ff52398090221510b054665d2fcfa4eeaa32 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Thu, 12 Nov 2020 18:05:30 -0600 Subject: [PATCH 01/10] ADR-038 state listening --- docs/architecture/README.md | 3 +- docs/architecture/adr-038-state-listening.md | 369 +++++++++++++++++++ 2 files changed, 371 insertions(+), 1 deletion(-) create mode 100644 docs/architecture/adr-038-state-listening.md diff --git a/docs/architecture/README.md b/docs/architecture/README.md index a979f30e418c..19e6d6e2db49 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -73,4 +73,5 @@ Read about the [PROCESS](./PROCESS.md). - [ADR 028: Public Key Addresses](./adr-028-public-key-addresses.md) - [ADR 032: Typed Events](./adr-032-typed-events.md) - [ADR 035: Rosetta API Support](./adr-035-rosetta-api-support.md) -- [ADR 037: Governance Split Votes](./adr-037-gov-split-vote.md) \ No newline at end of file +- [ADR 037: Governance Split Votes](./adr-037-gov-split-vote.md) +- [ADR 038: State Listening](./adr-038-state-listening.md) \ No newline at end of file diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md new file mode 100644 index 000000000000..c4eaadd0a688 --- /dev/null +++ b/docs/architecture/adr-038-state-listening.md @@ -0,0 +1,369 @@ +# ADR 038: KVStore state listening + +## Changelog + +- 11/23/2020: Initial draft + +## Status + +Proposed + +## Abstract + +This ADR defines a set of changes to enable state change listening of individual KVStores. + +## Context + +Currently, KVStore data can be remotely accessed through [Queries](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/messages-and-queries.md#queries) which proceed through Tendermint and the ABCI. +In addition to these request/response queries, it would be beneficial to have a means of listening to state changes as they occur in real time. + +## Decision + +We will modify the MultiStore interface and its concrete (`basemulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to specific state changes in underlying KVStores and routing the output to consumers. +We will also introduce two approaches for exposing the data to consumers: writing to files and writing to a gRPC stream. + +### Listening interface +In a new file- `store/types/listening.go`- we will create a `Listening` interface for streaming out an allowed subset of state changes from a KVStore. +The interface can be backed by a simple wrapper around any underlying `io.Writer`. + +```go +// Listening interface comprises the methods needed to filter and stream data out from a KVStore +type Listening interface { + io.Writer // the underlying io.Writer + Allowed(op Operation, key []byte) bool // method used to check if the Listener is allowed to listen to a specific state change + TraceContext() TraceContext // method to access this Listener's TraceContext +} +``` + +We will lift some of the private types currently being used by the `tracekv.Store` into public types at a new location- `store/types/tracing.go`- for use in the `Listening` interface +```go +type Operation string + +const ( + WriteOp Operation = "write" + ReadOp Operation = "read" + DeleteOp Operation = "delete" + IterKeyOp Operation = "iterKey" + IterValueOp Operation = "iterValue" +) + +// TraceOperation implements a traced KVStore operation +type TraceOperation struct { + Operation Operation `json:"operation"` + Key string `json:"key"` + Value string `json:"value"` + Metadata map[string]interface{} `json:"metadata"` +} +``` + +### Listener type +We will create a concrete implementation of the `Listening` interface in `store/types/listening.go`. +This implementation will be configurable with a list of allowed `Operation`s, a set of whitelisted keys and prefixes, and a set of blacklisted keys and prefixes. + +```go + +// Listener is used to configure listening on specific keys of a KVStore +type Listener struct { + writer io.Writer + context TraceContext + allowedOperations map[Operation]struct{} // The operations which this listener is allowed to listen to + whitelistedKeys [][]byte // Keys explicitly allowed to be listened to + blacklistedKeys [][]byte // Keys explicitly disallowed to be listened to + whitelistedPrefixes [][]byte // Key prefixes explicitly allowed to be listened to + blacklistedPrefixes [][]byte // Key prefixes explicitly disallowed to be listened to +} + +... + +// GetContext satisfies Listening interface +func (l *Listener) GetContext() TraceContext { + return l.context +} + +// Write satisfies Listening interface +// it wraps around the underlying writer interface +func (l *Listener) Write(b []byte) (int, error) { + return l.writer.Write(b) +} + +// Allowed satisfies Listening interface +// it returns whether or not the Listener is allowed to listen to the provided operation at the provided key +func (l *Listener) Allowed(op Operation, key []byte) bool { + // first check if the operation is allowed + if _, ok := l.allowedOperations[op]; !ok { + return false + } + // if there are no keys or prefixes in the whitelists then every key is allowed (unless disallowed in blacklists) + // if there are whitelisted keys or prefixes then only the keys which conform are allowed (unless disallowed in blacklists) + allowed := true + if len(l.whitelistedKeys) > 0 || len(l.whitelistedPrefixes) > 0 { + allowed = listsContain(l.whitelistedKeys, l.whitelistedPrefixes, key) + } + return allowed && !listsContain(l.blacklistedKeys, l.blacklistedPrefixes, key) +} + +func listsContain(keys, prefixes [][]byte, key []byte) bool { + for _, k := range keys { + if bytes.Equal(key, k) { + return true + } + } + for _, p := range prefixes { + if bytes.HasPrefix(key, p) { + return true + } + } + return false +} +``` + +### ListenKVStore +We will create a new `Store` type `listenkv.Store` that the `MultiStore` wraps around a `KVStore` to enable state listening. +This is closely modeled after the `tracekv.Store` with the primary difference being that we can configure the `Store` with a set of `Listening` types +which direct the streaming of only certain allowed subsets of keys and/or operations to specific `io.Writer` destinations. + +```go +// Store implements the KVStore interface with listening enabled. +// Operations are traced on each core KVStore call and written to any of the +// underlying listeners with the proper key and operation permissions +type Store struct { + parent types.KVStore + listeners []types.Listener +} + +// NewStore returns a reference to a new traceKVStore given a parent +// KVStore implementation and a buffered writer. +func NewStore(parent types.KVStore, listeners []types.Listener) *Store { + return &Store{parent: parent, listeners: listeners} +} + +... + +// Get implements the KVStore interface. It traces a read operation and +// delegates a Get call to the parent KVStore. +func (tkv *Store) Get(key []byte) []byte { + value := tkv.parent.Get(key) + + writeOperation(tkv.listeners, types.ReadOp, key, value) + return value +} + +// Set implements the KVStore interface. It traces a write operation and +// delegates the Set call to the parent KVStore. +func (tkv *Store) Set(key []byte, value []byte) { + types.AssertValidKey(key) + writeOperation(tkv.listeners, types.WriteOp, key, value) + tkv.parent.Set(key, value) +} + +// Delete implements the KVStore interface. It traces a write operation and +// delegates the Delete call to the parent KVStore. +func (tkv *Store) Delete(key []byte) { + writeOperation(tkv.listeners, types.DeleteOp, key, nil) + tkv.parent.Delete(key) +} + +// Has implements the KVStore interface. It delegates the Has call to the +// parent KVStore. +func (tkv *Store) Has(key []byte) bool { + return tkv.parent.Has(key) +} + +... + +// writeOperation writes a KVStore operation to the underlying io.Writer of +// every listener that has permissions to listen to that operation at the given key +// The TraceOperation is JSON-encoded with the `key` and `value` fields as base64 encoded strings +func writeOperation(listeners []types.Listener, op types.Operation, key, value []byte) { + traceOp := types.TraceOperation{ + Operation: op, + Key: base64.StdEncoding.EncodeToString(key), + Value: base64.StdEncoding.EncodeToString(value), + } + for _, l := range listeners { + if !l.Allowed(op, key) { + continue + } + traceOp.Metadata = l.Context + raw, err := json.Marshal(traceOp) + if err != nil { + panic(errors.Wrap(err, "failed to serialize listen operation")) + } + if _, err := l.Writer.Write(raw); err != nil { + panic(errors.Wrap(err, "failed to write listen operation")) + } + io.WriteString(l.Writer, "\n") + } +} +``` + +### MultiStore interface updates +We will update the `MultiStore` interface to allow us to wrap a set of listeners around a specific `KVStore`. +Additionally, we will update the `CacheWrap` and `CacheWrapper` interfaces to enable listening in the caching layer, and add a `MultiStore` method +to turn on or off this cache listening. + +```go +type MultiStore interface { + ... + + // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey + ListeningEnabled(key StoreKey) bool + + // SetListeners sets the listener set for the KVStore belonging to the provided StoreKey + SetListeners(key StoreKey, listeners []Listener) + + // CacheListening enables or disables KVStore listening at the cache layer + CacheListening(listen bool) +} +``` + +```go +type CacheWrap interface { + ... + + // CacheWrapWithListeners recursively wraps again with listening enabled + CacheWrapWithListeners(listeners []Listener) CacheWrap +} + +type CacheWrapper interface { + ... + + // CacheWrapWithListeners recursively wraps again with listening enabled + CacheWrapWithListeners(listeners []Listener) CacheWrap +} +``` + +### MultiStore implementation updates +We will modify all of the Stores and MultiStores to satisfy these new interfaces, and adjust the `rootmulti` MultiStore's `GetKVStore` method +to enable wrapping the returned `KVStore` with the `listenkv.Store`. + +```go +func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { + store := rs.stores[key].(types.KVStore) + + if rs.TracingEnabled() { + store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext) + } + if rs.ListeningEnabled(key) { + store = listenkv.NewStore(store, rs.listeners[key]) + } + + return store +} +``` + +We will also adjust the `cachemulti` constructor methods and the `rootmulti` `CacheMultiStore` method to enable cache listening when `CacheListening` is turned on. + +```go +func (rs *Store) CacheMultiStore() types.CacheMultiStore { + stores := make(map[types.StoreKey]types.CacheWrapper) + for k, v := range rs.stores { + stores[k] = v + } + var cacheListeners map[types.StoreKey][]types.Listener + if rs.cacheListening { + cacheListeners = rs.listeners + } + return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, cacheListeners) +} +``` + +### Exposing the data +We will introduce and document mechanisms for exposing data from the above listeners to external consumers. + +#### Writing to file +We will document and provide examples of how to configure a listener to write out to a file. +No new type implementation is needed, a `os.File` can be used as the underlying `io.Writer` for a listener. +Writing to a file is the simplest approach for streaming the data out to consumers. +This approach also provide the advantages of being persistent and durable. +The files can be read directly or an auxiliary streaming services can tail the files and serve the data remotely. +Without pruning the file size can grow indefinitely, this will need to be managed by +the developer in an application or even module-specific manner. + +#### Writing to gRPC stream +We will implement a `io.Writer` type for exposing our listeners over a gRPC server stream. +Writing to a gRPC stream gRPC allows us to expose the data over the standard gRPC interface. +This interface can be exposed directly to consumers or we can implement a message queue or streaming service logic on top. +Using gRPC provides us with all of the regular advantages of gRPC and protobuf: versioning guarantees, client side code generation, and interoperability with the many gRPC plugins and auxillary services. +Proceeding through a gRPC intermediate will provide additional overhead, in most cases this is not expected to be rate limiting but in +instances where it is the developer can implement a more performant streaming mechanism for state listening. + +### Configuration +We will provide detailed documentation for how to configure the state listeners and their external streaming services from within an app's `AppCreator`, +using the provided `AppOptions`. + +e.g. SimApp with simple state streaming to files: + +```go +func NewSimApp( + logger log.Logger, db dbm.DB, traceStore io.Writer, loadLatest bool, skipUpgradeHeights map[int64]bool, + homePath string, invCheckPeriod uint, encodingConfig simappparams.EncodingConfig, + appOpts servertypes.AppOptions, baseAppOptions ...func(*baseapp.BaseApp), +) *SimApp { + + ... + + keys := sdk.NewKVStoreKeys( + authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey, + minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey, + govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey, + evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey, + ) + tkeys := sdk.NewTransientStoreKeys(paramstypes.TStoreKey) + memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey) + + // configure state listening capabilities using AppOptions + if cast.ToBool(appOpts.Get("simApp.listening")) { + writeDir := filepath.Clean(cast.ToString(appOpts.Get("simApp.listening.writeDir"))) + for _, key := range keys { + loadListener(bApp, writeDir, key) + } + for _, key := range tkeys { + loadListener(bApp, writeDir, key) + } + for _, key := range memKeys { + loadListener(bApp, writeDir, key) + } + bApp.SetCacheListening(cast.ToBool(appOpts.Get("simApp.cacheListening"))) + } + + ... + + return app +} + +// loadListener creates and adds to the BaseApp a listener that writes out to a file in the provided write directory +// The file is named after the StoreKey for the KVStore it listens to +func loadListener(bApp *baseapp.BaseApp, writeDir string, key sdk.StoreKey) { + writePath := filepath.Join(writeDir, key.Name()) + // TODO: how to handle graceful file closure? + fileHandler, err := os.OpenFile(writePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0664) + if err != nil { + tmos.Exit(err.Error()) + } + // using single listener with all operations and keys permitted + listener := storeTypes.NewDefaultStateListener(fileHandler, nil) + bApp.SetCommitMultiStoreListeners(key, []storeTypes.Listening{listener}) +} + +``` +## Consequences + +These changes will provide a means of subscribing to KVStore state changes in real time. + +### Backwards Compatibility + +- This ADR changes the `MultiStore`, `CacheWrap`, and `CacheWrapper` interfaces, implementations supporting the previous version of these interfaces will not support the new ones + +### Positive + +- Ability to listen to KVStore state changes in real time and expose these events to external consumers + +### Negative + +- Changes `MultiStore`, `CacheWrap`, and `CacheWrapper` interfaces + +### Neutral + +- Introduces additional- but optional- complexity to configuring and running a cosmos app +- If an application developer opts to use these features to expose data, they need to be aware of the ramifications/risks of that data exposure as it pertains to the specifics of their application From 7d797fabb05b1762b3f0c8a630e11bca6e8ef93f Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 23 Nov 2020 22:15:50 -0600 Subject: [PATCH 02/10] updates/fixes --- docs/architecture/adr-038-state-listening.md | 80 +++++++++++++------- 1 file changed, 51 insertions(+), 29 deletions(-) diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index c4eaadd0a688..7e83957b3559 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -10,7 +10,7 @@ Proposed ## Abstract -This ADR defines a set of changes to enable state change listening of individual KVStores. +This ADR defines a set of changes to enable listening to state changes of individual KVStores and exposing these data to consumers. ## Context @@ -19,8 +19,8 @@ In addition to these request/response queries, it would be beneficial to have a ## Decision -We will modify the MultiStore interface and its concrete (`basemulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to specific state changes in underlying KVStores and routing the output to consumers. -We will also introduce two approaches for exposing the data to consumers: writing to files and writing to a gRPC stream. +We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to specific state changes in underlying KVStores. +We will also introduce two approaches for exposing the data to external consumers: writing to files and writing to a gRPC stream. ### Listening interface In a new file- `store/types/listening.go`- we will create a `Listening` interface for streaming out an allowed subset of state changes from a KVStore. @@ -128,12 +128,12 @@ which direct the streaming of only certain allowed subsets of keys and/or operat // underlying listeners with the proper key and operation permissions type Store struct { parent types.KVStore - listeners []types.Listener + listeners []types.Listening } // NewStore returns a reference to a new traceKVStore given a parent // KVStore implementation and a buffered writer. -func NewStore(parent types.KVStore, listeners []types.Listener) *Store { +func NewStore(parent types.KVStore, listeners []types.Listening) *Store { return &Store{parent: parent, listeners: listeners} } @@ -143,7 +143,6 @@ func NewStore(parent types.KVStore, listeners []types.Listener) *Store { // delegates a Get call to the parent KVStore. func (tkv *Store) Get(key []byte) []byte { value := tkv.parent.Get(key) - writeOperation(tkv.listeners, types.ReadOp, key, value) return value } @@ -174,7 +173,11 @@ func (tkv *Store) Has(key []byte) bool { // writeOperation writes a KVStore operation to the underlying io.Writer of // every listener that has permissions to listen to that operation at the given key // The TraceOperation is JSON-encoded with the `key` and `value` fields as base64 encoded strings -func writeOperation(listeners []types.Listener, op types.Operation, key, value []byte) { +func writeOperation(listeners []types.Listening, op types.Operation, key, value []byte) { + // short circuit if there are no listeners so we don't waste time base64 encoding `key` and `value` + if len(listeners) == 0 { + return + } traceOp := types.TraceOperation{ Operation: op, Key: base64.StdEncoding.EncodeToString(key), @@ -184,15 +187,15 @@ func writeOperation(listeners []types.Listener, op types.Operation, key, value [ if !l.Allowed(op, key) { continue } - traceOp.Metadata = l.Context + traceOp.Metadata = l.GetContext() raw, err := json.Marshal(traceOp) if err != nil { panic(errors.Wrap(err, "failed to serialize listen operation")) } - if _, err := l.Writer.Write(raw); err != nil { + if _, err := l.Write(raw); err != nil { panic(errors.Wrap(err, "failed to write listen operation")) } - io.WriteString(l.Writer, "\n") + io.WriteString(l, "\n") } } ``` @@ -210,7 +213,7 @@ type MultiStore interface { ListeningEnabled(key StoreKey) bool // SetListeners sets the listener set for the KVStore belonging to the provided StoreKey - SetListeners(key StoreKey, listeners []Listener) + SetListeners(key StoreKey, listeners []Listening) // CacheListening enables or disables KVStore listening at the cache layer CacheListening(listen bool) @@ -219,23 +222,23 @@ type MultiStore interface { ```go type CacheWrap interface { - ... + ... // CacheWrapWithListeners recursively wraps again with listening enabled - CacheWrapWithListeners(listeners []Listener) CacheWrap + CacheWrapWithListeners(listeners []Listening) CacheWrap } type CacheWrapper interface { ... // CacheWrapWithListeners recursively wraps again with listening enabled - CacheWrapWithListeners(listeners []Listener) CacheWrap + CacheWrapWithListeners(listeners []Listening) CacheWrap } ``` ### MultiStore implementation updates -We will modify all of the Stores and MultiStores to satisfy these new interfaces, and adjust the `rootmulti` MultiStore's `GetKVStore` method -to enable wrapping the returned `KVStore` with the `listenkv.Store`. +We will modify all of the `Store` and `MultiStore` implementations to satisfy these new interfaces, and adjust the `rootmulti` `GetKVStore` method +to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on. ```go func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { @@ -252,7 +255,8 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { } ``` -We will also adjust the `cachemulti` constructor methods and the `rootmulti` `CacheMultiStore` method to enable cache listening when `CacheListening` is turned on. +We will also adjust the `cachemulti` constructor methods and the `rootmulti` `CacheMultiStore` method to enable listening +in the cache layer when `CacheListening` is turned on. ```go func (rs *Store) CacheMultiStore() types.CacheMultiStore { @@ -260,7 +264,7 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore { for k, v := range rs.stores { stores[k] = v } - var cacheListeners map[types.StoreKey][]types.Listener + var cacheListeners map[types.StoreKey][]types.Listening if rs.cacheListening { cacheListeners = rs.listeners } @@ -273,26 +277,44 @@ We will introduce and document mechanisms for exposing data from the above liste #### Writing to file We will document and provide examples of how to configure a listener to write out to a file. -No new type implementation is needed, a `os.File` can be used as the underlying `io.Writer` for a listener. +No new type implementation will be needed, a `os.File` can be used as the underlying `io.Writer` for a listener. + Writing to a file is the simplest approach for streaming the data out to consumers. -This approach also provide the advantages of being persistent and durable. -The files can be read directly or an auxiliary streaming services can tail the files and serve the data remotely. +This approach also provide the advantages of being persistent and durable, and the files can be read directly +or an auxiliary streaming services can tail the files and serve the data remotely. + Without pruning the file size can grow indefinitely, this will need to be managed by the developer in an application or even module-specific manner. #### Writing to gRPC stream -We will implement a `io.Writer` type for exposing our listeners over a gRPC server stream. -Writing to a gRPC stream gRPC allows us to expose the data over the standard gRPC interface. -This interface can be exposed directly to consumers or we can implement a message queue or streaming service logic on top. -Using gRPC provides us with all of the regular advantages of gRPC and protobuf: versioning guarantees, client side code generation, and interoperability with the many gRPC plugins and auxillary services. +We will implement and document an `io.Writer` type for exposing our listeners over a gRPC server stream. + +Writing to a gRPC stream gRPC will allow us to expose the data over the standard gRPC interface. +This interface can be exposed directly to consumers or we can implement a message queue or secondary streaming service on top. +Using gRPC will provide us with all of the regular advantages of gRPC and protobuf: versioning guarantees, client side code generation, and interoperability with the many gRPC plugins and auxillary services. + Proceeding through a gRPC intermediate will provide additional overhead, in most cases this is not expected to be rate limiting but in instances where it is the developer can implement a more performant streaming mechanism for state listening. ### Configuration -We will provide detailed documentation for how to configure the state listeners and their external streaming services from within an app's `AppCreator`, -using the provided `AppOptions`. +We will provide detailed documentation on how to configure the state listeners and their external streaming services from within an app's `AppCreator`, +using the provided `AppOptions`. We will add two methods to the `BaseApp` to enable this configuration: + +```go +// SetCommitMultiStoreListeners sets the KVStore listeners for the provided StoreKey +func (app *BaseApp) SetCommitMultiStoreListeners(key sdk.StoreKey, listeners []storeTypes.Listening) { + app.cms.SetListeners(key, listeners) +} + +// SetCacheListening turns on or off listening at the cache layer +func (app *BaseApp) SetCacheListening(listening bool) { + app.cms.CacheListening(listening) +} +``` + +As a demonstration, we will implement the state watching features as part of SimApp. +For example, the below is a very rudimentary integration of the state listening features into the SimApp `AppCreator` function: -e.g. SimApp with simple state streaming to files: ```go func NewSimApp( @@ -341,7 +363,7 @@ func loadListener(bApp *baseapp.BaseApp, writeDir string, key sdk.StoreKey) { if err != nil { tmos.Exit(err.Error()) } - // using single listener with all operations and keys permitted + // using single listener with all operations and keys permitted and no TraceContext listener := storeTypes.NewDefaultStateListener(fileHandler, nil) bApp.SetCommitMultiStoreListeners(key, []storeTypes.Listening{listener}) } From 202df57d678be239d5e7fb911bc240da27e88296 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Wed, 25 Nov 2020 12:01:22 -0600 Subject: [PATCH 03/10] review fixes/adjustments --- CHANGELOG.md | 2 +- docs/architecture/adr-038-state-listening.md | 176 +++++-------------- 2 files changed, 41 insertions(+), 137 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 12a2d94b31dc..13442e97c191 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -398,7 +398,7 @@ sure you are aware of any relevant breaking changes. * (x/auth) [\#5892](https://github.com/cosmos/cosmos-sdk/pull/5892) Add `RegisterKeyTypeCodec` to register new types (eg. keys) to the `auth` module internal amino codec. * (x/bank) [\#6536](https://github.com/cosmos/cosmos-sdk/pull/6536) Fix bug in `WriteGeneratedTxResponse` function used by multiple - REST endpoints. Now it writes a Tx in StdTx format. + REST endpoints. Now it writes a T===x in StdTx format. * (x/genutil) [\#5938](https://github.com/cosmos/cosmos-sdk/pull/5938) Fix `InitializeNodeValidatorFiles` error handling. * (x/gentx) [\#8183](https://github.com/cosmos/cosmos-sdk/pull/8183) change gentx cmd amount to arg from flag * (x/gov) [#7641](https://github.com/cosmos/cosmos-sdk/pull/7641) Fix tally calculation precision error. diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 7e83957b3559..6effecbe3a7a 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -19,108 +19,52 @@ In addition to these request/response queries, it would be beneficial to have a ## Decision -We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to specific state changes in underlying KVStores. +We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores. We will also introduce two approaches for exposing the data to external consumers: writing to files and writing to a gRPC stream. ### Listening interface -In a new file- `store/types/listening.go`- we will create a `Listening` interface for streaming out an allowed subset of state changes from a KVStore. -The interface can be backed by a simple wrapper around any underlying `io.Writer`. +In a new file- `store/types/listening.go`- we will create a `WriteListener` interface for streaming out state changes from a KVStore. ```go -// Listening interface comprises the methods needed to filter and stream data out from a KVStore -type Listening interface { - io.Writer // the underlying io.Writer - Allowed(op Operation, key []byte) bool // method used to check if the Listener is allowed to listen to a specific state change - TraceContext() TraceContext // method to access this Listener's TraceContext -} -``` - -We will lift some of the private types currently being used by the `tracekv.Store` into public types at a new location- `store/types/tracing.go`- for use in the `Listening` interface -```go -type Operation string - -const ( - WriteOp Operation = "write" - ReadOp Operation = "read" - DeleteOp Operation = "delete" - IterKeyOp Operation = "iterKey" - IterValueOp Operation = "iterValue" -) - -// TraceOperation implements a traced KVStore operation -type TraceOperation struct { - Operation Operation `json:"operation"` - Key string `json:"key"` - Value string `json:"value"` - Metadata map[string]interface{} `json:"metadata"` +// WriteListener interface for writing data out from a listenkv.Store +type WriteListener interface { + // if value is nil then it was deleted + OnWrite(key []byte, value []byte) } ``` ### Listener type -We will create a concrete implementation of the `Listening` interface in `store/types/listening.go`. -This implementation will be configurable with a list of allowed `Operation`s, a set of whitelisted keys and prefixes, and a set of blacklisted keys and prefixes. +We will create a concrete implementation of the `WriteListener` interface in `store/types/listening.go` that gob +encodes and writes the key value pair into an underlying `io.Writer`. ```go - -// Listener is used to configure listening on specific keys of a KVStore -type Listener struct { - writer io.Writer - context TraceContext - allowedOperations map[Operation]struct{} // The operations which this listener is allowed to listen to - whitelistedKeys [][]byte // Keys explicitly allowed to be listened to - blacklistedKeys [][]byte // Keys explicitly disallowed to be listened to - whitelistedPrefixes [][]byte // Key prefixes explicitly allowed to be listened to - blacklistedPrefixes [][]byte // Key prefixes explicitly disallowed to be listened to +// GobWriteListener is used to configure listening to a KVStore using a gob encoding wrapper around an io.Writer +type GobWriteListener struct { + writer *gob.Encoder } -... - -// GetContext satisfies Listening interface -func (l *Listener) GetContext() TraceContext { - return l.context -} - -// Write satisfies Listening interface -// it wraps around the underlying writer interface -func (l *Listener) Write(b []byte) (int, error) { - return l.writer.Write(b) +// NewGobWriteListener wraps a WriteListenerWriter around an io.Writer +func NewGobWriteListener(w io.Writer) *GobWriteListener { + return &GobWriteListener{ + writer: gob.NewEncoder(w), + } } -// Allowed satisfies Listening interface -// it returns whether or not the Listener is allowed to listen to the provided operation at the provided key -func (l *Listener) Allowed(op Operation, key []byte) bool { - // first check if the operation is allowed - if _, ok := l.allowedOperations[op]; !ok { - return false - } - // if there are no keys or prefixes in the whitelists then every key is allowed (unless disallowed in blacklists) - // if there are whitelisted keys or prefixes then only the keys which conform are allowed (unless disallowed in blacklists) - allowed := true - if len(l.whitelistedKeys) > 0 || len(l.whitelistedPrefixes) > 0 { - allowed = listsContain(l.whitelistedKeys, l.whitelistedPrefixes, key) - } - return allowed && !listsContain(l.blacklistedKeys, l.blacklistedPrefixes, key) +// KVPair for gob encoding +type KVPair struct { + Key []byte + Value []byte } -func listsContain(keys, prefixes [][]byte, key []byte) bool { - for _, k := range keys { - if bytes.Equal(key, k) { - return true - } - } - for _, p := range prefixes { - if bytes.HasPrefix(key, p) { - return true - } - } - return false +// OnWrite satisfies WriteListener interface by writing out key-value gobs to the underlying io.Writer +func (l *Listener) OnWrite(key []byte, value []byte) { + l.writer.Encode(KVPair{Key: key, Value: value}) } ``` ### ListenKVStore We will create a new `Store` type `listenkv.Store` that the `MultiStore` wraps around a `KVStore` to enable state listening. -This is closely modeled after the `tracekv.Store` with the primary difference being that we can configure the `Store` with a set of `Listening` types -which direct the streaming of only certain allowed subsets of keys and/or operations to specific `io.Writer` destinations. +We can configure the `Store` with a set of `WriteListener`s which stream the output to specific destinations. ```go // Store implements the KVStore interface with listening enabled. @@ -128,74 +72,34 @@ which direct the streaming of only certain allowed subsets of keys and/or operat // underlying listeners with the proper key and operation permissions type Store struct { parent types.KVStore - listeners []types.Listening + listeners []types.WriteListener } // NewStore returns a reference to a new traceKVStore given a parent // KVStore implementation and a buffered writer. -func NewStore(parent types.KVStore, listeners []types.Listening) *Store { +func NewStore(parent types.KVStore, listeners []types.WriteListener) *Store { return &Store{parent: parent, listeners: listeners} } -... - -// Get implements the KVStore interface. It traces a read operation and -// delegates a Get call to the parent KVStore. -func (tkv *Store) Get(key []byte) []byte { - value := tkv.parent.Get(key) - writeOperation(tkv.listeners, types.ReadOp, key, value) - return value -} - // Set implements the KVStore interface. It traces a write operation and // delegates the Set call to the parent KVStore. func (tkv *Store) Set(key []byte, value []byte) { types.AssertValidKey(key) - writeOperation(tkv.listeners, types.WriteOp, key, value) + onWrite(tkv.listeners, key, value) tkv.parent.Set(key, value) } // Delete implements the KVStore interface. It traces a write operation and // delegates the Delete call to the parent KVStore. func (tkv *Store) Delete(key []byte) { - writeOperation(tkv.listeners, types.DeleteOp, key, nil) + onWrite(tkv.listeners, key, nil) tkv.parent.Delete(key) } -// Has implements the KVStore interface. It delegates the Has call to the -// parent KVStore. -func (tkv *Store) Has(key []byte) bool { - return tkv.parent.Has(key) -} - -... - -// writeOperation writes a KVStore operation to the underlying io.Writer of -// every listener that has permissions to listen to that operation at the given key -// The TraceOperation is JSON-encoded with the `key` and `value` fields as base64 encoded strings -func writeOperation(listeners []types.Listening, op types.Operation, key, value []byte) { - // short circuit if there are no listeners so we don't waste time base64 encoding `key` and `value` - if len(listeners) == 0 { - return - } - traceOp := types.TraceOperation{ - Operation: op, - Key: base64.StdEncoding.EncodeToString(key), - Value: base64.StdEncoding.EncodeToString(value), - } +// onWrite writes a KVStore operation to all of the WriteListeners +func onWrite(listeners []types.WriteListener, key, value []byte) { for _, l := range listeners { - if !l.Allowed(op, key) { - continue - } - traceOp.Metadata = l.GetContext() - raw, err := json.Marshal(traceOp) - if err != nil { - panic(errors.Wrap(err, "failed to serialize listen operation")) - } - if _, err := l.Write(raw); err != nil { - panic(errors.Wrap(err, "failed to write listen operation")) - } - io.WriteString(l, "\n") + l.OnWrite(key, value) } } ``` @@ -212,8 +116,8 @@ type MultiStore interface { // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey ListeningEnabled(key StoreKey) bool - // SetListeners sets the listener set for the KVStore belonging to the provided StoreKey - SetListeners(key StoreKey, listeners []Listening) + // SetListeners sets the WriteListeners for the KVStore belonging to the provided StoreKey + SetListeners(key StoreKey, listeners []WriteListener) // CacheListening enables or disables KVStore listening at the cache layer CacheListening(listen bool) @@ -225,14 +129,14 @@ type CacheWrap interface { ... // CacheWrapWithListeners recursively wraps again with listening enabled - CacheWrapWithListeners(listeners []Listening) CacheWrap + CacheWrapWithListeners(listeners []WriteListener) CacheWrap } type CacheWrapper interface { ... // CacheWrapWithListeners recursively wraps again with listening enabled - CacheWrapWithListeners(listeners []Listening) CacheWrap + CacheWrapWithListeners(listeners []WriteListener) CacheWrap } ``` @@ -264,7 +168,7 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore { for k, v := range rs.stores { stores[k] = v } - var cacheListeners map[types.StoreKey][]types.Listening + var cacheListeners map[types.StoreKey][]types.WriteListener if rs.cacheListening { cacheListeners = rs.listeners } @@ -277,7 +181,7 @@ We will introduce and document mechanisms for exposing data from the above liste #### Writing to file We will document and provide examples of how to configure a listener to write out to a file. -No new type implementation will be needed, a `os.File` can be used as the underlying `io.Writer` for a listener. +No new type implementation will be needed, an `os.File` can be used as the underlying `io.Writer` for a `GobWriteListener`. Writing to a file is the simplest approach for streaming the data out to consumers. This approach also provide the advantages of being persistent and durable, and the files can be read directly @@ -302,7 +206,7 @@ using the provided `AppOptions`. We will add two methods to the `BaseApp` to ena ```go // SetCommitMultiStoreListeners sets the KVStore listeners for the provided StoreKey -func (app *BaseApp) SetCommitMultiStoreListeners(key sdk.StoreKey, listeners []storeTypes.Listening) { +func (app *BaseApp) SetCommitMultiStoreListeners(key sdk.StoreKey, listeners []storeTypes.WriteListener) { app.cms.SetListeners(key, listeners) } @@ -363,8 +267,8 @@ func loadListener(bApp *baseapp.BaseApp, writeDir string, key sdk.StoreKey) { if err != nil { tmos.Exit(err.Error()) } - // using single listener with all operations and keys permitted and no TraceContext - listener := storeTypes.NewDefaultStateListener(fileHandler, nil) + // using single io.Writer based listener + listener := storeTypes.NewGobWriteListener(fileHandler) bApp.SetCommitMultiStoreListeners(key, []storeTypes.Listening{listener}) } From 70042bb59cc1933ec766cdc5c9e749428371e92d Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 1 Dec 2020 14:53:07 -0600 Subject: [PATCH 04/10] review fixes/adjustments part 2 --- docs/architecture/adr-038-state-listening.md | 261 ++++++++++++++----- 1 file changed, 194 insertions(+), 67 deletions(-) diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 6effecbe3a7a..904ba64db51a 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -23,10 +23,11 @@ We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cac We will also introduce two approaches for exposing the data to external consumers: writing to files and writing to a gRPC stream. ### Listening interface + In a new file- `store/types/listening.go`- we will create a `WriteListener` interface for streaming out state changes from a KVStore. ```go -// WriteListener interface for writing data out from a listenkv.Store +// WriteListener interface for streaming data out from a listenkv.Store type WriteListener interface { // if value is nil then it was deleted OnWrite(key []byte, value []byte) @@ -34,35 +35,94 @@ type WriteListener interface { ``` ### Listener type -We will create a concrete implementation of the `WriteListener` interface in `store/types/listening.go` that gob -encodes and writes the key value pair into an underlying `io.Writer`. + +We will create two concrete implementation of the `WriteListener` interface in `store/types/listening.go`. + +One that writes out length-prefixed key-value pairs to an underlying `io.Writer`: ```go -// GobWriteListener is used to configure listening to a KVStore using a gob encoding wrapper around an io.Writer -type GobWriteListener struct { - writer *gob.Encoder +// PrefixWriteListener is used to configure listening to a KVStore by writing out big endian length-prefixed +// key-value pairs to an io.Writer +type PrefixWriteListener struct { + writer io.Writer + prefixBuf [6]byte } -// NewGobWriteListener wraps a WriteListenerWriter around an io.Writer -func NewGobWriteListener(w io.Writer) *GobWriteListener { - return &GobWriteListener{ - writer: gob.NewEncoder(w), +// NewPrefixWriteListener wraps a PrefixWriteListener around an io.Writer +func NewPrefixWriteListener(w io.Writer) *PrefixWriteListener { + return &PrefixWriteListener{ + writer: w, } } -// KVPair for gob encoding -type KVPair struct { - Key []byte - Value []byte +// OnWrite satisfies the WriteListener interface by writing out big endian length-prefixed key-value pairs +// to an underlying io.Writer +// The first two bytes of the prefix encode the length of the key +// The last four bytes of the prefix encode the length of the value +// This WriteListener makes two assumptions +// 1) The key is no longer than 1<<16 - 1 +// 2) The value is no longer than 1<<32 - 1 +func (swl *PrefixWriteListener) OnWrite(key []byte, value []byte) { + keyLen := len(key) + valLen := len(key) + if keyLen > math.MaxUint16 || valLen > math.MaxUint32 { + return + } + binary.BigEndian.PutUint16(l.prefixBuf[:2], uint16(keyLen)) + binary.BigEndian.PutUint32(l.prefixBuf[2:], uint32(valLen)) + l.writer.Write(l.prefixBuf[:]) + l.writer.Write(key) + l.writer.Write(value) +} +``` + +And one that writes out newline-delineated key-length-prefixed key-value pairs to an underlying io.Writer: + +```go +// NewlineWriteListener is used to configure listening to a KVStore by writing out big endian key-length-prefixed and +// newline delineated key-value pairs to an io.Writer +type NewlineWriteListener struct { + writer io.Writer + keyLenBuf [2]byte +} + +// NewNewlineWriteListener wraps a StockWriteListener around an io.Writer +func NewNewlineWriteListener(w io.Writer) *NewlineWriteListener { + return &NewlineWriteListener{ + writer: w, + } } -// OnWrite satisfies WriteListener interface by writing out key-value gobs to the underlying io.Writer -func (l *Listener) OnWrite(key []byte, value []byte) { - l.writer.Encode(KVPair{Key: key, Value: value}) +var newline = []byte("\n") + +// OnWrite satisfies WriteListener interface by writing out newline delineated big endian key-length-prefixed key-value +// pairs to the underlying io.Writer +// The first two bytes encode the length of the key +// Separate key-value pairs are newline delineated +// This WriteListener makes three assumptions +// 1) The key is no longer than 1<<16 - 1 +// 2) The value and keys contain no newline characters +func (l *NewlineWriteListener) OnWrite(key []byte, value []byte) { + keyLen := len(key) + if keyLen > math.MaxUint16 { + return + } + binary.BigEndian.PutUint16(l.keyLenBuf[:], uint16(keyLen)) + l.writer.Write(e.keyLenBuf[:]) + l.writer.Write(key) + l.writer.Write(value) + l.writer.Write(newline) } ``` +The former makes no assumptions about the presence of newline characters in keys or values, but values +must be no longer than 1<<32 - 1. The latter assumes newlines are not present in keys or values but can support any length +of value. Both assume keys are no longer than 1<<16 - 1. Newline delineation improves durability by enabling a consumer to orient +themselves at the start of a key-value pair at any point in the stream (e.g. tail a file), without character delineation a consumer must start +at the beginning of the stream and not lose track of their position in the stream. + ### ListenKVStore + We will create a new `Store` type `listenkv.Store` that the `MultiStore` wraps around a `KVStore` to enable state listening. We can configure the `Store` with a set of `WriteListener`s which stream the output to specific destinations. @@ -71,8 +131,8 @@ We can configure the `Store` with a set of `WriteListener`s which stream the out // Operations are traced on each core KVStore call and written to any of the // underlying listeners with the proper key and operation permissions type Store struct { - parent types.KVStore - listeners []types.WriteListener + parent types.KVStore + listeners []types.WriteListener } // NewStore returns a reference to a new traceKVStore given a parent @@ -97,6 +157,7 @@ func (tkv *Store) Delete(key []byte) { } // onWrite writes a KVStore operation to all of the WriteListeners +// Note: write out in a goroutine func onWrite(listeners []types.WriteListener, key, value []byte) { for _, l := range listeners { l.OnWrite(key, value) @@ -105,9 +166,9 @@ func onWrite(listeners []types.WriteListener, key, value []byte) { ``` ### MultiStore interface updates + We will update the `MultiStore` interface to allow us to wrap a set of listeners around a specific `KVStore`. -Additionally, we will update the `CacheWrap` and `CacheWrapper` interfaces to enable listening in the caching layer, and add a `MultiStore` method -to turn on or off this cache listening. +Additionally, we will update the `CacheWrap` and `CacheWrapper` interfaces to enable listening in the caching layer. ```go type MultiStore interface { @@ -118,9 +179,6 @@ type MultiStore interface { // SetListeners sets the WriteListeners for the KVStore belonging to the provided StoreKey SetListeners(key StoreKey, listeners []WriteListener) - - // CacheListening enables or disables KVStore listening at the cache layer - CacheListening(listen bool) } ``` @@ -141,8 +199,9 @@ type CacheWrapper interface { ``` ### MultiStore implementation updates + We will modify all of the `Store` and `MultiStore` implementations to satisfy these new interfaces, and adjust the `rootmulti` `GetKVStore` method -to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on. +to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on for that `Store`. ```go func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { @@ -159,8 +218,8 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { } ``` -We will also adjust the `cachemulti` constructor methods and the `rootmulti` `CacheMultiStore` method to enable listening -in the cache layer when `CacheListening` is turned on. +We will also adjust the `cachemulti` constructor methods and the `rootmulti` `CacheMultiStore` method to forward the listeners +to and enable listening in the cache layer. ```go func (rs *Store) CacheMultiStore() types.CacheMultiStore { @@ -168,54 +227,134 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore { for k, v := range rs.stores { stores[k] = v } - var cacheListeners map[types.StoreKey][]types.WriteListener - if rs.cacheListening { - cacheListeners = rs.listeners - } - return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, cacheListeners) + return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners) } ``` -### Exposing the data +### Exposing the data + We will introduce and document mechanisms for exposing data from the above listeners to external consumers. #### Writing to file + We will document and provide examples of how to configure a listener to write out to a file. No new type implementation will be needed, an `os.File` can be used as the underlying `io.Writer` for a `GobWriteListener`. Writing to a file is the simplest approach for streaming the data out to consumers. -This approach also provide the advantages of being persistent and durable, and the files can be read directly +This approach also provide the advantages of being persistent and durable, and the files can be read directly, or an auxiliary streaming services can tail the files and serve the data remotely. Without pruning the file size can grow indefinitely, this will need to be managed by -the developer in an application or even module-specific manner. +the developer in an application or even module-specific manner (e.g. log rotation). #### Writing to gRPC stream + We will implement and document an `io.Writer` type for exposing our listeners over a gRPC server stream. Writing to a gRPC stream gRPC will allow us to expose the data over the standard gRPC interface. -This interface can be exposed directly to consumers or we can implement a message queue or secondary streaming service on top. -Using gRPC will provide us with all of the regular advantages of gRPC and protobuf: versioning guarantees, client side code generation, and interoperability with the many gRPC plugins and auxillary services. +This interface can be exposed directly to consumers, or we can implement a message queue or secondary streaming service on top. +Using gRPC will provide us with all the regular advantages of gRPC and protobuf: versioning guarantees, +client side code generation, and interoperability with the many gRPC plugins and auxiliary services. Proceeding through a gRPC intermediate will provide additional overhead, in most cases this is not expected to be rate limiting but in instances where it is the developer can implement a more performant streaming mechanism for state listening. ### Configuration + We will provide detailed documentation on how to configure the state listeners and their external streaming services from within an app's `AppCreator`, -using the provided `AppOptions`. We will add two methods to the `BaseApp` to enable this configuration: +using the provided `AppOptions`. We will add a new method to the `BaseApp` to enable this configuration: ```go // SetCommitMultiStoreListeners sets the KVStore listeners for the provided StoreKey func (app *BaseApp) SetCommitMultiStoreListeners(key sdk.StoreKey, listeners []storeTypes.WriteListener) { app.cms.SetListeners(key, listeners) } +``` + +### TOML Configuration + +We will provide standard TOML configuration options for configuring the state listeners and their external streaming services. +Note: the actual namespace is TBD. + +```toml +[store] + listeners = [ # if len(listeners) > 0 we are listening + "file", + "grpc" + ] +``` + +We will also provide a mapping of these TOML configuration options to helper functions for setting up the specified +streaming service. + +```go +// StreamingServiceConstructor is used to construct and load a WriteListener onto the provided BaseApp and expose it over a streaming service +type StreamingServiceConstructor func(bApp *BaseApp, opts servertypes.AppOptions, keys []sdk.StoreKey) error + +// StreamingServiceType enum for specifying the type of StreamingService +type StreamingServiceType int + +const ( + Unknown StreamingServiceType = iota + File + GRPC +) + +// NewStreamingServiceType returns the StreamingServiceType corresponding to the provided name +func NewStreamingServiceType(name string) StreamingServiceType { + switch strings.ToLower(name) { + case "file", "f": + return File + case "grpc": + return GRPC + default: + return Unknown + } +} + +// String returns the string name of a StreamingServiceType +func (sst StreamingServiceType) String() string { + switch sst { + case File: + return "file" + case GRPC: + return "grpc" + default: + return "" + } +} + +// StreamingServiceConstructorLookupTable is a mapping of StreamingServiceTypes to StreamingServiceConstructors +var StreamingServiceConstructorLookupTable = map[StreamingServiceType]StreamingServiceConstructor{ + File: FileStreamingConstructor, + GRPC: GRPCStreamingConstructor, +} + +// NewStreamingServiceConstructor returns the StreamingServiceConstructor corresponding to the provided name +func NewStreamingServiceConstructor(name string) (StreamingServiceConstructor, error) { + ssType := NewStreamingServiceType(name) + if ssType == Unknown { + return nil, fmt.Errorf("unrecognized streaming service name %s", name) + } + if constructor, ok := StreamingServiceConstructorLookupTable[ssType]; ok { + return constructor, nil + } + return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String()) +} -// SetCacheListening turns on or off listening at the cache layer -func (app *BaseApp) SetCacheListening(listening bool) { - app.cms.CacheListening(listening) +// FileStreamingConstructor is the StreamingServiceConstructor function for writing out to a file +func FileStreamingConstructor(bApp *BaseApp, opts servertypes.AppOptions, keys []sdk.StoreKey) error { + ... +} + +// GRPCStreamingConstructor is the StreamingServiceConstructor function for writing out to a gRPC stream +func GRPCStreamingConstructor(bApp *BaseApp, opts servertypes.AppOptions, keys []sdk.StoreKey) error { + ... } ``` +### Example configuration + As a demonstration, we will implement the state watching features as part of SimApp. For example, the below is a very rudimentary integration of the state listening features into the SimApp `AppCreator` function: @@ -237,42 +376,30 @@ func NewSimApp( ) tkeys := sdk.NewTransientStoreKeys(paramstypes.TStoreKey) memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey) - + + // collect the keys for the stores we wish to expose + storeKeys := make([]storeTypes.StoreKey, 0, len(keys)) + for _, key := range keys { + storeKeys = append(storeKeys, key) + } // configure state listening capabilities using AppOptions - if cast.ToBool(appOpts.Get("simApp.listening")) { - writeDir := filepath.Clean(cast.ToString(appOpts.Get("simApp.listening.writeDir"))) - for _, key := range keys { - loadListener(bApp, writeDir, key) - } - for _, key := range tkeys { - loadListener(bApp, writeDir, key) + listeners := cast.ToStringSlice(appOpts.Get("store.listeners")) + for _, listenerName := range listeners { + constructor, err := baseapp.NewStreamingServiceConstructor(listenerName) + if err != nil { + tmos.Exit(err.Error()) // or continue? } - for _, key := range memKeys { - loadListener(bApp, writeDir, key) + if err := constructor(bApp, appOpts, storeKeys); err != nil { + tmos.Exit(err.Error()) } - bApp.SetCacheListening(cast.ToBool(appOpts.Get("simApp.cacheListening"))) } ... return app } - -// loadListener creates and adds to the BaseApp a listener that writes out to a file in the provided write directory -// The file is named after the StoreKey for the KVStore it listens to -func loadListener(bApp *baseapp.BaseApp, writeDir string, key sdk.StoreKey) { - writePath := filepath.Join(writeDir, key.Name()) - // TODO: how to handle graceful file closure? - fileHandler, err := os.OpenFile(writePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0664) - if err != nil { - tmos.Exit(err.Error()) - } - // using single io.Writer based listener - listener := storeTypes.NewGobWriteListener(fileHandler) - bApp.SetCommitMultiStoreListeners(key, []storeTypes.Listening{listener}) -} - ``` + ## Consequences These changes will provide a means of subscribing to KVStore state changes in real time. From 20d938a6efc668402c52b3f63e19dc03312b70ff Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 15 Dec 2020 14:35:59 -0600 Subject: [PATCH 05/10] review updates part 3: refactor after review to coordinate store changes with block and tx messages and enable file pruning --- docs/architecture/adr-038-state-listening.md | 262 +++++++++++++++---- 1 file changed, 212 insertions(+), 50 deletions(-) diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 904ba64db51a..2438a0086955 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -20,7 +20,7 @@ In addition to these request/response queries, it would be beneficial to have a ## Decision We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores. -We will also introduce two approaches for exposing the data to external consumers: writing to files and writing to a gRPC stream. +We will also introduce the tooling for writing these state changes out to a file. ### Listening interface @@ -117,7 +117,7 @@ func (l *NewlineWriteListener) OnWrite(key []byte, value []byte) { The former makes no assumptions about the presence of newline characters in keys or values, but values must be no longer than 1<<32 - 1. The latter assumes newlines are not present in keys or values but can support any length -of value. Both assume keys are no longer than 1<<16 - 1. Newline delineation improves durability by enabling a consumer to orient +of value. Both assume keys are no longer than 1<<16 - 1. Newline delineation improves readability by enabling a consumer to orient themselves at the start of a key-value pair at any point in the stream (e.g. tail a file), without character delineation a consumer must start at the beginning of the stream and not lose track of their position in the stream. @@ -178,6 +178,7 @@ type MultiStore interface { ListeningEnabled(key StoreKey) bool // SetListeners sets the WriteListeners for the KVStore belonging to the provided StoreKey + // It appends the listeners to a current set, if one already exists SetListeners(key StoreKey, listeners []WriteListener) } ``` @@ -233,63 +234,228 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore { ### Exposing the data -We will introduce and document mechanisms for exposing data from the above listeners to external consumers. +We will introduce a new `StreamingService` interface for exposing `WriteListener` data streams to external consumers. -#### Writing to file +```go +// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI context +type StreamingService interface { + Listeners() map[sdk.StoreKey][]storeTypes.WriteListener // returns the streaming service's listeners for the BaseApp to register + BeginBlockReq(req abci.RequestBeginBlock) // update the streaming service with the latest RequestBeginBlock message + BeginBlockResres abci.ResponseBeginBlock) // update the steaming service with the latest ResponseBeginBlock message + EndBlockReq(req abci.RequestEndBlock) // update the steaming service with the latest RequestEndBlock message + EndBlockRes(res abci.ResponseEndBlock) // update the steaming service with the latest ResponseEndBlock message + DeliverTxReq(req abci.RequestDeliverTx) // update the steaming service with the latest RequestDeliverTx message + DeliverTxRes(res abci.ResponseDeliverTx) // update the steaming service with the latest ResponseDeliverTx message +} +``` + +#### Writing state changes to files + +We will introduce an implementation of `StreamingService` which writes state changes out to a file. -We will document and provide examples of how to configure a listener to write out to a file. -No new type implementation will be needed, an `os.File` can be used as the underlying `io.Writer` for a `GobWriteListener`. +```go +// FileStreamingService is a concrete implementation of StreamingService that writes state changes out to a file +type FileStreamingService struct { + listeners map[sdk.StoreKey][]storeTypes.WriteListener // the listeners that will be initialized with BaseApp + writeDir string + filePrefix string + fileSuffix string + dst *os.File // the current write output file +} + +``` Writing to a file is the simplest approach for streaming the data out to consumers. This approach also provide the advantages of being persistent and durable, and the files can be read directly, or an auxiliary streaming services can tail the files and serve the data remotely. +#### File pruning + Without pruning the file size can grow indefinitely, this will need to be managed by the developer in an application or even module-specific manner (e.g. log rotation). -#### Writing to gRPC stream +### Configuration -We will implement and document an `io.Writer` type for exposing our listeners over a gRPC server stream. +We will provide detailed documentation on how to configure the state listeners and the file streaming service from within an app's `AppCreator`, +using the provided `AppOptions`. -Writing to a gRPC stream gRPC will allow us to expose the data over the standard gRPC interface. -This interface can be exposed directly to consumers, or we can implement a message queue or secondary streaming service on top. -Using gRPC will provide us with all the regular advantages of gRPC and protobuf: versioning guarantees, -client side code generation, and interoperability with the many gRPC plugins and auxiliary services. +#### BaseApp registration -Proceeding through a gRPC intermediate will provide additional overhead, in most cases this is not expected to be rate limiting but in -instances where it is the developer can implement a more performant streaming mechanism for state listening. +We will add a new method to the `BaseApp` to enable the registration of `StreamingService`s: -### Configuration +```go +// RegisterStreamingService is used to register a streaming service with the BaseApp +func (app *BaseApp) RegisterStreamingService(s StreamingService) { + // set the listeners for each StoreKey + for key, lis := range s.Listeners() { + app.cms.SetListeners(key, lis) + } + // register the streaming service within the BaseApp + // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context + app.streamingServices = append(app.streamingServices, serv) +} +``` + +We will also modify the `BeginBlock`, `EndBlock`, and `DeliverTx` methods to pass messages and responses to any `StreamingServices` registered +with the `BaseApp`. -We will provide detailed documentation on how to configure the state listeners and their external streaming services from within an app's `AppCreator`, -using the provided `AppOptions`. We will add a new method to the `BaseApp` to enable this configuration: ```go -// SetCommitMultiStoreListeners sets the KVStore listeners for the provided StoreKey -func (app *BaseApp) SetCommitMultiStoreListeners(key sdk.StoreKey, listeners []storeTypes.WriteListener) { - app.cms.SetListeners(key, listeners) +func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeginBlock) { + defer telemetry.MeasureSince(time.Now(), "abci", "begin_block") + + if app.cms.TracingEnabled() { + app.cms.SetTracingContext(sdk.TraceContext( + map[string]interface{}{"blockHeight": req.Header.Height}, + )) + } + + if err := app.validateHeight(req); err != nil { + panic(err) + } + + // Update any registered streaming services with the new RequestBeginBlock message + for _, streamingService := range app.streamingServices { + streamingSerice.BeginBlockReq(req) + } + + // Initialize the DeliverTx state. If this is the first block, it should + // already be initialized in InitChain. Otherwise app.deliverState will be + // nil, since it is reset on Commit. + if app.deliverState == nil { + app.setDeliverState(req.Header) + } else { + // In the first block, app.deliverState.ctx will already be initialized + // by InitChain. Context is now updated with Header information. + app.deliverState.ctx = app.deliverState.ctx. + WithBlockHeader(req.Header). + WithBlockHeight(req.Header.Height) + } + + // add block gas meter + var gasMeter sdk.GasMeter + if maxGas := app.getMaximumBlockGas(app.deliverState.ctx); maxGas > 0 { + gasMeter = sdk.NewGasMeter(maxGas) + } else { + gasMeter = sdk.NewInfiniteGasMeter() + } + + app.deliverState.ctx = app.deliverState.ctx.WithBlockGasMeter(gasMeter) + + if app.beginBlocker != nil { + res = app.beginBlocker(app.deliverState.ctx, req) + res.Events = sdk.MarkEventsToIndex(res.Events, app.indexEvents) + } + // set the signed validators for addition to context in deliverTx + app.voteInfos = req.LastCommitInfo.GetVotes() + + // Update any registered streaming services with the new ResponseBeginBlock message + for _ streamingService := range app.streamingServices { + streamingService.BeginBlockRes(res) + } + + return res } ``` -### TOML Configuration +```go +func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBlock) { + defer telemetry.MeasureSince(time.Now(), "abci", "end_block") -We will provide standard TOML configuration options for configuring the state listeners and their external streaming services. + if app.deliverState.ms.TracingEnabled() { + app.deliverState.ms = app.deliverState.ms.SetTracingContext(nil).(sdk.CacheMultiStore) + } + + // Update any registered streaming services with the new RequestEndBlock message + for _, streamingService := range app.streamingServices { + streamingService.EndBlockReq(req) + } + + if app.endBlocker != nil { + res = app.endBlocker(app.deliverState.ctx, req) + res.Events = sdk.MarkEventsToIndex(res.Events, app.indexEvents) + } + + if cp := app.GetConsensusParams(app.deliverState.ctx); cp != nil { + res.ConsensusParamUpdates = cp + } + + // Update any registered streaming services with the new RequestEndBlock message + for _, streamingService := range app.streamingServices { + streamingService.EndBlockRes(res) + } + + return res +} +``` + +```go +func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { + defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx") + + gInfo := sdk.GasInfo{} + resultStr := "successful" + + defer func() { + telemetry.IncrCounter(1, "tx", "count") + telemetry.IncrCounter(1, "tx", resultStr) + telemetry.SetGauge(float32(gInfo.GasUsed), "tx", "gas", "used") + telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted") + }() + + // Update any registered streaming services with the new RequestEndBlock message + for _, streamingService := range app.streamingServices { + streamingService.DeliverTxReq(req) + } + + gInfo, result, err := app.runTx(runTxModeDeliver, req.Tx) + if err != nil { + resultStr = "failed" + return sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace) + } + + res := abci.ResponseDeliverTx{ + GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints? + GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints? + Log: result.Log, + Data: result.Data, + Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents), + } + + // Update any registered streaming services with the new RequestEndBlock message + for _, streamingService := range app.streamingServices { + streamingService.DeliverTxRes(res) + } + + return res +} +``` + +#### TOML Configuration + +We will provide a standard TOML configuration options for configuring a `FileStreamingService` for specific `Store`s. Note: the actual namespace is TBD. ```toml [store] - listeners = [ # if len(listeners) > 0 we are listening + streamers = [ # if len(streamers) > 0 we are streamers "file", - "grpc" ] + +[streamers] + [streamers.file] + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streamer"] + writeDir = "path to the write directory" + filePrefix = "optional string to prefix the file names with" + fileSuffix = "optional string to suffix the file names with" ``` -We will also provide a mapping of these TOML configuration options to helper functions for setting up the specified +We will also provide a mapping of the TOML `store.streamer` configuration options to helper functions for constructing the specified streaming service. ```go -// StreamingServiceConstructor is used to construct and load a WriteListener onto the provided BaseApp and expose it over a streaming service -type StreamingServiceConstructor func(bApp *BaseApp, opts servertypes.AppOptions, keys []sdk.StoreKey) error +// StreamingServiceConstructor is used to construct a streaming service +type StreamingServiceConstructor func(opts servertypes.AppOptions, keys []sdk.StoreKey) (StreamingService, error) // StreamingServiceType enum for specifying the type of StreamingService type StreamingServiceType int @@ -297,7 +463,7 @@ type StreamingServiceType int const ( Unknown StreamingServiceType = iota File - GRPC + // add more in the future ) // NewStreamingServiceType returns the StreamingServiceType corresponding to the provided name @@ -305,8 +471,6 @@ func NewStreamingServiceType(name string) StreamingServiceType { switch strings.ToLower(name) { case "file", "f": return File - case "grpc": - return GRPC default: return Unknown } @@ -317,8 +481,6 @@ func (sst StreamingServiceType) String() string { switch sst { case File: return "file" - case GRPC: - return "grpc" default: return "" } @@ -327,7 +489,6 @@ func (sst StreamingServiceType) String() string { // StreamingServiceConstructorLookupTable is a mapping of StreamingServiceTypes to StreamingServiceConstructors var StreamingServiceConstructorLookupTable = map[StreamingServiceType]StreamingServiceConstructor{ File: FileStreamingConstructor, - GRPC: GRPCStreamingConstructor, } // NewStreamingServiceConstructor returns the StreamingServiceConstructor corresponding to the provided name @@ -342,18 +503,13 @@ func NewStreamingServiceConstructor(name string) (StreamingServiceConstructor, e return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String()) } -// FileStreamingConstructor is the StreamingServiceConstructor function for writing out to a file -func FileStreamingConstructor(bApp *BaseApp, opts servertypes.AppOptions, keys []sdk.StoreKey) error { - ... -} - -// GRPCStreamingConstructor is the StreamingServiceConstructor function for writing out to a gRPC stream -func GRPCStreamingConstructor(bApp *BaseApp, opts servertypes.AppOptions, keys []sdk.StoreKey) error { +// FileStreamingConstructor is the StreamingServiceConstructor function for creating a FileStreamingService +func FileStreamingConstructor(opts servertypes.AppOptions, keys []sdk.StoreKey) (StreamingService, error) { ... } ``` -### Example configuration +#### Example configuration As a demonstration, we will implement the state watching features as part of SimApp. For example, the below is a very rudimentary integration of the state listening features into the SimApp `AppCreator` function: @@ -374,24 +530,30 @@ func NewSimApp( govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey, evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey, ) - tkeys := sdk.NewTransientStoreKeys(paramstypes.TStoreKey) - memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey) - // collect the keys for the stores we wish to expose - storeKeys := make([]storeTypes.StoreKey, 0, len(keys)) - for _, key := range keys { - storeKeys = append(storeKeys, key) - } // configure state listening capabilities using AppOptions - listeners := cast.ToStringSlice(appOpts.Get("store.listeners")) + listeners := cast.ToStringSlice(appOpts.Get("store.streamers")) for _, listenerName := range listeners { + // get the store keys allowed to be exposed for this listener + exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", listenerName)) + exposeStoreKeys = make([]storeTypes.StoreKey, 0, len(exposeKeyStrs)) + for _, keyStr := range exposeKeyStrs { + if storeKey, ok := keys[keyStr]; ok { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + } + // get the constructor for this listener name constructor, err := baseapp.NewStreamingServiceConstructor(listenerName) if err != nil { tmos.Exit(err.Error()) // or continue? } - if err := constructor(bApp, appOpts, storeKeys); err != nil { + // generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose + streamingService, err := constructor(appOpts, exposeStoreKeys) + if err != nil { tmos.Exit(err.Error()) } + // register the streaming service with the BaseApp + bApp.RegisterStreamingService(streamingService) } ... From dd785c44503f289b01bb8eb0b1a9d1c67005a388 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Thu, 17 Dec 2020 12:50:14 -0600 Subject: [PATCH 06/10] review updates part 4: additional details and fixes; addressing recent feedback; use binary protobuf encoding for kv pairs in files --- CHANGELOG.md | 2 +- docs/architecture/adr-038-state-listening.md | 333 ++++++++++++------- 2 files changed, 223 insertions(+), 112 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13442e97c191..12a2d94b31dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -398,7 +398,7 @@ sure you are aware of any relevant breaking changes. * (x/auth) [\#5892](https://github.com/cosmos/cosmos-sdk/pull/5892) Add `RegisterKeyTypeCodec` to register new types (eg. keys) to the `auth` module internal amino codec. * (x/bank) [\#6536](https://github.com/cosmos/cosmos-sdk/pull/6536) Fix bug in `WriteGeneratedTxResponse` function used by multiple - REST endpoints. Now it writes a T===x in StdTx format. + REST endpoints. Now it writes a Tx in StdTx format. * (x/genutil) [\#5938](https://github.com/cosmos/cosmos-sdk/pull/5938) Fix `InitializeNodeValidatorFiles` error handling. * (x/gentx) [\#8183](https://github.com/cosmos/cosmos-sdk/pull/8183) change gentx cmd amount to arg from flag * (x/gov) [#7641](https://github.com/cosmos/cosmos-sdk/pull/7641) Fix tally calculation precision error. diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 2438a0086955..5df41e481da8 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -14,13 +14,14 @@ This ADR defines a set of changes to enable listening to state changes of indivi ## Context -Currently, KVStore data can be remotely accessed through [Queries](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/messages-and-queries.md#queries) which proceed through Tendermint and the ABCI. +Currently, KVStore data can be remotely accessed through [Queries](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/messages-and-queries.md#queries) +which proceed either through Tendermint and the ABCI, or through the gRPC server. In addition to these request/response queries, it would be beneficial to have a means of listening to state changes as they occur in real time. ## Decision We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores. -We will also introduce the tooling for writing these state changes out to a file. +We will also introduce the tooling for writing these state changes out to files and configuring this service. ### Listening interface @@ -29,98 +30,58 @@ In a new file- `store/types/listening.go`- we will create a `WriteListener` inte ```go // WriteListener interface for streaming data out from a listenkv.Store type WriteListener interface { - // if value is nil then it was deleted - OnWrite(key []byte, value []byte) + // if value is nil then it was deleted + //storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores + OnWrite(storeKey types.StoreKey, key []byte, value []byte) } ``` ### Listener type -We will create two concrete implementation of the `WriteListener` interface in `store/types/listening.go`. -One that writes out length-prefixed key-value pairs to an underlying `io.Writer`: +We will create a concrete implementation of the `WriteListener` interface in `store/types/listening.go`, that writes out protobuf +encoded KV pairs to an underlying `io.Writer`. -```go -// PrefixWriteListener is used to configure listening to a KVStore by writing out big endian length-prefixed -// key-value pairs to an io.Writer -type PrefixWriteListener struct { - writer io.Writer - prefixBuf [6]byte -} +This will include defining a simple protobuf type for the KV pairs. In addition to the key and value fields this message +will include the StoreKey for the originating KVStore so that we can write out from separate KVStores to the same stream/file +and determine the source of each KV pair. -// NewPrefixWriteListener wraps a PrefixWriteListener around an io.Writer -func NewPrefixWriteListener(w io.Writer) *PrefixWriteListener { - return &PrefixWriteListener{ - writer: w, - } -} - -// OnWrite satisfies the WriteListener interface by writing out big endian length-prefixed key-value pairs -// to an underlying io.Writer -// The first two bytes of the prefix encode the length of the key -// The last four bytes of the prefix encode the length of the value -// This WriteListener makes two assumptions -// 1) The key is no longer than 1<<16 - 1 -// 2) The value is no longer than 1<<32 - 1 -func (swl *PrefixWriteListener) OnWrite(key []byte, value []byte) { - keyLen := len(key) - valLen := len(key) - if keyLen > math.MaxUint16 || valLen > math.MaxUint32 { - return - } - binary.BigEndian.PutUint16(l.prefixBuf[:2], uint16(keyLen)) - binary.BigEndian.PutUint32(l.prefixBuf[2:], uint32(valLen)) - l.writer.Write(l.prefixBuf[:]) - l.writer.Write(key) - l.writer.Write(value) +```protobuf +message StoreKVPair { + optional string store_key = 1; + required bytes key = 2; + required bytes value = 3; } ``` -And one that writes out newline-delineated key-length-prefixed key-value pairs to an underlying io.Writer: - ```go -// NewlineWriteListener is used to configure listening to a KVStore by writing out big endian key-length-prefixed and -// newline delineated key-value pairs to an io.Writer -type NewlineWriteListener struct { - writer io.Writer - keyLenBuf [2]byte +// StoreKVPairWriteListener is used to configure listening to a KVStore by writing out length-prefixed +// protobuf encoded StoreKVPairs to an underlying io.Writer +type StoreKVPairWriteListener struct { + writer io.Writer + marshaler codec.BinaryMarshaler } -// NewNewlineWriteListener wraps a StockWriteListener around an io.Writer -func NewNewlineWriteListener(w io.Writer) *NewlineWriteListener { - return &NewlineWriteListener{ +// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryMarshaler +func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryMarshaler) *StoreKVPairWriteListener { + return &StoreKVPairWriteListener{ writer: w, + marshaler: m, } } -var newline = []byte("\n") - -// OnWrite satisfies WriteListener interface by writing out newline delineated big endian key-length-prefixed key-value -// pairs to the underlying io.Writer -// The first two bytes encode the length of the key -// Separate key-value pairs are newline delineated -// This WriteListener makes three assumptions -// 1) The key is no longer than 1<<16 - 1 -// 2) The value and keys contain no newline characters -func (l *NewlineWriteListener) OnWrite(key []byte, value []byte) { - keyLen := len(key) - if keyLen > math.MaxUint16 { - return +// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs +func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte) { + kvPair := new(types.StoreKVPair) + kvPair.StoreKey = storeKey.Name() + kvPair.Key = key + kvPair.Value = value + if by, err := wl.marshaler.MarshalBinaryLengthPrefixed(kvPair); err == nil { + wl.writer.Write(by) } - binary.BigEndian.PutUint16(l.keyLenBuf[:], uint16(keyLen)) - l.writer.Write(e.keyLenBuf[:]) - l.writer.Write(key) - l.writer.Write(value) - l.writer.Write(newline) } ``` -The former makes no assumptions about the presence of newline characters in keys or values, but values -must be no longer than 1<<32 - 1. The latter assumes newlines are not present in keys or values but can support any length -of value. Both assume keys are no longer than 1<<16 - 1. Newline delineation improves readability by enabling a consumer to orient -themselves at the start of a key-value pair at any point in the stream (e.g. tail a file), without character delineation a consumer must start -at the beginning of the stream and not lose track of their position in the stream. - ### ListenKVStore We will create a new `Store` type `listenkv.Store` that the `MultiStore` wraps around a `KVStore` to enable state listening. @@ -133,34 +94,34 @@ We can configure the `Store` with a set of `WriteListener`s which stream the out type Store struct { parent types.KVStore listeners []types.WriteListener + parentStoreKey types.StoreKey } // NewStore returns a reference to a new traceKVStore given a parent // KVStore implementation and a buffered writer. -func NewStore(parent types.KVStore, listeners []types.WriteListener) *Store { - return &Store{parent: parent, listeners: listeners} +func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteListener) *Store { + return &Store{parent: parent, listeners: listeners, parentStoreKey: psk} } // Set implements the KVStore interface. It traces a write operation and // delegates the Set call to the parent KVStore. -func (tkv *Store) Set(key []byte, value []byte) { +func (s *Store) Set(key []byte, value []byte) { types.AssertValidKey(key) - onWrite(tkv.listeners, key, value) - tkv.parent.Set(key, value) + s.parent.Set(key, value) + s.onWrite(key, value) } // Delete implements the KVStore interface. It traces a write operation and // delegates the Delete call to the parent KVStore. -func (tkv *Store) Delete(key []byte) { - onWrite(tkv.listeners, key, nil) - tkv.parent.Delete(key) +func (s *Store) Delete(key []byte) { + s.parent.Delete(key) + s.onWrite(key, nil) } // onWrite writes a KVStore operation to all of the WriteListeners -// Note: write out in a goroutine -func onWrite(listeners []types.WriteListener, key, value []byte) { - for _, l := range listeners { - l.OnWrite(key, value) +func (s *Store) onWrite(key, value []byte) { + for _, l := range s.listeners { + l.OnWrite(s.parentStoreKey, key, value) } } ``` @@ -188,14 +149,14 @@ type CacheWrap interface { ... // CacheWrapWithListeners recursively wraps again with listening enabled - CacheWrapWithListeners(listeners []WriteListener) CacheWrap + CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap } type CacheWrapper interface { ... // CacheWrapWithListeners recursively wraps again with listening enabled - CacheWrapWithListeners(listeners []WriteListener) CacheWrap + CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap } ``` @@ -212,7 +173,7 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext) } if rs.ListeningEnabled(key) { - store = listenkv.NewStore(store, rs.listeners[key]) + store = listenkv.NewStore(key, store, rs.listeners[key]) } return store @@ -239,45 +200,183 @@ We will introduce a new `StreamingService` interface for exposing `WriteListener ```go // StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI context type StreamingService interface { + Stream(wg *sync.WaitGroup, quitChan <-chan struct{}) // streaming service loop, awaits kv pairs and writes them to some destination stream or file Listeners() map[sdk.StoreKey][]storeTypes.WriteListener // returns the streaming service's listeners for the BaseApp to register BeginBlockReq(req abci.RequestBeginBlock) // update the streaming service with the latest RequestBeginBlock message - BeginBlockResres abci.ResponseBeginBlock) // update the steaming service with the latest ResponseBeginBlock message + BeginBlockRes(res abci.ResponseBeginBlock) // update the steaming service with the latest ResponseBeginBlock message EndBlockReq(req abci.RequestEndBlock) // update the steaming service with the latest RequestEndBlock message EndBlockRes(res abci.ResponseEndBlock) // update the steaming service with the latest ResponseEndBlock message DeliverTxReq(req abci.RequestDeliverTx) // update the steaming service with the latest RequestDeliverTx message - DeliverTxRes(res abci.ResponseDeliverTx) // update the steaming service with the latest ResponseDeliverTx message + DeliverTxRes(res abci.ResponseDeliverTx) // update the steaming service with the latest ResponseDeliverTx message } ``` #### Writing state changes to files -We will introduce an implementation of `StreamingService` which writes state changes out to a file. +We will introduce an implementation of `StreamingService` which writes state changes out to files as length-prefixed protobuf encoded `StoreKVPair`s. +This service uses the same `StoreKVPairWriteListener` for every KVStore, writing all the KV pairs from every KVStore +out to the same files, relying on the `StoreKey` field in the `StoreKVPair` protobuf message to later distinguish the source KVStore for each pair. + +The file naming schema is as such: +* After every `BeginBlock` request a new file is created with the name `block-{N}-begin`, where N is the block number. All +subsequent state changes are written out to this file until the first `DeliverTx` request is received. At the head of these files, + the length-prefixed protobuf encoded `BeginBlock` request is written, and the response is written at the tail. +* After every `DeliverTx` request a new file is created with the name `block-{N}-tx-{M}` where N is the block number and M +is the tx number in the block (i.e. 0, 1, 2...). All subsequent state changes are written out to this file until the next +`DeliverTx` request is received or an `EndBlock` request is received. At the head of these files, the length-prefixed protobuf + encoded `DeliverTx` request is written, and the response is written at the tail. +* After every `EndBlock` request a new file is created with the name `block-{N}-end`, where N is the block number. All +subsequent state changes are written out to this file until the next `BeginBlock` request is received. At the head of these files, + the length-prefixed protobuf encoded `EndBlock` request is written, and the response is written at the tail. ```go // FileStreamingService is a concrete implementation of StreamingService that writes state changes out to a file type FileStreamingService struct { listeners map[sdk.StoreKey][]storeTypes.WriteListener // the listeners that will be initialized with BaseApp - writeDir string - filePrefix string - fileSuffix string - dst *os.File // the current write output file + srcChan <-chan []byte // the channel that all of the WriteListeners write their out to + filePrefix string // optional prefix for each of the generated files + writeDir string // directory to write files into + dstFile *os.File // the current write output file + marshaler codec.BinaryMarshaler // marshaler used for re-marshalling the ABCI messages to write them out to the destination files + fileLock *sync.Mutex // mutex to sync access to the dst file since + // NOTE: I suspect this lock is unnecessary since everything above the FileStreamingService occurs synchronously, + // e.g. we dont need to worry about a kv pair being sent to the srcChan at the same time a new file is being generated +} +``` + +This streaming service uses a single instance of a simple intermediate `io.Writer` as the underlying `io.Writer` for the single `StoreKVPairWriteListener`, +collecting the KV pairs from every KVStore synchronously off of the same channel, and then writing +them out to the destination file generated for the current ABCI stage (as outlined above, with optional prefixes to avoid potential naming collisions +across separate instances). + +```go +// intermediateWriter is used so that we do not need to update the underlying io.Writer inside the StoreKVPairWriteListener +// everytime we begin writing to a new file +type intermediateWriter struct { + outChan chan <-[]byte +} + +// NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel +func NewIntermediateWriter(outChan chan <-[]byte) *intermediateWriter { + return &intermediateWriter{ + outChan: outChan, + } +} + +// Write satisfies io.Writer +func (iw *intermediateWriter) Write(b []byte) (int, error) { + iw.outChan <- b + return len(b), nil } +// NewFileStreamingService creates a new FileStreamingService for the provided writeDir, (optional) filePrefix, and storeKeys +func NewFileStreamingService(writeDir, filePrefix string, storeKeys []sdk.StoreKey, m codec.BinaryMarshaler) (*FileStreamingService, error) { + listenChan := make(chan []byte, 0) + iw := NewIntermediateWriter(listenChan) + listener := listen.NewStoreKVPairWriteListener(iw, m) + listners := make(map[sdk.StoreKey][]storeTypes.WriteListener, len(storeKeys)) + // in this case, we are using the same listener for each Store + for _, key := range storeKeys { + listeners[key] = listener + } + // check that the writeDir exists and is writeable so that we can catch the error here at initialization if it is not + // we don't open a dstFile until we receive our first ABCI message + if err := fileutil.IsDirWriteable(writeDir); err != nil { + return nil, err + } + return &FileStreamingService{ + listeners: listener + srcChan: listenChan, + filePrefix: filePrefix, + writeDir: writeDir, + marshaler: m, + fileLock: new(sync.Mutex), + }, nil +} + +// Listeners returns the StreamingService's underlying WriteListeners, use for registering them with the BaseApp +func (fss *FileStreamingService) Listeners() map[sdk.StoreKey][]storeTypes.WriteListener { + return fss.listeners +} + +func (fss *FileStreamingService) BeginBlockReq(req abci.RequestBeginBlock) { + // lock + // close the file currently at fss.dstFile + // update fss.dstFile with a new file generated using the begin block request info, per the naming schema + // marshall the request and write it at the head of the file + // unlock + // now all kv pair writes go to the new file +} + +func (fss *FileStreamingService) BeginBlockRes(res abci.ResponseBeginBlock) { + // lock + // marshall the response and write it to the tail of the current fss.dstFile + // unlock +} + +func (fss *FileStreamingService) EndBlockReq(req abci.RequestEndBlock) { + // lock + // close the file currently at fss.dstFile + // update fss.dstFile with a new file generated using the end block request info, per the naming schema + // marshall the request and write it at the head of the file + // unlock + // now all kv pair writes go to the new file +} + +func (fss *FileStreamingService) EndBlockRes(res abci.ResponseEndBlock) { + // lock + // marshall the response and write it at the tail of the current fss.dstFile + // unlock +} + +func (fss *FileStreamingService) DeliverTxReq(req abci.RequestDeliverTx) { + // lock + // close the file currently at fss.dstFile + // update fss.dstFile with a new file generated using the deliver tx request info, per the naming schema + // marshall the request and then write it at the head of that file + // unlock + // now all writes go to the new file +} + +func (fss *FileStreamingService) DeliverTxRes(res abci.ResponseDeliverTx) { + // lock + // marshall the response and write it at the tail of the current fss.dstFile + // unlock +} + +// Stream spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs to write out to the +// current destination file or a quit signal to shutdown the service +func (fss *FileStreamingService) Stream(wg *sync.WaitGroup, quitChan <-chan struct{}) { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-quitChan: + return + case by := <-fss.srcChan: + fss.fileLock.Wait() + fss.dstFile.Write(by) + } + } + }() +} ``` Writing to a file is the simplest approach for streaming the data out to consumers. This approach also provide the advantages of being persistent and durable, and the files can be read directly, -or an auxiliary streaming services can tail the files and serve the data remotely. +or an auxiliary streaming services can read from the files and serve the data over a remote interface. #### File pruning -Without pruning the file size can grow indefinitely, this will need to be managed by -the developer in an application or even module-specific manner (e.g. log rotation). +Without pruning the number of files can grow indefinitely, this will need to be managed by +the developer in an application or even module-specific manner. +The file naming schema facilitates pruning by block number and/or ABCI message. ### Configuration -We will provide detailed documentation on how to configure the state listeners and the file streaming service from within an app's `AppCreator`, -using the provided `AppOptions`. +We will provide detailed documentation on how to configure a `FileStreamingService` from within an app's `AppCreator`, +using the provided `AppOptions` and TOML configuration fields. #### BaseApp registration @@ -292,11 +391,11 @@ func (app *BaseApp) RegisterStreamingService(s StreamingService) { } // register the streaming service within the BaseApp // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context - app.streamingServices = append(app.streamingServices, serv) + app.streamingServices = append(app.streamingServices, serv) } ``` -We will also modify the `BeginBlock`, `EndBlock`, and `DeliverTx` methods to pass messages and responses to any `StreamingServices` registered +We will also modify the `BeginBlock`, `EndBlock`, and `DeliverTx` methods to pass ABCI requests and responses to any `StreamingServices` registered with the `BaseApp`. @@ -314,6 +413,7 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg panic(err) } + // NEW CODE HERE // Update any registered streaming services with the new RequestBeginBlock message for _, streamingService := range app.streamingServices { streamingSerice.BeginBlockReq(req) @@ -349,6 +449,7 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg // set the signed validators for addition to context in deliverTx app.voteInfos = req.LastCommitInfo.GetVotes() + // NEW CODE HERE // Update any registered streaming services with the new ResponseBeginBlock message for _ streamingService := range app.streamingServices { streamingService.BeginBlockRes(res) @@ -366,7 +467,8 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc app.deliverState.ms = app.deliverState.ms.SetTracingContext(nil).(sdk.CacheMultiStore) } - // Update any registered streaming services with the new RequestEndBlock message + // NEW CODE HERE + // Update any registered streaming services with the new RequestEndBlock message for _, streamingService := range app.streamingServices { streamingService.EndBlockReq(req) } @@ -380,6 +482,7 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc res.ConsensusParamUpdates = cp } + // NEW CODE HERE // Update any registered streaming services with the new RequestEndBlock message for _, streamingService := range app.streamingServices { streamingService.EndBlockRes(res) @@ -403,6 +506,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted") }() + // NEW CODE HERE // Update any registered streaming services with the new RequestEndBlock message for _, streamingService := range app.streamingServices { streamingService.DeliverTxReq(req) @@ -422,6 +526,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents), } + // NEW CODE HERE // Update any registered streaming services with the new RequestEndBlock message for _, streamingService := range app.streamingServices { streamingService.DeliverTxRes(res) @@ -433,12 +538,12 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx #### TOML Configuration -We will provide a standard TOML configuration options for configuring a `FileStreamingService` for specific `Store`s. +We will provide standard TOML configuration options for configuring a `FileStreamingService` for specific `Store`s. Note: the actual namespace is TBD. ```toml [store] - streamers = [ # if len(streamers) > 0 we are streamers + streamers = [ # if len(streamers) > 0 we are streaming "file", ] @@ -446,12 +551,11 @@ Note: the actual namespace is TBD. [streamers.file] keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streamer"] writeDir = "path to the write directory" - filePrefix = "optional string to prefix the file names with" - fileSuffix = "optional string to suffix the file names with" + prefix = "optional prefix to prepend to the generated file names" ``` -We will also provide a mapping of the TOML `store.streamer` configuration options to helper functions for constructing the specified -streaming service. +We will also provide a mapping of the TOML `store.streamers` "file" configuration option to a helper functions for constructing the specified +streaming service. In the future, as other streaming services are added, their constructors will be added here as well. ```go // StreamingServiceConstructor is used to construct a streaming service @@ -505,7 +609,9 @@ func NewStreamingServiceConstructor(name string) (StreamingServiceConstructor, e // FileStreamingConstructor is the StreamingServiceConstructor function for creating a FileStreamingService func FileStreamingConstructor(opts servertypes.AppOptions, keys []sdk.StoreKey) (StreamingService, error) { - ... + filePrefix := cast.ToString(opts.Get("streamers.file.prefix")) + fileDir := cast.ToString(opts.Get("streamers.file.writeDir")) + return streaming.NewFileStreamingService(fileDir, filePrefix, keys), nil } ``` @@ -534,7 +640,7 @@ func NewSimApp( // configure state listening capabilities using AppOptions listeners := cast.ToStringSlice(appOpts.Get("store.streamers")) for _, listenerName := range listeners { - // get the store keys allowed to be exposed for this listener + // get the store keys allowed to be exposed for this streaming service/state listeners exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", listenerName)) exposeStoreKeys = make([]storeTypes.StoreKey, 0, len(exposeKeyStrs)) for _, keyStr := range exposeKeyStrs { @@ -554,6 +660,11 @@ func NewSimApp( } // register the streaming service with the BaseApp bApp.RegisterStreamingService(streamingService) + // waitgroup and quit channel for optional shutdown coordination of the streaming service + wg := new(sync.WaitGroup) + quitChan := new(chan struct{})) + // kick off the background streaming service loop + streamingService.Stream(wg, quitChan) // maybe this should be done from inside BaseApp instead? } ... @@ -580,5 +691,5 @@ These changes will provide a means of subscribing to KVStore state changes in re ### Neutral -- Introduces additional- but optional- complexity to configuring and running a cosmos app +- Introduces additional- but optional- complexity to configuring and running a cosmos application - If an application developer opts to use these features to expose data, they need to be aware of the ramifications/risks of that data exposure as it pertains to the specifics of their application From 0ca6d09736911ea29feca57b1ba156cd57947112 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Fri, 8 Jan 2021 13:30:29 -0600 Subject: [PATCH 07/10] review updates part 5: formatting fixes; updated StreamingService/Hook interface --- docs/architecture/adr-038-state-listening.md | 235 ++++++------------- 1 file changed, 70 insertions(+), 165 deletions(-) diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 5df41e481da8..f1438945fa0c 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -25,20 +25,19 @@ We will also introduce the tooling for writing these state changes out to files ### Listening interface -In a new file- `store/types/listening.go`- we will create a `WriteListener` interface for streaming out state changes from a KVStore. +In a new file, `store/types/listening.go`, we will create a `WriteListener` interface for streaming out state changes from a KVStore. ```go // WriteListener interface for streaming data out from a listenkv.Store type WriteListener interface { // if value is nil then it was deleted - //storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores + // storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores OnWrite(storeKey types.StoreKey, key []byte, value []byte) } ``` ### Listener type - We will create a concrete implementation of the `WriteListener` interface in `store/types/listening.go`, that writes out protobuf encoded KV pairs to an underlying `io.Writer`. @@ -198,16 +197,18 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore { We will introduce a new `StreamingService` interface for exposing `WriteListener` data streams to external consumers. ```go -// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI context +// Hook interface used to hook into the ABCI message processing of the BaseApp +type Hook interface { + ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) // update the streaming service with the latest BeginBlock messages + ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) // update the steaming service with the latest EndBlock messages + ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) // update the steaming service with the latest DeliverTx messages +} + +// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks type StreamingService interface { Stream(wg *sync.WaitGroup, quitChan <-chan struct{}) // streaming service loop, awaits kv pairs and writes them to some destination stream or file Listeners() map[sdk.StoreKey][]storeTypes.WriteListener // returns the streaming service's listeners for the BaseApp to register - BeginBlockReq(req abci.RequestBeginBlock) // update the streaming service with the latest RequestBeginBlock message - BeginBlockRes(res abci.ResponseBeginBlock) // update the steaming service with the latest ResponseBeginBlock message - EndBlockReq(req abci.RequestEndBlock) // update the steaming service with the latest RequestEndBlock message - EndBlockRes(res abci.ResponseEndBlock) // update the steaming service with the latest ResponseEndBlock message - DeliverTxReq(req abci.RequestDeliverTx) // update the steaming service with the latest RequestDeliverTx message - DeliverTxRes(res abci.ResponseDeliverTx) // update the steaming service with the latest ResponseDeliverTx message + Hook } ``` @@ -215,7 +216,7 @@ type StreamingService interface { We will introduce an implementation of `StreamingService` which writes state changes out to files as length-prefixed protobuf encoded `StoreKVPair`s. This service uses the same `StoreKVPairWriteListener` for every KVStore, writing all the KV pairs from every KVStore -out to the same files, relying on the `StoreKey` field in the `StoreKVPair` protobuf message to later distinguish the source KVStore for each pair. +out to the same files, relying on the `StoreKey` field in the `StoreKVPair` protobuf message to later distinguish the source for each pair. The file naming schema is as such: * After every `BeginBlock` request a new file is created with the name `block-{N}-begin`, where N is the block number. All @@ -238,16 +239,14 @@ type FileStreamingService struct { writeDir string // directory to write files into dstFile *os.File // the current write output file marshaler codec.BinaryMarshaler // marshaler used for re-marshalling the ABCI messages to write them out to the destination files - fileLock *sync.Mutex // mutex to sync access to the dst file since - // NOTE: I suspect this lock is unnecessary since everything above the FileStreamingService occurs synchronously, - // e.g. we dont need to worry about a kv pair being sent to the srcChan at the same time a new file is being generated + stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received } ``` -This streaming service uses a single instance of a simple intermediate `io.Writer` as the underlying `io.Writer` for the single `StoreKVPairWriteListener`, -collecting the KV pairs from every KVStore synchronously off of the same channel, and then writing -them out to the destination file generated for the current ABCI stage (as outlined above, with optional prefixes to avoid potential naming collisions -across separate instances). +This streaming service uses a single instance of a simple intermediate `io.Writer` as the underlying `io.Writer` for its single `StoreKVPairWriteListener`, +It collects KV pairs from every KVStore synchronously off of the same channel, caching them in the order they are received, and then writing +them out to a file generated in response to an ABCI message hook. Files are named as outlined above, with optional prefixes to avoid potential naming collisions +across separate instances. ```go // intermediateWriter is used so that we do not need to update the underlying io.Writer inside the StoreKVPairWriteListener @@ -290,7 +289,8 @@ func NewFileStreamingService(writeDir, filePrefix string, storeKeys []sdk.StoreK filePrefix: filePrefix, writeDir: writeDir, marshaler: m, - fileLock: new(sync.Mutex), + stateCache: make([][]byte, 0), + cacheLock: new(sync.Mutex), }, nil } @@ -299,53 +299,35 @@ func (fss *FileStreamingService) Listeners() map[sdk.StoreKey][]storeTypes.Write return fss.listeners } -func (fss *FileStreamingService) BeginBlockReq(req abci.RequestBeginBlock) { - // lock - // close the file currently at fss.dstFile - // update fss.dstFile with a new file generated using the begin block request info, per the naming schema - // marshall the request and write it at the head of the file - // unlock - // now all kv pair writes go to the new file -} - -func (fss *FileStreamingService) BeginBlockRes(res abci.ResponseBeginBlock) { - // lock - // marshall the response and write it to the tail of the current fss.dstFile - // unlock +func (fss *FileStreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) { + // create a new file with the req info according to naming schema + // write req to file + // write all state changes cached for this stage to file + // reset cache + // write res to file + // close file } -func (fss *FileStreamingService) EndBlockReq(req abci.RequestEndBlock) { - // lock - // close the file currently at fss.dstFile - // update fss.dstFile with a new file generated using the end block request info, per the naming schema - // marshall the request and write it at the head of the file - // unlock - // now all kv pair writes go to the new file +func (fss *FileStreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) { + // create a new file with the req info according to naming schema + // write req to file + // write all state changes cached for this stage to file + // reset cache + // write res to file + // close file } -func (fss *FileStreamingService) EndBlockRes(res abci.ResponseEndBlock) { - // lock - // marshall the response and write it at the tail of the current fss.dstFile - // unlock +func (fss *FileStreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) { + // create a new file with the req info according to naming schema + // NOTE: if the tx failed, handle accordingly + // write req to file + // write all state changes cached for this stage to file + // reset cache + // write res to file + // close file } -func (fss *FileStreamingService) DeliverTxReq(req abci.RequestDeliverTx) { - // lock - // close the file currently at fss.dstFile - // update fss.dstFile with a new file generated using the deliver tx request info, per the naming schema - // marshall the request and then write it at the head of that file - // unlock - // now all writes go to the new file -} - -func (fss *FileStreamingService) DeliverTxRes(res abci.ResponseDeliverTx) { - // lock - // marshall the response and write it at the tail of the current fss.dstFile - // unlock -} - -// Stream spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs to write out to the -// current destination file or a quit signal to shutdown the service +// Stream spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs and caches them in the order they were received func (fss *FileStreamingService) Stream(wg *sync.WaitGroup, quitChan <-chan struct{}) { wg.Add(1) go func() { @@ -355,8 +337,7 @@ func (fss *FileStreamingService) Stream(wg *sync.WaitGroup, quitChan <-chan stru case <-quitChan: return case by := <-fss.srcChan: - fss.fileLock.Wait() - fss.dstFile.Write(by) + append(fss.stateCache, by) } } }() @@ -370,7 +351,7 @@ or an auxiliary streaming services can read from the files and serve the data ov #### File pruning Without pruning the number of files can grow indefinitely, this will need to be managed by -the developer in an application or even module-specific manner. +the developer in an application or even module-specific manner (e.g. log rotation). The file naming schema facilitates pruning by block number and/or ABCI message. ### Configuration @@ -384,75 +365,29 @@ We will add a new method to the `BaseApp` to enable the registration of `Streami ```go // RegisterStreamingService is used to register a streaming service with the BaseApp -func (app *BaseApp) RegisterStreamingService(s StreamingService) { +func (app *BaseApp) RegisterHooks(s StreamingService) { // set the listeners for each StoreKey for key, lis := range s.Listeners() { app.cms.SetListeners(key, lis) - } - // register the streaming service within the BaseApp - // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context - app.streamingServices = append(app.streamingServices, serv) + } + // register the streaming service hooks within the BaseApp + // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks + app.hooks = append(app.hooks, s) } ``` -We will also modify the `BeginBlock`, `EndBlock`, and `DeliverTx` methods to pass ABCI requests and responses to any `StreamingServices` registered +We will also modify the `BeginBlock`, `EndBlock`, and `DeliverTx` methods to pass ABCI requests and responses to any streaming service hooks registered with the `BaseApp`. ```go func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeginBlock) { - defer telemetry.MeasureSince(time.Now(), "abci", "begin_block") - - if app.cms.TracingEnabled() { - app.cms.SetTracingContext(sdk.TraceContext( - map[string]interface{}{"blockHeight": req.Header.Height}, - )) - } - - if err := app.validateHeight(req); err != nil { - panic(err) - } - // NEW CODE HERE - // Update any registered streaming services with the new RequestBeginBlock message - for _, streamingService := range app.streamingServices { - streamingSerice.BeginBlockReq(req) - } - - // Initialize the DeliverTx state. If this is the first block, it should - // already be initialized in InitChain. Otherwise app.deliverState will be - // nil, since it is reset on Commit. - if app.deliverState == nil { - app.setDeliverState(req.Header) - } else { - // In the first block, app.deliverState.ctx will already be initialized - // by InitChain. Context is now updated with Header information. - app.deliverState.ctx = app.deliverState.ctx. - WithBlockHeader(req.Header). - WithBlockHeight(req.Header.Height) - } - - // add block gas meter - var gasMeter sdk.GasMeter - if maxGas := app.getMaximumBlockGas(app.deliverState.ctx); maxGas > 0 { - gasMeter = sdk.NewGasMeter(maxGas) - } else { - gasMeter = sdk.NewInfiniteGasMeter() - } - - app.deliverState.ctx = app.deliverState.ctx.WithBlockGasMeter(gasMeter) - - if app.beginBlocker != nil { - res = app.beginBlocker(app.deliverState.ctx, req) - res.Events = sdk.MarkEventsToIndex(res.Events, app.indexEvents) - } - // set the signed validators for addition to context in deliverTx - app.voteInfos = req.LastCommitInfo.GetVotes() + ... - // NEW CODE HERE - // Update any registered streaming services with the new ResponseBeginBlock message - for _ streamingService := range app.streamingServices { - streamingService.BeginBlockRes(res) + // Call the streaming service hooks with the BeginBlock messages + for _ hook := range app.hooks { + hook.ListenBeginBlock(app.deliverState.ctx, req, res) } return res @@ -461,31 +396,12 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg ```go func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBlock) { - defer telemetry.MeasureSince(time.Now(), "abci", "end_block") - - if app.deliverState.ms.TracingEnabled() { - app.deliverState.ms = app.deliverState.ms.SetTracingContext(nil).(sdk.CacheMultiStore) - } - - // NEW CODE HERE - // Update any registered streaming services with the new RequestEndBlock message - for _, streamingService := range app.streamingServices { - streamingService.EndBlockReq(req) - } - - if app.endBlocker != nil { - res = app.endBlocker(app.deliverState.ctx, req) - res.Events = sdk.MarkEventsToIndex(res.Events, app.indexEvents) - } - - if cp := app.GetConsensusParams(app.deliverState.ctx); cp != nil { - res.ConsensusParamUpdates = cp - } - // NEW CODE HERE - // Update any registered streaming services with the new RequestEndBlock message - for _, streamingService := range app.streamingServices { - streamingService.EndBlockRes(res) + ... + + // Call the streaming service hooks with the EndBlock messages + for _, hook := range app.hooks { + hook.ListenEndBlock(app.deliverState.ctx, req, res) } return res @@ -494,28 +410,18 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc ```go func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { - defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx") - - gInfo := sdk.GasInfo{} - resultStr := "successful" - - defer func() { - telemetry.IncrCounter(1, "tx", "count") - telemetry.IncrCounter(1, "tx", resultStr) - telemetry.SetGauge(float32(gInfo.GasUsed), "tx", "gas", "used") - telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted") - }() - // NEW CODE HERE - // Update any registered streaming services with the new RequestEndBlock message - for _, streamingService := range app.streamingServices { - streamingService.DeliverTxReq(req) - } + ... gInfo, result, err := app.runTx(runTxModeDeliver, req.Tx) if err != nil { resultStr = "failed" - return sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace) + res := sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace) + // If we throw and error, be sure to still call the streaming service's hook + for _, hook := range app.hooks { + hook.ListenDeliverTx(app.deliverState.ctx, req, res) + } + return res } res := abci.ResponseDeliverTx{ @@ -526,10 +432,9 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents), } - // NEW CODE HERE - // Update any registered streaming services with the new RequestEndBlock message - for _, streamingService := range app.streamingServices { - streamingService.DeliverTxRes(res) + // Call the streaming service hooks with the DeliverTx messages + for _, hook := range app.hook { + hook.ListenDeliverTx(app.deliverState.ctx, req, res) } return res @@ -549,7 +454,7 @@ Note: the actual namespace is TBD. [streamers] [streamers.file] - keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streamer"] + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] writeDir = "path to the write directory" prefix = "optional prefix to prepend to the generated file names" ``` From 94535a50e43b32986f2cb9205a1be599b860a45a Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Thu, 14 Jan 2021 15:39:37 -0600 Subject: [PATCH 08/10] auxiliary streaming/queue service --- docs/architecture/adr-038-state-listening.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index f1438945fa0c..661ffe13f8c3 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -348,11 +348,18 @@ Writing to a file is the simplest approach for streaming the data out to consume This approach also provide the advantages of being persistent and durable, and the files can be read directly, or an auxiliary streaming services can read from the files and serve the data over a remote interface. +#### Auxiliary streaming service + +We will create a separate standalone process that reads and internally queues the state as it is written out to these files +and serves the data over a gRPC API. This API will allow filtering of requested data, e.g. by block number, block/tx hash, ABCI message type, +whether a DeliverTx message failed or succeeded, etc. + #### File pruning -Without pruning the number of files can grow indefinitely, this will need to be managed by +Without pruning the number of files can grow indefinitely, this may need to be managed by the developer in an application or even module-specific manner (e.g. log rotation). The file naming schema facilitates pruning by block number and/or ABCI message. +The gRPC auxiliary streaming service introduced above will include an option to remove the files as it consumes their data. ### Configuration From f82bbfecd3e7ed40f2246eba376872cd6a706739 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 26 Jan 2021 13:52:11 -0600 Subject: [PATCH 09/10] review updates part 6: update StoreKVPair to differentiate between Set and Deletes on nil byte values; some minor adjustments --- docs/architecture/adr-038-state-listening.md | 41 +++++++++++--------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 661ffe13f8c3..6e255dbce90f 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -32,7 +32,8 @@ In a new file, `store/types/listening.go`, we will create a `WriteListener` inte type WriteListener interface { // if value is nil then it was deleted // storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores - OnWrite(storeKey types.StoreKey, key []byte, value []byte) + // set bool indicates if it was a set; true: set, false: delete + OnWrite(storeKey types.StoreKey, set bool, key []byte, value []byte) } ``` @@ -47,9 +48,10 @@ and determine the source of each KV pair. ```protobuf message StoreKVPair { - optional string store_key = 1; - required bytes key = 2; - required bytes value = 3; + optional string store_key = 1; // the store key for the KVStore this pair originates from + required bool set = 2; // true indicates a set operation, false indicates a delete operation + required bytes key = 3; + required bytes value = 4; } ``` @@ -58,24 +60,25 @@ message StoreKVPair { // protobuf encoded StoreKVPairs to an underlying io.Writer type StoreKVPairWriteListener struct { writer io.Writer - marshaler codec.BinaryMarshaler + marshaller codec.BinaryMarshaler } // NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryMarshaler func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryMarshaler) *StoreKVPairWriteListener { return &StoreKVPairWriteListener{ writer: w, - marshaler: m, + marshaller: m, } } // OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs -func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte) { +func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, set bool, key []byte, value []byte) { kvPair := new(types.StoreKVPair) kvPair.StoreKey = storeKey.Name() + kvPair.Set = set kvPair.Key = key kvPair.Value = value - if by, err := wl.marshaler.MarshalBinaryLengthPrefixed(kvPair); err == nil { + if by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair); err == nil { wl.writer.Write(by) } } @@ -107,20 +110,20 @@ func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteL func (s *Store) Set(key []byte, value []byte) { types.AssertValidKey(key) s.parent.Set(key, value) - s.onWrite(key, value) + s.onWrite(true, key, value) } // Delete implements the KVStore interface. It traces a write operation and // delegates the Delete call to the parent KVStore. func (s *Store) Delete(key []byte) { s.parent.Delete(key) - s.onWrite(key, nil) + s.onWrite(false, key, nil) } // onWrite writes a KVStore operation to all of the WriteListeners -func (s *Store) onWrite(key, value []byte) { +func (s *Store) onWrite(set bool, key, value []byte) { for _, l := range s.listeners { - l.OnWrite(s.parentStoreKey, key, value) + l.OnWrite(s.parentStoreKey, set, key, value) } } ``` @@ -234,11 +237,11 @@ subsequent state changes are written out to this file until the next `BeginBlock // FileStreamingService is a concrete implementation of StreamingService that writes state changes out to a file type FileStreamingService struct { listeners map[sdk.StoreKey][]storeTypes.WriteListener // the listeners that will be initialized with BaseApp - srcChan <-chan []byte // the channel that all of the WriteListeners write their out to + srcChan <-chan []byte // the channel that all of the WriteListeners write their data out to filePrefix string // optional prefix for each of the generated files writeDir string // directory to write files into dstFile *os.File // the current write output file - marshaler codec.BinaryMarshaler // marshaler used for re-marshalling the ABCI messages to write them out to the destination files + marshaller codec.BinaryMarshaler // marshaller used for re-marshalling the ABCI messages to write them out to the destination files stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received } ``` @@ -284,13 +287,12 @@ func NewFileStreamingService(writeDir, filePrefix string, storeKeys []sdk.StoreK return nil, err } return &FileStreamingService{ - listeners: listener + listeners: listeners, srcChan: listenChan, filePrefix: filePrefix, writeDir: writeDir, - marshaler: m, + marshaller: m, stateCache: make([][]byte, 0), - cacheLock: new(sync.Mutex), }, nil } @@ -300,6 +302,7 @@ func (fss *FileStreamingService) Listeners() map[sdk.StoreKey][]storeTypes.Write } func (fss *FileStreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) { + // NOTE: this could either be done synchronously or asynchronously // create a new file with the req info according to naming schema // write req to file // write all state changes cached for this stage to file @@ -309,6 +312,7 @@ func (fss *FileStreamingService) ListenBeginBlock(ctx sdk.Context, req abci.Requ } func (fss *FileStreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) { + // NOTE: this could either be done synchronously or asynchronously // create a new file with the req info according to naming schema // write req to file // write all state changes cached for this stage to file @@ -318,6 +322,7 @@ func (fss *FileStreamingService) ListenEndBlock(ctx sdk.Context, req abci.Reques } func (fss *FileStreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) { + // NOTE: this could either be done synchronously or asynchronously // create a new file with the req info according to naming schema // NOTE: if the tx failed, handle accordingly // write req to file @@ -352,7 +357,7 @@ or an auxiliary streaming services can read from the files and serve the data ov We will create a separate standalone process that reads and internally queues the state as it is written out to these files and serves the data over a gRPC API. This API will allow filtering of requested data, e.g. by block number, block/tx hash, ABCI message type, -whether a DeliverTx message failed or succeeded, etc. +whether a DeliverTx message failed or succeeded, etc. In addition to unary RPC endpoints this service will expose `stream` RPC endpoints for realtime subscriptions. #### File pruning From 27a462c5b4a1a68f6240c535c03373a17beb430e Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Thu, 4 Feb 2021 12:24:50 -0600 Subject: [PATCH 10/10] typo fix --- docs/architecture/adr-038-state-listening.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 6e255dbce90f..cd78e72e2caa 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -350,7 +350,7 @@ func (fss *FileStreamingService) Stream(wg *sync.WaitGroup, quitChan <-chan stru ``` Writing to a file is the simplest approach for streaming the data out to consumers. -This approach also provide the advantages of being persistent and durable, and the files can be read directly, +This approach also provides the advantages of being persistent and durable, and the files can be read directly, or an auxiliary streaming services can read from the files and serve the data over a remote interface. #### Auxiliary streaming service