diff --git a/command/root.go b/command/root.go index 299a64c..d4e8efd 100644 --- a/command/root.go +++ b/command/root.go @@ -57,6 +57,7 @@ func ExecuteContext(ctx context.Context) error { rootCmd.PersistentFlags().IntVar(&opts.Retries, "retry", 10, "connection retry attempts for tcp based protocols") rootCmd.PersistentFlags().StringVarP(&opts.StartSignal, "start-signal", "s", "", "wait for start signal") rootCmd.PersistentFlags().BoolVar(&opts.InsecureTLS, "insecure", false, "disable tls verification") + rootCmd.PersistentFlags().IntVar(&opts.RateLimit, "rate-limit", 500*1024, "bytes per second rate limit for UDP output") // Webhook output flags. rootCmd.PersistentFlags().StringVar(&opts.WebhookOptions.ContentType, "webhook-content-type", "application/json", "webhook Content-Type") diff --git a/go.mod b/go.mod index 0fb89b9..fb3c5f9 100644 --- a/go.mod +++ b/go.mod @@ -11,4 +11,5 @@ require ( go.uber.org/multierr v1.1.0 go.uber.org/zap v1.10.0 golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 ) diff --git a/go.sum b/go.sum index 5ec7eaf..265ab2a 100644 --- a/go.sum +++ b/go.sum @@ -254,6 +254,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/pkg/output/options.go b/pkg/output/options.go index ce2ee2f..0ed625e 100644 --- a/pkg/output/options.go +++ b/pkg/output/options.go @@ -13,6 +13,7 @@ type Options struct { Retries int // Number of connection retries for tcp based protocols. StartSignal string // OS signal to wait on before starting. InsecureTLS bool // Disable TLS verification checks. + RateLimit int // UDP rate limit in bytes. WebhookOptions } diff --git a/pkg/output/udp/udp.go b/pkg/output/udp/udp.go index d2eeef8..40dee99 100644 --- a/pkg/output/udp/udp.go +++ b/pkg/output/udp/udp.go @@ -8,20 +8,29 @@ import ( "context" "net" + "golang.org/x/time/rate" + "github.com/andrewkroh/stream/pkg/output" ) +const burst = 1024 * 1024 + func init() { output.Register("udp", New) } type Output struct { - opts *output.Options - conn *net.UDPConn + opts *output.Options + conn *net.UDPConn + ctx context.Context + limit *rate.Limiter } func New(opts *output.Options) (output.Output, error) { - return &Output{opts: opts}, nil + return &Output{ + opts: opts, + limit: rate.NewLimiter(rate.Limit(opts.RateLimit), burst), + }, nil } func (o *Output) DialContext(ctx context.Context) error { @@ -36,6 +45,7 @@ func (o *Output) DialContext(ctx context.Context) error { } o.conn = conn + o.ctx = ctx return nil } @@ -44,5 +54,8 @@ func (o *Output) Close() error { } func (o *Output) Write(b []byte) (int, error) { + if err := o.limit.WaitN(o.ctx, len(b)); err != nil { + return 0, err + } return o.conn.Write(b) }