Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(share/ipld): Remove proofs from leaked ctx #2574

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende

// extend block data
adder := ipld.NewProofsAdder(int(block.Data.SquareSize))
defer adder.Purge()

eds, err := extendBlock(block.Data, block.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err)
Expand Down Expand Up @@ -148,6 +150,8 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64

// extend block data
adder := ipld.NewProofsAdder(int(b.Data.SquareSize))
defer adder.Purge()

eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
Expand Down
2 changes: 2 additions & 0 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
)
// extend block data
adder := ipld.NewProofsAdder(int(b.Data.SquareSize))
defer adder.Purge()

eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return fmt.Errorf("extending block data: %w", err)
Expand Down
8 changes: 5 additions & 3 deletions share/eds/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ func writeProofs(ctx context.Context, eds *rsmt2d.ExtendedDataSquare, w io.Write

func getProofs(ctx context.Context, eds *rsmt2d.ExtendedDataSquare) (map[cid.Cid][]byte, error) {
// check if there are proofs collected by ipld.ProofsAdder in previous reconstruction of eds
proofs := ipld.ProofsAdderFromCtx(ctx).Proofs()
if proofs != nil {
return proofs, nil
if adder := ipld.ProofsAdderFromCtx(ctx); adder != nil {
defer adder.Purge()
return adder.Proofs(), nil
}

// recompute proofs from eds
Expand All @@ -124,6 +124,8 @@ func getProofs(ctx context.Context, eds *rsmt2d.ExtendedDataSquare) (map[cid.Cid
// this adder ignores leaves, so that they are not added to the store we iterate through in
// writeProofs
adder := ipld.NewProofsAdder(odsWidth * 2)
defer adder.Purge()

eds, err := rsmt2d.ImportExtendedDataSquare(
shares,
share.DefaultRSMT2DCodec(),
Expand Down
5 changes: 4 additions & 1 deletion share/getters/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func (tg *TeeGetter) GetEDS(ctx context.Context, root *share.Root) (eds *rsmt2d.
utils.SetStatusAndEnd(span, err)
}()

ctx = ipld.CtxWithProofsAdder(ctx, ipld.NewProofsAdder(len(root.RowRoots)))
adder := ipld.NewProofsAdder(len(root.RowRoots))
ctx = ipld.CtxWithProofsAdder(ctx, adder)
defer adder.Purge()

eds, err = tg.getter.GetEDS(ctx, root)
if err != nil {
return nil, err
Expand Down
20 changes: 20 additions & 0 deletions share/ipld/nmt_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,28 @@ func BatchSize(squareSize int) int {
return (squareSize*2-1)*squareSize*2 - (squareSize * squareSize)
}

// ProofsAdder is used to collect proof nodes, while traversing merkle tree
type ProofsAdder struct {
lock sync.RWMutex
proofs map[cid.Cid][]byte
}

// NewProofsAdder creates new instance of ProofsAdder.
func NewProofsAdder(squareSize int) *ProofsAdder {
return &ProofsAdder{
// preallocate map to fit all inner nodes for given square size
proofs: make(map[cid.Cid][]byte, innerNodesAmount(squareSize)),
}
}

// CtxWithProofsAdder creates context, that will contain ProofsAdder. If context is leaked to
// another go-routine, proofs will be not collected by gc. To prevent it, use Purge after Proofs
// are collected from adder, to preemptively release memory allocated for proofs.
func CtxWithProofsAdder(ctx context.Context, adder *ProofsAdder) context.Context {
return context.WithValue(ctx, proofsAdderKey, adder)
}

// ProofsAdderFromCtx extracts ProofsAdder from context
func ProofsAdderFromCtx(ctx context.Context) *ProofsAdder {
val := ctx.Value(proofsAdderKey)
adder, ok := val.(*ProofsAdder)
Expand All @@ -126,6 +132,7 @@ func ProofsAdderFromCtx(ctx context.Context) *ProofsAdder {
return adder
}

// Proofs returns proofs collected by ProofsAdder
func (a *ProofsAdder) Proofs() map[cid.Cid][]byte {
if a == nil {
return nil
Expand All @@ -136,6 +143,7 @@ func (a *ProofsAdder) Proofs() map[cid.Cid][]byte {
return a.proofs
}

// VisitFn returns NodeVisitorFn, that will collect proof nodes while traversing merkle tree.
func (a *ProofsAdder) VisitFn() nmt.NodeVisitorFn {
if a == nil {
return nil
Expand All @@ -151,6 +159,18 @@ func (a *ProofsAdder) VisitFn() nmt.NodeVisitorFn {
return a.visitInnerNodes
}

// Purge removed proofs from ProofsAdder allowing GC to collect the memory
func (a *ProofsAdder) Purge() {
if a == nil {
return
}

a.lock.Lock()
defer a.lock.Unlock()

a.proofs = nil
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}

func (a *ProofsAdder) visitInnerNodes(hash []byte, children ...[]byte) {
switch len(children) {
case 1:
Expand Down
Loading