Skip to content

Commit

Permalink
move log config up to getcall
Browse files Browse the repository at this point in the history
we have WithLogger already, we just need to use it. this moves the config of
the logger itself up to be a call option which makes it less brittle in
GetCall to hit the path where it might get accidentally turned on.

removes the old buffer and io.Reader on stderr so that we could upload logs,
we only need the logrus logger for now and it was a bit of a tangled mess.

TODO need to add config bit to set the default level to 'info' so that we get
logs out of these guys in the agent config (pure runner is left alone to off),
with ability to change to debug or turn off altogether.

NOTE: this fixes #1328 by putting a write guard after close and not using the
line writer for different writers
  • Loading branch information
rdallman committed May 10, 2019
1 parent 59188c5 commit b42b80b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 140 deletions.
9 changes: 6 additions & 3 deletions api/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,8 @@ func (s *hotSlot) dispatch(ctx context.Context, call *call) error {
defer span.End()

// TODO it's possible we can get rid of this (after getting rid of logs API) - may need for call id/debug mode still
// TODO there's a timeout race for swapping this back if the container doesn't get killed for timing out, and don't you forget it
swapBack := s.container.swap(call.stderr, &call.Stats)
defer call.stderr.Close()
defer swapBack()

req := createUDSRequest(ctx, call)
Expand Down Expand Up @@ -1204,9 +1204,11 @@ func newHotContainer(ctx context.Context, evictor Evictor, call *call, cfg *Conf
if _, ok := stderr.(common.NoopReadWriteCloser); !ok {
gw := common.NewGhostWriter()
buf1 := bufPool.Get().(*bytes.Buffer)
sec := &nopCloser{&logWriter{
// TODO(reed): this logger may have garbage in it between calls, easy fix
// is to make a new one each swap, it's cheap enough to be doable.
sec := &nopCloser{newLogWriter(
logrus.WithFields(logrus.Fields{"tag": "stderr", "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "container_id": id}),
}}
)}
gw.Swap(newLineWriterWithBuffer(buf1, sec))
stderr = gw
bufs = append(bufs, buf1)
Expand Down Expand Up @@ -1336,6 +1338,7 @@ func (c *container) DisableNet() bool { return c.disableNet }
func (c *container) WriteStat(ctx context.Context, stat driver_stats.Stat) {
for key, value := range stat.Metrics {
if m, ok := dockerMeasures[key]; ok {
common.Logger(ctx).WithField(key, value).Info("container stats")
stats.Record(ctx, m.M(int64(value)))
}
}
Expand Down
25 changes: 14 additions & 11 deletions api/agent/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"path/filepath"
"strings"
Expand Down Expand Up @@ -212,6 +213,14 @@ func WithDockerAuth(auth docker.Auther) CallOpt {
}
}

func (a *agent) WithStderrLogger() CallOpt {
return func(c *call) error {
// XXX(reed): allow configuring the level / turning off
c.stderr = setupLogger(c.Call)
return nil
}
}

// GetCall builds a Call that can be used to submit jobs to the agent.
func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
var c call
Expand Down Expand Up @@ -253,20 +262,17 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
}
c.Call.Config["FN_LISTENER"] = "unix:" + filepath.Join(iofsDockerMountDest, udsFilename)
c.Call.Config["FN_FORMAT"] = "http-stream" // TODO: remove this after fdk's forget what it means
// TODO we could set type here too, for now, or anything else not based in fn/app/trigger config

setupCtx(&c)

c.ct = a
if c.stderr == nil {
// TODO(reed): is line writer is vulnerable to attack?
// XXX(reed): forcing this as default is not great / configuring it isn't great either. reconsider.
c.stderr = setupLogger(c.req.Context(), a.cfg.MaxLogSize, !a.cfg.DisableDebugUserLogs, c.Call)
// this disables logs in driver (at container level)
c.stderr = common.NoopReadWriteCloser{}
}
if c.respWriter == nil {
// send function output to logs if no writer given (TODO no longer need w/o async?)
// TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?)
c.respWriter = c.stderr
// TODO: we could make this an error, up to us
c.respWriter = ioutil.Discard
}

return &c, nil
Expand All @@ -283,7 +289,7 @@ type call struct {

respWriter io.Writer
req *http.Request
stderr io.ReadWriteCloser
stderr io.WriteCloser
ct callTrigger
slots *slotQueue
requestState RequestState
Expand Down Expand Up @@ -373,9 +379,6 @@ func (c *call) End(ctx context.Context, errIn error) error {
// ensure stats histogram is reasonably bounded
c.Call.Stats = stats.Decimate(240, c.Call.Stats)

// NOTE call this after InsertLog or the buffer will get reset
c.stderr.Close()

if err := c.ct.fireAfterCall(ctx, c.Model()); err != nil {
return err
}
Expand Down
153 changes: 41 additions & 112 deletions api/agent/func_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,129 +2,90 @@ package agent

import (
"bytes"
"context"
"fmt"
"io"
"sync"
"sync/atomic"

"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
"github.com/sirupsen/logrus"
)

var (
bufPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
logPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
)

// setupLogger returns a ReadWriteCloser that may have:
// * [always] writes bytes to a size limited buffer, that can be read from using io.Reader
// * [always] writes bytes per line to stderr as DEBUG
//
// To prevent write failures from failing the call or any other writes,
// multiWriteCloser ignores errors. Close will flush the line writers
// appropriately. The returned io.ReadWriteCloser is not safe for use after
// calling Close.
func setupLogger(ctx context.Context, maxSize uint64, debug bool, c *models.Call) io.ReadWriteCloser {
func setupLogger(c *models.Call) io.WriteCloser {
lbuf := bufPool.Get().(*bytes.Buffer)
dbuf := logPool.Get().(*bytes.Buffer)

close := func() error {
close := func() {
// TODO we may want to toss out buffers that grow to grotesque size but meh they will prob get GC'd
lbuf.Reset()
dbuf.Reset()
bufPool.Put(lbuf)
logPool.Put(dbuf)
return nil
}

// we don't need to log per line to db, but we do need to limit it
limitw := &nopCloser{newLimitWriter(int(maxSize), dbuf)}

// order matters, in that closer should be last and limit should be next to last
mw := make(multiWriteCloser, 0, 3)

if debug {
// accumulate all line writers, wrap in same line writer (to re-use buffer)
stderrLogger := common.Logger(ctx).WithFields(logrus.Fields{"user_log": true, "app_id": c.AppID, "fn_id": c.FnID, "image": c.Image, "call_id": c.ID})
loggo := &nopCloser{&logWriter{stderrLogger}}

// we don't need to limit the log writer(s), but we do need it to dispense lines
linew := newLineWriterWithBuffer(lbuf, loggo)
mw = append(mw, linew)
stderrLogger := logrus.WithFields(logrus.Fields{"user_log": true, "app_id": c.AppID, "fn_id": c.FnID, "image": c.Image, "call_id": c.ID})
loggo := newLogWriter(stderrLogger)
linew := newLineWriterWithBuffer(lbuf, loggo)
linew = &fCloser{
close: func() error {
err := linew.Close()
close()
return err
},
}

mw = append(mw, limitw, &fCloser{close})
return &rwc{mw, dbuf}
return linew
}

// implements io.ReadWriteCloser, fmt.Stringer and Bytes()
// TODO WriteString and ReadFrom would be handy to implement,
// ReadFrom is a little involved.
type rwc struct {
io.WriteCloser

// buffer is not embedded since it would bypass calls to WriteCloser.Write
// in cases such as WriteString and ReadFrom
b *bytes.Buffer
}

func (r *rwc) Read(b []byte) (int, error) { return r.b.Read(b) }
func (r *rwc) String() string { return r.b.String() }
func (r *rwc) Bytes() []byte { return r.b.Bytes() }

// implements passthrough Write & closure call in Close
// implements passthrough WriteCloser with overwritable Close
type fCloser struct {
io.Writer
close func() error
}

func (f *fCloser) Write(b []byte) (int, error) { return len(b), nil }
func (f *fCloser) Close() error { return f.close() }
func (f *fCloser) Close() error { return f.close() }

type nopCloser struct {
io.Writer
}

func (n *nopCloser) Close() error { return nil }

// multiWriteCloser ignores all errors from inner writers. you say, oh, this is a bad idea?
// yes, well, we were going to silence them all individually anyway, so let's not be shy about it.
// the main thing we need to ensure is that every close is called, even if another errors.
// XXX(reed): maybe we should log it (for syslog, it may help debug, maybe we just log that one)
type multiWriteCloser []io.WriteCloser

func (m multiWriteCloser) Write(b []byte) (n int, err error) {
for _, mw := range m {
mw.Write(b)
}
return len(b), nil
}

func (m multiWriteCloser) Close() (err error) {
for _, mw := range m {
mw.Close()
}
return nil
}

// logWriter will log (to real stderr) every call to Write as a line. it should
// be wrapped with a lineWriter so that the output makes sense.
type logWriter struct {
// level string // XXX(reed):
logrus.FieldLogger
closed uint32
}

func newLogWriter(logger logrus.FieldLogger) io.WriteCloser {
return &logWriter{FieldLogger: logger}
}

func (l *logWriter) Write(b []byte) (int, error) {
if atomic.LoadUint32(&l.closed) == 1 {
// we don't want to return 0/error or the container will get shut down
return len(b), nil
}
l.Debug(string(b))
return len(b), nil
}

func (l *logWriter) Close() error {
atomic.StoreUint32(&l.closed, 1)
return nil
}

// lineWriter buffers all calls to Write and will call Write
// on the underlying writer once per new line. Close must
// be called to ensure that the buffer is flushed, and a newline
// will be appended in Close if none is present.
// TODO(reed): is line writer is vulnerable to attack?
type lineWriter struct {
b *bytes.Buffer
w io.WriteCloser
b *bytes.Buffer
w io.WriteCloser
closed uint32
}

func newLineWriter(w io.WriteCloser) io.WriteCloser {
Expand All @@ -136,6 +97,10 @@ func newLineWriterWithBuffer(b *bytes.Buffer, w io.WriteCloser) io.WriteCloser {
}

func (li *lineWriter) Write(ogb []byte) (int, error) {
if atomic.LoadUint32(&li.closed) == 1 {
// we don't want to return 0/error or the container will shut down
return len(ogb), nil
}
li.b.Write(ogb) // bytes.Buffer is guaranteed, read it!

for {
Expand All @@ -159,6 +124,8 @@ func (li *lineWriter) Write(ogb []byte) (int, error) {
}

func (li *lineWriter) Close() error {
atomic.StoreUint32(&li.closed, 1)

defer li.w.Close() // MUST close this (after writing last line)

// flush the remaining bytes in the buffer to underlying writer, adding a
Expand All @@ -174,41 +141,3 @@ func (li *lineWriter) Close() error {
_, err := li.w.Write(b)
return err
}

// io.Writer that allows limiting bytes written to w
// TODO change to use clamp writer, this is dupe code
type limitDiscardWriter struct {
n, max int
io.Writer
}

func newLimitWriter(max int, w io.Writer) io.Writer {
return &limitDiscardWriter{max: max, Writer: w}
}

func (l *limitDiscardWriter) Write(b []byte) (int, error) {
inpLen := len(b)
if l.n >= l.max {
return inpLen, nil
}

if l.n+inpLen >= l.max {
// cut off to prevent gigantic line attack
b = b[:l.max-l.n]
}

n, err := l.Writer.Write(b)
l.n += n

if l.n >= l.max {
// write in truncation message to log once
l.Writer.Write([]byte(fmt.Sprintf("\n-----max log size %d bytes exceeded, truncating log-----\n", l.max)))
} else if n != len(b) {
// Is this truly a partial write? We'll be honest if that's the case.
return n, err
}

// yes, we lie... this is to prevent callers to blow up, we always pretend
// that we were able to write the entire buffer.
return inpLen, err
}
25 changes: 11 additions & 14 deletions api/server/runner_fninvoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,28 @@ func (s *Server) fnInvoke(resp http.ResponseWriter, req *http.Request, app *mode
// buffer the response before writing it out to client to prevent partials from trying to stream
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
var writer ResponseBuffer

var opts []agent.CallOpt
opts = append(opts, agent.FromHTTPFnRequest(app, fn, req))

var writer ResponseBuffer
isDetached := req.Header.Get("Fn-Invoke-Type") == models.TypeDetached
if isDetached {
writer = agent.NewDetachedResponseWriter(202)
opts = append(opts, agent.InvokeDetached())
} else {
writer = &syncResponseWriter{
headers: resp.Header(),
status: 200,
Buffer: buf,
}
}
opts := getCallOptions(req, app, fn, trig, writer)

opts = append(opts, agent.WithWriter(rw))
opts = append(opts, agent.WithStderrLogger())
if trig != nil {
opts = append(opts, agent.WithTrigger(trig))
}

call, err := s.agent.GetCall(opts...)
if err != nil {
Expand Down Expand Up @@ -137,16 +146,4 @@ func (s *Server) fnInvoke(resp http.ResponseWriter, req *http.Request, app *mode
}

func getCallOptions(req *http.Request, app *models.App, fn *models.Fn, trig *models.Trigger, rw http.ResponseWriter) []agent.CallOpt {
var opts []agent.CallOpt
opts = append(opts, agent.WithWriter(rw)) // XXX (reed): order matters [for now]
opts = append(opts, agent.FromHTTPFnRequest(app, fn, req))

if req.Header.Get("Fn-Invoke-Type") == models.TypeDetached {
opts = append(opts, agent.InvokeDetached())
}

if trig != nil {
opts = append(opts, agent.WithTrigger(trig))
}
return opts
}

0 comments on commit b42b80b

Please sign in to comment.