Skip to content

Commit

Permalink
Add UDP rate limit
Browse files Browse the repository at this point in the history
Closes #11
  • Loading branch information
andrewkroh committed Feb 26, 2021
1 parent 158e02a commit 6d4c256
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

- Added `--rate-limit` flag to control rate (in bytes/sec) of UDP streaming. [#12](https://github.com/andrewkroh/stream/pull/12)

## [0.2.0]

- Added `--insecure` to disable TLS verification for the TLS and webhook outputs.
Expand Down
1 change: 1 addition & 0 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.PersistentFlags().IntVar(&opts.Retries, "retry", 10, "connection retry attempts for tcp based protocols")
rootCmd.PersistentFlags().StringVarP(&opts.StartSignal, "start-signal", "s", "", "wait for start signal")
rootCmd.PersistentFlags().BoolVar(&opts.InsecureTLS, "insecure", false, "disable tls verification")
rootCmd.PersistentFlags().IntVar(&opts.RateLimit, "rate-limit", 500*1024, "bytes per second rate limit for UDP output")

// Webhook output flags.
rootCmd.PersistentFlags().StringVar(&opts.WebhookOptions.ContentType, "webhook-content-type", "application/json", "webhook Content-Type")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ require (
go.uber.org/multierr v1.1.0
go.uber.org/zap v1.10.0
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
1 change: 1 addition & 0 deletions pkg/output/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Options struct {
Retries int // Number of connection retries for tcp based protocols.
StartSignal string // OS signal to wait on before starting.
InsecureTLS bool // Disable TLS verification checks.
RateLimit int // UDP rate limit in bytes.

WebhookOptions
}
Expand Down
19 changes: 16 additions & 3 deletions pkg/output/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,29 @@ import (
"context"
"net"

"golang.org/x/time/rate"

"github.com/andrewkroh/stream/pkg/output"
)

const burst = 1024 * 1024

func init() {
output.Register("udp", New)
}

type Output struct {
opts *output.Options
conn *net.UDPConn
opts *output.Options
conn *net.UDPConn
ctx context.Context
limit *rate.Limiter
}

func New(opts *output.Options) (output.Output, error) {
return &Output{opts: opts}, nil
return &Output{
opts: opts,
limit: rate.NewLimiter(rate.Limit(opts.RateLimit), burst),
}, nil
}

func (o *Output) DialContext(ctx context.Context) error {
Expand All @@ -36,6 +45,7 @@ func (o *Output) DialContext(ctx context.Context) error {
}

o.conn = conn
o.ctx = ctx
return nil
}

Expand All @@ -44,5 +54,8 @@ func (o *Output) Close() error {
}

func (o *Output) Write(b []byte) (int, error) {
if err := o.limit.WaitN(o.ctx, len(b)); err != nil {
return 0, err
}
return o.conn.Write(b)
}

0 comments on commit 6d4c256

Please sign in to comment.