From f61e2d783dbd7adc2447b5b348b51a5fc0413aff Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 3 Jun 2021 16:47:06 +0200 Subject: [PATCH] Add support for log lines of any size and glob patterns --- CHANGELOG.md | 3 +++ command/log.go | 9 ++++++++- command/root.go | 1 + pkg/cmdutil/validate.go | 19 ++++++++++++++++++- pkg/output/options.go | 15 ++++++++------- 5 files changed, 38 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 810565b..9bef24b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +- Added option to set up custom buffer size for the log reader. [#22](https://github.com/elastic/stream/pull/22) +- Added support for glob patterns. [#22](https://github.com/elastic/stream/pull/22) + ## [0.4.0] - Added HTTP Server output. [#19](https://github.com/elastic/stream/pull/19) diff --git a/command/log.go b/command/log.go index 49f9efc..cf6f208 100644 --- a/command/log.go +++ b/command/log.go @@ -39,13 +39,18 @@ func newLogRunner(options *output.Options, logger *zap.Logger) *cobra.Command { return r.cmd } -func (r *logRunner) Run(files []string) error { +func (r *logRunner) Run(args []string) error { out, err := output.Initialize(r.out, r.logger, r.cmd.Context()) if err != nil { return err } defer out.Close() + files, err := cmdutil.ExpandGlobPatternsFromArgs(args) + if err != nil { + return err + } + for _, f := range files { if err := r.sendLog(f, out); err != nil { return err @@ -66,6 +71,8 @@ func (r *logRunner) sendLog(path string, out output.Output) error { var totalBytes, totalLines int s := bufio.NewScanner(bufio.NewReader(f)) + buf := make([]byte, r.out.LogReaderBuffer) + s.Buffer(buf, r.out.LogReaderBuffer) for s.Scan() { if r.cmd.Context().Err() != nil { break diff --git a/command/root.go b/command/root.go index 253b668..0ee800d 100644 --- a/command/root.go +++ b/command/root.go @@ -62,6 +62,7 @@ func ExecuteContext(ctx context.Context) error { rootCmd.PersistentFlags().StringVarP(&opts.StartSignal, "start-signal", "s", "", "wait for start signal") rootCmd.PersistentFlags().BoolVar(&opts.InsecureTLS, "insecure", false, "disable tls verification") rootCmd.PersistentFlags().IntVar(&opts.RateLimit, "rate-limit", 500*1024, "bytes per second rate limit for UDP output") + rootCmd.PersistentFlags().IntVar(&opts.LogReaderBuffer, "log-reader-buf", 500*1024, "buffer size in bytes of the log reader") // Webhook output flags. rootCmd.PersistentFlags().StringVar(&opts.WebhookOptions.ContentType, "webhook-content-type", "application/json", "webhook Content-Type") diff --git a/pkg/cmdutil/validate.go b/pkg/cmdutil/validate.go index fef896f..df734e1 100644 --- a/pkg/cmdutil/validate.go +++ b/pkg/cmdutil/validate.go @@ -7,6 +7,7 @@ package cmdutil import ( "fmt" "os" + "path/filepath" "github.com/spf13/cobra" ) @@ -25,7 +26,11 @@ func ValidateArgs(validators ...cobra.PositionalArgs) cobra.PositionalArgs { // RegularFiles validates that each arg is a regular file. func RegularFiles(_ *cobra.Command, args []string) error { - for _, f := range args { + paths, err := ExpandGlobPatternsFromArgs(args) + if err != nil { + return err + } + for _, f := range paths { info, err := os.Stat(f) if err != nil { return fmt.Errorf("arg %q is not a valid file: %w", f, err) @@ -36,3 +41,15 @@ func RegularFiles(_ *cobra.Command, args []string) error { } return nil } + +func ExpandGlobPatternsFromArgs(args []string) ([]string, error) { + var paths []string + for _, pat := range args { + matches, err := filepath.Glob(pat) + if err != nil { + return nil, fmt.Errorf("invalid glob pattern %q: %w", pat, err) + } + paths = append(paths, matches...) + } + return paths, nil +} diff --git a/pkg/output/options.go b/pkg/output/options.go index 53ff185..9129f00 100644 --- a/pkg/output/options.go +++ b/pkg/output/options.go @@ -7,13 +7,14 @@ package output import "time" type Options struct { - Addr string // Destination address (host:port). - Delay time.Duration // Delay start after start signal. - Protocol string // Protocol (udp/tcp/tls). - Retries int // Number of connection retries for tcp based protocols. - StartSignal string // OS signal to wait on before starting. - InsecureTLS bool // Disable TLS verification checks. - RateLimit int // UDP rate limit in bytes. + Addr string // Destination address (host:port). + Delay time.Duration // Delay start after start signal. + Protocol string // Protocol (udp/tcp/tls). + Retries int // Number of connection retries for tcp based protocols. + StartSignal string // OS signal to wait on before starting. + InsecureTLS bool // Disable TLS verification checks. + RateLimit int // UDP rate limit in bytes. + LogReaderBuffer int // Log reader buffer size in bytes. WebhookOptions GCPPubsubOptions