Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration #8664

Merged
merged 54 commits into from
Oct 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
0a88c29
adjust KVStores to fit new CacheWrapper interface
i-norden Feb 8, 2021
875f23e
adjust multistores to fit new MultiStore interface and enable wrappin…
i-norden Feb 8, 2021
5e02e3d
update server mock KVStore and MultiStore
i-norden Feb 9, 2021
c394e5d
fix bug identified in CI
i-norden Feb 9, 2021
c92556a
improve codecov, minor fixes/adjustments
i-norden Feb 10, 2021
18bf622
review fixes
i-norden Feb 24, 2021
d92f1d0
review updates; flip set to delete in KVStorePair, updated proto-docs…
i-norden Mar 5, 2021
72d6a88
adjust KVStores to fit new CacheWrapper interface
i-norden Feb 8, 2021
602867e
review fixes
i-norden Feb 24, 2021
93dbea9
adjust KVStores to fit new CacheWrapper interface
i-norden Feb 8, 2021
441af0e
review fixes
i-norden Feb 24, 2021
f7ecbcb
hook and streaming service interfaces
i-norden Feb 8, 2021
5f74b68
integrate Hooks and StreamingService into BaseApp
i-norden Feb 8, 2021
d3c44a4
begin file streaming service implementation
i-norden Feb 8, 2021
7cc4dbd
update Hook interface to return errors so that they can be logged at …
i-norden Feb 11, 2021
7d2e9d6
finish implementation of the file streaming service
i-norden Feb 11, 2021
8f72e00
streaming service unit tests; minor adjustments
i-norden Feb 18, 2021
613a4c5
streaming service constuctor, constructor unit test, update adr
i-norden Feb 22, 2021
c6ad1ed
example toml configuration
i-norden Feb 22, 2021
7198289
ci/linting fixes
i-norden Feb 22, 2021
2fffbd0
simapp integration
i-norden Mar 1, 2021
d398cbf
update changelog
i-norden Mar 3, 2021
e0a1f32
documentation for configuring and using a StreamingService
i-norden Mar 3, 2021
caf96a0
update to use new KVStorePair type
i-norden Mar 5, 2021
6fdb3c1
fix double cache wrap issue; prefer wrapping with listener vs tracer
i-norden Mar 30, 2021
921f289
review refactor
i-norden Apr 16, 2021
d5bbb0a
fix linting
i-norden Apr 16, 2021
901e62f
review fixes
i-norden Apr 20, 2021
6cf023f
adjustments after rebase
i-norden Jun 4, 2021
dfe63d8
Merge branch 'master' into adr038_streamingservice
i-norden Aug 10, 2021
20087a1
Merge branch 'master' into adr038_streamingservice
i-norden Sep 7, 2021
5af6d7e
Merge branch 'master' into adr038_streamingservice
tac0turtle Sep 16, 2021
3b0f7ef
Merge branch 'master' into adr038_streamingservice
i-norden Sep 20, 2021
d29f880
add state cache mutex to prevent race condition detection error; alth…
i-norden Sep 20, 2021
c72a44d
Merge branch 'master' into adr038_streamingservice
tac0turtle Sep 21, 2021
3fa194d
review updates
i-norden Sep 23, 2021
fe4c30e
Merge branch 'master' into adr038_streamingservice
i-norden Sep 23, 2021
7584a9f
Merge branch 'master' into adr038_streamingservice
i-norden Oct 1, 2021
942a4f4
review fixes
i-norden Oct 1, 2021
5d69858
skip finicky test in CI environment
i-norden Oct 5, 2021
ee0fa3d
Merge branch 'master' into adr038_streamingservice
i-norden Oct 5, 2021
1ff64ee
adjustments after rebase
i-norden Oct 5, 2021
de3c42a
add store.md section for listenkv
i-norden Oct 5, 2021
ba033ac
review adjustments
i-norden Oct 14, 2021
748981b
Merge branch 'master' into adr038_streamingservice
i-norden Oct 14, 2021
51a5485
review fixes
i-norden Oct 18, 2021
64cb321
Merge branch 'master' into adr038_streamingservice
i-norden Oct 18, 2021
35c7ad1
Merge branch 'master' into adr038_streamingservice
i-norden Oct 20, 2021
a8c74f6
not sure why the unit tests are failing to read from the github actio…
i-norden Oct 20, 2021
70451e8
Merge branch 'master' into adr038_streamingservice
i-norden Oct 20, 2021
20da072
Merge branch 'master' into adr038_streamingservice
i-norden Oct 21, 2021
4ab7ca7
Merge branch 'master' into adr038_streamingservice
i-norden Oct 22, 2021
7a4b4f2
Merge branch 'master' into adr038_streamingservice
tac0turtle Oct 24, 2021
ea352a7
Merge branch 'master' into adr038_streamingservice
tac0turtle Oct 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [\#9776](https://github.com/cosmos/cosmos-sdk/pull/9776) Add flag `staking-bond-denom` to specify the staking bond denomination value when initializing a new chain.
* [\#9533](https://github.com/cosmos/cosmos-sdk/pull/9533) Added a new gRPC method, `DenomOwners`, in `x/bank` to query for all account holders of a specific denomination.
* (bank) [\#9618](https://github.com/cosmos/cosmos-sdk/pull/9618) Update bank.Metadata: add URI and URIHash attributes.
* (store) [\#8664](https://github.com/cosmos/cosmos-sdk/pull/8664) Implementation of ADR-038 file StreamingService
* [\#9837](https://github.com/cosmos/cosmos-sdk/issues/9837) `--generate-only` flag will accept the keyname now.
* [\#10045](https://github.com/cosmos/cosmos-sdk/pull/10045) Revert [#8549](https://github.com/cosmos/cosmos-sdk/pull/8549). Do not route grpc queries through Tendermint.
* [\#10326](https://github.com/cosmos/cosmos-sdk/pull/10326) `x/authz` add query all grants by granter query.
Expand Down
31 changes: 28 additions & 3 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
}
// set the signed validators for addition to context in deliverTx
app.voteInfos = req.LastCommitInfo.GetVotes()

// call the hooks with the BeginBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
}
}

return res
}

Expand All @@ -215,6 +223,13 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
res.ConsensusParamUpdates = cp
}

// call the streaming service hooks with the EndBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
}
}

return res
}

Expand Down Expand Up @@ -262,15 +277,25 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")

var res abci.ResponseDeliverTx
defer func() {
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
}
}
}()
tx, err := app.txDecoder(req.Tx)
if err != nil {
return sdkerrors.ResponseDeliverTx(err, 0, 0, app.trace)
res = sdkerrors.ResponseDeliverTx(err, 0, 0, app.trace)
return res
}

ctx := app.getContextForTx(runTxModeDeliver, req.Tx)
res, err := app.txHandler.DeliverTx(ctx, tx, req)
res, err = app.txHandler.DeliverTx(ctx, tx, req)
if err != nil {
return sdkerrors.ResponseDeliverTx(err, uint64(res.GasUsed), uint64(res.GasWanted), app.trace)
res = sdkerrors.ResponseDeliverTx(err, uint64(res.GasUsed), uint64(res.GasWanted), app.trace)
return res
}

return res
Expand Down
4 changes: 4 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ type BaseApp struct { // nolint: maligned
// indexEvents defines the set of events in the form {eventType}.{attributeKey},
// which informs Tendermint what to index. If empty, all events will be indexed.
indexEvents map[string]struct{}

// abciListeners for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
abciListeners []ABCIListener
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
11 changes: 11 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,14 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) {
app.interfaceRegistry = registry
app.grpcQueryRouter.SetInterfaceRegistry(registry)
}

// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
// add the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
i-norden marked this conversation as resolved.
Show resolved Hide resolved
}
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, s)
}
33 changes: 33 additions & 0 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package baseapp

import (
"io"
"sync"

abci "github.com/tendermint/tendermint/abci/types"

store "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/types"
)

// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService interface {
i-norden marked this conversation as resolved.
Show resolved Hide resolved
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
Stream(wg *sync.WaitGroup) error
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[store.StoreKey][]store.WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener
// Closer interface
io.Closer
}
Loading