Skip to content

Commit

Permalink
Add support for log lines of any size and glob patterns (#22)
Browse files Browse the repository at this point in the history
* Add support for log lines of any size and glob patterns

* Rename buffer parameter to make it more clear
  • Loading branch information
marc-gr authored Jun 4, 2021
1 parent ac0680b commit bb61f49
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

- Added option to set up custom buffer size for the log reader. [#22](https://github.com/elastic/stream/pull/22)
- Added support for glob patterns. [#22](https://github.com/elastic/stream/pull/22)

## [0.4.0]

- Added HTTP Server output. [#19](https://github.com/elastic/stream/pull/19)
Expand Down
9 changes: 8 additions & 1 deletion command/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,18 @@ func newLogRunner(options *output.Options, logger *zap.Logger) *cobra.Command {
return r.cmd
}

func (r *logRunner) Run(files []string) error {
func (r *logRunner) Run(args []string) error {
out, err := output.Initialize(r.out, r.logger, r.cmd.Context())
if err != nil {
return err
}
defer out.Close()

files, err := cmdutil.ExpandGlobPatternsFromArgs(args)
if err != nil {
return err
}

for _, f := range files {
if err := r.sendLog(f, out); err != nil {
return err
Expand All @@ -66,6 +71,8 @@ func (r *logRunner) sendLog(path string, out output.Output) error {

var totalBytes, totalLines int
s := bufio.NewScanner(bufio.NewReader(f))
buf := make([]byte, r.out.MaxLogLineSize)
s.Buffer(buf, r.out.MaxLogLineSize)
for s.Scan() {
if r.cmd.Context().Err() != nil {
break
Expand Down
1 change: 1 addition & 0 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.PersistentFlags().StringVarP(&opts.StartSignal, "start-signal", "s", "", "wait for start signal")
rootCmd.PersistentFlags().BoolVar(&opts.InsecureTLS, "insecure", false, "disable tls verification")
rootCmd.PersistentFlags().IntVar(&opts.RateLimit, "rate-limit", 500*1024, "bytes per second rate limit for UDP output")
rootCmd.PersistentFlags().IntVar(&opts.MaxLogLineSize, "max-log-line-size", 500*1024, "max size of a single log line in bytes")

// Webhook output flags.
rootCmd.PersistentFlags().StringVar(&opts.WebhookOptions.ContentType, "webhook-content-type", "application/json", "webhook Content-Type")
Expand Down
19 changes: 18 additions & 1 deletion pkg/cmdutil/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package cmdutil
import (
"fmt"
"os"
"path/filepath"

"github.com/spf13/cobra"
)
Expand All @@ -25,7 +26,11 @@ func ValidateArgs(validators ...cobra.PositionalArgs) cobra.PositionalArgs {

// RegularFiles validates that each arg is a regular file.
func RegularFiles(_ *cobra.Command, args []string) error {
for _, f := range args {
paths, err := ExpandGlobPatternsFromArgs(args)
if err != nil {
return err
}
for _, f := range paths {
info, err := os.Stat(f)
if err != nil {
return fmt.Errorf("arg %q is not a valid file: %w", f, err)
Expand All @@ -36,3 +41,15 @@ func RegularFiles(_ *cobra.Command, args []string) error {
}
return nil
}

func ExpandGlobPatternsFromArgs(args []string) ([]string, error) {
var paths []string
for _, pat := range args {
matches, err := filepath.Glob(pat)
if err != nil {
return nil, fmt.Errorf("invalid glob pattern %q: %w", pat, err)
}
paths = append(paths, matches...)
}
return paths, nil
}
15 changes: 8 additions & 7 deletions pkg/output/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ package output
import "time"

type Options struct {
Addr string // Destination address (host:port).
Delay time.Duration // Delay start after start signal.
Protocol string // Protocol (udp/tcp/tls).
Retries int // Number of connection retries for tcp based protocols.
StartSignal string // OS signal to wait on before starting.
InsecureTLS bool // Disable TLS verification checks.
RateLimit int // UDP rate limit in bytes.
Addr string // Destination address (host:port).
Delay time.Duration // Delay start after start signal.
Protocol string // Protocol (udp/tcp/tls).
Retries int // Number of connection retries for tcp based protocols.
StartSignal string // OS signal to wait on before starting.
InsecureTLS bool // Disable TLS verification checks.
RateLimit int // UDP rate limit in bytes.
MaxLogLineSize int // Log reader buffer size in bytes.

WebhookOptions
GCPPubsubOptions
Expand Down

0 comments on commit bb61f49

Please sign in to comment.