From 6c10a6ce1209188c89a0017101bb2fa65fcc07ab Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Wed, 14 Aug 2024 14:10:02 -0400 Subject: [PATCH 1/3] feat(schema/appdata)!: efficiency improvement aligned with server/v2 --- core/store/changeset.go | 2 +- schema/appdata/async.go | 13 +++-- schema/appdata/batch.go | 41 ++++++++++++++++ schema/appdata/batch_test.go | 81 ++++++++++++++++++++++++++++++++ schema/appdata/data.go | 38 ++++++++++----- schema/appdata/listener.go | 9 ++++ schema/appdata/mux.go | 10 ++++ schema/appdata/packet.go | 2 + schema/decoding/decoding_test.go | 12 +++-- schema/decoding/middleware.go | 56 +++++++++++++--------- schema/decoding/resolver.go | 21 +++++++++ 11 files changed, 242 insertions(+), 43 deletions(-) create mode 100644 schema/appdata/batch.go create mode 100644 schema/appdata/batch_test.go diff --git a/core/store/changeset.go b/core/store/changeset.go index b1233e8abfaa..35837059c627 100644 --- a/core/store/changeset.go +++ b/core/store/changeset.go @@ -10,7 +10,7 @@ type Changeset struct { } // StateChanges represents a set of changes to the state of an actor in storage. -type StateChanges struct { +type StateChanges = struct { Actor []byte // actor represents the space in storage where state is stored, previously this was called a "storekey" StateChanges KVPairs // StateChanges is a list of key-value pairs representing the changes to the state. } diff --git a/schema/appdata/async.go b/schema/appdata/async.go index 4112ae839fe9..e27b57a22804 100644 --- a/schema/appdata/async.go +++ b/schema/appdata/async.go @@ -151,11 +151,14 @@ func AsyncListener(opts AsyncListenerOptions, commitChan chan<- error, listener } } - if listener.Commit != nil { - res.Commit = func(data CommitData) error { - packetChan <- data - return nil - } + res.Commit = func(data CommitData) error { + packetChan <- data + return nil + } + + res.onBatch = func(batch PacketBatch) error { + packetChan <- batch + return nil } return res diff --git a/schema/appdata/batch.go b/schema/appdata/batch.go new file mode 100644 index 000000000000..1d4802092bea --- /dev/null +++ b/schema/appdata/batch.go @@ -0,0 +1,41 @@ +package appdata + +// BatchablePacket is the interface that packet types which can be batched implement. +// All types that implement Packet except CommitData also implement BatchablePacket. +// CommitData should not be batched because it forces synchronization of asynchronous listeners. +type BatchablePacket interface { + Packet + isBatchablePacket() +} + +// PacketBatch is a batch of packets that can be sent to a listener. +// If listener processing is asynchronous, the batch of packets will be sent +// all at once in a single operation which can be more efficient than sending +// each packet individually. +type PacketBatch []BatchablePacket + +func (p PacketBatch) apply(l *Listener) error { + if l.onBatch != nil { + return l.onBatch(p) + } + + for _, packet := range p { + if err := packet.apply(l); err != nil { + return err + } + } + + return nil +} + +func (ModuleInitializationData) isBatchablePacket() {} + +func (StartBlockData) isBatchablePacket() {} + +func (TxData) isBatchablePacket() {} + +func (EventData) isBatchablePacket() {} + +func (KVPairData) isBatchablePacket() {} + +func (ObjectUpdateData) isBatchablePacket() {} diff --git a/schema/appdata/batch_test.go b/schema/appdata/batch_test.go new file mode 100644 index 000000000000..b704547c04cd --- /dev/null +++ b/schema/appdata/batch_test.go @@ -0,0 +1,81 @@ +package appdata + +import ( + "context" + "reflect" + "testing" +) + +func TestBatch(t *testing.T) { + l, got := batchListener() + + if err := l.SendPacket(testBatch); err != nil { + t.Error(err) + } + + if !reflect.DeepEqual(*got, testBatch) { + t.Errorf("got %v, expected %v", *got, testBatch) + } +} + +var testBatch = PacketBatch{ + ModuleInitializationData{}, + StartBlockData{}, + TxData{}, + EventData{}, + KVPairData{}, + ObjectUpdateData{}, +} + +func batchListener() (Listener, *PacketBatch) { + var got = new(PacketBatch) + l := Listener{ + InitializeModuleData: func(m ModuleInitializationData) error { + *got = append(*got, m) + return nil + }, + StartBlock: func(b StartBlockData) error { + *got = append(*got, b) + return nil + }, + OnTx: func(t TxData) error { + *got = append(*got, t) + return nil + }, + OnEvent: func(e EventData) error { + *got = append(*got, e) + return nil + }, + OnKVPair: func(k KVPairData) error { + *got = append(*got, k) + return nil + }, + OnObjectUpdate: func(o ObjectUpdateData) error { + *got = append(*got, o) + return nil + }, + } + + return l, got +} + +func TestBatchAsync(t *testing.T) { + l, got := batchListener() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + l = AsyncListenerMux(AsyncListenerOptions{Context: ctx}, l) + + if err := l.SendPacket(testBatch); err != nil { + t.Error(err) + } + + // commit to synchronize + if err := l.Commit(CommitData{}); err != nil { + t.Error(err) + } + + if !reflect.DeepEqual(*got, testBatch) { + t.Errorf("got %v, expected %v", *got, testBatch) + } +} diff --git a/schema/appdata/data.go b/schema/appdata/data.go index 7e02fbc5db8f..1ef3f012c11e 100644 --- a/schema/appdata/data.go +++ b/schema/appdata/data.go @@ -41,8 +41,14 @@ type TxData struct { JSON ToJSON } -// EventData represents event data that is passed to a listener. +// EventData represents event data that is passed to a listener when events are received. type EventData struct { + // Events are the events that are received. + Events []Event +} + +// Event represents the data for a single event. +type Event struct { // TxIndex is the index of the transaction in the block to which this event is associated. // It should be set to a negative number if the event is not associated with a transaction. // Canonically -1 should be used to represent begin block processing and -2 should be used to @@ -52,16 +58,23 @@ type EventData struct { // MsgIndex is the index of the message in the transaction to which this event is associated. // If TxIndex is negative, this index could correspond to the index of the message in // begin or end block processing if such indexes exist, or it can be set to zero. - MsgIndex uint32 + MsgIndex int32 // EventIndex is the index of the event in the message to which this event is associated. - EventIndex uint32 + EventIndex int32 // Type is the type of the event. Type string - // Data is the JSON representation of the event data. It should generally be a JSON object. + // Data lazily returns the JSON representation of the event. Data ToJSON + + // Attributes lazily returns the key-value attribute representation of the event. + Attributes ToEventAttributes +} + +type EventAttribute = struct { + Key, Value string } // ToBytes is a function that lazily returns the raw byte representation of data. @@ -70,18 +83,21 @@ type ToBytes = func() ([]byte, error) // ToJSON is a function that lazily returns the JSON representation of data. type ToJSON = func() (json.RawMessage, error) +// ToEventAttributes is a function that lazily returns the key-value attribute representation of an event. +type ToEventAttributes = func() ([]EventAttribute, error) + // KVPairData represents a batch of key-value pair data that is passed to a listener. type KVPairData struct { - Updates []ModuleKVPairUpdate + Updates []ActorKVPairUpdate } -// ModuleKVPairUpdate represents a key-value pair update for a specific module. -type ModuleKVPairUpdate struct { - // ModuleName is the name of the module that the key-value pair belongs to. - ModuleName string +// ActorKVPairUpdate represents a key-value pair update for a specific module or account. +type ActorKVPairUpdate = struct { + // Actor is the byte representation of the module or account that is updating the key-value pair. + Actor []byte - // Update is the key-value pair update. - Update schema.KVPairUpdate + // StateChanges are key-value pair updates. + StateChanges []schema.KVPairUpdate } // ObjectUpdateData represents object update data that is passed to a listener. diff --git a/schema/appdata/listener.go b/schema/appdata/listener.go index d4786cb02564..9d6903f2418f 100644 --- a/schema/appdata/listener.go +++ b/schema/appdata/listener.go @@ -37,5 +37,14 @@ type Listener struct { // indexers should commit their data when this is called and return an error if // they are unable to commit. Data sources MUST call Commit when data is committed, // otherwise it should be assumed that indexers have not persisted their state. + // When listener processing is pushed into background go routines using AsyncListener + // or AsyncListenerMux, Commit will synchronize the processing of all listeners and + // is a blocking call. Producers that do not want to block on Commit in a given block + // can delay calling Commit until the start of the next block to give listener + // processing time to complete. Commit func(CommitData) error + + // onBatch can be used internally to efficiently forward packet batches to + // async listeners. + onBatch func(PacketBatch) error } diff --git a/schema/appdata/mux.go b/schema/appdata/mux.go index 8e6b886577d2..6805298e6a8a 100644 --- a/schema/appdata/mux.go +++ b/schema/appdata/mux.go @@ -124,5 +124,15 @@ func ListenerMux(listeners ...Listener) Listener { } } + mux.onBatch = func(batch PacketBatch) error { + for _, listener := range listeners { + err := batch.apply(&listener) + if err != nil { + return err + } + } + return nil + } + return mux } diff --git a/schema/appdata/packet.go b/schema/appdata/packet.go index e5fe6be966b7..9833c7c87c4f 100644 --- a/schema/appdata/packet.go +++ b/schema/appdata/packet.go @@ -2,6 +2,8 @@ package appdata // Packet is the interface that all listener data structures implement so that this data can be "packetized" // and processed in a stream, possibly asynchronously. +// Valid implementations are ModuleInitializationData, StartBlockData, TxData, EventData, KVPairData, ObjectUpdateData, +// and CommitData. type Packet interface { apply(*Listener) error } diff --git a/schema/decoding/decoding_test.go b/schema/decoding/decoding_test.go index a671d2f4c35b..c8f71ae94b83 100644 --- a/schema/decoding/decoding_test.go +++ b/schema/decoding/decoding_test.go @@ -297,12 +297,14 @@ func (t testStore) GetUInt64(key []byte) uint64 { func (t testStore) Set(key, value []byte) { if t.listener.OnKVPair != nil { - err := t.listener.OnKVPair(appdata.KVPairData{Updates: []appdata.ModuleKVPairUpdate{ + err := t.listener.OnKVPair(appdata.KVPairData{Updates: []appdata.ActorKVPairUpdate{ { - ModuleName: t.modName, - Update: schema.KVPairUpdate{ - Key: key, - Value: value, + Actor: []byte(t.modName), + StateChanges: []schema.KVPairUpdate{ + { + Key: key, + Value: value, + }, }, }, }}) diff --git a/schema/decoding/middleware.go b/schema/decoding/middleware.go index 57c0783c6281..2c269dcab417 100644 --- a/schema/decoding/middleware.go +++ b/schema/decoding/middleware.go @@ -23,6 +23,7 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa onKVPair := target.OnKVPair moduleCodecs := map[string]*schema.ModuleCodec{} + moduleNames := map[string]string{} target.OnKVPair = func(data appdata.KVPairData) error { // first forward kv pair updates @@ -34,17 +35,28 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa } for _, kvUpdate := range data.Updates { + moduleName, ok := moduleNames[string(kvUpdate.Actor)] + if !ok { + var err error + moduleName, err = resolver.DecodeModuleName(kvUpdate.Actor) + if err != nil { + return err + } + + moduleNames[string(kvUpdate.Actor)] = moduleName + } + // look for an existing codec - pcdc, ok := moduleCodecs[kvUpdate.ModuleName] + pcdc, ok := moduleCodecs[moduleName] if !ok { - if opts.ModuleFilter != nil && !opts.ModuleFilter(kvUpdate.ModuleName) { + if opts.ModuleFilter != nil && !opts.ModuleFilter(moduleName) { // we don't care about this module so store nil and continue - moduleCodecs[kvUpdate.ModuleName] = nil + moduleCodecs[moduleName] = nil continue } // look for a new codec - cdc, found, err := resolver.LookupDecoder(kvUpdate.ModuleName) + cdc, found, err := resolver.LookupDecoder(moduleName) if err != nil { return err } @@ -52,16 +64,16 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa if !found { // store nil to indicate we've seen this module and don't have a codec // and keep processing the kv updates - moduleCodecs[kvUpdate.ModuleName] = nil + moduleCodecs[moduleName] = nil continue } pcdc = &cdc - moduleCodecs[kvUpdate.ModuleName] = pcdc + moduleCodecs[moduleName] = pcdc if initializeModuleData != nil { err = initializeModuleData(appdata.ModuleInitializationData{ - ModuleName: kvUpdate.ModuleName, + ModuleName: moduleName, Schema: cdc.Schema, }) if err != nil { @@ -80,22 +92,24 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa continue } - updates, err := pcdc.KVDecoder(kvUpdate.Update) - if err != nil { - return err - } + for _, u := range kvUpdate.StateChanges { + updates, err := pcdc.KVDecoder(u) + if err != nil { + return err + } - if len(updates) == 0 { - // no updates - continue - } + if len(updates) == 0 { + // no updates + continue + } - err = target.OnObjectUpdate(appdata.ObjectUpdateData{ - ModuleName: kvUpdate.ModuleName, - Updates: updates, - }) - if err != nil { - return err + err = target.OnObjectUpdate(appdata.ObjectUpdateData{ + ModuleName: moduleName, + Updates: updates, + }) + if err != nil { + return err + } } } diff --git a/schema/decoding/resolver.go b/schema/decoding/resolver.go index db0ec0bb1726..cb022dbb6947 100644 --- a/schema/decoding/resolver.go +++ b/schema/decoding/resolver.go @@ -1,6 +1,7 @@ package decoding import ( + "fmt" "sort" "cosmossdk.io/schema" @@ -8,6 +9,12 @@ import ( // DecoderResolver is an interface that allows indexers to discover and use module decoders. type DecoderResolver interface { + // DecodeModuleName decodes a module name from a byte slice passed as the actor in a KVPairUpdate. + DecodeModuleName([]byte) (string, error) + + // EncodeModuleName encodes a module name into a byte slice that can be used as the actor in a KVPairUpdate. + EncodeModuleName(string) ([]byte, error) + // IterateAll iterates over all available module decoders. IterateAll(func(moduleName string, cdc schema.ModuleCodec) error) error @@ -27,6 +34,20 @@ type moduleSetDecoderResolver struct { moduleSet map[string]interface{} } +func (a moduleSetDecoderResolver) DecodeModuleName(bytes []byte) (string, error) { + if _, ok := a.moduleSet[string(bytes)]; ok { + return string(bytes), nil + } + return "", fmt.Errorf("module %s not found", bytes) +} + +func (a moduleSetDecoderResolver) EncodeModuleName(s string) ([]byte, error) { + if _, ok := a.moduleSet[s]; ok { + return []byte(s), nil + } + return nil, fmt.Errorf("module %s not found", s) +} + func (a moduleSetDecoderResolver) IterateAll(f func(string, schema.ModuleCodec) error) error { keys := make([]string, 0, len(a.moduleSet)) for k := range a.moduleSet { From 318c68105cfac982bf5461777eeaa7914e0a0744 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Wed, 14 Aug 2024 14:24:28 -0400 Subject: [PATCH 2/3] fix test for go 1.12 --- schema/appdata/batch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema/appdata/batch_test.go b/schema/appdata/batch_test.go index b704547c04cd..868abf595e0c 100644 --- a/schema/appdata/batch_test.go +++ b/schema/appdata/batch_test.go @@ -63,7 +63,7 @@ func TestBatchAsync(t *testing.T) { l, got := batchListener() ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + defer cancel() l = AsyncListenerMux(AsyncListenerOptions{Context: ctx}, l) if err := l.SendPacket(testBatch); err != nil { From de5c78afaf6e3ad6017adec9d267412be0fc30ff Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Tue, 20 Aug 2024 10:33:38 -0400 Subject: [PATCH 3/3] fix test --- schema/appdata/batch_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/schema/appdata/batch_test.go b/schema/appdata/batch_test.go index 868abf595e0c..557079e225c4 100644 --- a/schema/appdata/batch_test.go +++ b/schema/appdata/batch_test.go @@ -71,7 +71,11 @@ func TestBatchAsync(t *testing.T) { } // commit to synchronize - if err := l.Commit(CommitData{}); err != nil { + cb, err := l.Commit(CommitData{}) + if err != nil { + t.Error(err) + } + if err := cb(); err != nil { t.Error(err) }