From 83b9979aa00917e34e4d9b3f2a0db1e0988ad313 Mon Sep 17 00:00:00 2001 From: jchappelow <140431406+jchappelow@users.noreply.github.com> Date: Wed, 23 Aug 2023 11:01:39 -0500 Subject: [PATCH] abci: use a semaphore for commit/apply/beginblock (#196) Consensus method requests from cometbft are synchronous, but a portion of the work of Commit is launched in a goroutine, so we block a subsequent BeginBlock from starting new changes. We do this by acquiring a semaphore with max concurrency of 1 at the start of BeginBlock, and releasing it when the changes from Commit have finished applying. A mutex is rarely held for longer than the duration of a local function, while a waitgroup does not provide atomic Wait/Add semantics that fit here. Co-authored-by: Brennan Lamey <66885902+brennanjl@users.noreply.github.com> --- pkg/abci/abci.go | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/pkg/abci/abci.go b/pkg/abci/abci.go index dc966b7a8..0b2973a11 100644 --- a/pkg/abci/abci.go +++ b/pkg/abci/abci.go @@ -7,7 +7,6 @@ import ( "encoding/hex" "errors" "fmt" - "sync" abciTypes "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/crypto/ed25519" @@ -78,7 +77,7 @@ func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, com log: log.NewNoOp(), - commitWaiter: sync.WaitGroup{}, + commitSemaphore: make(chan struct{}, 1), // max concurrency for a BeginBlock->Commit+Apply sequence is 1 // state: appState{height, ...}, // TODO } @@ -126,11 +125,14 @@ type AbciApp struct { log log.Logger - // commitWaiter is a waitgroup that waits for the commit to finish - // when a block is begun, the commitWaiter waits until the previous commit is finished - // it then increments and starts "begin block" - // when a commit is finished, the commitWaiter is decremented - commitWaiter sync.WaitGroup + // Consensus method requests from cometbft are synchronous, but a portion of + // the work of Commit is launched in a goroutine, so we block a subsequent + // BeginBlock from starting new changes. We do this by acquiring a semaphore + // with max concurrency of 1 at the start of BeginBlock, and releasing it + // when the changes from Commit have finished applying. A mutex is rarely + // held for longer than the duration of a local function, while a waitgroup + // does not provide atomic Wait/Add semantics that fit here. + commitSemaphore chan struct{} // Expected AppState after bootstrapping the node with a given snapshot, // state gets updated with the bootupState after bootstrapping @@ -144,11 +146,7 @@ var _ abciTypes.Application = &AbciApp{} // BeginBlock begins a block. // If the previous commit is not finished, it will wait for the previous commit to finish. func (a *AbciApp) BeginBlock(req abciTypes.RequestBeginBlock) abciTypes.ResponseBeginBlock { - // TODO: replace this waitGroup with something else. It's not a queue and - // Wait/Add is not atomic. Fortunately all consensus connections are - // synchronous so there won't be more than one BeginBlock waiting. - a.commitWaiter.Wait() - a.commitWaiter.Add(1) + a.commitSemaphore <- struct{}{} // peg in (until Commit is applied), there will only be at most one waiter err := a.committer.Begin(context.Background()) if err != nil { @@ -389,8 +387,7 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit { if err != nil { panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err))) } - - a.commitWaiter.Done() + <-a.commitSemaphore // peg out (from BeginBlock) }) if err != nil { panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err)))