Skip to content

Commit

Permalink
fix(share/ipld): Remove proofs from leaked ctx (celestiaorg#2574)
Browse files Browse the repository at this point in the history
(cherry picked from commit 04e2928)
  • Loading branch information
walldiss committed Sep 25, 2023
1 parent 13c05e8 commit b365ee5
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 4 deletions.
4 changes: 4 additions & 0 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende

// extend block data
adder := ipld.NewProofsAdder(int(block.Data.SquareSize))
defer adder.Purge()

eds, err := extendBlock(block.Data, block.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err)
Expand Down Expand Up @@ -148,6 +150,8 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64

// extend block data
adder := ipld.NewProofsAdder(int(b.Data.SquareSize))
defer adder.Purge()

eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
Expand Down
2 changes: 2 additions & 0 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
)
// extend block data
adder := ipld.NewProofsAdder(int(b.Data.SquareSize))
defer adder.Purge()

eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return fmt.Errorf("extending block data: %w", err)
Expand Down
8 changes: 5 additions & 3 deletions share/eds/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ func writeProofs(ctx context.Context, eds *rsmt2d.ExtendedDataSquare, w io.Write

func getProofs(ctx context.Context, eds *rsmt2d.ExtendedDataSquare) (map[cid.Cid][]byte, error) {
// check if there are proofs collected by ipld.ProofsAdder in previous reconstruction of eds
proofs := ipld.ProofsAdderFromCtx(ctx).Proofs()
if proofs != nil {
return proofs, nil
if adder := ipld.ProofsAdderFromCtx(ctx); adder != nil {
defer adder.Purge()
return adder.Proofs(), nil
}

// recompute proofs from eds
Expand All @@ -124,6 +124,8 @@ func getProofs(ctx context.Context, eds *rsmt2d.ExtendedDataSquare) (map[cid.Cid
// this adder ignores leaves, so that they are not added to the store we iterate through in
// writeProofs
adder := ipld.NewProofsAdder(odsWidth * 2)
defer adder.Purge()

eds, err := rsmt2d.ImportExtendedDataSquare(
shares,
share.DefaultRSMT2DCodec(),
Expand Down
5 changes: 4 additions & 1 deletion share/getters/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func (tg *TeeGetter) GetEDS(ctx context.Context, root *share.Root) (eds *rsmt2d.
utils.SetStatusAndEnd(span, err)
}()

ctx = ipld.CtxWithProofsAdder(ctx, ipld.NewProofsAdder(len(root.RowRoots)))
adder := ipld.NewProofsAdder(len(root.RowRoots))
ctx = ipld.CtxWithProofsAdder(ctx, adder)
defer adder.Purge()

eds, err = tg.getter.GetEDS(ctx, root)
if err != nil {
return nil, err
Expand Down
20 changes: 20 additions & 0 deletions share/ipld/nmt_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,28 @@ func BatchSize(squareSize int) int {
return (squareSize*2-1)*squareSize*2 - (squareSize * squareSize)
}

// ProofsAdder is used to collect proof nodes, while traversing merkle tree
type ProofsAdder struct {
lock sync.RWMutex
proofs map[cid.Cid][]byte
}

// NewProofsAdder creates new instance of ProofsAdder.
func NewProofsAdder(squareSize int) *ProofsAdder {
return &ProofsAdder{
// preallocate map to fit all inner nodes for given square size
proofs: make(map[cid.Cid][]byte, innerNodesAmount(squareSize)),
}
}

// CtxWithProofsAdder creates context, that will contain ProofsAdder. If context is leaked to
// another go-routine, proofs will be not collected by gc. To prevent it, use Purge after Proofs
// are collected from adder, to preemptively release memory allocated for proofs.
func CtxWithProofsAdder(ctx context.Context, adder *ProofsAdder) context.Context {
return context.WithValue(ctx, proofsAdderKey, adder)
}

// ProofsAdderFromCtx extracts ProofsAdder from context
func ProofsAdderFromCtx(ctx context.Context) *ProofsAdder {
val := ctx.Value(proofsAdderKey)
adder, ok := val.(*ProofsAdder)
Expand All @@ -126,6 +132,7 @@ func ProofsAdderFromCtx(ctx context.Context) *ProofsAdder {
return adder
}

// Proofs returns proofs collected by ProofsAdder
func (a *ProofsAdder) Proofs() map[cid.Cid][]byte {
if a == nil {
return nil
Expand All @@ -136,6 +143,7 @@ func (a *ProofsAdder) Proofs() map[cid.Cid][]byte {
return a.proofs
}

// VisitFn returns NodeVisitorFn, that will collect proof nodes while traversing merkle tree.
func (a *ProofsAdder) VisitFn() nmt.NodeVisitorFn {
if a == nil {
return nil
Expand All @@ -151,6 +159,18 @@ func (a *ProofsAdder) VisitFn() nmt.NodeVisitorFn {
return a.visitInnerNodes
}

// Purge removed proofs from ProofsAdder allowing GC to collect the memory
func (a *ProofsAdder) Purge() {
if a == nil {
return
}

a.lock.Lock()
defer a.lock.Unlock()

a.proofs = nil
}

func (a *ProofsAdder) visitInnerNodes(hash []byte, children ...[]byte) {
switch len(children) {
case 1:
Expand Down

0 comments on commit b365ee5

Please sign in to comment.