Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for log lines of any size and glob patterns #22

Merged
merged 2 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

- Added option to set up custom buffer size for the log reader. [#22](https://github.com/elastic/stream/pull/22)
- Added support for glob patterns. [#22](https://github.com/elastic/stream/pull/22)

## [0.4.0]

- Added HTTP Server output. [#19](https://github.com/elastic/stream/pull/19)
Expand Down
9 changes: 8 additions & 1 deletion command/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,18 @@ func newLogRunner(options *output.Options, logger *zap.Logger) *cobra.Command {
return r.cmd
}

func (r *logRunner) Run(files []string) error {
func (r *logRunner) Run(args []string) error {
out, err := output.Initialize(r.out, r.logger, r.cmd.Context())
if err != nil {
return err
}
defer out.Close()

files, err := cmdutil.ExpandGlobPatternsFromArgs(args)
if err != nil {
return err
}

for _, f := range files {
if err := r.sendLog(f, out); err != nil {
return err
Expand All @@ -66,6 +71,8 @@ func (r *logRunner) sendLog(path string, out output.Output) error {

var totalBytes, totalLines int
s := bufio.NewScanner(bufio.NewReader(f))
buf := make([]byte, r.out.LogReaderBuffer)
s.Buffer(buf, r.out.LogReaderBuffer)
for s.Scan() {
if r.cmd.Context().Err() != nil {
break
Expand Down
1 change: 1 addition & 0 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.PersistentFlags().StringVarP(&opts.StartSignal, "start-signal", "s", "", "wait for start signal")
rootCmd.PersistentFlags().BoolVar(&opts.InsecureTLS, "insecure", false, "disable tls verification")
rootCmd.PersistentFlags().IntVar(&opts.RateLimit, "rate-limit", 500*1024, "bytes per second rate limit for UDP output")
rootCmd.PersistentFlags().IntVar(&opts.LogReaderBuffer, "log-reader-buf", 500*1024, "buffer size in bytes of the log reader")
Copy link
Member

@andrewkroh andrewkroh Jun 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the "maximum size of buffer that may be allocated" during log scanning, correct?

Is this needed to allow scanning lines longer than the default MaxScanTokenSize = 64 * 1024?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, until we have support for multiline I encountered cases where this is needed. Maybe defaulting to 64*1024 is enough though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with making it bigger. Can you change the description to indicate this is the max.


// Webhook output flags.
rootCmd.PersistentFlags().StringVar(&opts.WebhookOptions.ContentType, "webhook-content-type", "application/json", "webhook Content-Type")
Expand Down
19 changes: 18 additions & 1 deletion pkg/cmdutil/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package cmdutil
import (
"fmt"
"os"
"path/filepath"

"github.com/spf13/cobra"
)
Expand All @@ -25,7 +26,11 @@ func ValidateArgs(validators ...cobra.PositionalArgs) cobra.PositionalArgs {

// RegularFiles validates that each arg is a regular file.
func RegularFiles(_ *cobra.Command, args []string) error {
for _, f := range args {
paths, err := ExpandGlobPatternsFromArgs(args)
if err != nil {
return err
}
for _, f := range paths {
info, err := os.Stat(f)
if err != nil {
return fmt.Errorf("arg %q is not a valid file: %w", f, err)
Expand All @@ -36,3 +41,15 @@ func RegularFiles(_ *cobra.Command, args []string) error {
}
return nil
}

func ExpandGlobPatternsFromArgs(args []string) ([]string, error) {
var paths []string
for _, pat := range args {
matches, err := filepath.Glob(pat)
if err != nil {
return nil, fmt.Errorf("invalid glob pattern %q: %w", pat, err)
}
paths = append(paths, matches...)
}
return paths, nil
}
15 changes: 8 additions & 7 deletions pkg/output/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ package output
import "time"

type Options struct {
Addr string // Destination address (host:port).
Delay time.Duration // Delay start after start signal.
Protocol string // Protocol (udp/tcp/tls).
Retries int // Number of connection retries for tcp based protocols.
StartSignal string // OS signal to wait on before starting.
InsecureTLS bool // Disable TLS verification checks.
RateLimit int // UDP rate limit in bytes.
Addr string // Destination address (host:port).
Delay time.Duration // Delay start after start signal.
Protocol string // Protocol (udp/tcp/tls).
Retries int // Number of connection retries for tcp based protocols.
StartSignal string // OS signal to wait on before starting.
InsecureTLS bool // Disable TLS verification checks.
RateLimit int // UDP rate limit in bytes.
LogReaderBuffer int // Log reader buffer size in bytes.

WebhookOptions
GCPPubsubOptions
Expand Down