Skip to content

Commit

Permalink
fix (dot/telemetry): NoTelemetry flag stops telemetry (ChainSafe#1660)
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardmack authored and timwu20 committed Dec 6, 2021
1 parent fd43774 commit dd9d365
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 5 deletions.
4 changes: 1 addition & 3 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
return nil, err
}

if cfg.Global.NoTelemetry {
return node, nil
}
telemetry.GetInstance().Initialise(!cfg.Global.NoTelemetry)

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)
genesisHash := stateSrvc.Block.GenesisHash()
Expand Down
44 changes: 42 additions & 2 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,26 @@ type Handler struct {
sendMessageTimeout time.Duration
}

// Instance interface that telemetry handler instance needs to implement
type Instance interface {
AddConnections(conns []*genesis.TelemetryEndpoint)
SendMessage(msg Message) error
startListening()
Initialise(enabled bool)
}

var (
once sync.Once
handlerInstance *Handler
handlerInstance Instance

enabled = true // enabled by default
initilised sync.Once
)

const defaultMessageTimeout = time.Second

// GetInstance singleton pattern to for accessing TelemetryHandler
func GetInstance() *Handler { //nolint
func GetInstance() Instance {
if handlerInstance == nil {
once.Do(
func() {
Expand All @@ -65,9 +76,21 @@ func GetInstance() *Handler { //nolint
go handlerInstance.startListening()
})
}
if !enabled {
return &NoopHandler{}
}

return handlerInstance
}

// Initialise function to set if telemetry is enabled
func (h *Handler) Initialise(e bool) {
initilised.Do(
func() {
enabled = e
})
}

// AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data
func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
for _, v := range conns {
Expand Down Expand Up @@ -290,3 +313,20 @@ func NewNetworkStateTM(host libp2phost.Host, peerInfos []common.PeerInfo) *Netwo
func (tm *NetworkStateTM) messageType() string {
return tm.Msg
}

// NoopHandler struct no op handling (ignoring) telemetry messages
type NoopHandler struct {
}

// Initialise function to set if telemetry is enabled
func (h *NoopHandler) Initialise(enabled bool) {}

func (h *NoopHandler) startListening() {}

// SendMessage no op for telemetry send message function
func (h *NoopHandler) SendMessage(msg Message) error {
return nil
}

// AddConnections no op for telemetry add connections function
func (h *NoopHandler) AddConnections(conns []*genesis.TelemetryEndpoint) {}
33 changes: 33 additions & 0 deletions dot/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,39 @@ func TestListenerConcurrency(t *testing.T) {
}
}

func TestDisableInstance(t *testing.T) {
const qty = 1000
var wg sync.WaitGroup
wg.Add(qty)

resultCh = make(chan []byte)
for i := 0; i < qty; i++ {
if i == qty/2 {
GetInstance().Initialise(false)
}
go func() {
bh := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6")
GetInstance().SendMessage(NewBlockImportTM(&bh, big.NewInt(2), "NetworkInitialSync"))
wg.Done()
}()
}
wg.Wait()
counter := 0
tk := time.NewTicker(time.Second * 2)
main:
for {
select {
case <-tk.C:
break main
case <-resultCh:
counter++
}
}
tk.Stop()

require.LessOrEqual(t, counter, qty/2)
}

// TestInfiniteListener starts loop that print out data received on websocket ws://localhost:8001/
// this can be useful to see what data is sent to telemetry server
func TestInfiniteListener(t *testing.T) {
Expand Down

0 comments on commit dd9d365

Please sign in to comment.