From e68a6d68fbd41a65720d728a20da0e926dab0cb9 Mon Sep 17 00:00:00 2001 From: Vlad Date: Wed, 16 Aug 2023 14:01:48 +0300 Subject: [PATCH 1/3] remove proofs leaked ctx --- share/eds/eds.go | 8 +++++--- share/ipld/nmt_adder.go | 20 ++++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/share/eds/eds.go b/share/eds/eds.go index f1efd3b433..dad522e004 100644 --- a/share/eds/eds.go +++ b/share/eds/eds.go @@ -108,9 +108,9 @@ func writeProofs(ctx context.Context, eds *rsmt2d.ExtendedDataSquare, w io.Write func getProofs(ctx context.Context, eds *rsmt2d.ExtendedDataSquare) (map[cid.Cid][]byte, error) { // check if there are proofs collected by ipld.ProofsAdder in previous reconstruction of eds - proofs := ipld.ProofsAdderFromCtx(ctx).Proofs() - if proofs != nil { - return proofs, nil + if adder := ipld.ProofsAdderFromCtx(ctx); adder != nil { + defer adder.Purge() + return adder.Proofs(), nil } // recompute proofs from eds @@ -124,6 +124,8 @@ func getProofs(ctx context.Context, eds *rsmt2d.ExtendedDataSquare) (map[cid.Cid // this adder ignores leaves, so that they are not added to the store we iterate through in // writeProofs adder := ipld.NewProofsAdder(odsWidth * 2) + defer adder.Purge() + eds, err := rsmt2d.ImportExtendedDataSquare( shares, share.DefaultRSMT2DCodec(), diff --git a/share/ipld/nmt_adder.go b/share/ipld/nmt_adder.go index 0b4c521a80..f8f5452b2b 100644 --- a/share/ipld/nmt_adder.go +++ b/share/ipld/nmt_adder.go @@ -101,11 +101,13 @@ func BatchSize(squareSize int) int { return (squareSize*2-1)*squareSize*2 - (squareSize * squareSize) } +// ProofsAdder is used to collect proof nodes, while traversing merkle tree type ProofsAdder struct { lock sync.RWMutex proofs map[cid.Cid][]byte } +// NewProofsAdder creates new instance of ProofsAdder. func NewProofsAdder(squareSize int) *ProofsAdder { return &ProofsAdder{ // preallocate map to fit all inner nodes for given square size @@ -113,10 +115,14 @@ func NewProofsAdder(squareSize int) *ProofsAdder { } } +// CtxWithProofsAdder creates context, that will contain ProofsAdder. If context is leaked to +// another go-routine, proofs will be not collected by gc. To prevent it, use Purge after Proofs +// are collected from adder, to preemptively release memory allocated for proofs. func CtxWithProofsAdder(ctx context.Context, adder *ProofsAdder) context.Context { return context.WithValue(ctx, proofsAdderKey, adder) } +// ProofsAdderFromCtx extracts ProofsAdder from context func ProofsAdderFromCtx(ctx context.Context) *ProofsAdder { val := ctx.Value(proofsAdderKey) adder, ok := val.(*ProofsAdder) @@ -126,6 +132,7 @@ func ProofsAdderFromCtx(ctx context.Context) *ProofsAdder { return adder } +// Proofs returns proofs collected by ProofsAdder func (a *ProofsAdder) Proofs() map[cid.Cid][]byte { if a == nil { return nil @@ -136,6 +143,7 @@ func (a *ProofsAdder) Proofs() map[cid.Cid][]byte { return a.proofs } +// VisitFn returns NodeVisitorFn, that will collect proof nodes while traversing merkle tree. func (a *ProofsAdder) VisitFn() nmt.NodeVisitorFn { if a == nil { return nil @@ -151,6 +159,18 @@ func (a *ProofsAdder) VisitFn() nmt.NodeVisitorFn { return a.visitInnerNodes } +// Purge removed proofs from ProofsAdder allowing GC to collect the memory +func (a *ProofsAdder) Purge() { + if a == nil { + return + } + + a.lock.RLock() + defer a.lock.RUnlock() + + a.proofs = nil +} + func (a *ProofsAdder) visitInnerNodes(hash []byte, children ...[]byte) { switch len(children) { case 1: From a0d4a7e37c3626253c690b7d046351e7f0743d58 Mon Sep 17 00:00:00 2001 From: Vlad Date: Thu, 17 Aug 2023 13:43:57 +0300 Subject: [PATCH 2/3] cleanup proofadder cache on errors --- core/exchange.go | 4 ++++ core/listener.go | 2 ++ share/getters/tee.go | 5 ++++- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/core/exchange.go b/core/exchange.go index a9b3a532d5..bed2404195 100644 --- a/core/exchange.go +++ b/core/exchange.go @@ -108,6 +108,8 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende // extend block data adder := ipld.NewProofsAdder(int(block.Data.SquareSize)) + defer adder.Purge() + eds, err := extendBlock(block.Data, block.Header.Version.App, nmt.NodeVisitor(adder.VisitFn())) if err != nil { return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err) @@ -148,6 +150,8 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64 // extend block data adder := ipld.NewProofsAdder(int(b.Data.SquareSize)) + defer adder.Purge() + eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn())) if err != nil { return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err) diff --git a/core/listener.go b/core/listener.go index 17be12a780..565fc62032 100644 --- a/core/listener.go +++ b/core/listener.go @@ -153,6 +153,8 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS ) // extend block data adder := ipld.NewProofsAdder(int(b.Data.SquareSize)) + defer adder.Purge() + eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn())) if err != nil { return fmt.Errorf("extending block data: %w", err) diff --git a/share/getters/tee.go b/share/getters/tee.go index 8ee1e91390..9c89b2dec5 100644 --- a/share/getters/tee.go +++ b/share/getters/tee.go @@ -52,7 +52,10 @@ func (tg *TeeGetter) GetEDS(ctx context.Context, root *share.Root) (eds *rsmt2d. utils.SetStatusAndEnd(span, err) }() - ctx = ipld.CtxWithProofsAdder(ctx, ipld.NewProofsAdder(len(root.RowRoots))) + adder := ipld.NewProofsAdder(len(root.RowRoots)) + ctx = ipld.CtxWithProofsAdder(ctx, adder) + defer adder.Purge() + eds, err = tg.getter.GetEDS(ctx, root) if err != nil { return nil, err From 4a6ed251f1b1b806dc3c187190a7e459bce43253 Mon Sep 17 00:00:00 2001 From: Vlad Date: Fri, 18 Aug 2023 17:09:07 +0300 Subject: [PATCH 3/3] fix rlock --- share/ipld/nmt_adder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/share/ipld/nmt_adder.go b/share/ipld/nmt_adder.go index f8f5452b2b..d090c679d9 100644 --- a/share/ipld/nmt_adder.go +++ b/share/ipld/nmt_adder.go @@ -165,8 +165,8 @@ func (a *ProofsAdder) Purge() { return } - a.lock.RLock() - defer a.lock.RUnlock() + a.lock.Lock() + defer a.lock.Unlock() a.proofs = nil }