diff --git a/diagnostics/setup.go b/diagnostics/setup.go index 6534ecac245..7eb9363a3c6 100644 --- a/diagnostics/setup.go +++ b/diagnostics/setup.go @@ -50,11 +50,13 @@ func Setup(ctx *cli.Context, node *node.ErigonNode, metricsMux *http.ServeMux, p } speedTest := ctx.Bool(diagnoticsSpeedTestFlag) - - diagnostic := diaglib.NewDiagnosticClient(diagMux, node.Backend().DataDir(), speedTest) - diagnostic.Setup() - - SetupEndpoints(ctx, node, diagMux, diagnostic) + diagnostic, err := diaglib.NewDiagnosticClient(ctx.Context, diagMux, node.Backend().DataDir(), speedTest) + if err == nil { + diagnostic.Setup() + SetupEndpoints(ctx, node, diagMux, diagnostic) + } else { + log.Error("[Diagnostics] Failure in setting up diagnostics", "err", err) + } } func SetupDiagnosticsEndpoint(metricsMux *http.ServeMux, addres string) *http.ServeMux { diff --git a/erigon-lib/diagnostics/client.go b/erigon-lib/diagnostics/client.go index df5b04a76fc..0c8197ea48b 100644 --- a/erigon-lib/diagnostics/client.go +++ b/erigon-lib/diagnostics/client.go @@ -1,13 +1,22 @@ package diagnostics import ( + "context" "net/http" + "path/filepath" "sync" + "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" + "github.com/ledgerwatch/log/v3" + "golang.org/x/sync/semaphore" ) type DiagnosticClient struct { + ctx context.Context + db kv.RwDB metricsMux *http.ServeMux dataDirPath string speedTest bool @@ -27,20 +36,54 @@ type DiagnosticClient struct { networkSpeedMutex sync.Mutex } -func NewDiagnosticClient(metricsMux *http.ServeMux, dataDirPath string, speedTest bool) *DiagnosticClient { +func NewDiagnosticClient(ctx context.Context, metricsMux *http.ServeMux, dataDirPath string, speedTest bool) (*DiagnosticClient, error) { + dirPath := filepath.Join(dataDirPath, "diagnostics") + db, err := createDb(ctx, dirPath) + if err != nil { + return nil, err + } + + hInfo := ReadSysInfo(db) + ss := ReadSyncStagesInfo(db) + snpdwl := ReadSnapshotDownloadInfo(db) + snpidx := ReadSnapshotIndexingInfo(db) + return &DiagnosticClient{ - metricsMux: metricsMux, - dataDirPath: dataDirPath, - speedTest: speedTest, - syncStats: SyncStatistics{}, - hardwareInfo: HardwareInfo{}, + ctx: ctx, + db: db, + metricsMux: metricsMux, + dataDirPath: dataDirPath, + speedTest: speedTest, + syncStats: SyncStatistics{ + SyncStages: ss, + SnapshotDownload: snpdwl, + SnapshotIndexing: snpidx, + }, + hardwareInfo: hInfo, snapshotFileList: SnapshoFilesList{}, bodies: BodiesInfo{}, resourcesUsage: ResourcesUsage{ MemoryUsage: []MemoryStats{}, }, peersStats: NewPeerStats(1000), // 1000 is the limit of peers; TODO: make it configurable through a flag + }, nil +} + +func createDb(ctx context.Context, dbDir string) (db kv.RwDB, err error) { + db, err = mdbx.NewMDBX(log.New()). + Label(kv.DiagnosticsDB). + WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DiagnosticsTablesCfg }). + GrowthStep(4 * datasize.MB). + MapSize(16 * datasize.GB). + PageSize(uint64(4 * datasize.KB)). + RoTxsLimiter(semaphore.NewWeighted(9_000)). + Path(dbDir). + Open(ctx) + if err != nil { + return nil, err } + + return db, nil } func (d *DiagnosticClient) Setup() { diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index 97f0941083e..f6080fc842b 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -2,10 +2,17 @@ package diagnostics import ( "context" + "encoding/json" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" ) +var ( + SnapshotDownloadStatisticsKey = []byte("diagSnapshotDownloadStatistics") + SnapshotIndexingStatisticsKey = []byte("diagSnapshotIndexingStatistics") +) + func (d *DiagnosticClient) setupSnapshotDiagnostics(rootCtx context.Context) { d.runSnapshotListener(rootCtx) d.runSegmentDownloadingListener(rootCtx) @@ -26,6 +33,7 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) { case <-rootCtx.Done(): return case info := <-ch: + d.mu.Lock() d.syncStats.SnapshotDownload.Downloaded = info.Downloaded d.syncStats.SnapshotDownload.Total = info.Total @@ -39,6 +47,11 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) { d.syncStats.SnapshotDownload.Sys = info.Sys d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady + + if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil { + log.Error("[Diagnostics] Failed to update snapshot download info", "err", err) + } + d.mu.Unlock() if info.DownloadFinished { @@ -77,6 +90,10 @@ func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info } + if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil { + log.Error("[Diagnostics] Failed to update snapshot download info", "err", err) + } + d.mu.Unlock() } } @@ -95,6 +112,9 @@ func (d *DiagnosticClient) runSegmentIndexingListener(rootCtx context.Context) { return case info := <-ch: d.addOrUpdateSegmentIndexingState(info) + if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil { + log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err) + } } } }() @@ -128,6 +148,11 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener(rootCtx context.Co Sys: 0, }) } + + if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil { + log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err) + } + d.mu.Unlock() } } @@ -264,7 +289,6 @@ func (d *DiagnosticClient) UpdateFileDownloadedStatistics(downloadedInfo *FileDo d.syncStats.SnapshotDownload.SegmentsDownloading[downloadingInfo.Name] = *downloadingInfo } } - } func (d *DiagnosticClient) SyncStatistics() SyncStatistics { @@ -274,3 +298,35 @@ func (d *DiagnosticClient) SyncStatistics() SyncStatistics { func (d *DiagnosticClient) SnapshotFilesList() SnapshoFilesList { return d.snapshotFileList } + +func ReadSnapshotDownloadInfo(db kv.RoDB) (info SnapshotDownloadStatistics) { + data := ReadDataFromTable(db, kv.DiagSyncStages, SnapshotDownloadStatisticsKey) + err := json.Unmarshal(data, &info) + + if err != nil { + log.Error("[Diagnostics] Failed to read snapshot download info", "err", err) + return SnapshotDownloadStatistics{} + } else { + return info + } +} + +func ReadSnapshotIndexingInfo(db kv.RoDB) (info SnapshotIndexingStatistics) { + data := ReadDataFromTable(db, kv.DiagSyncStages, SnapshotIndexingStatisticsKey) + err := json.Unmarshal(data, &info) + + if err != nil { + log.Error("[Diagnostics] Failed to read snapshot indexing info", "err", err) + return SnapshotIndexingStatistics{} + } else { + return info + } +} + +func SnapshotDownloadUpdater(info SnapshotDownloadStatistics) func(tx kv.RwTx) error { + return PutDataToTable(kv.DiagSyncStages, SnapshotDownloadStatisticsKey, info) +} + +func SnapshotIndexingUpdater(info SnapshotIndexingStatistics) func(tx kv.RwTx) error { + return PutDataToTable(kv.DiagSyncStages, SnapshotIndexingStatisticsKey, info) +} diff --git a/erigon-lib/diagnostics/snapshots_test.go b/erigon-lib/diagnostics/snapshots_test.go index 9698892d412..cb1c13ec30a 100644 --- a/erigon-lib/diagnostics/snapshots_test.go +++ b/erigon-lib/diagnostics/snapshots_test.go @@ -1,6 +1,7 @@ package diagnostics_test import ( + "context" "testing" "github.com/ledgerwatch/erigon-lib/diagnostics" @@ -8,41 +9,9 @@ import ( ) func TestUpdateFileDownloadingStats(t *testing.T) { - d := diagnostics.NewDiagnosticClient(nil, "test", false) + d, err := diagnostics.NewDiagnosticClient(context.TODO(), nil, "test", true) - d.UpdateFileDownloadedStatistics(nil, &segmentDownloadStatsMock) - - sd := d.SyncStatistics().SnapshotDownload.SegmentsDownloading - require.NotNil(t, sd) - require.NotEqual(t, len(sd), 0) - - require.Equal(t, sd["test"], segmentDownloadStatsMock) -} - -func TestUpdateFileDownloadedStats(t *testing.T) { - d := diagnostics.NewDiagnosticClient(nil, "test", false) - - d.UpdateFileDownloadedStatistics(&fileDownloadedUpdMock, nil) - - sd := d.SyncStatistics().SnapshotDownload.SegmentsDownloading - require.NotNil(t, sd) - require.NotEqual(t, len(sd), 0) - - require.Equal(t, sd["test"], diagnostics.SegmentDownloadStatistics{ - Name: "test", - TotalBytes: 0, - DownloadedBytes: 0, - Webseeds: make([]diagnostics.SegmentPeer, 0), - Peers: make([]diagnostics.SegmentPeer, 0), - DownloadedStats: diagnostics.FileDownloadedStatistics{ - TimeTook: 1.0, - AverageRate: 1, - }, - }) -} - -func TestUpdateFileFullStatsUpdate(t *testing.T) { - d := diagnostics.NewDiagnosticClient(nil, "test", false) + require.NoError(t, err) d.UpdateFileDownloadedStatistics(nil, &segmentDownloadStatsMock) diff --git a/erigon-lib/diagnostics/speedtest.go b/erigon-lib/diagnostics/speedtest.go index ab5856c3e3a..54fad6f161d 100644 --- a/erigon-lib/diagnostics/speedtest.go +++ b/erigon-lib/diagnostics/speedtest.go @@ -55,6 +55,7 @@ func (d *DiagnosticClient) runSpeedTest(rootCtx context.Context) NetworkSpeedTes } ctx, cancel := context.WithTimeout(rootCtx, time.Second*15) + defer cancel() _ = analyzer.RunWithContext(ctx, s.Host, func(pl *transport.PLoss) { packetLoss = pl.Loss() diff --git a/erigon-lib/diagnostics/stages.go b/erigon-lib/diagnostics/stages.go index 7687dff135b..7754886e1d0 100644 --- a/erigon-lib/diagnostics/stages.go +++ b/erigon-lib/diagnostics/stages.go @@ -2,10 +2,17 @@ package diagnostics import ( "context" + "encoding/json" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" ) +var ( + StagesListKey = []byte("diagStagesList") + CurrentStageKey = []byte("diagCurrentStage") +) + func (d *DiagnosticClient) setupStagesDiagnostics(rootCtx context.Context) { d.runCurrentSyncStageListener(rootCtx) d.runSyncStagesListListener(rootCtx) @@ -22,6 +29,10 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) { case <-rootCtx.Done(): return case info := <-ch: + if err := d.db.Update(d.ctx, StagesListUpdater(info.Stages)); err != nil { + log.Error("[Diagnostics] Failed to update stages list", "err", err) + } + d.mu.Lock() d.syncStats.SyncStages.StagesList = info.Stages d.mu.Unlock() @@ -42,6 +53,10 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context) case <-rootCtx.Done(): return case info := <-ch: + if err := d.db.Update(d.ctx, CurrentStageUpdater(info.Stage)); err != nil { + log.Error("[Diagnostics] Failed to update current stage", "err", err) + } + d.mu.Lock() d.syncStats.SyncStages.CurrentStage = info.Stage if int(d.syncStats.SyncStages.CurrentStage) >= len(d.syncStats.SyncStages.StagesList) { @@ -52,3 +67,47 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context) } }() } + +func ReadSyncStagesInfo(db kv.RoDB) (info SyncStages) { + stageesList := ReadStagesList(db) + currentStage := ReadCurrentStage(db) + + return SyncStages{ + StagesList: stageesList, + CurrentStage: currentStage, + } +} + +func ReadStagesList(db kv.RoDB) []string { + data := ReadDataFromTable(db, kv.DiagSyncStages, StagesListKey) + var info []string + err := json.Unmarshal(data, &info) + + if err != nil { + log.Error("[Diagnostics] Failed to read stages list", "err", err) + return []string{} + } else { + return info + } +} + +func ReadCurrentStage(db kv.RoDB) uint { + data := ReadDataFromTable(db, kv.DiagSyncStages, StagesListKey) + var info uint + err := json.Unmarshal(data, &info) + + if err != nil { + log.Error("[Diagnostics] Failed to read current stage", "err", err) + return 0 + } else { + return info + } +} + +func StagesListUpdater(info []string) func(tx kv.RwTx) error { + return PutDataToTable(kv.DiagSyncStages, StagesListKey, info) +} + +func CurrentStageUpdater(info uint) func(tx kv.RwTx) error { + return PutDataToTable(kv.DiagSyncStages, StagesListKey, info) +} diff --git a/erigon-lib/diagnostics/sys_info.go b/erigon-lib/diagnostics/sys_info.go index 6b12ce1b80b..92f295d931a 100644 --- a/erigon-lib/diagnostics/sys_info.go +++ b/erigon-lib/diagnostics/sys_info.go @@ -1,14 +1,35 @@ package diagnostics import ( + "encoding/json" + "github.com/ledgerwatch/erigon-lib/diskutils" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/log/v3" "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/mem" ) +var ( + SystemRamInfoKey = []byte("diagSystemRamInfo") + SystemCpuInfoKey = []byte("diagSystemCpuInfo") + SystemDiskInfoKey = []byte("diagSystemDiskInfo") +) + func (d *DiagnosticClient) setupSysInfoDiagnostics() { sysInfo := GetSysInfo(d.dataDirPath) + if err := d.db.Update(d.ctx, RAMInfoUpdater(sysInfo.RAM)); err != nil { + log.Error("[Diagnostics] Failed to update RAM info", "err", err) + } + + if err := d.db.Update(d.ctx, CPUInfoUpdater(sysInfo.CPU)); err != nil { + log.Error("[Diagnostics] Failed to update CPU info", "err", err) + } + + if err := d.db.Update(d.ctx, DiskInfoUpdater(sysInfo.Disk)); err != nil { + log.Error("[Diagnostics] Failed to update Disk info", "err", err) + } d.mu.Lock() d.hardwareInfo = sysInfo @@ -106,3 +127,66 @@ func GetCPUInfo() CPUInfo { Mhz: mhz, } } + +func ReadSysInfo(db kv.RoDB) (info HardwareInfo) { + ram := ReadRAMInfo(db) + cpu := ReadCPUInfo(db) + disk := ReadDickInfo(db) + + return HardwareInfo{ + RAM: ram, + CPU: cpu, + Disk: disk, + } +} + +func ReadRAMInfo(db kv.RoDB) RAMInfo { + data := ReadDataFromTable(db, kv.DiagSystemInfo, SystemRamInfoKey) + var info RAMInfo + err := json.Unmarshal(data, &info) + + if err != nil { + log.Error("[Diagnostics] Failed to read RAM info", "err", err) + return RAMInfo{} + } else { + return info + } +} + +func ReadCPUInfo(db kv.RoDB) CPUInfo { + data := ReadDataFromTable(db, kv.DiagSystemInfo, SystemCpuInfoKey) + var info CPUInfo + err := json.Unmarshal(data, &info) + + if err != nil { + log.Error("[Diagnostics] Failed to read CPU info", "err", err) + return CPUInfo{} + } else { + return info + } +} + +func ReadDickInfo(db kv.RoDB) DiskInfo { + data := ReadDataFromTable(db, kv.DiagSystemInfo, SystemDiskInfoKey) + var info DiskInfo + err := json.Unmarshal(data, &info) + + if err != nil { + log.Error("[Diagnostics] Failed to read Disk info", "err", err) + return DiskInfo{} + } else { + return info + } +} + +func RAMInfoUpdater(info RAMInfo) func(tx kv.RwTx) error { + return PutDataToTable(kv.DiagSystemInfo, SystemRamInfoKey, info) +} + +func CPUInfoUpdater(info CPUInfo) func(tx kv.RwTx) error { + return PutDataToTable(kv.DiagSystemInfo, SystemCpuInfoKey, info) +} + +func DiskInfoUpdater(info DiskInfo) func(tx kv.RwTx) error { + return PutDataToTable(kv.DiagSystemInfo, SystemDiskInfoKey, info) +} diff --git a/erigon-lib/diagnostics/utils.go b/erigon-lib/diagnostics/utils.go new file mode 100644 index 00000000000..de01bebe506 --- /dev/null +++ b/erigon-lib/diagnostics/utils.go @@ -0,0 +1,37 @@ +package diagnostics + +import ( + "context" + "encoding/json" + + "github.com/ledgerwatch/erigon-lib/kv" +) + +func ReadDataFromTable(db kv.RoDB, table string, key []byte) (data []byte) { + if err := db.View(context.Background(), func(tx kv.Tx) error { + bytes, err := tx.GetOne(table, key) + + if err != nil { + return err + } + + data = bytes + + return nil + }); err != nil { + return []byte{} + } + return data +} + +func PutDataToTable(table string, key []byte, info any) func(tx kv.RwTx) error { + return func(tx kv.RwTx) error { + infoBytes, err := json.Marshal(info) + + if err != nil { + return err + } + + return tx.Put(table, key, infoBytes) + } +} diff --git a/erigon-lib/kv/kv_interface.go b/erigon-lib/kv/kv_interface.go index 6d8eb859333..dd3313b67a3 100644 --- a/erigon-lib/kv/kv_interface.go +++ b/erigon-lib/kv/kv_interface.go @@ -149,13 +149,14 @@ type DBVerbosityLvl int8 type Label uint8 const ( - ChainDB Label = 0 - TxPoolDB Label = 1 - SentryDB Label = 2 - ConsensusDB Label = 3 - DownloaderDB Label = 4 - InMem Label = 5 - HeimdallDB Label = 6 + ChainDB Label = 0 + TxPoolDB Label = 1 + SentryDB Label = 2 + ConsensusDB Label = 3 + DownloaderDB Label = 4 + InMem Label = 5 + HeimdallDB Label = 6 + DiagnosticsDB Label = 7 ) func (l Label) String() string { @@ -174,6 +175,8 @@ func (l Label) String() string { return "inMem" case HeimdallDB: return "heimdall" + case DiagnosticsDB: + return "diagnostics" default: return "unknown" } @@ -194,6 +197,8 @@ func UnmarshalLabel(s string) Label { return InMem case "heimdall": return HeimdallDB + case "diagnostics": + return DiagnosticsDB default: panic(fmt.Sprintf("unexpected label: %s", s)) } diff --git a/erigon-lib/kv/tables.go b/erigon-lib/kv/tables.go index c38e77f33a3..1634583c92d 100644 --- a/erigon-lib/kv/tables.go +++ b/erigon-lib/kv/tables.go @@ -510,6 +510,10 @@ const ( Proposers = "BlockProposers" // epoch => proposers indicies StatesProcessingProgress = "StatesProcessingProgress" + + //Diagnostics tables + DiagSystemInfo = "DiagSystemInfo" + DiagSyncStages = "DiagSyncStages" ) // Keys @@ -750,6 +754,12 @@ var ChaindataDeprecatedTables = []string{ TransitionBlockKey, } +// Diagnostics tables +var DiagnosticsTables = []string{ + DiagSystemInfo, + DiagSyncStages, +} + type CmpFunc func(k1, k2, v1, v2 []byte) int type TableCfg map[string]TableCfgItem @@ -854,6 +864,7 @@ var BorTablesCfg = TableCfg{ var TxpoolTablesCfg = TableCfg{} var SentryTablesCfg = TableCfg{} var DownloaderTablesCfg = TableCfg{} +var DiagnosticsTablesCfg = TableCfg{} var ReconTablesCfg = TableCfg{ PlainStateD: {Flags: DupSort}, CodeD: {Flags: DupSort}, @@ -870,6 +881,8 @@ func TablesCfgByLabel(label Label) TableCfg { return SentryTablesCfg case DownloaderDB: return DownloaderTablesCfg + case DiagnosticsDB: + return DiagnosticsTablesCfg default: panic(fmt.Sprintf("unexpected label: %s", label)) } @@ -931,6 +944,13 @@ func reinit() { ReconTablesCfg[name] = TableCfgItem{} } } + + for _, name := range DiagnosticsTables { + _, ok := DiagnosticsTablesCfg[name] + if !ok { + DiagnosticsTablesCfg[name] = TableCfgItem{} + } + } } // Temporal