Skip to content

Commit

Permalink
feat(schema/appdata)!: make commit async (#21306)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronc authored Aug 20, 2024
1 parent aeb0f27 commit 27d3d48
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 76 deletions.
6 changes: 3 additions & 3 deletions indexer/postgres/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ func StartIndexer(ctx context.Context, logger SqlLogger, config Config) (appdata

return mm.InitializeSchema(ctx, tx)
},
Commit: func(data appdata.CommitData) error {
Commit: func(data appdata.CommitData) (completionCallback func() error, err error) {
err = tx.Commit()
if err != nil {
return err
return nil, err
}

tx, err = db.BeginTx(ctx, nil)
return err
return nil, err
},
}, nil
}
6 changes: 5 additions & 1 deletion indexer/postgres/tests/init_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func testInitSchema(t *testing.T, disableRetainDeletions bool, goldenFileName st
}))

require.NotNil(t, listener.Commit)
require.NoError(t, listener.Commit(appdata.CommitData{}))
cb, err := listener.Commit(appdata.CommitData{})
require.NoError(t, err)
if cb != nil {
require.NoError(t, cb())
}

golden.Assert(t, buf.String(), goldenFileName)
}
Expand Down
52 changes: 13 additions & 39 deletions schema/appdata/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,50 +19,24 @@ type AsyncListenerOptions struct {
DoneWaitGroup *sync.WaitGroup
}

// AsyncListenerMux returns a listener that forwards received events to all the provided listeners asynchronously
// with each listener processing in a separate go routine. All callbacks in the returned listener will return nil
// except for Commit which will return an error or nil once all listeners have processed the commit. The context
// is used to signal that the listeners should stop listening and return. bufferSize is the size of the buffer for the
// channels used to send events to the listeners.
// AsyncListenerMux is a convenience function that calls AsyncListener for each listener
// with the provided options and combines them using ListenerMux.
func AsyncListenerMux(opts AsyncListenerOptions, listeners ...Listener) Listener {
asyncListeners := make([]Listener, len(listeners))
commitChans := make([]chan error, len(listeners))
for i, l := range listeners {
commitChan := make(chan error)
commitChans[i] = commitChan
asyncListeners[i] = AsyncListener(opts, commitChan, l)
asyncListeners[i] = AsyncListener(opts, l)
}
mux := ListenerMux(asyncListeners...)
muxCommit := mux.Commit
mux.Commit = func(data CommitData) error {
if muxCommit != nil {
err := muxCommit(data)
if err != nil {
return err
}
}

for _, commitChan := range commitChans {
err := <-commitChan
if err != nil {
return err
}
}
return nil
}

return mux
return ListenerMux(asyncListeners...)
}

// AsyncListener returns a listener that forwards received events to the provided listener listening in asynchronously
// in a separate go routine. The listener that is returned will return nil for all methods including Commit and
// an error or nil will only be returned in commitChan once the sender has sent commit and the receiving listener has
// processed it. Thus commitChan can be used as a synchronization and error checking mechanism. The go routine
// an error or nil will only be returned when the callback returned by Commit is called.
// Thus Commit() can be used as a synchronization and error checking mechanism. The go routine
// that is being used for listening will exit when context.Done() returns and no more events will be received by the listener.
// bufferSize is the size of the buffer for the channel that is used to send events to the listener.
// Instead of using AsyncListener directly, it is recommended to use AsyncListenerMux which does coordination directly
// via its Commit callback.
func AsyncListener(opts AsyncListenerOptions, commitChan chan<- error, listener Listener) Listener {
func AsyncListener(opts AsyncListenerOptions, listener Listener) Listener {
commitChan := make(chan error)
packetChan := make(chan Packet, opts.BufferSize)
res := Listener{}
ctx := opts.Context
Expand Down Expand Up @@ -151,11 +125,11 @@ func AsyncListener(opts AsyncListenerOptions, commitChan chan<- error, listener
}
}

if listener.Commit != nil {
res.Commit = func(data CommitData) error {
packetChan <- data
return nil
}
res.Commit = func(data CommitData) (func() error, error) {
packetChan <- data
return func() error {
return <-commitChan
}, nil
}

return res
Expand Down
45 changes: 29 additions & 16 deletions schema/appdata/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ func TestAsyncListenerMux(t *testing.T) {
BufferSize: 16, Context: ctx, DoneWaitGroup: wg,
}, listener1, listener2)

callAllCallbacksOnces(t, res)
completeCb := callAllCallbacksOnces(t, res)
if completeCb != nil {
if err := completeCb(); err != nil {
t.Fatal(err)
}
}

expectedCalls := []string{
"InitializeModuleData",
Expand All @@ -72,15 +77,23 @@ func TestAsyncListenerMux(t *testing.T) {
listener1 := callCollector(1, func(name string, _ int, _ Packet) {
calls1 = append(calls1, name)
})
listener1.Commit = func(data CommitData) error {
return fmt.Errorf("error")
listener1.Commit = func(data CommitData) (completionCallback func() error, err error) {
return nil, fmt.Errorf("error")
}
listener2 := callCollector(2, func(name string, _ int, _ Packet) {
calls2 = append(calls2, name)
})
res := AsyncListenerMux(AsyncListenerOptions{}, listener1, listener2)

err := res.Commit(CommitData{})
cb, err := res.Commit(CommitData{})
if err != nil {
t.Fatalf("expected first error to be nil, got %v", err)
}
if cb == nil {
t.Fatalf("expected completion callback")
}

err = cb()
if err == nil || err.Error() != "error" {
t.Fatalf("expected error, got %v", err)
}
Expand All @@ -89,21 +102,19 @@ func TestAsyncListenerMux(t *testing.T) {

func TestAsyncListener(t *testing.T) {
t.Run("call cancel", func(t *testing.T) {
commitChan := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
var calls []string
listener := callCollector(1, func(name string, _ int, _ Packet) {
calls = append(calls, name)
})
res := AsyncListener(AsyncListenerOptions{BufferSize: 16, Context: ctx, DoneWaitGroup: wg},
commitChan, listener)

callAllCallbacksOnces(t, res)
res := AsyncListener(AsyncListenerOptions{BufferSize: 16, Context: ctx, DoneWaitGroup: wg}, listener)

err := <-commitChan
if err != nil {
t.Fatalf("expected nil, got %v", err)
completeCb := callAllCallbacksOnces(t, res)
if completeCb != nil {
if err := completeCb(); err != nil {
t.Fatal(err)
}
}

checkExpectedCallOrder(t, calls, []string{
Expand All @@ -124,7 +135,6 @@ func TestAsyncListener(t *testing.T) {
})

t.Run("error", func(t *testing.T) {
commitChan := make(chan error)
var calls []string
listener := callCollector(1, func(name string, _ int, _ Packet) {
calls = append(calls, name)
Expand All @@ -134,11 +144,14 @@ func TestAsyncListener(t *testing.T) {
return fmt.Errorf("error")
}

res := AsyncListener(AsyncListenerOptions{BufferSize: 16}, commitChan, listener)
res := AsyncListener(AsyncListenerOptions{BufferSize: 16}, listener)

callAllCallbacksOnces(t, res)
completeCb := callAllCallbacksOnces(t, res)
if completeCb == nil {
t.Fatalf("expected completion callback")
}

err := <-commitChan
err := completeCb()
if err == nil || err.Error() != "error" {
t.Fatalf("expected error, got %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion schema/appdata/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ func PacketForwarder(f func(Packet) error) Listener {
OnKVPair: func(data KVPairData) error { return f(data) },
OnObjectUpdate: func(data ObjectUpdateData) error { return f(data) },
StartBlock: func(data StartBlockData) error { return f(data) },
Commit: func(data CommitData) error { return f(data) },
Commit: func(data CommitData) (func() error, error) { return nil, f(data) },
}
}
7 changes: 6 additions & 1 deletion schema/appdata/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,10 @@ type Listener struct {
// indexers should commit their data when this is called and return an error if
// they are unable to commit. Data sources MUST call Commit when data is committed,
// otherwise it should be assumed that indexers have not persisted their state.
Commit func(CommitData) error
// Commit is designed to support async processing so that implementations may return
// a completion callback to wait for commit to complete. Callers should first check
// if err is nil and then if it is, check if completionCallback is nil and if not
// call it and check for an error. Commit should be designed to be non-blocking if
// possible, but calling completionCallback should be blocking.
Commit func(CommitData) (completionCallback func() error, err error)
}
25 changes: 19 additions & 6 deletions schema/appdata/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,33 @@ func ListenerMux(listeners ...Listener) Listener {
}
}

commitCbs := make([]func(CommitData) error, 0, len(listeners))
commitCbs := make([]func(CommitData) (func() error, error), 0, len(listeners))
for _, l := range listeners {
if l.Commit != nil {
commitCbs = append(commitCbs, l.Commit)
}
}
if len(commitCbs) > 0 {
mux.Commit = func(data CommitData) error {
n := len(commitCbs)
if n > 0 {
mux.Commit = func(data CommitData) (func() error, error) {
waitCbs := make([]func() error, 0, n)
for _, cb := range commitCbs {
if err := cb(data); err != nil {
return err
wait, err := cb(data)
if err != nil {
return nil, err
}
if wait != nil {
waitCbs = append(waitCbs, wait)
}
}
return nil
return func() error {
for _, cb := range waitCbs {
if err := cb(); err != nil {
return err
}
}
return nil
}, nil
}
}

Expand Down
18 changes: 13 additions & 5 deletions schema/appdata/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ func TestListenerMux(t *testing.T) {

res := ListenerMux(callCollector(1, onCall), callCollector(2, onCall))

callAllCallbacksOnces(t, res)
completeCb := callAllCallbacksOnces(t, res)
if completeCb != nil {
if err := completeCb(); err != nil {
t.Fatal(err)
}
}

checkExpectedCallOrder(t, calls, []string{
"InitializeModuleData 1",
Expand All @@ -61,7 +66,7 @@ func TestListenerMux(t *testing.T) {
})
}

func callAllCallbacksOnces(t *testing.T, listener Listener) {
func callAllCallbacksOnces(t *testing.T, listener Listener) (completeCb func() error) {
t.Helper()
if err := listener.InitializeModuleData(ModuleInitializationData{}); err != nil {
t.Error(err)
Expand All @@ -81,9 +86,12 @@ func callAllCallbacksOnces(t *testing.T, listener Listener) {
if err := listener.OnObjectUpdate(ObjectUpdateData{}); err != nil {
t.Error(err)
}
if err := listener.Commit(CommitData{}); err != nil {
var err error
completeCb, err = listener.Commit(CommitData{})
if err != nil {
t.Error(err)
}
return completeCb
}

func callCollector(i int, onCall func(string, int, Packet)) Listener {
Expand Down Expand Up @@ -112,9 +120,9 @@ func callCollector(i int, onCall func(string, int, Packet)) Listener {
onCall("OnObjectUpdate", i, nil)
return nil
},
Commit: func(CommitData) error {
Commit: func(data CommitData) (completionCallback func() error, err error) {
onCall("Commit", i, nil)
return nil
return nil, nil
},
}
}
Expand Down
9 changes: 8 additions & 1 deletion schema/appdata/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,12 @@ func (c CommitData) apply(l *Listener) error {
if l.Commit == nil {
return nil
}
return l.Commit(c)
cb, err := l.Commit(c)
if err != nil {
return err
}
if cb != nil {
return cb()
}
return nil
}
6 changes: 3 additions & 3 deletions schema/testing/appdatasim/app_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func writerListener(w io.Writer) appdata.Listener {
OnTx: nil,
OnEvent: nil,
OnKVPair: nil,
Commit: func(data appdata.CommitData) error {
_, err := fmt.Fprintf(w, "Commit: %v\n", data)
return err
Commit: func(data appdata.CommitData) (completionCallback func() error, err error) {
_, err = fmt.Fprintf(w, "Commit: %v\n", data)
return nil, err
},
InitializeModuleData: func(data appdata.ModuleInitializationData) error {
bz, err := json.Marshal(data)
Expand Down

0 comments on commit 27d3d48

Please sign in to comment.