Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools/alb-replay: Add new features to alb-replay #4384

Merged
merged 2 commits into from
May 19, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 70 additions & 22 deletions tools/alb-replay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"compress/gzip"
"context"
"encoding/csv"
"errors"
Expand All @@ -14,7 +15,8 @@ import (
"net/url"
"os"
"os/signal"
"strconv"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
Expand All @@ -32,23 +34,29 @@ type NumberedURL struct {
URL string
}

func isSuccesfulStatusCode(statusCode int) bool {
// consider all 2XX HTTP errors a success
return statusCode/100 == 2
type ALBLogEntryReader struct {
csvReader *csv.Reader
config ALBLogEntryReaderConfig
}

type ALBLogEntryReader csv.Reader
type ALBLogEntryReaderConfig struct {
pathRegexp *regexp.Regexp
statusCodeRegexp *regexp.Regexp
}

func newALBLogEntryReader(input io.Reader) *ALBLogEntryReader {
func newALBLogEntryReader(input io.Reader, config ALBLogEntryReaderConfig) *ALBLogEntryReader {
reader := csv.NewReader(input)
reader.Comma = ' '
reader.FieldsPerRecord = albLogEntryCount
reader.ReuseRecord = true
return (*ALBLogEntryReader)(reader)
return &ALBLogEntryReader{
csvReader: reader,
config: config,
}
}

func (r *ALBLogEntryReader) GetRequestURI() (string, error) {
records, err := (*csv.Reader)(r).Read()
records, err := r.csvReader.Read()
if err != nil {
return "", err
}
Expand All @@ -58,13 +66,9 @@ func (r *ALBLogEntryReader) GetRequestURI() (string, error) {
if statusCodeStr == "-" {
return "", nil
}
statusCode, err := strconv.Atoi(statusCodeStr)
if err != nil {
return "", fmt.Errorf("error parsing target status code %q: %v", statusCodeStr, err)
}

// discard unsuccesful requests
if !isSuccesfulStatusCode(statusCode) {
if !r.config.statusCodeRegexp.MatchString(statusCodeStr) {
// discard url
return "", nil
}

Expand All @@ -86,6 +90,13 @@ func (r *ALBLogEntryReader) GetRequestURI() (string, error) {
return "", fmt.Errorf("error parsing url %q: %v", urlStr, err)
}

if r.config.pathRegexp != nil {
if !r.config.pathRegexp.MatchString(urlStr) {
// discard url
return "", nil
}
}

return parsed.RequestURI(), nil
}

Expand Down Expand Up @@ -121,7 +132,7 @@ func parseURLs(ctx context.Context, startFrom int, baseURL string, logReader *AL
}
}

func queryURLs(ctx context.Context, timeout time.Duration, urlChan chan NumberedURL) {
func queryURLs(ctx context.Context, timeout time.Duration, urlChan chan NumberedURL, quiet bool) {
client := http.Client{
Timeout: timeout,
}
Expand All @@ -146,35 +157,72 @@ func queryURLs(ctx context.Context, timeout time.Duration, urlChan chan Numbered
continue
}
resp.Body.Close()
if !isSuccesfulStatusCode(resp.StatusCode) {
if resp.StatusCode/100 != 2 {
log.Printf("(%d) unexpected status code: %d %q", numURL.Number, resp.StatusCode, numURL.URL)
continue
}
log.Printf("(%d) %s %s", numURL.Number, time.Since(start), numURL.URL)
if !quiet {
log.Printf("(%d) %s %s", numURL.Number, time.Since(start), numURL.URL)
}
}
}

func main() {
workers := flag.Int("workers", 1, "How many parallel workers to use")
startFromURLNum := flag.Int("start-from", 1, "What URL number to start from")
pathRegexp := flag.String("path-filter", "", "Regular expression with which to filter in requests based on their paths")
2opremio marked this conversation as resolved.
Show resolved Hide resolved
statusCodeRegexp := flag.String("status-code-filter", "^2[0-9][0-9]$", "Regular expression with which to filter in request based on their status codes")
timeout := flag.Duration("timeout", time.Second*5, "HTTP request timeout")
quiet := flag.Bool("quiet", false, "Only log failed requests")
flag.Parse()
if *workers < 1 {
log.Fatal("--workers parameter must be > 0")
}
if *startFromURLNum < 1 {
log.Fatal("--start-from must be > 0")
}
var pathRE *regexp.Regexp
if *pathRegexp != "" {
var err error
pathRE, err = regexp.Compile(*pathRegexp)
if err != nil {
log.Fatalf("error parsing --path-filter %q: %v", *pathRegexp, err)
}
}
var statusCodeRE *regexp.Regexp
if *statusCodeRegexp != "" {
var err error
statusCodeRE, err = regexp.Compile(*statusCodeRegexp)
if err != nil {
log.Fatalf("error parsing --status-code-filter parameter %q: %v", *statusCodeRegexp, err)
}
}
if flag.NArg() != 2 {
log.Fatalf("usage: %s <aws_log_file> <target_host_base_url>", os.Args[0])
log.Fatalf("usage: %s <aws_log_file[.gz]> <target_host_base_url>", os.Args[0])
}

file, err := os.Open(flag.Args()[0])
var reader io.ReadCloser
filePath := flag.Args()[0]
reader, err := os.Open(filePath)
if err != nil {
log.Fatalf("error opening file %q: %v", os.Args[1], err)
log.Fatalf("error opening file %q: %v", filePath, err)
}
defer reader.Close()
if filepath.Ext(filePath) == ".gz" {
var err error
reader, err = gzip.NewReader(reader)
if err != nil {
log.Fatalf("error opening file %q: %v", filePath, err)
}
defer reader.Close()
}

baseURL := flag.Args()[1]
logReader := newALBLogEntryReader(file)
logReaderConfig := ALBLogEntryReaderConfig{
pathRegexp: pathRE,
statusCodeRegexp: statusCodeRE,
}
logReader := newALBLogEntryReader(reader, logReaderConfig)
urlChan := make(chan NumberedURL, *workers)
var wg sync.WaitGroup

Expand All @@ -186,7 +234,7 @@ func main() {
for i := 0; i < *workers; i++ {
wg.Add(1)
go func() {
queryURLs(ctx, *timeout, urlChan)
queryURLs(ctx, *timeout, urlChan, *quiet)
wg.Done()
}()
}
Expand Down