Skip to content

Commit

Permalink
go/consensus/tendermint: Make sure DBs are only closed during cleanup
Browse files Browse the repository at this point in the history
Tendermint Core will close the block/state store DBs during Stop which
can make certain in-flight queries trigger a panic due to the database
being already closed.

This commit introduces a DB wrapper that enables us to defer closing
the DB until Cleanup is called (as at that point all of the services
have already been stopped).
  • Loading branch information
kostko committed May 25, 2022
1 parent 62bdd12 commit 80e4eee
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 35 deletions.
48 changes: 48 additions & 0 deletions go/consensus/tendermint/db/closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package db

import (
"sync"

dbm "github.com/tendermint/tm-db"
)

// Closer manages closing of multiple Tendermint Core databases.
type Closer struct {
l sync.Mutex
dbs []dbm.DB
}

// Close closes all the managed databases.
func (c *Closer) Close() {
c.l.Lock()
defer c.l.Unlock()

for _, db := range c.dbs {
_ = db.Close()
}
}

// NewCloser creates a new empty database closer.
func NewCloser() *Closer {
return &Closer{}
}

type dbWithCloser struct {
dbm.DB
}

func (d *dbWithCloser) Close() error {
// Do nothing unless explicitly closed via the closer.
return nil
}

// WithCloser wraps a Tendermint Core database instance so that it can only be closed by the given
// closer instance. Direct attempts to close the returned database instance will be ignored.
func WithCloser(db dbm.DB, closer *Closer) dbm.DB {
closer.l.Lock()
defer closer.l.Unlock()

closer.dbs = append(closer.dbs, db)

return &dbWithCloser{db}
}
13 changes: 2 additions & 11 deletions go/consensus/tendermint/full/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,6 @@ func (srv *archiveService) Stop() {
})
}

func (srv *archiveService) Cleanup() {
srv.commonNode.Cleanup()

if err := srv.blockStoreDB.Close(); err != nil {
srv.Logger.Error("error on closing block store", "err", err)
}
if err := srv.stateStore.Close(); err != nil {
srv.Logger.Error("error on closing state store", "err", err)
}
}

// Implements consensusAPI.Backend.
func (srv *archiveService) Quit() <-chan struct{} {
return srv.quitCh
Expand Down Expand Up @@ -207,6 +196,7 @@ func NewArchive(
if err != nil {
return nil, err
}
srv.blockStoreDB = db.WithCloser(srv.blockStoreDB, srv.dbCloser)

// NOTE: DBContext uses a full tendermint config but the only thing that is actually used
// is the data dir field.
Expand All @@ -215,6 +205,7 @@ func NewArchive(
if err != nil {
return nil, err
}
stateDB = db.WithCloser(stateDB, srv.dbCloser)
srv.stateStore = state.NewStore(stateDB)

tmGenDoc, err := api.GetTendermintGenesisDocument(genesisProvider)
Expand Down
51 changes: 28 additions & 23 deletions go/consensus/tendermint/full/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"

"github.com/spf13/viper"
tmcore "github.com/tendermint/tendermint/rpc/core"
Expand Down Expand Up @@ -31,6 +32,7 @@ import (
tmbeacon "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/beacon"
"github.com/oasisprotocol/oasis-core/go/consensus/tendermint/common"
"github.com/oasisprotocol/oasis-core/go/consensus/tendermint/crypto"
"github.com/oasisprotocol/oasis-core/go/consensus/tendermint/db"
tmgovernance "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/governance"
tmkeymanager "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/keymanager"
tmregistry "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/registry"
Expand Down Expand Up @@ -82,27 +84,28 @@ type commonNode struct {
// These stores must be populated by the parent before the node is deemed ready.
blockStoreDB tmdb.DB
stateStore state.Store
dbCloser *db.Closer

// Guarded by the lock.
isStarted, isInitialized bool

state uint32
startedCh chan struct{}

parentNode api.Backend
}

func (n *commonNode) initialized() bool {
n.Lock()
defer n.Unlock()
// Possible internal node states.
const (
stateNotReady = 0
stateInitialized = 1
stateStarted = 2
stateStopping = 3
)

return n.isInitialized
func (n *commonNode) initialized() bool {
return atomic.LoadUint32(&n.state) >= stateInitialized
}

func (n *commonNode) started() bool {
n.Lock()
defer n.Unlock()

return n.isStarted
return atomic.LoadUint32(&n.state) >= stateStarted
}

func (n *commonNode) ensureStarted(ctx context.Context) error {
Expand All @@ -115,6 +118,10 @@ func (n *commonNode) ensureStarted(ctx context.Context) error {
return ctx.Err()
}

if atomic.LoadUint32(&n.state) >= stateStopping {
return fmt.Errorf("node is shutting down")
}

return nil
}

Expand All @@ -127,12 +134,8 @@ func (n *commonNode) start() error {
n.Lock()
defer n.Unlock()

if n.isStarted {
return fmt.Errorf("tendermint/common_node: already started")
}

if !n.isInitialized {
return fmt.Errorf("tendermint/common_node: not initialized")
if atomic.LoadUint32(&n.state) != stateInitialized {
return fmt.Errorf("tendermint/common_node: not in initialized state")
}

if err := n.mux.Start(); err != nil {
Expand All @@ -143,29 +146,29 @@ func (n *commonNode) start() error {
}

func (n *commonNode) finishStart() {
n.Lock()
n.isStarted = true
n.Unlock()
atomic.StoreUint32(&n.state, stateStarted)
close(n.startedCh)
}

func (n *commonNode) stop() {
n.Lock()
defer n.Unlock()

if !n.isStarted || !n.isInitialized {
if !n.started() {
return
}

n.svcMgr.Stop()
n.mux.Stop()

atomic.StoreUint32(&n.state, stateStopping)
}

func (n *commonNode) initialize() error {
n.Lock()
defer n.Unlock()

if n.isInitialized {
if atomic.LoadUint32(&n.state) != stateNotReady {
return nil
}

Expand Down Expand Up @@ -275,7 +278,7 @@ func (n *commonNode) initialize() error {
}
}

n.isInitialized = true
atomic.StoreUint32(&n.state, stateInitialized)

return nil
}
Expand All @@ -289,6 +292,7 @@ func (n *commonNode) Started() <-chan struct{} {
func (n *commonNode) Cleanup() {
n.serviceClientsWg.Wait()
n.svcMgr.Cleanup()
n.dbCloser.Close()
}

// Implements consensusAPI.Backend.
Expand Down Expand Up @@ -803,6 +807,7 @@ func newCommonNode(
genesis: genesisDoc,
dataDir: dataDir,
svcMgr: cmbackground.NewServiceManager(logging.GetLogger("tendermint/servicemanager")),
dbCloser: db.NewCloser(),
startedCh: make(chan struct{}),
}, nil
}
3 changes: 2 additions & 1 deletion go/consensus/tendermint/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,10 +658,11 @@ func (t *fullService) lazyInit() error {
// Tendermint does not expose a way to access the state database and we need it to bypass some
// stupid things like pagination on the in-process "client".
wrapDbProvider := func(dbCtx *tmnode.DBContext) (tmdb.DB, error) {
db, derr := dbProvider(dbCtx)
rawDB, derr := dbProvider(dbCtx)
if derr != nil {
return nil, derr
}
db := db.WithCloser(rawDB, t.dbCloser)

switch dbCtx.ID {
case "state":
Expand Down
4 changes: 4 additions & 0 deletions go/consensus/tendermint/full/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (n *commonNode) GetLightBlock(ctx context.Context, height int64) (*consensu

// Implements LightClientBackend.
func (n *commonNode) GetParameters(ctx context.Context, height int64) (*consensusAPI.Parameters, error) {
if err := n.ensureStarted(ctx); err != nil {
return nil, err
}

tmHeight, err := n.heightToTendermintHeight(height)
if err != nil {
return nil, err
Expand Down

0 comments on commit 80e4eee

Please sign in to comment.