Skip to content

Commit

Permalink
Latency measurement (#43)
Browse files Browse the repository at this point in the history
* Add simple sink system for latency metrics

* Ignore error explicitly

* Add test for stdout sink
  • Loading branch information
Evertras authored Oct 9, 2023
1 parent 0ec09e8 commit 903e5d1
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 6 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ binary for native level testing.
Optional HTTP server that provides a simple landing page to show current
status and configuration of the given `cyn` instance.

Measure latency from senders to receivers over time. Note that a small delay
exists (100-200μs) with UDP measurements due to how connections may not be
efficiently routed. TCP-based pings will be lower and more accurate for network
latency at lower values. This cannot be easily fixed since we support broadcast
with UDP, making two way communication more complicated, but may be addressed
in the future.

### Future

These will be converted to issues in the future, but some other ideas...
Expand Down
23 changes: 20 additions & 3 deletions cmd/cyn/cmds/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/evertras/cynomys/pkg/cyn"
"github.com/evertras/cynomys/pkg/listener"
"github.com/evertras/cynomys/pkg/metrics"
"github.com/evertras/cynomys/pkg/sender"
)

Expand All @@ -24,6 +25,12 @@ var config struct {
HTTPServer struct {
Address string `mapstructure:"address"`
} `mapstructure:"http"`

Sinks struct {
SinkStdout struct {
Enabled bool `mapstructure:"enabled"`
} `mapstructure:"stdout"`
} `mapstructure:"sinks"`
}

var (
Expand All @@ -44,6 +51,7 @@ func init() {
flags.StringSliceP("send-tcp", "T", nil, "An IP:port address to send to (TCP). Can be specified multiple times.")
flags.DurationP("send-interval", "i", time.Second, "How long to wait between attempting to send data")
flags.String("http.address", "", "An address:port to host an HTTP server on for realtime data, such as '127.0.0.1:8080'")
flags.Bool("sinks.stdout.enabled", false, "Whether to enable the stdout metrics sink")

err := viper.BindPFlags(flags)

Expand All @@ -61,7 +69,8 @@ func initConfig() {
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
viper.AutomaticEnv()

viper.ReadInConfig()
// Ignore errors here because we don't necessarily need a config file
_ = viper.ReadInConfig()

err := viper.Unmarshal(&config)

Expand All @@ -72,6 +81,14 @@ func initConfig() {

var rootCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
sinks := []metrics.Sink{}

if config.Sinks.SinkStdout.Enabled {
sinks = append(sinks, metrics.NewSinkStdout())
}

sink := metrics.NewMultiSink(sinks...)

instance := cyn.New()

if config.HTTPServer.Address != "" {
Expand Down Expand Up @@ -107,7 +124,7 @@ var rootCmd = &cobra.Command{
return fmt.Errorf("net.ResolveUDPAddr for %q: %w", sendUDPTo, err)
}

instance.AddUDPSender(sender.NewUDPSender(*addr, config.SendInterval))
instance.AddUDPSender(sender.NewUDPSender(*addr, config.SendInterval, sink))
}

// We could probably generalize this a bit better, but it's short enough
Expand All @@ -119,7 +136,7 @@ var rootCmd = &cobra.Command{
return fmt.Errorf("net.ResolveTCPAddr for %q: %w", sendTCPTo, err)
}

instance.AddTCPSender(sender.NewTCPSender(*addr, config.SendInterval))
instance.AddTCPSender(sender.NewTCPSender(*addr, config.SendInterval, sink))
}

return instance.Run()
Expand Down
5 changes: 5 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package constants

const (
PingReply = "K"
)
10 changes: 10 additions & 0 deletions pkg/listener/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"net"
"strings"
"sync"

"github.com/evertras/cynomys/pkg/constants"
)

type TCPListener struct {
Expand Down Expand Up @@ -65,6 +67,14 @@ func (l *TCPListener) Listen() error {
break
}

_, err = conn.Write([]byte(constants.PingReply))

if err != nil {
log.Printf("failed to reply to ping: %s", err.Error())
_ = conn.Close()
break
}

log.Printf("Read %d bytes from %v", rlen, remote)
log.Printf("Received: %s", strings.ReplaceAll(string(buf), "\n", "\\n"))
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/listener/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net"
"strings"
"sync"

"github.com/evertras/cynomys/pkg/constants"
)

type UDPListener struct {
Expand Down Expand Up @@ -44,6 +46,14 @@ func (l *UDPListener) Listen() error {
return fmt.Errorf("conn.ReadFromUDP: %w", err)
}

// Write back before doing anything else to minimize latency,
// every nanosecond counts!
_, err = conn.WriteToUDP([]byte(constants.PingReply), remote)

if err != nil {
return fmt.Errorf("conn.WriteToUDP: %w", err)
}

log.Printf("Read %d bytes from %v", rlen, remote)
log.Printf("Received: %s", strings.ReplaceAll(string(buf), "\n", "\\n"))
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/metrics/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package metrics

import "time"

// Sink is an interface for sending latency measurements.
type Sink interface {
SendLatencyMeasurement(fromAddr, toAddr string, measurement time.Duration) error
}
35 changes: 35 additions & 0 deletions pkg/metrics/sink_multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package metrics

import (
"errors"
"time"
)

// MultiSink sends latency measurements to multiple sinks.
// Simplifies the process of sending to multiple sinks.
// If no sinks are given, it will safely do nothing.
type MultiSink struct {
sinks []Sink
}

// NewMultiSink creates a new MultiSink ready to send to the given sinks.
// If no sinks are given, it will safely do nothing.
func NewMultiSink(sinks ...Sink) *MultiSink {
return &MultiSink{
sinks: sinks,
}
}

// SendLatencyMeasurement sends a latency measurement to all sinks.
// Any errors are returned as a single joined error.
func (s *MultiSink) SendLatencyMeasurement(fromAddr, toAddr string, measurement time.Duration) error {
errs := make([]error, 0, len(s.sinks))

for _, sink := range s.sinks {
if err := sink.SendLatencyMeasurement(fromAddr, toAddr, measurement); err != nil {
errs = append(errs, err)
}
}

return errors.Join(errs...)

Check failure on line 34 in pkg/metrics/sink_multi.go

View workflow job for this annotation

GitHub Actions / goreleaser

undefined: errors.Join
}
21 changes: 21 additions & 0 deletions pkg/metrics/sink_stdout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package metrics

import (
"log"
"time"
)

// SinkStdout sends latency measurements to stdout. Useful for ad hoc checks.
type SinkStdout struct{}

// NewSinkStdout creates a new SinkStdout ready to print to stdout.
func NewSinkStdout() *SinkStdout {
return &SinkStdout{}
}

// SendLatencyMeasurement sends a latency measurement to stdout.
func (s *SinkStdout) SendLatencyMeasurement(fromAddr, toAddr string, measurement time.Duration) error {
log.Printf("%v -> %v latency: %v", fromAddr, toAddr, measurement)

return nil
}
15 changes: 14 additions & 1 deletion pkg/sender/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net"
"sync"
"time"

"github.com/evertras/cynomys/pkg/metrics"
)

type TCPSender struct {
Expand All @@ -14,12 +16,15 @@ type TCPSender struct {
broadcastAddr net.TCPAddr
conn *net.TCPConn
sendInterval time.Duration

sink metrics.Sink
}

func NewTCPSender(addr net.TCPAddr, sendInterval time.Duration) *TCPSender {
func NewTCPSender(addr net.TCPAddr, sendInterval time.Duration, sink metrics.Sink) *TCPSender {
return &TCPSender{
broadcastAddr: addr,
sendInterval: sendInterval,
sink: sink,
}
}

Expand Down Expand Up @@ -48,6 +53,8 @@ func (s *TCPSender) Send(data []byte) error {
s.conn = c
}

sent := time.Now()

_, err := s.conn.Write(data)

if err != nil {
Expand All @@ -56,6 +63,12 @@ func (s *TCPSender) Send(data []byte) error {
return fmt.Errorf("s.conn.Write: %w", err)
}

latency := time.Since(sent)

if err := s.sink.SendLatencyMeasurement(s.conn.LocalAddr().String(), s.conn.RemoteAddr().String(), latency); err != nil {
log.Printf("Failed to send latency measurement: %v", err)
}

return nil
}

Expand Down
32 changes: 31 additions & 1 deletion pkg/sender/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"net"
"sync"
"time"

"github.com/evertras/cynomys/pkg/constants"
"github.com/evertras/cynomys/pkg/metrics"
)

type UDPSender struct {
Expand All @@ -14,12 +17,18 @@ type UDPSender struct {
broadcastAddr net.UDPAddr
conn *net.UDPConn
sendInterval time.Duration

fromAddr string
toAddr string

sink metrics.Sink
}

func NewUDPSender(addr net.UDPAddr, sendInterval time.Duration) *UDPSender {
func NewUDPSender(addr net.UDPAddr, sendInterval time.Duration, sink metrics.Sink) *UDPSender {
return &UDPSender{
broadcastAddr: addr,
sendInterval: sendInterval,
sink: sink,
}
}

Expand All @@ -34,12 +43,33 @@ func (s *UDPSender) Send(data []byte) error {
s.conn = c
}

reply := make([]byte, 16)
sent := time.Now()

_, err := s.conn.Write(data)

if err != nil {
return fmt.Errorf("s.conn.Write: %w", err)
}

_, _, err = s.conn.ReadFromUDP(reply)

if err != nil {
return fmt.Errorf("waiting for ping reply s.conn.ReadFromUDP: %w", err)
}

if reply[0] != constants.PingReply[0] {
return fmt.Errorf("ping reply was not %q: %q", constants.PingReply, string(reply))
}

latency := time.Since(sent)

err = s.sink.SendLatencyMeasurement(s.conn.LocalAddr().String(), s.conn.RemoteAddr().String(), latency)

if err != nil {
log.Printf("Failed to send latency measurement: %v", err)
}

return nil
}

Expand Down
15 changes: 15 additions & 0 deletions tests/features/sink-stdout.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Feature: watch latency in stdout
In order to check latency in an ad hoc fashion
Cyn should send the latency to stdout

Scenario: no latency in stdout by default
Given I run cyn -u 127.0.0.1:14567
And I run cyn -U 127.0.0.1:14567 --send-interval 200ms
When I wait 1 second
Then the stdout does not contain "latency"

Scenario: latency is displayed in stdout when asked for
Given I run cyn -u 127.0.0.1:14567
And I run cyn -U 127.0.0.1:14567 --send-interval 200ms --sinks.stdout.enabled
When I wait 1 second
Then the stdout contains "latency"
2 changes: 1 addition & 1 deletion tests/steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func InitializeScenario(sc *godog.ScenarioContext) {
sc.AfterScenario(func(sc *godog.Scenario, err error) {
if err != nil {
for i, cmd := range t.cmds {
fmt.Printf("Process #%d", i)
fmt.Printf("Process #%d\n", i)
fmt.Println("vvvv STDOUT DUMP vvvv")
fmt.Println(cmd.Stdout())
fmt.Println("^^^^ STDOUT DUMP ^^^^")
Expand Down

0 comments on commit 903e5d1

Please sign in to comment.