Skip to content

Commit

Permalink
diagnostics: save stats on disk (#10421)
Browse files Browse the repository at this point in the history
Added diagnostics DB as it require to persist latest state to be
displayed for diagnostics command
  • Loading branch information
dvovk authored May 24, 2024
1 parent 89fc131 commit 6d6c12e
Show file tree
Hide file tree
Showing 10 changed files with 329 additions and 53 deletions.
12 changes: 7 additions & 5 deletions diagnostics/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ func Setup(ctx *cli.Context, node *node.ErigonNode, metricsMux *http.ServeMux, p
}

speedTest := ctx.Bool(diagnoticsSpeedTestFlag)

diagnostic := diaglib.NewDiagnosticClient(diagMux, node.Backend().DataDir(), speedTest)
diagnostic.Setup()

SetupEndpoints(ctx, node, diagMux, diagnostic)
diagnostic, err := diaglib.NewDiagnosticClient(ctx.Context, diagMux, node.Backend().DataDir(), speedTest)
if err == nil {
diagnostic.Setup()
SetupEndpoints(ctx, node, diagMux, diagnostic)
} else {
log.Error("[Diagnostics] Failure in setting up diagnostics", "err", err)
}
}

func SetupDiagnosticsEndpoint(metricsMux *http.ServeMux, addres string) *http.ServeMux {
Expand Down
55 changes: 49 additions & 6 deletions erigon-lib/diagnostics/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package diagnostics

import (
"context"
"net/http"
"path/filepath"
"sync"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
)

type DiagnosticClient struct {
ctx context.Context
db kv.RwDB
metricsMux *http.ServeMux
dataDirPath string
speedTest bool
Expand All @@ -27,20 +36,54 @@ type DiagnosticClient struct {
networkSpeedMutex sync.Mutex
}

func NewDiagnosticClient(metricsMux *http.ServeMux, dataDirPath string, speedTest bool) *DiagnosticClient {
func NewDiagnosticClient(ctx context.Context, metricsMux *http.ServeMux, dataDirPath string, speedTest bool) (*DiagnosticClient, error) {
dirPath := filepath.Join(dataDirPath, "diagnostics")
db, err := createDb(ctx, dirPath)
if err != nil {
return nil, err
}

hInfo := ReadSysInfo(db)
ss := ReadSyncStagesInfo(db)
snpdwl := ReadSnapshotDownloadInfo(db)
snpidx := ReadSnapshotIndexingInfo(db)

return &DiagnosticClient{
metricsMux: metricsMux,
dataDirPath: dataDirPath,
speedTest: speedTest,
syncStats: SyncStatistics{},
hardwareInfo: HardwareInfo{},
ctx: ctx,
db: db,
metricsMux: metricsMux,
dataDirPath: dataDirPath,
speedTest: speedTest,
syncStats: SyncStatistics{
SyncStages: ss,
SnapshotDownload: snpdwl,
SnapshotIndexing: snpidx,
},
hardwareInfo: hInfo,
snapshotFileList: SnapshoFilesList{},
bodies: BodiesInfo{},
resourcesUsage: ResourcesUsage{
MemoryUsage: []MemoryStats{},
},
peersStats: NewPeerStats(1000), // 1000 is the limit of peers; TODO: make it configurable through a flag
}, nil
}

func createDb(ctx context.Context, dbDir string) (db kv.RwDB, err error) {
db, err = mdbx.NewMDBX(log.New()).
Label(kv.DiagnosticsDB).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DiagnosticsTablesCfg }).
GrowthStep(4 * datasize.MB).
MapSize(16 * datasize.GB).
PageSize(uint64(4 * datasize.KB)).
RoTxsLimiter(semaphore.NewWeighted(9_000)).
Path(dbDir).
Open(ctx)
if err != nil {
return nil, err
}

return db, nil
}

func (d *DiagnosticClient) Setup() {
Expand Down
58 changes: 57 additions & 1 deletion erigon-lib/diagnostics/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ package diagnostics

import (
"context"
"encoding/json"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
)

var (
SnapshotDownloadStatisticsKey = []byte("diagSnapshotDownloadStatistics")
SnapshotIndexingStatisticsKey = []byte("diagSnapshotIndexingStatistics")
)

func (d *DiagnosticClient) setupSnapshotDiagnostics(rootCtx context.Context) {
d.runSnapshotListener(rootCtx)
d.runSegmentDownloadingListener(rootCtx)
Expand All @@ -26,6 +33,7 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:

d.mu.Lock()
d.syncStats.SnapshotDownload.Downloaded = info.Downloaded
d.syncStats.SnapshotDownload.Total = info.Total
Expand All @@ -39,6 +47,11 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) {
d.syncStats.SnapshotDownload.Sys = info.Sys
d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished
d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady

if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot download info", "err", err)
}

d.mu.Unlock()

if info.DownloadFinished {
Expand Down Expand Up @@ -77,6 +90,10 @@ func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context
d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info
}

if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot download info", "err", err)
}

d.mu.Unlock()
}
}
Expand All @@ -95,6 +112,9 @@ func (d *DiagnosticClient) runSegmentIndexingListener(rootCtx context.Context) {
return
case info := <-ch:
d.addOrUpdateSegmentIndexingState(info)
if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err)
}
}
}
}()
Expand Down Expand Up @@ -128,6 +148,11 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener(rootCtx context.Co
Sys: 0,
})
}

if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err)
}

d.mu.Unlock()
}
}
Expand Down Expand Up @@ -264,7 +289,6 @@ func (d *DiagnosticClient) UpdateFileDownloadedStatistics(downloadedInfo *FileDo
d.syncStats.SnapshotDownload.SegmentsDownloading[downloadingInfo.Name] = *downloadingInfo
}
}

}

func (d *DiagnosticClient) SyncStatistics() SyncStatistics {
Expand All @@ -274,3 +298,35 @@ func (d *DiagnosticClient) SyncStatistics() SyncStatistics {
func (d *DiagnosticClient) SnapshotFilesList() SnapshoFilesList {
return d.snapshotFileList
}

func ReadSnapshotDownloadInfo(db kv.RoDB) (info SnapshotDownloadStatistics) {
data := ReadDataFromTable(db, kv.DiagSyncStages, SnapshotDownloadStatisticsKey)
err := json.Unmarshal(data, &info)

if err != nil {
log.Error("[Diagnostics] Failed to read snapshot download info", "err", err)
return SnapshotDownloadStatistics{}
} else {
return info
}
}

func ReadSnapshotIndexingInfo(db kv.RoDB) (info SnapshotIndexingStatistics) {
data := ReadDataFromTable(db, kv.DiagSyncStages, SnapshotIndexingStatisticsKey)
err := json.Unmarshal(data, &info)

if err != nil {
log.Error("[Diagnostics] Failed to read snapshot indexing info", "err", err)
return SnapshotIndexingStatistics{}
} else {
return info
}
}

func SnapshotDownloadUpdater(info SnapshotDownloadStatistics) func(tx kv.RwTx) error {
return PutDataToTable(kv.DiagSyncStages, SnapshotDownloadStatisticsKey, info)
}

func SnapshotIndexingUpdater(info SnapshotIndexingStatistics) func(tx kv.RwTx) error {
return PutDataToTable(kv.DiagSyncStages, SnapshotIndexingStatisticsKey, info)
}
37 changes: 3 additions & 34 deletions erigon-lib/diagnostics/snapshots_test.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,17 @@
package diagnostics_test

import (
"context"
"testing"

"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/stretchr/testify/require"
)

func TestUpdateFileDownloadingStats(t *testing.T) {
d := diagnostics.NewDiagnosticClient(nil, "test", false)
d, err := diagnostics.NewDiagnosticClient(context.TODO(), nil, "test", true)

d.UpdateFileDownloadedStatistics(nil, &segmentDownloadStatsMock)

sd := d.SyncStatistics().SnapshotDownload.SegmentsDownloading
require.NotNil(t, sd)
require.NotEqual(t, len(sd), 0)

require.Equal(t, sd["test"], segmentDownloadStatsMock)
}

func TestUpdateFileDownloadedStats(t *testing.T) {
d := diagnostics.NewDiagnosticClient(nil, "test", false)

d.UpdateFileDownloadedStatistics(&fileDownloadedUpdMock, nil)

sd := d.SyncStatistics().SnapshotDownload.SegmentsDownloading
require.NotNil(t, sd)
require.NotEqual(t, len(sd), 0)

require.Equal(t, sd["test"], diagnostics.SegmentDownloadStatistics{
Name: "test",
TotalBytes: 0,
DownloadedBytes: 0,
Webseeds: make([]diagnostics.SegmentPeer, 0),
Peers: make([]diagnostics.SegmentPeer, 0),
DownloadedStats: diagnostics.FileDownloadedStatistics{
TimeTook: 1.0,
AverageRate: 1,
},
})
}

func TestUpdateFileFullStatsUpdate(t *testing.T) {
d := diagnostics.NewDiagnosticClient(nil, "test", false)
require.NoError(t, err)

d.UpdateFileDownloadedStatistics(nil, &segmentDownloadStatsMock)

Expand Down
1 change: 1 addition & 0 deletions erigon-lib/diagnostics/speedtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (d *DiagnosticClient) runSpeedTest(rootCtx context.Context) NetworkSpeedTes
}

ctx, cancel := context.WithTimeout(rootCtx, time.Second*15)

defer cancel()
_ = analyzer.RunWithContext(ctx, s.Host, func(pl *transport.PLoss) {
packetLoss = pl.Loss()
Expand Down
59 changes: 59 additions & 0 deletions erigon-lib/diagnostics/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ package diagnostics

import (
"context"
"encoding/json"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
)

var (
StagesListKey = []byte("diagStagesList")
CurrentStageKey = []byte("diagCurrentStage")
)

func (d *DiagnosticClient) setupStagesDiagnostics(rootCtx context.Context) {
d.runCurrentSyncStageListener(rootCtx)
d.runSyncStagesListListener(rootCtx)
Expand All @@ -22,6 +29,10 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:
if err := d.db.Update(d.ctx, StagesListUpdater(info.Stages)); err != nil {
log.Error("[Diagnostics] Failed to update stages list", "err", err)
}

d.mu.Lock()
d.syncStats.SyncStages.StagesList = info.Stages
d.mu.Unlock()
Expand All @@ -42,6 +53,10 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context)
case <-rootCtx.Done():
return
case info := <-ch:
if err := d.db.Update(d.ctx, CurrentStageUpdater(info.Stage)); err != nil {
log.Error("[Diagnostics] Failed to update current stage", "err", err)
}

d.mu.Lock()
d.syncStats.SyncStages.CurrentStage = info.Stage
if int(d.syncStats.SyncStages.CurrentStage) >= len(d.syncStats.SyncStages.StagesList) {
Expand All @@ -52,3 +67,47 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context)
}
}()
}

func ReadSyncStagesInfo(db kv.RoDB) (info SyncStages) {
stageesList := ReadStagesList(db)
currentStage := ReadCurrentStage(db)

return SyncStages{
StagesList: stageesList,
CurrentStage: currentStage,
}
}

func ReadStagesList(db kv.RoDB) []string {
data := ReadDataFromTable(db, kv.DiagSyncStages, StagesListKey)
var info []string
err := json.Unmarshal(data, &info)

if err != nil {
log.Error("[Diagnostics] Failed to read stages list", "err", err)
return []string{}
} else {
return info
}
}

func ReadCurrentStage(db kv.RoDB) uint {
data := ReadDataFromTable(db, kv.DiagSyncStages, StagesListKey)
var info uint
err := json.Unmarshal(data, &info)

if err != nil {
log.Error("[Diagnostics] Failed to read current stage", "err", err)
return 0
} else {
return info
}
}

func StagesListUpdater(info []string) func(tx kv.RwTx) error {
return PutDataToTable(kv.DiagSyncStages, StagesListKey, info)
}

func CurrentStageUpdater(info uint) func(tx kv.RwTx) error {
return PutDataToTable(kv.DiagSyncStages, StagesListKey, info)
}
Loading

0 comments on commit 6d6c12e

Please sign in to comment.