Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

diagnostics: save stats on disk #10421

Merged
merged 12 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions diagnostics/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ func Setup(ctx *cli.Context, node *node.ErigonNode, metricsMux *http.ServeMux, p
}

speedTest := ctx.Bool(diagnoticsSpeedTestFlag)

diagnostic := diaglib.NewDiagnosticClient(diagMux, node.Backend().DataDir(), speedTest)
diagnostic.Setup()

SetupEndpoints(ctx, node, diagMux, diagnostic)
diagnostic, err := diaglib.NewDiagnosticClient(ctx.Context, diagMux, node.Backend().DataDir(), speedTest)
if err == nil {
diagnostic.Setup()
SetupEndpoints(ctx, node, diagMux, diagnostic)
} else {
log.Error("[Diagnostics] Failure in setting up diagnostics", "err", err)
}
}

func SetupDiagnosticsEndpoint(metricsMux *http.ServeMux, addres string) *http.ServeMux {
Expand Down
55 changes: 49 additions & 6 deletions erigon-lib/diagnostics/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package diagnostics

import (
"context"
"net/http"
"path/filepath"
"sync"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
)

type DiagnosticClient struct {
ctx context.Context
db kv.RwDB
metricsMux *http.ServeMux
dataDirPath string
speedTest bool
Expand All @@ -27,20 +36,54 @@ type DiagnosticClient struct {
networkSpeedMutex sync.Mutex
}

func NewDiagnosticClient(metricsMux *http.ServeMux, dataDirPath string, speedTest bool) *DiagnosticClient {
func NewDiagnosticClient(ctx context.Context, metricsMux *http.ServeMux, dataDirPath string, speedTest bool) (*DiagnosticClient, error) {
dirPath := filepath.Join(dataDirPath, "diagnostics")
db, err := createDb(ctx, dirPath)
if err != nil {
return nil, err
}

hInfo := ReadSysInfo(db)
ss := ReadSyncStagesInfo(db)
snpdwl := ReadSnapshotDownloadInfo(db)
snpidx := ReadSnapshotIndexingInfo(db)

return &DiagnosticClient{
metricsMux: metricsMux,
dataDirPath: dataDirPath,
speedTest: speedTest,
syncStats: SyncStatistics{},
hardwareInfo: HardwareInfo{},
ctx: ctx,
db: db,
metricsMux: metricsMux,
dataDirPath: dataDirPath,
speedTest: speedTest,
syncStats: SyncStatistics{
SyncStages: ss,
SnapshotDownload: snpdwl,
SnapshotIndexing: snpidx,
},
hardwareInfo: hInfo,
snapshotFileList: SnapshoFilesList{},
bodies: BodiesInfo{},
resourcesUsage: ResourcesUsage{
MemoryUsage: []MemoryStats{},
},
peersStats: NewPeerStats(1000), // 1000 is the limit of peers; TODO: make it configurable through a flag
}, nil
}

func createDb(ctx context.Context, dbDir string) (db kv.RwDB, err error) {
db, err = mdbx.NewMDBX(log.New()).
Label(kv.DiagnosticsDB).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DiagnosticsTablesCfg }).
GrowthStep(4 * datasize.MB).
MapSize(16 * datasize.GB).
PageSize(uint64(4 * datasize.KB)).
RoTxsLimiter(semaphore.NewWeighted(9_000)).
Path(dbDir).
Open(ctx)
if err != nil {
return nil, err
}

return db, nil
}

func (d *DiagnosticClient) Setup() {
Expand Down
58 changes: 57 additions & 1 deletion erigon-lib/diagnostics/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ package diagnostics

import (
"context"
"encoding/json"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
)

var (
SnapshotDownloadStatisticsKey = []byte("diagSnapshotDownloadStatistics")
SnapshotIndexingStatisticsKey = []byte("diagSnapshotIndexingStatistics")
)

func (d *DiagnosticClient) setupSnapshotDiagnostics(rootCtx context.Context) {
d.runSnapshotListener(rootCtx)
d.runSegmentDownloadingListener(rootCtx)
Expand All @@ -26,6 +33,7 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:

d.mu.Lock()
d.syncStats.SnapshotDownload.Downloaded = info.Downloaded
d.syncStats.SnapshotDownload.Total = info.Total
Expand All @@ -39,6 +47,11 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) {
d.syncStats.SnapshotDownload.Sys = info.Sys
d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished
d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady

if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot download info", "err", err)
}

d.mu.Unlock()

if info.DownloadFinished {
Expand Down Expand Up @@ -77,6 +90,10 @@ func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context
d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info
}

if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot download info", "err", err)
}

d.mu.Unlock()
}
}
Expand All @@ -95,6 +112,9 @@ func (d *DiagnosticClient) runSegmentIndexingListener(rootCtx context.Context) {
return
case info := <-ch:
d.addOrUpdateSegmentIndexingState(info)
if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err)
}
}
}
}()
Expand Down Expand Up @@ -128,6 +148,11 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener(rootCtx context.Co
Sys: 0,
})
}

if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err)
}

d.mu.Unlock()
}
}
Expand Down Expand Up @@ -264,7 +289,6 @@ func (d *DiagnosticClient) UpdateFileDownloadedStatistics(downloadedInfo *FileDo
d.syncStats.SnapshotDownload.SegmentsDownloading[downloadingInfo.Name] = *downloadingInfo
}
}

}

func (d *DiagnosticClient) SyncStatistics() SyncStatistics {
Expand All @@ -274,3 +298,35 @@ func (d *DiagnosticClient) SyncStatistics() SyncStatistics {
func (d *DiagnosticClient) SnapshotFilesList() SnapshoFilesList {
return d.snapshotFileList
}

func ReadSnapshotDownloadInfo(db kv.RoDB) (info SnapshotDownloadStatistics) {
data := ReadDataFromTable(db, kv.DiagSyncStages, SnapshotDownloadStatisticsKey)
err := json.Unmarshal(data, &info)

if err != nil {
log.Error("[Diagnostics] Failed to read snapshot download info", "err", err)
return SnapshotDownloadStatistics{}
} else {
return info
}
}

func ReadSnapshotIndexingInfo(db kv.RoDB) (info SnapshotIndexingStatistics) {
data := ReadDataFromTable(db, kv.DiagSyncStages, SnapshotIndexingStatisticsKey)
err := json.Unmarshal(data, &info)

if err != nil {
log.Error("[Diagnostics] Failed to read snapshot indexing info", "err", err)
return SnapshotIndexingStatistics{}
} else {
return info
}
}

func SnapshotDownloadUpdater(info SnapshotDownloadStatistics) func(tx kv.RwTx) error {
return PutDataToTable(kv.DiagSyncStages, SnapshotDownloadStatisticsKey, info)
}

func SnapshotIndexingUpdater(info SnapshotIndexingStatistics) func(tx kv.RwTx) error {
return PutDataToTable(kv.DiagSyncStages, SnapshotIndexingStatisticsKey, info)
}
37 changes: 3 additions & 34 deletions erigon-lib/diagnostics/snapshots_test.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,17 @@
package diagnostics_test

import (
"context"
"testing"

"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/stretchr/testify/require"
)

func TestUpdateFileDownloadingStats(t *testing.T) {
d := diagnostics.NewDiagnosticClient(nil, "test", false)
d, err := diagnostics.NewDiagnosticClient(context.TODO(), nil, "test", true)

d.UpdateFileDownloadedStatistics(nil, &segmentDownloadStatsMock)

sd := d.SyncStatistics().SnapshotDownload.SegmentsDownloading
require.NotNil(t, sd)
require.NotEqual(t, len(sd), 0)

require.Equal(t, sd["test"], segmentDownloadStatsMock)
}

func TestUpdateFileDownloadedStats(t *testing.T) {
d := diagnostics.NewDiagnosticClient(nil, "test", false)

d.UpdateFileDownloadedStatistics(&fileDownloadedUpdMock, nil)

sd := d.SyncStatistics().SnapshotDownload.SegmentsDownloading
require.NotNil(t, sd)
require.NotEqual(t, len(sd), 0)

require.Equal(t, sd["test"], diagnostics.SegmentDownloadStatistics{
Name: "test",
TotalBytes: 0,
DownloadedBytes: 0,
Webseeds: make([]diagnostics.SegmentPeer, 0),
Peers: make([]diagnostics.SegmentPeer, 0),
DownloadedStats: diagnostics.FileDownloadedStatistics{
TimeTook: 1.0,
AverageRate: 1,
},
})
}

func TestUpdateFileFullStatsUpdate(t *testing.T) {
d := diagnostics.NewDiagnosticClient(nil, "test", false)
require.NoError(t, err)

d.UpdateFileDownloadedStatistics(nil, &segmentDownloadStatsMock)

Expand Down
1 change: 1 addition & 0 deletions erigon-lib/diagnostics/speedtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (d *DiagnosticClient) runSpeedTest(rootCtx context.Context) NetworkSpeedTes
}

ctx, cancel := context.WithTimeout(rootCtx, time.Second*15)

defer cancel()
_ = analyzer.RunWithContext(ctx, s.Host, func(pl *transport.PLoss) {
packetLoss = pl.Loss()
Expand Down
59 changes: 59 additions & 0 deletions erigon-lib/diagnostics/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ package diagnostics

import (
"context"
"encoding/json"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
)

var (
StagesListKey = []byte("diagStagesList")
CurrentStageKey = []byte("diagCurrentStage")
)

func (d *DiagnosticClient) setupStagesDiagnostics(rootCtx context.Context) {
d.runCurrentSyncStageListener(rootCtx)
d.runSyncStagesListListener(rootCtx)
Expand All @@ -22,6 +29,10 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:
if err := d.db.Update(d.ctx, StagesListUpdater(info.Stages)); err != nil {
log.Error("[Diagnostics] Failed to update stages list", "err", err)
}

d.mu.Lock()
d.syncStats.SyncStages.StagesList = info.Stages
d.mu.Unlock()
Expand All @@ -42,6 +53,10 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context)
case <-rootCtx.Done():
return
case info := <-ch:
if err := d.db.Update(d.ctx, CurrentStageUpdater(info.Stage)); err != nil {
log.Error("[Diagnostics] Failed to update current stage", "err", err)
}

d.mu.Lock()
d.syncStats.SyncStages.CurrentStage = info.Stage
if int(d.syncStats.SyncStages.CurrentStage) >= len(d.syncStats.SyncStages.StagesList) {
Expand All @@ -52,3 +67,47 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context)
}
}()
}

func ReadSyncStagesInfo(db kv.RoDB) (info SyncStages) {
stageesList := ReadStagesList(db)
currentStage := ReadCurrentStage(db)

return SyncStages{
StagesList: stageesList,
CurrentStage: currentStage,
}
}

func ReadStagesList(db kv.RoDB) []string {
data := ReadDataFromTable(db, kv.DiagSyncStages, StagesListKey)
var info []string
err := json.Unmarshal(data, &info)

if err != nil {
log.Error("[Diagnostics] Failed to read stages list", "err", err)
return []string{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it ok to just omit error? data is stored in db but each time it could be just empty string or 0 without knowing something wrong with encoding - so maybe add error log entry when error happens and omitted?

} else {
return info
}
}

func ReadCurrentStage(db kv.RoDB) uint {
data := ReadDataFromTable(db, kv.DiagSyncStages, StagesListKey)
var info uint
err := json.Unmarshal(data, &info)

if err != nil {
log.Error("[Diagnostics] Failed to read current stage", "err", err)
return 0
} else {
return info
}
}

func StagesListUpdater(info []string) func(tx kv.RwTx) error {
return PutDataToTable(kv.DiagSyncStages, StagesListKey, info)
}

func CurrentStageUpdater(info uint) func(tx kv.RwTx) error {
return PutDataToTable(kv.DiagSyncStages, StagesListKey, info)
}
Loading
Loading