Skip to content

Commit

Permalink
abci: use a semaphore for commit/apply/beginblock
Browse files Browse the repository at this point in the history
Consensus method requests from cometbft are synchronous, but a portion
of the work of Commit is launched in a goroutine, so we block a
subsequent BeginBlock from starting new changes. We do this by
acquiring a semaphore with max concurrency of 1 at the start of
BeginBlock, and releasing it when the changes from Commit have finished
applying. A mutex is rarely held for longer than the duration of a
local function, while a waitgroup does not provide atomic Wait/Add
semantics that fit here.
  • Loading branch information
jchappelow committed Aug 22, 2023
1 parent c227100 commit ce20b1a
Showing 1 changed file with 11 additions and 14 deletions.
25 changes: 11 additions & 14 deletions pkg/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"sync"

abciTypes "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/crypto/ed25519"
Expand Down Expand Up @@ -78,7 +77,7 @@ func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, com

log: log.NewNoOp(),

commitWaiter: sync.WaitGroup{},
commitSemaphore: make(chan struct{}, 1), // max concurrency for a BeginBlock->Commit+Apply sequence is 1

// state: appState{height, ...}, // TODO
}
Expand Down Expand Up @@ -126,11 +125,14 @@ type AbciApp struct {

log log.Logger

// commitWaiter is a waitgroup that waits for the commit to finish
// when a block is begun, the commitWaiter waits until the previous commit is finished
// it then increments and starts "begin block"
// when a commit is finished, the commitWaiter is decremented
commitWaiter sync.WaitGroup
// Consensus method requests from cometbft are synchronous, but a portion of
// the work of Commit is launched in a goroutine, so we block a subsequent
// BeginBlock from starting new changes. We do this by acquiring a semaphore
// with max concurrency of 1 at the start of BeginBlock, and releasing it
// when the changes from Commit have finished applying. A mutex is rarely
// held for longer than the duration of a local function, while a waitgroup
// does not provide atomic Wait/Add semantics that fit here.
commitSemaphore chan struct{}

// Expected AppState after bootstrapping the node with a given snapshot,
// state gets updated with the bootupState after bootstrapping
Expand All @@ -144,11 +146,7 @@ var _ abciTypes.Application = &AbciApp{}
// BeginBlock begins a block.
// If the previous commit is not finished, it will wait for the previous commit to finish.
func (a *AbciApp) BeginBlock(req abciTypes.RequestBeginBlock) abciTypes.ResponseBeginBlock {
// TODO: replace this waitGroup with something else. It's not a queue and
// Wait/Add is not atomic. Fortunately all consensus connections are
// synchronous so there won't be more than one BeginBlock waiting.
a.commitWaiter.Wait()
a.commitWaiter.Add(1)
a.commitSemaphore <- struct{}{} // peg in (until Commit is applied), there will only be at most one waiter

err := a.committer.Begin(context.Background())
if err != nil {
Expand Down Expand Up @@ -389,8 +387,7 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit {
if err != nil {
panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err)))
}

a.commitWaiter.Done()
<-a.commitSemaphore // peg out (from BeginBlock)
})
if err != nil {
panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err)))
Expand Down

0 comments on commit ce20b1a

Please sign in to comment.