Skip to content

Commit

Permalink
Merge pull request #6137 from filecoin-project/feat/flush-bs
Browse files Browse the repository at this point in the history
feat: Plumb through a proper Flush() method on all blockstores
  • Loading branch information
simlecode authored Aug 30, 2023
2 parents 31f8390 + d7f14ae commit 62196d3
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 0 deletions.
8 changes: 8 additions & 0 deletions venus-shared/blockstore/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ func (b *BadgerBlockstore) View(ctx context.Context, cid cid.Cid, fn func([]byte
})
}

func (b *BadgerBlockstore) Flush(context.Context) error {
if atomic.LoadInt64(&b.state) != stateOpen {
return ErrBlockstoreClosed
}

return b.DB.Sync()
}

// Has implements blockstore.Has.
func (b *BadgerBlockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
if atomic.LoadInt64(&b.state) != stateOpen {
Expand Down
12 changes: 12 additions & 0 deletions venus-shared/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Blockstore interface {
blockstore.Blockstore
blockstore.Viewer
BatchDeleter
Flusher
}

// Alias so other packages don't have to import go-ipfs-blockstore
Expand All @@ -66,6 +67,10 @@ type BatchDeleter interface {
DeleteMany(ctx context.Context, cids []cid.Cid) error
}

type Flusher interface {
Flush(context.Context) error
}

var (
NewGCLocker = blockstore.NewGCLocker
NewGCBlockstore = blockstore.NewGCBlockstore
Expand Down Expand Up @@ -93,6 +98,13 @@ type adaptedBlockstore struct {

var _ Blockstore = (*adaptedBlockstore)(nil)

func (a *adaptedBlockstore) Flush(ctx context.Context) error {
if flusher, canFlush := a.Blockstore.(Flusher); canFlush {
return flusher.Flush(ctx)
}
return nil
}

func (a *adaptedBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error {
blk, err := a.Get(ctx, cid)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions venus-shared/blockstore/buf_bstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ var (
_ Viewer = (*BufferedBS)(nil)
)

func (bs *BufferedBS) Flush(ctx context.Context) error { return bs.write.Flush(ctx) }

func (bs *BufferedBS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
a, err := bs.read.AllKeysChan(ctx)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions venus-shared/blockstore/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func NewMemory() MemBlockstore {
// MemBlockstore is a terminal blockstore that keeps blocks in memory.
type MemBlockstore map[string]blocks.Block

func (m MemBlockstore) Flush(context.Context) error { return nil }

func (m MemBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
delete(m, genKey(c))
return nil
Expand Down
2 changes: 2 additions & 0 deletions venus-shared/blockstore/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ func (n *NetworkStore) HashOnRead(enabled bool) {
// todo
}

func (n *NetworkStore) Flush(context.Context) error { return nil }

func (n *NetworkStore) Stop(ctx context.Context) error {
close(n.closing)

Expand Down
2 changes: 2 additions & 0 deletions venus-shared/blockstore/syncstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type SyncStore struct {
bs MemBlockstore // specifically use a memStore to save indirection overhead.
}

func (m *SyncStore) Flush(context.Context) error { return nil }

func (m *SyncStore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions venus-shared/blockstore/view_blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type TxBlockstore struct {
keyTransform *keytransform.PrefixTransform
}

func (txBlockstore *TxBlockstore) Flush(context.Context) error { return nil }

func (txBlockstore *TxBlockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
return errors.New("readonly blocksgtore")
}
Expand Down

0 comments on commit 62196d3

Please sign in to comment.