Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (dot/telemetry): NoTelemetry flag stops telemetry #1660

Merged
merged 16 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
return nil, err
}

if cfg.Global.NoTelemetry {
return node, nil
}
telemetry.GetInstance().Initialise(!cfg.Global.NoTelemetry)

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)
genesisHash := stateSrvc.Block.GenesisHash()
Expand Down
44 changes: 42 additions & 2 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,26 @@ type Handler struct {
sendMessageTimeout time.Duration
}

// Instance interface that telemetry handler instance needs to implement
type Instance interface {
AddConnections(conns []*genesis.TelemetryEndpoint)
SendMessage(msg Message) error
startListening()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the private method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to control who can start the listener, this is called when the instance is created.

Initialise(enabled bool)
}

var (
once sync.Once
handlerInstance *Handler
handlerInstance Instance

enabled = true // enabled by default
initilised sync.Once
)

const defaultMessageTimeout = time.Second

// GetInstance singleton pattern to for accessing TelemetryHandler
func GetInstance() *Handler { //nolint
func GetInstance() Instance {
if handlerInstance == nil {
once.Do(
func() {
Expand All @@ -65,9 +76,21 @@ func GetInstance() *Handler { //nolint
go handlerInstance.startListening()
})
}
if !enabled {
return &NoopHandler{}
}

return handlerInstance
}

// Initialise function to set if telemetry is enabled
func (h *Handler) Initialise(_enabled bool) {
edwardmack marked this conversation as resolved.
Show resolved Hide resolved
initilised.Do(
func() {
enabled = _enabled
})
}

// AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data
func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
for _, v := range conns {
Expand Down Expand Up @@ -290,3 +313,20 @@ func NewNetworkStateTM(host libp2phost.Host, peerInfos []common.PeerInfo) *Netwo
func (tm *NetworkStateTM) messageType() string {
return tm.Msg
}

// NoopHandler struct no op handling (ignoring) telemetry messages
type NoopHandler struct {
}

// Initialise function to set if telemetry is enabled
func (h *NoopHandler) Initialise(enabled bool) {}

func (h *NoopHandler) startListening() {}

// SendMessage no op for telemetry send message function
func (h *NoopHandler) SendMessage(msg Message) error {
return nil
}

// AddConnections no op for telemetry add connections function
func (h *NoopHandler) AddConnections(conns []*genesis.TelemetryEndpoint) {}
33 changes: 33 additions & 0 deletions dot/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,39 @@ func TestListenerConcurrency(t *testing.T) {
}
}

func TestDisableInstance(t *testing.T) {
const qty = 1000
var wg sync.WaitGroup
wg.Add(qty)

resultCh = make(chan []byte)
for i := 0; i < qty; i++ {
if i == qty/2 {
GetInstance().Initialise(false)
}
go func() {
bh := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6")
GetInstance().SendMessage(NewBlockImportTM(&bh, big.NewInt(2), "NetworkInitialSync"))
wg.Done()
}()
}
wg.Wait()
counter := 0
tk := time.NewTicker(time.Second * 2)
main:
for {
select {
case <-tk.C:
break main
case <-resultCh:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this case will be reached? The resultsCh is created but none data is pass through channel

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestMain starts a web server that has a HandleFunc listen which reads websocket messages and passes them to resultCh at line 191. Added a fmt.Println to the select case confirms this.

counter++
}
}
tk.Stop()

require.LessOrEqual(t, counter, qty/2)
}

// TestInfiniteListener starts loop that print out data received on websocket ws://localhost:8001/
// this can be useful to see what data is sent to telemetry server
func TestInfiniteListener(t *testing.T) {
Expand Down