Skip to content

Commit

Permalink
fix: /info api when no decentralized workers (#598)
Browse files Browse the repository at this point in the history
  • Loading branch information
pseudoyu authored Oct 22, 2024
1 parent f52fa95 commit 3706d1f
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 87 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ go.work

### Configuration
deploy/config.yaml

first_start_time.txt
7 changes: 3 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var command = cobra.Command{
var settlementCaller *vsl.SettlementCaller

// Apply database migrations for all modules except the broadcaster.
if module != BroadcasterArg && len(config.Component.Decentralized) > 0 {
if module != BroadcasterArg && (len(config.Component.Decentralized) > 0 || len(config.Component.Federated) > 0) {
databaseClient, err = dialer.Dial(cmd.Context(), config.Database)
if err != nil {
return fmt.Errorf("dial database: %w", err)
Expand Down Expand Up @@ -157,19 +157,18 @@ var command = cobra.Command{
}

// set first start time
firstStartTime, err := info.GetFirstStartTime(cmd.Context(), redisClient)
firstStartTime, err := info.GetFirstStartTime()
if err != nil {
return fmt.Errorf("get first start time: %w", err)
}

if firstStartTime == 0 {
// update first start time to current timestamp in seconds
err = info.UpdateFirstStartTime(cmd.Context(), redisClient, time.Now().Unix())
err = info.UpdateFirstStartTime(time.Now().Unix())
if err != nil {
return fmt.Errorf("update first start time: %w", err)
}
}

}

switch module {
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type File struct {
Component *Component `mapstructure:"component" validate:"required"`
Database *Database `mapstructure:"database" validate:"required"`
Stream *Stream `mapstructure:"stream"`
Redis *Redis `mapstructure:"redis" validate:"required"`
Redis *Redis `mapstructure:"redis"`
Observability *Telemetry `mapstructure:"observability"`
}

Expand Down
5 changes: 0 additions & 5 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1146,10 +1146,6 @@
"description": "The total uptime of the node.",
"type": "integer"
},
"parameters": {
"description": "The current network parameters of the node.",
"type": "string"
},
"coverage": {
"description": "The supported worker list of the node.",
"items":{
Expand All @@ -1170,7 +1166,6 @@
"commit": "000000"
},
"uptime": 249979,
"parameters": "{\"network_tolerance\":{\"arbitrum\":1000,\"arweave\":100,\"avax\":100,\"base\":100,\"binance-smart-chain\":100,\"crossbell\":500,\"ethereum\":100,\"farcaster\":3600000,\"gnosis\":100,\"linea\":100,\"optimism\":100,\"polygon\":100,\"savm\":100,\"vsl\":100},\"network_start_block\":{\"arbitrum\":185724972,\"arweave\":1374360,\"avax\":42301570,\"base\":11216527,\"binance-smart-chain\":36563564,\"crossbell\":58846671,\"ethereum\":19334220,\"gnosis\":32695982,\"linea\":2591120,\"optimism\":116811812,\"polygon\":54103805,\"savm\":60741,\"vsl\":14192},\"network_core_worker_disk_space_per_month\":{\"arbitrum\":26,\"arweave\":0,\"avax\":0,\"base\":10,\"binance-smart-chain\":117,\"crossbell\":0,\"ethereum\":51,\"gnosis\":9,\"linea\":31,\"optimism\":25,\"polygon\":153,\"savm\":1,\"vsl\":1,\"farcaster\":50}}",
"coverage": 1,
"records": {
"last_heartbeat": 1722222806,
Expand Down
132 changes: 55 additions & 77 deletions internal/node/component/info/handler_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@ package info
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/labstack/echo/v4"
"github.com/redis/rueidis"
"github.com/rss3-network/node/config/parameter"
"github.com/rss3-network/node/internal/constant"
"github.com/rss3-network/node/internal/node/component/decentralized"
"github.com/rss3-network/node/internal/node/component/federated"
"github.com/rss3-network/node/internal/node/component/rss"
"github.com/samber/lo"
"github.com/shopspring/decimal"
"go.uber.org/zap"
Expand Down Expand Up @@ -57,12 +58,11 @@ type GINodeInfoResponse struct {
}

type NodeInfo struct {
Operator common.Address `json:"operator"`
Version Version `json:"version"`
Uptime int64 `json:"uptime"`
Parameters string `json:"parameters"`
Coverage []string `json:"coverage"`
Records Record `json:"records"`
Operator common.Address `json:"operator"`
Version Version `json:"version"`
Uptime int64 `json:"uptime"`
Coverage []string `json:"coverage"`
Records Record `json:"records"`
}

type Record struct {
Expand Down Expand Up @@ -113,16 +113,8 @@ func (c *Component) GetNodeInfo(ctx echo.Context) error {
evmAddress = operator.EvmAddress
}

// Get network params info
params, err := c.getNetworkParams(ctx.Request().Context())
if err != nil {
zap.L().Error("failed to get network params", zap.Error(err))

return err
}

// Get uptime info
uptime, err := c.getNodeUptime(ctx.Request().Context())
uptime, err := c.getNodeUptime()
if err != nil {
zap.L().Error("failed to get node uptime", zap.Error(err))

Expand All @@ -148,16 +140,29 @@ func (c *Component) GetNodeInfo(ctx echo.Context) error {
return err
}

var recentRequests []string

if len(c.config.Component.Decentralized) > 0 {
recentRequests = append(recentRequests, decentralized.GetRecentRequest()...)
}

if len(c.config.Component.Federated) > 0 {
recentRequests = append(recentRequests, federated.GetRecentRequest()...)
}

if c.config.Component.RSS != nil {
recentRequests = append(recentRequests, rss.GetRecentRequest()...)
}

return ctx.JSON(http.StatusOK, NodeInfoResponse{
Data: NodeInfo{
Version: version,
Operator: evmAddress,
Parameters: params,
Uptime: uptime,
Coverage: workerCoverage,
Version: version,
Operator: evmAddress,
Uptime: uptime,
Coverage: workerCoverage,
Records: Record{
LastHeartbeat: lastHeartbeat,
RecentRequests: decentralized.GetRecentRequest(),
RecentRequests: recentRequests,
RecentRewards: rewards,
SlashedTokens: slashedTokens,
},
Expand Down Expand Up @@ -204,11 +209,11 @@ func (c *Component) getNodeWorkerCoverage() []string {
}

// getNodeUptime returns the node uptime.
func (c *Component) getNodeUptime(ctx context.Context) (int64, error) {
func (c *Component) getNodeUptime() (int64, error) {
var uptime int64

// get first start time from redis cache and calculate uptime
firstStartTime, err := GetFirstStartTime(ctx, c.redisClient)
firstStartTime, err := GetFirstStartTime()
if err != nil {
zap.L().Error("failed to get first start time from cache", zap.Error(err))

Expand All @@ -218,7 +223,7 @@ func (c *Component) getNodeUptime(ctx context.Context) (int64, error) {
if firstStartTime == 0 {
uptime = 0

err := UpdateFirstStartTime(ctx, c.redisClient, time.Now().Unix())
err := UpdateFirstStartTime(time.Now().Unix())
if err != nil {
zap.L().Error("failed to update first start time", zap.Error(err))

Expand All @@ -231,26 +236,6 @@ func (c *Component) getNodeUptime(ctx context.Context) (int64, error) {
return uptime, nil
}

// getNetworkParams returns the network parameters.
func (c *Component) getNetworkParams(ctx context.Context) (string, error) {
currentEpoch, err := parameter.GetCurrentEpoch(ctx, c.redisClient)
if err != nil {
zap.L().Error("failed to get current epoch", zap.Error(err))

return "", err
}

// Get parameters from the network
params, err := parameter.PullNetworkParamsFromVSL(c.networkParamsCaller, uint64(currentEpoch))
if err != nil {
zap.L().Error("failed to pull network params from VSL", zap.Error(err))

return "", err
}

return params, nil
}

// getNodeRewards returns the node rewards.
func (c *Component) getNodeRewards(ctx context.Context, address common.Address) ([]Reward, error) {
var resp GIRewardsResponse
Expand Down Expand Up @@ -338,49 +323,42 @@ func (c *Component) sendRequest(ctx context.Context, path string, result any) er
return nil
}

// GetFirstStartTime Get the first start time from redis cache
func GetFirstStartTime(ctx context.Context, redisClient rueidis.Client) (int64, error) {
if redisClient == nil {
return 0, fmt.Errorf("redis client is nil")
}

command := redisClient.B().Get().Key(buildFirstStartTimeCacheKey()).Build()
// GetFirstStartTime Get the first start time from local file
func GetFirstStartTime() (int64, error) {
filePath := "first_start_time.txt"

result := redisClient.Do(ctx, command)
if err := result.Error(); err != nil {
if errors.Is(err, rueidis.Nil) {
// Key doesn't exist, return 0 or a default value
return 0, nil
}
// Check if file exists
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return 0, nil
}

return 0, fmt.Errorf("redis result: %w", err)
// Read the file
content, err := os.ReadFile(filePath)
if err != nil {
return 0, fmt.Errorf("read file: %w", err)
}

firstStartTime, err := result.AsInt64()
// Convert content to int64
firstStartTime, err := strconv.ParseInt(strings.TrimSpace(string(content)), 10, 64)
if err != nil {
return 0, fmt.Errorf("redis result to int64: %w", err)
return 0, fmt.Errorf("parse int64: %w", err)
}

return firstStartTime, nil
}

// UpdateFirstStartTime updates the first start time in redis cache
func UpdateFirstStartTime(ctx context.Context, redisClient rueidis.Client, timestamp int64) error {
if redisClient == nil {
return fmt.Errorf("redis client is nil")
}
// UpdateFirstStartTime updates the first start time in local file
func UpdateFirstStartTime(timestamp int64) error {
filePath := "first_start_time.txt"

command := redisClient.B().Set().Key(buildFirstStartTimeCacheKey()).Value(strconv.FormatInt(timestamp, 10)).Build()
// Convert timestamp to string
content := strconv.FormatInt(timestamp, 10)

result := redisClient.Do(ctx, command)
if err := result.Error(); err != nil {
return fmt.Errorf("redis result: %w", err)
// Write to file
err := os.WriteFile(filePath, []byte(content), 0600)
if err != nil {
return fmt.Errorf("write file: %w", err)
}

return nil
}

// buildFirstStartTimeCacheKey builds the cache key for the first start time
func buildFirstStartTimeCacheKey() string {
return "node:info:first_start_time"
}
36 changes: 36 additions & 0 deletions internal/node/component/rss/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"context"
"fmt"
"net/http"
"sync"

cb "github.com/emirpasic/gods/queues/circularbuffer"
"github.com/labstack/echo/v4"
"github.com/rss3-network/node/config"
"github.com/rss3-network/node/internal/constant"
"github.com/rss3-network/node/internal/node/component"
"github.com/rss3-network/node/internal/node/component/middleware"
"github.com/rss3-network/node/schema/worker"
"github.com/rss3-network/protocol-go/schema/network"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand All @@ -35,13 +38,22 @@ type configx struct {

const Name = "rss"

const MaxRecentRequests = 10

var (
RecentRequests *cb.Queue
recentRequestsMutex sync.RWMutex
)

func (h *Component) Name() string {
return Name
}

var _ component.Component = (*Component)(nil)

func NewComponent(_ context.Context, apiServer *echo.Echo, config *config.File) component.Component {
RecentRequests = cb.New(MaxRecentRequests)

c := &Component{
config: config,
httpClient: http.DefaultClient,
Expand Down Expand Up @@ -128,3 +140,27 @@ func (h *Component) setAccessKey(config *config.Module) error {

return nil
}

func addRecentRequest(path string) {
recentRequestsMutex.Lock()
defer recentRequestsMutex.Unlock()

RecentRequests.Enqueue(path)
}

// GetRecentRequest returns the filtered recent requests.
func GetRecentRequest() []string {
recentRequestsMutex.RLock()
defer recentRequestsMutex.RUnlock()

// Convert queue to slice
values := RecentRequests.Values()

// Filter out empty strings and convert to []string
filteredRequests := lo.FilterMap(values, func(item interface{}, _ int) (string, bool) {
str, ok := item.(string)
return str, ok && str != ""
})

return filteredRequests
}
2 changes: 2 additions & 0 deletions internal/node/component/rss/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func (h *Component) Handler(ctx echo.Context) error {

go h.CollectMetric(ctx.Request().Context(), ctx.Request().RequestURI, path)

addRecentRequest(ctx.Request().RequestURI)

data, err := h.getActivities(ctx.Request().Context(), path, ctx.Request().URL)
if err != nil {
zap.L().Error("getActivities InternalError", zap.Error(err))
Expand Down

0 comments on commit 3706d1f

Please sign in to comment.