Skip to content

Commit

Permalink
feat: dot/telemetry: Implement basic telemetry connection (#1497)
Browse files Browse the repository at this point in the history
* implement telemetry connection server

* add telemetry for import block

* setup telemetry to use node name from Global config

* init telemetry connections based on genesis.json values

* fix error message formatting

* add tests for telemetry

* implement no-telemetry cli flag

* fix typos

* cleanup error check

* create TelemetryEndpoint struct

* created connection data struct to hold connection data

* make TelemetryEndpoints a pointer reference

* add tests for interfaceToTelemetryEndpoint

* update test to fix random name
  • Loading branch information
edwardmack authored Apr 7, 2021
1 parent b7df8ed commit fcb4159
Show file tree
Hide file tree
Showing 18 changed files with 409 additions and 49 deletions.
3 changes: 3 additions & 0 deletions cmd/gossamer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ func setDotGlobalConfig(ctx *cli.Context, tomlCfg *ctoml.Config, cfg *dot.Global
cfg.MetricsPort = tomlCfg.Global.MetricsPort
}

// TODO: generate random name if one is not assigned (see issue #1496)
// check --name flag and update node configuration
if name := ctx.GlobalString(NameFlag.Name); name != "" {
cfg.Name = name
Expand Down Expand Up @@ -442,6 +443,8 @@ func setDotGlobalConfig(ctx *cli.Context, tomlCfg *ctoml.Config, cfg *dot.Global
cfg.MetricsPort = uint32(metricsPort)
}

cfg.NoTelemetry = ctx.Bool("no-telemetry")

logger.Debug(
"global configuration",
"name", cfg.Name,
Expand Down
14 changes: 14 additions & 0 deletions cmd/gossamer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,20 @@ func TestGlobalConfigFromFlags(t *testing.T) {
MetricsPort: uint32(9871),
},
},
{
"Test gossamer --no-telemetry",
[]string{"config", "no-telemetry", "name"},
[]interface{}{testCfgFile.Name(), true, testCfg.Global.Name},
dot.GlobalConfig{
Name: testCfg.Global.Name,
ID: testCfg.Global.ID,
BasePath: testCfg.Global.BasePath,
LogLvl: log.LvlInfo,
PublishMetrics: testCfg.Global.PublishMetrics,
MetricsPort: testCfg.Global.MetricsPort,
NoTelemetry: true,
},
},
}

for _, c := range testcases {
Expand Down
9 changes: 9 additions & 0 deletions cmd/gossamer/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ var (
Name: "metrics-port",
Usage: "Set metric listening port ",
}

// NoTelemetryFlag stops publishing telemetry to default defined in genesis.json
NoTelemetryFlag = cli.BoolFlag{
Name: "no-telemetry",
Usage: "Disable connecting to the Substrate telemetry server",
}
)

// Initialization-only flags
Expand Down Expand Up @@ -295,6 +301,9 @@ var (
// metrics flag
PublishMetricsFlag,
MetricsPortFlag,

// telemetry flags
NoTelemetryFlag,
}
)

Expand Down
1 change: 1 addition & 0 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type GlobalConfig struct {
LogLvl log.Lvl
PublishMetrics bool
MetricsPort uint32
NoTelemetry bool
}

// LogConfig represents the log levels for individual packages
Expand Down
30 changes: 27 additions & 3 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,23 @@ import (
"os/signal"
"path"
"runtime/debug"
"strconv"
"sync"
"syscall"
"time"

"github.com/ChainSafe/chaindb"
gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/ChainSafe/gossamer/lib/services"
log "github.com/ChainSafe/log15"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/metrics/prometheus"

"github.com/ChainSafe/chaindb"
log "github.com/ChainSafe/log15"
)

var logger = log.New("pkg", "dot")
Expand Down Expand Up @@ -306,6 +308,28 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
publishMetrics(cfg)
}

gd, err := stateSrvc.Storage.GetGenesisData()
if err != nil {
return nil, err
}

if cfg.Global.NoTelemetry {
return node, nil
}

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)
data := &telemetry.ConnectionData{
Authority: cfg.Core.GrandpaAuthority,
Chain: sysSrvc.ChainName(),
GenesisHash: stateSrvc.Block.GenesisHash().String(),
SystemName: sysSrvc.SystemName(),
NodeName: cfg.Global.Name,
SystemVersion: sysSrvc.SystemVersion(),
NetworkID: networkSrvc.NetworkState().PeerID,
StartTime: strconv.FormatInt(time.Now().UnixNano(), 10),
}
telemetry.GetInstance().SendConnection(data)

return node, nil
}

Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type RuntimeAPI interface {
type SystemAPI interface {
SystemName() string
SystemVersion() string
NodeName() string
Properties() map[string]interface{}
ChainType() string
ChainName() string
}
4 changes: 2 additions & 2 deletions dot/rpc/modules/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewSystemModule(net NetworkAPI, sys SystemAPI, core CoreAPI,

// Chain returns the runtime chain
func (sm *SystemModule) Chain(r *http.Request, req *EmptyRequest, res *string) error {
*res = sm.systemAPI.NodeName()
*res = sm.systemAPI.ChainName()
return nil
}

Expand Down Expand Up @@ -161,7 +161,7 @@ func (sm *SystemModule) NodeRoles(r *http.Request, req *EmptyRequest, res *[]int
// AccountNextIndex Returns the next valid index (aka. nonce) for given account.
func (sm *SystemModule) AccountNextIndex(r *http.Request, req *StringRequest, res *U64Response) error {
if req == nil || len(req.String) == 0 {
return errors.New("Account address must be valid")
return errors.New("account address must be valid")
}
addressPubKey := crypto.PublicAddressToByteArray(common.Address(req.String))

Expand Down
3 changes: 1 addition & 2 deletions dot/rpc/modules/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,9 @@ func (api *mockSystemAPI) SystemVersion() string {
return api.info.SystemVersion
}

func (api *mockSystemAPI) NodeName() string {
func (api *mockSystemAPI) ChainName() string {
return api.genData.Name
}

func (api *mockSystemAPI) Properties() map[string]interface{} {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"os"

"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
rtstorage "github.com/ChainSafe/gossamer/lib/runtime/storage"

log "github.com/ChainSafe/log15"
)

Expand Down Expand Up @@ -339,6 +339,7 @@ func (s *Service) handleBlock(block *types.Block) error {
}
} else {
logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash())
telemetry.GetInstance().SendBlockImport(block.Header.Hash().String(), block.Header.Number)
}

// handle consensus digest for authority changes
Expand Down
4 changes: 2 additions & 2 deletions dot/system/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (s *Service) SystemVersion() string {
return s.systemInfo.SystemVersion
}

// NodeName returns the nodeName (chain name)
func (s *Service) NodeName() string {
// ChainName returns the chain name defined in genesis.json
func (s *Service) ChainName() string {
return s.genesisData.Name
}

Expand Down
4 changes: 2 additions & 2 deletions dot/system/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"github.com/stretchr/testify/require"
)

func TestService_NodeName(t *testing.T) {
func TestService_ChainName(t *testing.T) {
svc := newTestService()

name := svc.NodeName()
name := svc.ChainName()
require.Equal(t, "gssmr", name)
}

Expand Down
123 changes: 123 additions & 0 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2021 ChainSafe Systems (ON) Corp.
// This file is part of gossamer.
//
// The gossamer library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The gossamer library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.
package telemetry

import (
"bytes"
"encoding/json"
"fmt"
"math/big"
"sync"
"time"

"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)

// Handler struct for holding telemetry related things
type Handler struct {
buf bytes.Buffer
wsConn []*websocket.Conn
telemetryLogger *log.Entry
}

// MyJSONFormatter struct for defining JSON Formatter
type MyJSONFormatter struct {
}

// Format function for handling JSON formatting, this overrides default logging formatter to remove
// log level, line number and timestamp
func (f *MyJSONFormatter) Format(entry *log.Entry) ([]byte, error) {
serialized, err := json.Marshal(entry.Data)
if err != nil {
return nil, fmt.Errorf("failed to marshal fields to JSON, %w", err)
}
return append(serialized, '\n'), nil
}

var (
once sync.Once
handlerInstance *Handler
)

// GetInstance singleton pattern to for accessing TelemetryHandler
func GetInstance() *Handler {
if handlerInstance == nil {
once.Do(
func() {
handlerInstance = &Handler{
buf: bytes.Buffer{},
}
log.SetOutput(&handlerInstance.buf)
log.SetFormatter(new(MyJSONFormatter))
})
}
return handlerInstance
}

// AddConnections adds connections to telemetry sever
func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
for _, v := range conns {
c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil)
if err != nil {
fmt.Printf("Error %v\n", err)
return
}
h.wsConn = append(h.wsConn, c)
}
}

// ConnectionData struct to hold connection data
type ConnectionData struct {
Authority bool
Chain string
GenesisHash string
SystemName string
NodeName string
SystemVersion string
NetworkID string
StartTime string
}

// SendConnection sends connection request message to telemetry connection
func (h *Handler) SendConnection(data *ConnectionData) {
payload := log.Fields{"authority": data.Authority, "chain": data.Chain, "config": "", "genesis_hash": data.GenesisHash,
"implementation": data.SystemName, "msg": "system.connected", "name": data.NodeName, "network_id": data.NetworkID, "startup_time": data.StartTime,
"version": data.SystemVersion}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
}

// SendBlockImport sends block imported message to telemetry connection
func (h *Handler) SendBlockImport(bestHash string, height *big.Int) {
payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
}

func (h *Handler) sendTelemtry() {
for _, c := range h.wsConn {
err := c.WriteMessage(websocket.TextMessage, h.buf.Bytes())
if err != nil {
// TODO (ed) determine how to handle this error
fmt.Printf("ERROR connecting to telemetry %v\n", err)
}
}
h.buf.Reset()
}
Loading

0 comments on commit fcb4159

Please sign in to comment.