Skip to content

Commit

Permalink
Merge pull request #4 from kayac/feature/add-parser-flag-for-cloudfro…
Browse files Browse the repository at this point in the history
…nt-log

add CloudFront Standard Log Parser
  • Loading branch information
fujiwara authored Mar 30, 2021
2 parents 5d79f66 + a7ef95d commit 6ea2c00
Show file tree
Hide file tree
Showing 13 changed files with 478 additions and 27 deletions.
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ $ s3-object-router \
Usage of s3-object-router:
-bucket string
destination S3 bucket name
-format string
convert the s3 object format. choices are json|none (default "none")
-gzip
compress destination object by gzip (default true)
-keep-original-name
Expand All @@ -48,6 +50,8 @@ Usage of s3-object-router:
set time zone to localtime for parsed time
-no-put
do not put to s3
-parser string
object record parser. choices are json|cloudfront (default "json")
-replacer string
wildcard string replacer JSON. e.g. {"foo.bar.*":"foo"}
-time-format string
Expand Down Expand Up @@ -128,6 +132,49 @@ The first line will be routed to `path/to/app.normal/`, the second and third lin

`-replacer` takes a definition as JSON string. The key defines matcher(may includes wildcard `*` and `?`) and the value defines replacement. The matchers works with an order that appears in JSON. When a matcher matches to a string, replace it to replacement and breaks (will not try other matchers).

### record parser

`-parser` specifies the Parser for the object record. In defualt, `json` is selected, and the S3 object parse as one JSON object for each record.

#### `cloudfront`

If "cloudfront" is selected, the S3 object will be parsed as CloudFront standard logs.
(cf. https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#AccessLogsFileNaming)

For example,

- time-parse: true
- time-key: `datetime`
- time-format: `2006-01-02T15:04:05Z`
- key-prefix: `path/to/{{ .x_edge_location }}/{{ .datetime.Format "2006-01-02-15" }}`
- parser: `cloudfront`
- Source S3 object
```tsv
#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit SOX4xwn4XV6Q4rgb7XiVGOHms_BGlTAC4KyHmureZmBNrjGdRLiNIQ== d111111abcdef8.cloudfront.net https 23 0.001 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.001 Hit text/html 78 - -
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit k6WGMNkEzR5BEM_SaF47gjtX9zBDO2m349OY2an0QPEaUum1ZOLrow== d111111abcdef8.cloudfront.net https 23 0.000 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.000 Hit text/html 78 - -
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit f37nTMVvnKvV2ZSvEsivup_c2kZ7VXzYdjC-GUQZ5qNs-89BlWazbw== d111111abcdef8.cloudfront.net https 23 0.001 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.001 Hit text/html 78 - -
2019-12-13 22:36:27 SEA19-C1 900 192.0.2.200 GET d111111abcdef8.cloudfront.net /favicon.ico 502 http://www.example.com/ Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Error 1pkpNfBQ39sYMnjjUQjmH2w1wdJnbHYTbag21o_3OfcQgPzdL2RSSQ== www.example.com http 675 0.102 - - - Error HTTP/1.1 - - 25260 0.102 OriginDnsError text/html 507 - -
2019-12-13 22:36:26 SEA19-C1 900 192.0.2.200 GET d111111abcdef8.cloudfront.net / 502 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Error 3AqrZGCnF_g0-5KOvfA7c9XLcf4YGvMFSeFdIetR1N_2y8jSis8Zxg== www.example.com http 735 0.107 - - - Error HTTP/1.1 - - 3802 0.107 OriginDnsError text/html 507 - -
2019-12-13 22:37:02 SEA19-C2 900 192.0.2.200 GET d111111abcdef8.cloudfront.net / 502 - curl/7.55.1 - - Error kBkDzGnceVtWHqSCqBUqtA_cEs2T3tFUBbnBNkB9El_uVRhHgcZfcw== www.example.com http 387 0.103 - - - Error HTTP/1.1 - - 12644 0.103 OriginDnsError text/html 507 - -
```

The 3rd line will be routed to `path/to/LAX1/2019-12-04-21/`, the 4th line will be routed to `path/to/SEA19-C1/2019-12-13-22/`.

`cloudfront` parser parses two header lines in S3 object.
At that time, the field name is converted according to the following rules.

1. Replace `(` `)` `-` to `_`
1. Replace to all lowercase
1. Trim the right `_`

So can render any field in the key-prefix.; `cs(User-Agent)` can be rendered with `cs_user_agent`.


It also provides an RFC3399-formatted `datetime` field that combines the `date` and `time` fields of CloudFront's standard logs. Use with `-time-parse`,`-time-key`, `-time-format`.

If want to convert the routed S3 object format to JSON, please use `-format json`.
## LICENSE

MIT
10 changes: 7 additions & 3 deletions cmd/s3-object-router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,22 @@ func lambdaHandler(r *router.Router) func(context.Context, events.S3Event) error

func setup() (*router.Router, error) {
var (
bucket, keyPrefix, replacer string
timeKey, timeFormat string
gzip, timeParse, localTime, noPut, keep bool
bucket, keyPrefix, replacer, parser, objFromat string
timeKey, timeFormat string
gzip, timeParse, localTime, noPut, keep bool
)
flag.StringVar(&bucket, "bucket", "", "destination S3 bucket name")
flag.StringVar(&keyPrefix, "key-prefix", "", "prefix of S3 key")
flag.BoolVar(&gzip, "gzip", true, "compress destination object by gzip")
flag.StringVar(&replacer, "replacer", "", `wildcard string replacer JSON. e.g. {"foo.bar.*":"foo"}`)
flag.StringVar(&parser, "parser", "json", "object record parser. choices are json|cloudfront")
flag.BoolVar(&timeParse, "time-parse", false, "parse record value as time.Time with -time-format")
flag.StringVar(&timeFormat, "time-format", time.RFC3339Nano, "format of time-parse")
flag.StringVar(&timeKey, "time-key", router.DefaultTimeKey, "record key name for time-parse")
flag.BoolVar(&localTime, "local-time", false, "set time zone to localtime for parsed time")
flag.BoolVar(&noPut, "no-put", false, "do not put to s3")
flag.BoolVar(&keep, "keep-original-name", false, "keep original object base name")
flag.StringVar(&objFromat, "format", "none", `convert the s3 object format. choices are json|none`)
flag.VisitAll(envToFlag)
flag.Parse()

Expand All @@ -72,12 +74,14 @@ func setup() (*router.Router, error) {
KeyPrefix: keyPrefix,
Gzip: gzip,
Replacer: replacer,
Parser: parser,
TimeParse: timeParse,
TimeKey: timeKey,
TimeFormat: timeFormat,
LocalTime: localTime,
PutS3: !noPut,
KeepOriginalName: keep,
ObjectFormat: objFromat,
}
log.Printf("[debug] option: %#v", opt)
return router.New(&opt)
Expand Down
59 changes: 59 additions & 0 deletions encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package router

import (
"encoding/json"
"log"
)

// LF represents LineFeed \n
var LF = []byte("\n")

type noneEncoder struct {
body buffer
}

func newNoneEncoder(body buffer) encoder {
return &noneEncoder{
body: body,
}
}

func (e *noneEncoder) Encode(_ record, recordBytes []byte) error {
if _, err := e.body.Write(recordBytes); err != nil {
return err
}
_, err := e.body.Write(LF)
return err
}

func (e *noneEncoder) Buffer() buffer {
return e.body
}

type jsonEncoder struct {
body buffer
}

func newJSONEncoder(body buffer) encoder {
return &jsonEncoder{
body: body,
}
}

func (e *jsonEncoder) Encode(rec record, _ []byte) error {

bytes, err := json.Marshal(rec)
if err != nil {
log.Println("[warn] failed to generate json record", err)
return err
}
if _, err := e.body.Write(bytes); err != nil {
return err
}
_, err = e.body.Write(LF)
return err
}

func (e *jsonEncoder) Buffer() buffer {
return e.body
}
49 changes: 37 additions & 12 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,32 @@ var DefaultTimeKey = "time"

// Option represents option values of router
type Option struct {
Bucket string
KeyPrefix string
TimeParse bool
TimeKey string
TimeFormat string
LocalTime bool
Gzip bool
Replacer string
PutS3 bool
KeepOriginalName bool
Bucket string `json:"bucket,omitempty"`
KeyPrefix string `json:"key_prefix,omitempty"`
TimeParse bool `json:"time_parse,omitempty"`
TimeKey string `json:"time_key,omitempty"`
TimeFormat string `json:"time_format,omitempty"`
LocalTime bool `json:"local_time,omitempty"`
Gzip bool `json:"gzip,omitempty"`
Replacer string `json:"replacer,omitempty"`
Parser string `json:"parser,omitempty"`
PutS3 bool `json:"put_s3,omitempty"`
ObjectFormat string `json:"object_format,omitempty"`
KeepOriginalName bool `json:"keep_original_name,omitempty"`

replacer replacer
timeParser timeParser
replacer replacer
recordParser recordParser
newEncoder func(buffer) encoder
timeParser timeParser
}

type replacer interface {
Replace(string) string
}

type recordParser interface {
Parse([]byte, *record) error
}
type timeParser struct {
layout string
loc *time.Location
Expand Down Expand Up @@ -76,6 +83,16 @@ func (opt *Option) Init() error {
} else {
opt.replacer = strings.NewReplacer() // nop replacer
}
switch opt.Parser {
case "", "json":
opt.recordParser = recordParserFunc(func(b []byte, r *record) error {
return json.Unmarshal(b, r)
})
case "cloudfront":
opt.recordParser = &cloudfrontParser{}
default:
return errors.New("parser must be string any of json|cloudfront")
}
if opt.TimeParse {
p := timeParser{layout: opt.TimeFormat}
if opt.LocalTime {
Expand All @@ -88,5 +105,13 @@ func (opt *Option) Init() error {
if opt.TimeKey == "" {
opt.TimeKey = DefaultTimeKey
}
switch opt.ObjectFormat {
case "", "none":
opt.newEncoder = newNoneEncoder
case "json":
opt.newEncoder = newJSONEncoder
default:
return errors.New("format must be string any of json|none")
}
return nil
}
78 changes: 78 additions & 0 deletions parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package router

import (
"fmt"
"strings"

"github.com/pkg/errors"
)

const (
defaultCloudFrontNumColumns = 33
)

//recordParser predefined errors
var (
SkipLine = errors.New("Please skip this line.")
)

type recordParserFunc func([]byte, *record) error

func (p recordParserFunc) Parse(bs []byte, r *record) error {
return p(bs, r)
}

type cloudfrontParser struct {
version string
fields []string
}

func (p *cloudfrontParser) Parse(bs []byte, r *record) error {
str := string(bs)
rec := make(record, defaultCloudFrontNumColumns)
*r = rec
if str[0] == '#' {
part := strings.SplitN(str[1:], ":", 2)
if len(part) != 2 {
return SkipLine
}
key := strings.TrimSpace(part[0])
value := strings.TrimSpace(part[1])
switch key {
case "Version":
p.version = value
case "Fields":
rawFields := strings.Split(value, " ")
//convert to snake case
fields := make([]string, 0, len(rawFields))
replaceTargets := []string{"(", ")", "-"}
for _, rawField := range rawFields {
field := rawField
for _, target := range replaceTargets {
field = strings.ReplaceAll(field, target, "_")
}
field = strings.ToLower(field)
field = strings.TrimRight(field, "_")
fields = append(fields, field)
}
p.fields = fields
}
return SkipLine
}
values := strings.Split(str, "\t")
if len(values) > len(p.fields) {
return fmt.Errorf("this row has more values ​​than fields, num of values = %d, num of feilds = %d", len(values), len(p.fields))
}
var dateValue, timeValue string
for i, field := range p.fields {
rec[field] = values[i]
if field == "date" {
dateValue = values[i]
}
if field == "time" {
timeValue = values[i]
}
}
rec["datetime"] = dateValue + "T" + timeValue + "Z"
return nil
}
Loading

0 comments on commit 6ea2c00

Please sign in to comment.