From be9f1216175ccc62cfa974e8857b68131d48ce40 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Thu, 20 Jul 2023 09:47:47 +0900 Subject: [PATCH] perf(metarepos): add a pool for *mrpb.RaftEntry This change adds the pool for *mrpb.RaftEntry to reuse it. --- .../metarepos/raft_metadata_repository.go | 103 ++++++++++-------- proto/mrpb/raft_entry.go | 18 +++ 2 files changed, 78 insertions(+), 43 deletions(-) diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index c33068575..0775fa9bd 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -302,15 +302,16 @@ Loop: b, err := e.Marshal() if err != nil { mr.logger.Error(err.Error()) + e.Release() continue } - // Release *mrpb.Reports only if e contains Reports. - e.GetRequest().Report.Release() + nodeIndex, requestIndex := e.NodeIndex, e.RequestIndex + e.Release() select { case mr.rnProposeC <- b: case <-ctx.Done(): - mr.sendAck(e.NodeIndex, e.RequestIndex, ctx.Err()) + mr.sendAck(nodeIndex, requestIndex, ctx.Err()) } case <-ctx.Done(): break Loop @@ -1150,53 +1151,69 @@ func (mr *RaftMetadataRepository) proposeReport(snID types.StorageNodeID, ur []s return nil } -func (mr *RaftMetadataRepository) propose(ctx context.Context, r interface{}, guarantee bool) error { - e := &mrpb.RaftEntry{} - e.Request.SetValue(r) - e.NodeIndex = uint64(mr.nodeID) - e.RequestIndex = UnusedRequestIndex +func (mr *RaftMetadataRepository) leaseRaftEntry(request any, requestIndex uint64) *mrpb.RaftEntry { + re := mrpb.NewRaftEntry() + re.Request.SetValue(request) + re.NodeIndex = uint64(mr.nodeID) + re.RequestIndex = requestIndex + return re +} - if guarantee { - c := make(chan error, 1) - rIdx := mr.requestNum.Add(1) +func (mr *RaftMetadataRepository) proposeWithoutGuarantee(ctx context.Context, request any) (err error) { + re := mr.leaseRaftEntry(request, UnusedRequestIndex) + select { + case mr.proposeC <- re: + return nil + case <-ctx.Done(): + err = ctx.Err() + default: + err = verrors.ErrIgnore + } + if err != nil { + re.Release() + } + return err +} - e.RequestIndex = rIdx - mr.requestMap.Store(rIdx, c) - defer mr.requestMap.Delete(rIdx) +func (mr *RaftMetadataRepository) proposeWithGuarantee(ctx context.Context, request any) error { + rIdx := mr.requestNum.Add(1) + c := make(chan error, 1) + mr.requestMap.Store(rIdx, c) + defer mr.requestMap.Delete(rIdx) - t := time.NewTimer(mr.raftProposeTimeout) - defer t.Stop() + t := time.NewTimer(mr.raftProposeTimeout) + defer t.Stop() - PROPOSE: - select { - case mr.proposeC <- e: - case <-t.C: - t.Reset(mr.raftProposeTimeout) - goto PROPOSE - case <-ctx.Done(): - return ctx.Err() - } + re := mr.leaseRaftEntry(request, rIdx) - select { - case err := <-c: - return err - case <-t.C: - t.Reset(mr.raftProposeTimeout) - goto PROPOSE - case <-ctx.Done(): - return ctx.Err() - } - } else { - select { - case <-ctx.Done(): - return ctx.Err() - case mr.proposeC <- e: - default: - return verrors.ErrIgnore - } +Propose: + select { + case mr.proposeC <- re: + case <-t.C: + t.Reset(mr.raftProposeTimeout) + goto Propose + case <-ctx.Done(): + re.Release() + return ctx.Err() } - return nil + select { + case err := <-c: + return err + case <-t.C: + t.Reset(mr.raftProposeTimeout) + re = mr.leaseRaftEntry(request, rIdx) + goto Propose + case <-ctx.Done(): + return ctx.Err() + } +} + +func (mr *RaftMetadataRepository) propose(ctx context.Context, request interface{}, guarantee bool) error { + if !guarantee { + return mr.proposeWithoutGuarantee(ctx, request) + } + return mr.proposeWithGuarantee(ctx, request) } func (mr *RaftMetadataRepository) proposeConfChange(ctx context.Context, r raftpb.ConfChange) error { diff --git a/proto/mrpb/raft_entry.go b/proto/mrpb/raft_entry.go index caecc30e7..036843c28 100644 --- a/proto/mrpb/raft_entry.go +++ b/proto/mrpb/raft_entry.go @@ -53,3 +53,21 @@ func (rq *ReportQueue) Release() { reportQueuePool.Put(rq) } } + +var raftEntryPool = sync.Pool{ + New: func() any { + return &RaftEntry{} + }, +} + +func NewRaftEntry() *RaftEntry { + return raftEntryPool.Get().(*RaftEntry) +} + +func (re *RaftEntry) Release() { + if re != nil { + re.Request.Report.Release() + re.Reset() + raftEntryPool.Put(re) + } +}