Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dvovk/limit mem usage #10069

Merged
merged 7 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion erigon-lib/diagnostics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type DiagnosticClient struct {
mu sync.Mutex
headerMutex sync.Mutex
hardwareInfo HardwareInfo
peersSyncMap sync.Map
peersStats PeerStats
headers Headers
bodies BodiesInfo
bodiesMutex sync.Mutex
Expand All @@ -37,6 +37,7 @@ func NewDiagnosticClient(metricsMux *http.ServeMux, dataDirPath string) *Diagnos
resourcesUsage: ResourcesUsage{
MemoryUsage: []MemoryStats{},
},
peersStats: *NewPeerStats(1000), // 1000 is the limit of peers; TODO: make it configurable through a flag
}
}

Expand Down
4 changes: 3 additions & 1 deletion erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package diagnostics

import "time"
import (
"time"
)

type PeerStatisticsGetter interface {
GetPeersStatistics() map[string]*PeerStatistics
Expand Down
223 changes: 165 additions & 58 deletions erigon-lib/diagnostics/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,99 @@ package diagnostics

import (
"context"
"sort"
"sync"
"time"

"github.com/ledgerwatch/log/v3"
)

func (d *DiagnosticClient) setupNetworkDiagnostics(rootCtx context.Context) {
d.runCollectPeersStatistics(rootCtx)
type PeerStats struct {
peersInfo sync.Map
dvovk marked this conversation as resolved.
Show resolved Hide resolved
recordsCount int
lastUpdateMap map[string]time.Time
limit int
}

func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) {
go func() {
ctx, ch, closeChannel := Context[PeerStatisticMsgUpdate](rootCtx, 1)
defer closeChannel()
func NewPeerStats(peerLimit int) *PeerStats {
return &PeerStats{
peersInfo: sync.Map{},
recordsCount: 0,
lastUpdateMap: make(map[string]time.Time),
limit: peerLimit,
}
}

StartProviders(ctx, TypeOf(PeerStatisticMsgUpdate{}), log.Root())
for {
select {
case <-rootCtx.Done():
return
case info := <-ch:
if value, ok := d.peersSyncMap.Load(info.PeerID); ok {
if stats, ok := value.(PeerStatistics); ok {
if info.Inbound {
stats.BytesIn += uint64(info.Bytes)
stats.CapBytesIn[info.MsgCap] += uint64(info.Bytes)
stats.TypeBytesIn[info.MsgType] += uint64(info.Bytes)
} else {
stats.BytesOut += uint64(info.Bytes)
stats.CapBytesOut[info.MsgCap] += uint64(info.Bytes)
stats.TypeBytesOut[info.MsgType] += uint64(info.Bytes)
}

d.peersSyncMap.Store(info.PeerID, stats)
} else {
log.Debug("Failed to cast value to PeerStatistics struct", value)
}
} else {
d.peersSyncMap.Store(info.PeerID, PeerStatistics{
PeerType: info.PeerType,
CapBytesIn: make(map[string]uint64),
CapBytesOut: make(map[string]uint64),
TypeBytesIn: make(map[string]uint64),
TypeBytesOut: make(map[string]uint64),
})
}
}
func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate) {
if value, ok := p.peersInfo.Load(peerID); ok {
p.UpdatePeer(peerID, peerInfo, value)
} else {
p.AddPeer(peerID, peerInfo)
if p.GetPeersCount() > p.limit {
p.RemovePeersWhichExceedLimit(p.limit)
}
}()
}
}

func (d *DiagnosticClient) Peers() map[string]*PeerStatistics {
stats := make(map[string]*PeerStatistics)
func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) {
pv := PeerStatisticsFromMsgUpdate(peerInfo, nil)
p.peersInfo.Store(peerID, pv)
p.recordsCount++
p.lastUpdateMap[peerID] = time.Now()
}

func (p *PeerStats) UpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) {
pv := PeerStatisticsFromMsgUpdate(peerInfo, prevValue)

d.peersSyncMap.Range(func(key, value interface{}) bool {
p.peersInfo.Store(peerID, pv)
p.lastUpdateMap[peerID] = time.Now()
}

func PeerStatisticsFromMsgUpdate(msg PeerStatisticMsgUpdate, prevValue any) PeerStatistics {
ps := PeerStatistics{
PeerType: msg.PeerType,
BytesIn: 0,
BytesOut: 0,
CapBytesIn: make(map[string]uint64),
CapBytesOut: make(map[string]uint64),
TypeBytesIn: make(map[string]uint64),
TypeBytesOut: make(map[string]uint64),
}

if stats, ok := prevValue.(PeerStatistics); ok {
if msg.Inbound {
ps.BytesIn = stats.BytesIn + uint64(msg.Bytes)
ps.CapBytesIn[msg.MsgCap] = stats.CapBytesIn[msg.MsgCap] + uint64(msg.Bytes)
ps.TypeBytesIn[msg.MsgType] = stats.TypeBytesIn[msg.MsgType] + uint64(msg.Bytes)
} else {
ps.BytesOut = stats.BytesOut + uint64(msg.Bytes)
ps.CapBytesOut[msg.MsgCap] = stats.CapBytesOut[msg.MsgCap] + uint64(msg.Bytes)
ps.TypeBytesOut[msg.MsgType] = stats.TypeBytesOut[msg.MsgType] + uint64(msg.Bytes)
}
} else {
if msg.Inbound {
ps.BytesIn += uint64(msg.Bytes)
ps.CapBytesIn[msg.MsgCap] += uint64(msg.Bytes)
ps.TypeBytesIn[msg.MsgType] += uint64(msg.Bytes)
} else {
ps.BytesOut += uint64(msg.Bytes)
ps.CapBytesOut[msg.MsgCap] += uint64(msg.Bytes)
ps.TypeBytesOut[msg.MsgType] += uint64(msg.Bytes)
}

}

return ps
}

func (p *PeerStats) GetPeersCount() int {
return p.recordsCount
}

func (p *PeerStats) GetPeers() map[string]*PeerStatistics {
stats := make(map[string]*PeerStatistics)

p.peersInfo.Range(func(key, value interface{}) bool {
if loadedKey, ok := key.(string); ok {
if loadedValue, ok := value.(PeerStatistics); ok {
stats[loadedKey] = &loadedValue
Expand All @@ -69,26 +108,94 @@ func (d *DiagnosticClient) Peers() map[string]*PeerStatistics {
return true
})

d.PeerDataResetStatistics()

return stats
}

func (d *DiagnosticClient) PeerDataResetStatistics() {
d.peersSyncMap.Range(func(key, value interface{}) bool {
if stats, ok := value.(PeerStatistics); ok {
stats.BytesIn = 0
stats.BytesOut = 0
stats.CapBytesIn = make(map[string]uint64)
stats.CapBytesOut = make(map[string]uint64)
stats.TypeBytesIn = make(map[string]uint64)
stats.TypeBytesOut = make(map[string]uint64)

d.peersSyncMap.Store(key, stats)
} else {
log.Debug("Failed to cast value to PeerStatistics struct", value)
func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics {
if value, ok := p.peersInfo.Load(peerID); ok {
if peerStats, ok := value.(PeerStatistics); ok {
return peerStats
}
}

return true
return PeerStatistics{}
}

func (p *PeerStats) GetLastUpdate(peerID string) time.Time {
if lastUpdate, ok := p.lastUpdateMap[peerID]; ok {
return lastUpdate
}

return time.Time{}
}

func (p *PeerStats) Reset() {
p.peersInfo = sync.Map{}
p.recordsCount = 0
p.lastUpdateMap = make(map[string]time.Time)
}

func (p *PeerStats) RemovePeer(peerID string) {
p.peersInfo.Delete(peerID)
p.recordsCount--
delete(p.lastUpdateMap, peerID)
}

type PeerUpdTime struct {
PeerID string
Time time.Time
}

func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime {
timeArray := make([]PeerUpdTime, 0, p.GetPeersCount())
for k, v := range p.lastUpdateMap {
timeArray = append(timeArray, PeerUpdTime{k, v})
}

sort.Slice(timeArray, func(i, j int) bool {
return timeArray[i].Time.Before(timeArray[j].Time)
})

if len(timeArray) < size {
return timeArray
} else {
return timeArray[:size]
}
}

func (p *PeerStats) RemovePeersWhichExceedLimit(limit int) {
peersToRemove := p.GetPeersCount() - limit
if peersToRemove > 0 {
peers := p.GetOldestUpdatedPeersWithSize(peersToRemove)
for _, peer := range peers {
p.RemovePeer(peer.PeerID)
}
}
}

func (d *DiagnosticClient) setupNetworkDiagnostics(rootCtx context.Context) {
d.runCollectPeersStatistics(rootCtx)
}

func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) {
go func() {
ctx, ch, closeChannel := Context[PeerStatisticMsgUpdate](rootCtx, 1)
defer closeChannel()

StartProviders(ctx, TypeOf(PeerStatisticMsgUpdate{}), log.Root())
for {
select {
case <-rootCtx.Done():
return
case info := <-ch:
d.peersStats.AddOrUpdatePeer(info.PeerID, info)
}
}
}()
}

func (d *DiagnosticClient) Peers() map[string]*PeerStatistics {
peers := d.peersStats.GetPeers()
d.peersStats.Reset()
return peers
}
Loading
Loading