Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schema/appdata)!: efficiency & data model improvements aligned with server/v2 #21305

Merged
merged 7 commits into from
Aug 20, 2024
Merged
2 changes: 1 addition & 1 deletion core/store/changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Changeset struct {
}

// StateChanges represents a set of changes to the state of an actor in storage.
type StateChanges struct {
type StateChanges = struct {
Actor []byte // actor represents the space in storage where state is stored, previously this was called a "storekey"
StateChanges KVPairs // StateChanges is a list of key-value pairs representing the changes to the state.
}
Expand Down
13 changes: 8 additions & 5 deletions schema/appdata/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,14 @@ func AsyncListener(opts AsyncListenerOptions, commitChan chan<- error, listener
}
}

if listener.Commit != nil {
res.Commit = func(data CommitData) error {
packetChan <- data
return nil
}
res.Commit = func(data CommitData) error {
packetChan <- data
return nil
}

res.onBatch = func(batch PacketBatch) error {
packetChan <- batch
return nil
}

return res
Expand Down
41 changes: 41 additions & 0 deletions schema/appdata/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package appdata

// BatchablePacket is the interface that packet types which can be batched implement.
// All types that implement Packet except CommitData also implement BatchablePacket.
// CommitData should not be batched because it forces synchronization of asynchronous listeners.
type BatchablePacket interface {
Packet
isBatchablePacket()
}

// PacketBatch is a batch of packets that can be sent to a listener.
// If listener processing is asynchronous, the batch of packets will be sent
// all at once in a single operation which can be more efficient than sending
// each packet individually.
type PacketBatch []BatchablePacket

func (p PacketBatch) apply(l *Listener) error {
if l.onBatch != nil {
return l.onBatch(p)
}

for _, packet := range p {
if err := packet.apply(l); err != nil {
return err
}
}

return nil
}

func (ModuleInitializationData) isBatchablePacket() {}

func (StartBlockData) isBatchablePacket() {}

func (TxData) isBatchablePacket() {}

func (EventData) isBatchablePacket() {}

func (KVPairData) isBatchablePacket() {}

func (ObjectUpdateData) isBatchablePacket() {}
81 changes: 81 additions & 0 deletions schema/appdata/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package appdata

import (
"context"
"reflect"
"testing"
)

func TestBatch(t *testing.T) {
l, got := batchListener()

if err := l.SendPacket(testBatch); err != nil {
t.Error(err)
}

if !reflect.DeepEqual(*got, testBatch) {
t.Errorf("got %v, expected %v", *got, testBatch)
}
}

var testBatch = PacketBatch{
ModuleInitializationData{},
StartBlockData{},
TxData{},
EventData{},
KVPairData{},
ObjectUpdateData{},
}

func batchListener() (Listener, *PacketBatch) {
var got = new(PacketBatch)
l := Listener{
InitializeModuleData: func(m ModuleInitializationData) error {
*got = append(*got, m)
return nil
},
StartBlock: func(b StartBlockData) error {
*got = append(*got, b)
return nil
},
OnTx: func(t TxData) error {
*got = append(*got, t)
return nil
},
OnEvent: func(e EventData) error {
*got = append(*got, e)
return nil
},
OnKVPair: func(k KVPairData) error {
*got = append(*got, k)
return nil
},
OnObjectUpdate: func(o ObjectUpdateData) error {
*got = append(*got, o)
return nil
},
}

return l, got
}

func TestBatchAsync(t *testing.T) {
l, got := batchListener()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l = AsyncListenerMux(AsyncListenerOptions{Context: ctx}, l)

if err := l.SendPacket(testBatch); err != nil {
t.Error(err)
}

// commit to synchronize
if err := l.Commit(CommitData{}); err != nil {
t.Error(err)
}

if !reflect.DeepEqual(*got, testBatch) {
t.Errorf("got %v, expected %v", *got, testBatch)
}
}
38 changes: 27 additions & 11 deletions schema/appdata/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,14 @@ type TxData struct {
JSON ToJSON
}

// EventData represents event data that is passed to a listener.
// EventData represents event data that is passed to a listener when events are received.
type EventData struct {
// Events are the events that are received.
Events []Event
}

// Event represents the data for a single event.
type Event struct {
// TxIndex is the index of the transaction in the block to which this event is associated.
// It should be set to a negative number if the event is not associated with a transaction.
// Canonically -1 should be used to represent begin block processing and -2 should be used to
Expand All @@ -52,16 +58,23 @@ type EventData struct {
// MsgIndex is the index of the message in the transaction to which this event is associated.
// If TxIndex is negative, this index could correspond to the index of the message in
// begin or end block processing if such indexes exist, or it can be set to zero.
MsgIndex uint32
MsgIndex int32

// EventIndex is the index of the event in the message to which this event is associated.
EventIndex uint32
EventIndex int32

// Type is the type of the event.
Type string

// Data is the JSON representation of the event data. It should generally be a JSON object.
// Data lazily returns the JSON representation of the event.
Data ToJSON

// Attributes lazily returns the key-value attribute representation of the event.
Attributes ToEventAttributes
}
Copy link
Member Author

@aaronc aaronc Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would propose we use the above Event structure in STF and app manager instead of what we have now. It can support either key-value events or JSON events lazily and tracks tx, msg and event indexes directly (rather than pushed down into string formatted attributes). This could be mirrored in core/app


type EventAttribute = struct {
Key, Value string
}

// ToBytes is a function that lazily returns the raw byte representation of data.
Expand All @@ -70,18 +83,21 @@ type ToBytes = func() ([]byte, error)
// ToJSON is a function that lazily returns the JSON representation of data.
type ToJSON = func() (json.RawMessage, error)

// ToEventAttributes is a function that lazily returns the key-value attribute representation of an event.
type ToEventAttributes = func() ([]EventAttribute, error)

// KVPairData represents a batch of key-value pair data that is passed to a listener.
type KVPairData struct {
Updates []ModuleKVPairUpdate
Updates []ActorKVPairUpdate
}

// ModuleKVPairUpdate represents a key-value pair update for a specific module.
type ModuleKVPairUpdate struct {
// ModuleName is the name of the module that the key-value pair belongs to.
ModuleName string
// ActorKVPairUpdate represents a key-value pair update for a specific module or account.
type ActorKVPairUpdate = struct {
// Actor is the byte representation of the module or account that is updating the key-value pair.
Actor []byte

// Update is the key-value pair update.
Update schema.KVPairUpdate
// StateChanges are key-value pair updates.
StateChanges []schema.KVPairUpdate
}

// ObjectUpdateData represents object update data that is passed to a listener.
Expand Down
9 changes: 9 additions & 0 deletions schema/appdata/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,14 @@ type Listener struct {
// indexers should commit their data when this is called and return an error if
// they are unable to commit. Data sources MUST call Commit when data is committed,
// otherwise it should be assumed that indexers have not persisted their state.
// When listener processing is pushed into background go routines using AsyncListener
// or AsyncListenerMux, Commit will synchronize the processing of all listeners and
// is a blocking call. Producers that do not want to block on Commit in a given block
// can delay calling Commit until the start of the next block to give listener
// processing time to complete.
Commit func(CommitData) error

// onBatch can be used internally to efficiently forward packet batches to
// async listeners.
onBatch func(PacketBatch) error
}
10 changes: 10 additions & 0 deletions schema/appdata/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,15 @@ func ListenerMux(listeners ...Listener) Listener {
}
}

mux.onBatch = func(batch PacketBatch) error {
for _, listener := range listeners {
err := batch.apply(&listener)
if err != nil {
return err
}
}
return nil
}

return mux
}
2 changes: 2 additions & 0 deletions schema/appdata/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package appdata

// Packet is the interface that all listener data structures implement so that this data can be "packetized"
// and processed in a stream, possibly asynchronously.
// Valid implementations are ModuleInitializationData, StartBlockData, TxData, EventData, KVPairData, ObjectUpdateData,
// and CommitData.
type Packet interface {
apply(*Listener) error
}
Expand Down
12 changes: 7 additions & 5 deletions schema/decoding/decoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,14 @@ func (t testStore) GetUInt64(key []byte) uint64 {

func (t testStore) Set(key, value []byte) {
if t.listener.OnKVPair != nil {
err := t.listener.OnKVPair(appdata.KVPairData{Updates: []appdata.ModuleKVPairUpdate{
err := t.listener.OnKVPair(appdata.KVPairData{Updates: []appdata.ActorKVPairUpdate{
{
ModuleName: t.modName,
Update: schema.KVPairUpdate{
Key: key,
Value: value,
Actor: []byte(t.modName),
StateChanges: []schema.KVPairUpdate{
{
Key: key,
Value: value,
},
},
},
}})
Expand Down
56 changes: 35 additions & 21 deletions schema/decoding/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa
onKVPair := target.OnKVPair

moduleCodecs := map[string]*schema.ModuleCodec{}
moduleNames := map[string]string{}

target.OnKVPair = func(data appdata.KVPairData) error {
// first forward kv pair updates
Expand All @@ -34,34 +35,45 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa
}

for _, kvUpdate := range data.Updates {
moduleName, ok := moduleNames[string(kvUpdate.Actor)]
if !ok {
var err error
moduleName, err = resolver.DecodeModuleName(kvUpdate.Actor)
if err != nil {
return err
}

moduleNames[string(kvUpdate.Actor)] = moduleName
}

// look for an existing codec
pcdc, ok := moduleCodecs[kvUpdate.ModuleName]
pcdc, ok := moduleCodecs[moduleName]
if !ok {
if opts.ModuleFilter != nil && !opts.ModuleFilter(kvUpdate.ModuleName) {
if opts.ModuleFilter != nil && !opts.ModuleFilter(moduleName) {
// we don't care about this module so store nil and continue
moduleCodecs[kvUpdate.ModuleName] = nil
moduleCodecs[moduleName] = nil
continue
}

// look for a new codec
cdc, found, err := resolver.LookupDecoder(kvUpdate.ModuleName)
cdc, found, err := resolver.LookupDecoder(moduleName)
if err != nil {
return err
}

if !found {
// store nil to indicate we've seen this module and don't have a codec
// and keep processing the kv updates
moduleCodecs[kvUpdate.ModuleName] = nil
moduleCodecs[moduleName] = nil
continue
}

pcdc = &cdc
moduleCodecs[kvUpdate.ModuleName] = pcdc
moduleCodecs[moduleName] = pcdc

if initializeModuleData != nil {
err = initializeModuleData(appdata.ModuleInitializationData{
ModuleName: kvUpdate.ModuleName,
ModuleName: moduleName,
Schema: cdc.Schema,
})
if err != nil {
Expand All @@ -80,22 +92,24 @@ func Middleware(target appdata.Listener, resolver DecoderResolver, opts Middlewa
continue
}

updates, err := pcdc.KVDecoder(kvUpdate.Update)
if err != nil {
return err
}
for _, u := range kvUpdate.StateChanges {
updates, err := pcdc.KVDecoder(u)
if err != nil {
return err
}

if len(updates) == 0 {
// no updates
continue
}
if len(updates) == 0 {
// no updates
continue
}

err = target.OnObjectUpdate(appdata.ObjectUpdateData{
ModuleName: kvUpdate.ModuleName,
Updates: updates,
})
if err != nil {
return err
err = target.OnObjectUpdate(appdata.ObjectUpdateData{
ModuleName: moduleName,
Updates: updates,
})
if err != nil {
return err
}
}
}

Expand Down
Loading
Loading