Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add CloudFront Standard Log Parser #4

Merged
merged 15 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ $ s3-object-router \
Usage of s3-object-router:
-bucket string
destination S3 bucket name
-format string
convert the s3 object format. choices are json|none (default "none")
-gzip
compress destination object by gzip (default true)
-keep-original-name
Expand All @@ -48,6 +50,8 @@ Usage of s3-object-router:
set time zone to localtime for parsed time
-no-put
do not put to s3
-parser string
object record parser. choices are json|cloudfront (default "json")
-replacer string
wildcard string replacer JSON. e.g. {"foo.bar.*":"foo"}
-time-format string
Expand Down Expand Up @@ -128,6 +132,49 @@ The first line will be routed to `path/to/app.normal/`, the second and third lin

`-replacer` takes a definition as JSON string. The key defines matcher(may includes wildcard `*` and `?`) and the value defines replacement. The matchers works with an order that appears in JSON. When a matcher matches to a string, replace it to replacement and breaks (will not try other matchers).

### record parser

`-parser` specifies the Parser for the object record. In defualt, `json` is selected, and the S3 object parse as one JSON object for each record.

#### `cloudfront`

If "cloudfront" is selected, the S3 object will be parsed as CloudFront standard logs.
(cf. https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#AccessLogsFileNaming)

For example,

- time-parse: true
- time-key: `datetime`
- time-format: `2006-01-02T15:04:05Z`
- key-prefix: `path/to/{{ .x_edge_location }}/{{ .datetime.Format "2006-01-02-15" }}`
- parser: `cloudfront`
- Source S3 object
```tsv
#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit SOX4xwn4XV6Q4rgb7XiVGOHms_BGlTAC4KyHmureZmBNrjGdRLiNIQ== d111111abcdef8.cloudfront.net https 23 0.001 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.001 Hit text/html 78 - -
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit k6WGMNkEzR5BEM_SaF47gjtX9zBDO2m349OY2an0QPEaUum1ZOLrow== d111111abcdef8.cloudfront.net https 23 0.000 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.000 Hit text/html 78 - -
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit f37nTMVvnKvV2ZSvEsivup_c2kZ7VXzYdjC-GUQZ5qNs-89BlWazbw== d111111abcdef8.cloudfront.net https 23 0.001 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.001 Hit text/html 78 - -
2019-12-13 22:36:27 SEA19-C1 900 192.0.2.200 GET d111111abcdef8.cloudfront.net /favicon.ico 502 http://www.example.com/ Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Error 1pkpNfBQ39sYMnjjUQjmH2w1wdJnbHYTbag21o_3OfcQgPzdL2RSSQ== www.example.com http 675 0.102 - - - Error HTTP/1.1 - - 25260 0.102 OriginDnsError text/html 507 - -
2019-12-13 22:36:26 SEA19-C1 900 192.0.2.200 GET d111111abcdef8.cloudfront.net / 502 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Error 3AqrZGCnF_g0-5KOvfA7c9XLcf4YGvMFSeFdIetR1N_2y8jSis8Zxg== www.example.com http 735 0.107 - - - Error HTTP/1.1 - - 3802 0.107 OriginDnsError text/html 507 - -
2019-12-13 22:37:02 SEA19-C2 900 192.0.2.200 GET d111111abcdef8.cloudfront.net / 502 - curl/7.55.1 - - Error kBkDzGnceVtWHqSCqBUqtA_cEs2T3tFUBbnBNkB9El_uVRhHgcZfcw== www.example.com http 387 0.103 - - - Error HTTP/1.1 - - 12644 0.103 OriginDnsError text/html 507 - -
```

The 3rd line will be routed to `path/to/LAX1/2019-12-04-21/`, the 4th line will be routed to `path/to/SEA19-C1/2019-12-13-22/`.

`cloudfront` parser parses two header lines in S3 object.
At that time, the field name is converted according to the following rules.

1. Replace `(` `)` `-` to `_`
1. Replace to all lowercase
1. Trim the right `_`

So can render any field in the key-prefix.; `cs(User-Agent)` can be rendered with `cs_user_agent`.


It also provides an RFC3399-formatted `datetime` field that combines the `date` and `time` fields of CloudFront's standard logs. Use with `-time-parse`,`-time-key`, `-time-format`.

If want to convert the routed S3 object format to JSON, please use `-format json`.
## LICENSE

MIT
10 changes: 7 additions & 3 deletions cmd/s3-object-router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,22 @@ func lambdaHandler(r *router.Router) func(context.Context, events.S3Event) error

func setup() (*router.Router, error) {
var (
bucket, keyPrefix, replacer string
timeKey, timeFormat string
gzip, timeParse, localTime, noPut, keep bool
bucket, keyPrefix, replacer, parser, objFromat string
timeKey, timeFormat string
gzip, timeParse, localTime, noPut, keep bool
)
flag.StringVar(&bucket, "bucket", "", "destination S3 bucket name")
flag.StringVar(&keyPrefix, "key-prefix", "", "prefix of S3 key")
flag.BoolVar(&gzip, "gzip", true, "compress destination object by gzip")
flag.StringVar(&replacer, "replacer", "", `wildcard string replacer JSON. e.g. {"foo.bar.*":"foo"}`)
flag.StringVar(&parser, "parser", "json", "object record parser. choices are json|cloudfront")
flag.BoolVar(&timeParse, "time-parse", false, "parse record value as time.Time with -time-format")
flag.StringVar(&timeFormat, "time-format", time.RFC3339Nano, "format of time-parse")
flag.StringVar(&timeKey, "time-key", router.DefaultTimeKey, "record key name for time-parse")
flag.BoolVar(&localTime, "local-time", false, "set time zone to localtime for parsed time")
flag.BoolVar(&noPut, "no-put", false, "do not put to s3")
flag.BoolVar(&keep, "keep-original-name", false, "keep original object base name")
flag.StringVar(&objFromat, "format", "none", `convert the s3 object format. choices are json|none`)
flag.VisitAll(envToFlag)
flag.Parse()

Expand All @@ -72,12 +74,14 @@ func setup() (*router.Router, error) {
KeyPrefix: keyPrefix,
Gzip: gzip,
Replacer: replacer,
Parser: parser,
TimeParse: timeParse,
TimeKey: timeKey,
TimeFormat: timeFormat,
LocalTime: localTime,
PutS3: !noPut,
KeepOriginalName: keep,
ObjectFormat: objFromat,
}
log.Printf("[debug] option: %#v", opt)
return router.New(&opt)
Expand Down
59 changes: 59 additions & 0 deletions encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package router

import (
"encoding/json"
"log"
)

// LF represents LineFeed \n
var LF = []byte("\n")

type noneEncoder struct {
body buffer
}

func newNoneEncoder(body buffer) encoder {
return &noneEncoder{
body: body,
}
}

func (e *noneEncoder) Encode(_ record, recordBytes []byte) error {
if _, err := e.body.Write(recordBytes); err != nil {
return err
}
_, err := e.body.Write(LF)
return err
}

func (e *noneEncoder) Buffer() buffer {
return e.body
}

type jsonEncoder struct {
body buffer
}

func newJSONEncoder(body buffer) encoder {
return &jsonEncoder{
body: body,
}
}

func (e *jsonEncoder) Encode(rec record, _ []byte) error {

bytes, err := json.Marshal(rec)
if err != nil {
log.Println("[warn] failed to generate json record", err)
return err
}
if _, err := e.body.Write(bytes); err != nil {
return err
}
_, err = e.body.Write(LF)
return err
}

func (e *jsonEncoder) Buffer() buffer {
return e.body
}
49 changes: 37 additions & 12 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,32 @@ var DefaultTimeKey = "time"

// Option represents option values of router
type Option struct {
Bucket string
KeyPrefix string
TimeParse bool
TimeKey string
TimeFormat string
LocalTime bool
Gzip bool
Replacer string
PutS3 bool
KeepOriginalName bool
Bucket string `json:"bucket,omitempty"`
KeyPrefix string `json:"key_prefix,omitempty"`
TimeParse bool `json:"time_parse,omitempty"`
TimeKey string `json:"time_key,omitempty"`
TimeFormat string `json:"time_format,omitempty"`
LocalTime bool `json:"local_time,omitempty"`
Gzip bool `json:"gzip,omitempty"`
Replacer string `json:"replacer,omitempty"`
Parser string `json:"parser,omitempty"`
PutS3 bool `json:"put_s3,omitempty"`
ObjectFormat string `json:"object_format,omitempty"`
KeepOriginalName bool `json:"keep_original_name,omitempty"`

replacer replacer
timeParser timeParser
replacer replacer
recordParser recordParser
newEncoder func(buffer) encoder
timeParser timeParser
}

type replacer interface {
Replace(string) string
}

type recordParser interface {
Parse([]byte, *record) error
}
type timeParser struct {
layout string
loc *time.Location
Expand Down Expand Up @@ -76,6 +83,16 @@ func (opt *Option) Init() error {
} else {
opt.replacer = strings.NewReplacer() // nop replacer
}
switch opt.Parser {
case "", "json":
opt.recordParser = recordParserFunc(func(b []byte, r *record) error {
return json.Unmarshal(b, r)
})
case "cloudfront":
opt.recordParser = &cloudfrontParser{}
default:
return errors.New("parser must be string any of json|cloudfront")
}
if opt.TimeParse {
p := timeParser{layout: opt.TimeFormat}
if opt.LocalTime {
Expand All @@ -88,5 +105,13 @@ func (opt *Option) Init() error {
if opt.TimeKey == "" {
opt.TimeKey = DefaultTimeKey
}
switch opt.ObjectFormat {
case "", "none":
opt.newEncoder = newNoneEncoder
case "json":
opt.newEncoder = newJSONEncoder
default:
return errors.New("format must be string any of json|none")
}
return nil
}
78 changes: 78 additions & 0 deletions parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package router

import (
"fmt"
"strings"

"github.com/pkg/errors"
)

const (
defaultCloudFrontNumColumns = 33
)

//recordParser predefined errors
var (
SkipLine = errors.New("Please skip this line.")
)

type recordParserFunc func([]byte, *record) error

func (p recordParserFunc) Parse(bs []byte, r *record) error {
return p(bs, r)
}

type cloudfrontParser struct {
version string
fields []string
}

func (p *cloudfrontParser) Parse(bs []byte, r *record) error {
str := string(bs)
rec := make(record, defaultCloudFrontNumColumns)
*r = rec
if str[0] == '#' {
part := strings.SplitN(str[1:], ":", 2)
if len(part) != 2 {
return SkipLine
}
key := strings.TrimSpace(part[0])
value := strings.TrimSpace(part[1])
switch key {
case "Version":
p.version = value
case "Fields":
rawFields := strings.Split(value, " ")
//convert to snake case
fields := make([]string, 0, len(rawFields))
replaceTargets := []string{"(", ")", "-"}
for _, rawField := range rawFields {
field := rawField
for _, target := range replaceTargets {
field = strings.ReplaceAll(field, target, "_")
}
field = strings.ToLower(field)
field = strings.TrimRight(field, "_")
fields = append(fields, field)
}
p.fields = fields
}
return SkipLine
}
values := strings.Split(str, "\t")
if len(values) > len(p.fields) {
return fmt.Errorf("this row has more values ​​than fields, num of values = %d, num of feilds = %d", len(values), len(p.fields))
}
var dateValue, timeValue string
for i, field := range p.fields {
rec[field] = values[i]
if field == "date" {
dateValue = values[i]
}
if field == "time" {
timeValue = values[i]
}
}
rec["datetime"] = dateValue + "T" + timeValue + "Z"
return nil
}
Loading