Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

default user logs to send to fn stderr at INFO level #1479

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions api/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,8 @@ func (s *hotSlot) dispatch(ctx context.Context, call *call) error {
ctx, span := trace.StartSpan(ctx, "agent_dispatch_httpstream")
defer span.End()

// TODO it's possible we can get rid of this (after getting rid of logs API) - may need for call id/debug mode still
// TODO there's a timeout race for swapping this back if the container doesn't get killed for timing out, and don't you forget it
swapBack := s.container.swap(call.stderr, &call.Stats)
defer call.stderr.Close()
defer swapBack()

req := createUDSRequest(ctx, call)
Expand Down Expand Up @@ -1191,22 +1190,25 @@ func newHotContainer(ctx context.Context, evictor Evictor, call *call, cfg *Conf
// have to be read or *BOTH* blocked consistently. In other words, we cannot block one and continue
// reading from the other one without risking head-of-line blocking.

// TODO(reed): we should let the syslog driver pick this up really but our
// default story sucks there

// disable container logs if they're disabled on the call (pure_runner) -
// users may use syslog to get container logs, unrelated to this writer.
// otherwise, make a line writer and allow logrus DEBUG logs to host stderr
// between function invocations from the container.

var bufs []*bytes.Buffer
var stderr io.WriteCloser = call.stderr
if _, ok := stderr.(common.NoopReadWriteCloser); !ok {
if _, ok := stderr.(common.NoopReadWriteCloser); !ok && !cfg.DisableDebugUserLogs {
gw := common.NewGhostWriter()
buf1 := bufPool.Get().(*bytes.Buffer)
sec := &nopCloser{&logWriter{
// TODO(reed): this logger may have garbage in it between calls (without
// calling Close() to flush newlines, since we're swapping we can't), easy
// fix is to make a new one each swap, it's cheap enough to be doable.
// TODO(reed): we should only do this if they configure to log stderr, not if they use WithLogger(),
// for now use explicit disable with DisableDebugUserLogs
sec := newLogWriter(
logrus.WithFields(logrus.Fields{"tag": "stderr", "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "container_id": id}),
}}
cfg.UserLogLevel,
)
gw.Swap(newLineWriterWithBuffer(buf1, sec))
stderr = gw
bufs = append(bufs, buf1)
Expand Down
59 changes: 0 additions & 59 deletions api/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,65 +275,6 @@ func TestGetCallFromModelRoundTripACall(t *testing.T) {
}
}

func TestLoggerIsStringerAndWorks(t *testing.T) {
// TODO test limit writer, logrus writer, etc etc

var call models.Call
logger := setupLogger(context.Background(), 1*1024*1024, true, &call)

if _, ok := logger.(fmt.Stringer); !ok {
// NOTE: if you are reading, maybe what you've done is ok, but be aware we were relying on this for optimization...
t.Fatal("you turned the logger into something inefficient and possibly better all at the same time, how dare ye!")
}

str := "0 line\n1 line\n2 line\n\n4 line"
logger.Write([]byte(str))

strGot := logger.(fmt.Stringer).String()

if strGot != str {
t.Fatal("logs did not match expectations, like being an adult", strGot, str)
}

logger.Close() // idk maybe this would panic might as well ca this

// TODO we could check for the toilet to flush here to logrus
}

func TestLoggerTooBig(t *testing.T) {

var call models.Call
logger := setupLogger(context.Background(), 10, true, &call)

str := fmt.Sprintf("0 line\n1 l\n-----max log size 10 bytes exceeded, truncating log-----\n")

n, err := logger.Write([]byte(str))
if err != nil {
t.Fatalf("err returned, but should not fail err=%v n=%d", err, n)
}
if n != len(str) {
t.Fatalf("n should be %d, but got=%d", len(str), n)
}

// oneeeeee moreeee time... (cue in Daft Punk), the results appear as if we wrote
// again... But only "limit" bytes should succeed, ignoring the subsequent writes...
n, err = logger.Write([]byte(str))
if err != nil {
t.Fatalf("err returned, but should not fail err=%v n=%d", err, n)
}
if n != len(str) {
t.Fatalf("n should be %d, but got=%d", len(str), n)
}

strGot := logger.(fmt.Stringer).String()

if strGot != str {
t.Fatalf("logs did not match expectations, like being an adult got=\n%v\nexpected=\n%v\n", strGot, str)
}

logger.Close()
}

type testListener struct {
afterCall func(context.Context, *models.Call) error
}
Expand Down
59 changes: 32 additions & 27 deletions api/agent/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"path/filepath"
"strings"
Expand Down Expand Up @@ -44,11 +45,11 @@ type CallOverrider func(*http.Request, *models.Call, map[string]string) (map[str
// CallOpt allows configuring a call before execution
// TODO(reed): consider the interface here, all options must be defined in agent and flexible
// enough for usage by extenders of fn, this straddling is painful. consider models.Call?
type CallOpt func(c *call) error
type CallOpt func(Config, *call) error

// FromHTTPFnRequest Sets up a call from an http trigger request
func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOpt {
return func(c *call) error {
return func(cfg Config, c *call) error {
id := id.New().String()

var syslogURL string
Expand Down Expand Up @@ -120,7 +121,7 @@ func reqURL(req *http.Request) string {

// FromModel creates a call object from an existing stored call model object, reading the body from the stored call payload
func FromModel(mCall *models.Call) CallOpt {
return func(c *call) error {
return func(cfg Config, c *call) error {
c.Call = mCall

req, err := http.NewRequest(c.Method, c.URL, strings.NewReader(c.Payload))
Expand All @@ -137,7 +138,7 @@ func FromModel(mCall *models.Call) CallOpt {

// FromModelAndInput creates a call object from an existing stored call model object , reading the body from a provided stream
func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt {
return func(c *call) error {
return func(cfg Config, c *call) error {
c.Call = mCall

req, err := http.NewRequest(c.Method, c.URL, in)
Expand All @@ -155,7 +156,7 @@ func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt {
// WithTrigger adds trigger specific bits to a call.
// TODO consider removal, this is from a shuffle
func WithTrigger(t *models.Trigger) CallOpt {
return func(c *call) error {
return func(cfg Config, c *call) error {
// right now just set the trigger id
c.TriggerID = t.ID
return nil
Expand All @@ -165,31 +166,31 @@ func WithTrigger(t *models.Trigger) CallOpt {
// WithWriter sets the writer that the call uses to send its output message to
// TODO this should be required
func WithWriter(w io.Writer) CallOpt {
return func(c *call) error {
return func(cfg Config, c *call) error {
c.respWriter = w
return nil
}
}

// WithLogger sets stderr to the provided one
func WithLogger(w io.ReadWriteCloser) CallOpt {
return func(c *call) error {
return func(cfg Config, c *call) error {
c.stderr = w
return nil
}
}

// InvokeDetached mark a call to be a detached call
func InvokeDetached() CallOpt {
return func(c *call) error {
return func(cfg Config, c *call) error {
c.Model().Type = models.TypeDetached
return nil
}
}

// WithContext overrides the context on the call
func WithContext(ctx context.Context) CallOpt {
return func(c *call) error {
return func(cfg Config, c *call) error {
c.req = c.req.WithContext(ctx)
return nil
}
Expand All @@ -198,20 +199,34 @@ func WithContext(ctx context.Context) CallOpt {
// WithExtensions adds internal attributes to the call that can be interpreted by extensions in the agent
// Pure runner can use this to pass an extension to the call
func WithExtensions(extensions map[string]string) CallOpt {
return func(c *call) error {
return func(cfg Config, c *call) error {
c.extensions = extensions
return nil
}
}

// WithDockerAuth configures a call to retrieve credentials for an image pull
func WithDockerAuth(auth docker.Auther) CallOpt {
return func(c *call) error {
return func(cfg Config, c *call) error {
c.dockerAuth = auth
return nil
}
}

// WithStderrLogger configures a call to have its container logs logged by fn.
// Configure UserLogLevel or DisableDebugUserLogs on agent to change behavior.
func WithStderrLogger() CallOpt {
// TODO(reed): we could take a context here which would allow request level logging vars on ctx to be used here too
return func(cfg Config, c *call) error {
if cfg.DisableDebugUserLogs {
return nil
}

c.stderr = setupLogger(c.Call, cfg.UserLogLevel)
return nil
}
}

// GetCall builds a Call that can be used to submit jobs to the agent.
func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
var c call
Expand All @@ -222,7 +237,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
opts = append(opts, a.callOpts...)

for _, o := range opts {
err := o(&c)
err := o(a.cfg, &c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -253,20 +268,17 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
}
c.Call.Config["FN_LISTENER"] = "unix:" + filepath.Join(iofsDockerMountDest, udsFilename)
c.Call.Config["FN_FORMAT"] = "http-stream" // TODO: remove this after fdk's forget what it means
// TODO we could set type here too, for now, or anything else not based in fn/app/trigger config

setupCtx(&c)

c.ct = a
if c.stderr == nil {
// TODO(reed): is line writer is vulnerable to attack?
// XXX(reed): forcing this as default is not great / configuring it isn't great either. reconsider.
c.stderr = setupLogger(c.req.Context(), a.cfg.MaxLogSize, !a.cfg.DisableDebugUserLogs, c.Call)
// this disables logs in driver (at container level)
c.stderr = common.NoopReadWriteCloser{}
}
if c.respWriter == nil {
// send function output to logs if no writer given (TODO no longer need w/o async?)
// TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?)
c.respWriter = c.stderr
// TODO: we could make this an error, up to us
c.respWriter = ioutil.Discard
}

return &c, nil
Expand All @@ -283,7 +295,7 @@ type call struct {

respWriter io.Writer
req *http.Request
stderr io.ReadWriteCloser
stderr io.WriteCloser
ct callTrigger
slots *slotQueue
requestState RequestState
Expand Down Expand Up @@ -322,10 +334,6 @@ func (c *call) ResponseWriter() http.ResponseWriter {
return c.respWriter.(http.ResponseWriter)
}

func (c *call) StdErr() io.ReadWriteCloser {
return c.stderr
}

func (c *call) AddUserExecutionTime(dur time.Duration) {
if c.userExecTime == nil {
c.userExecTime = new(time.Duration)
Expand Down Expand Up @@ -373,9 +381,6 @@ func (c *call) End(ctx context.Context, errIn error) error {
// ensure stats histogram is reasonably bounded
c.Call.Stats = stats.Decimate(240, c.Call.Stats)

// NOTE call this after InsertLog or the buffer will get reset
c.stderr.Close()

if err := c.ct.fireAfterCall(ctx, c.Model()); err != nil {
return err
}
Expand Down
17 changes: 6 additions & 11 deletions api/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
MaxTmpFsInodes uint64 `json:"max_tmpfs_inodes"`
DisableReadOnlyRootFs bool `json:"disable_readonly_rootfs"`
DisableDebugUserLogs bool `json:"disable_debug_user_logs"`
UserLogLevel string `json:"user_log_level"`
IOFSEnableTmpfs bool `json:"iofs_enable_tmpfs"`
IOFSAgentPath string `json:"iofs_path"`
IOFSMountRoot string `json:"iofs_mount_root"`
Expand Down Expand Up @@ -73,16 +74,14 @@ const (
EnvHotPoll = "FN_HOT_POLL_MSECS"
// EnvHotLauncherTimeout is the timeout for a hot container queue to persist if idle
EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS"
// EnvHotStartTimeout is the timeout for a hot container to be created including docker-pull
// EnvHotPullTimeout is the timeout for a hot container to be created including docker-pull
EnvHotPullTimeout = "FN_HOT_PULL_TIMEOUT_MSECS"
// EnvHotStartTimeout is the timeout for a hot container to become available for use for requests after EnvHotStartTimeout
EnvHotStartTimeout = "FN_HOT_START_TIMEOUT_MSECS"
// EnvMaxResponseSize is the maximum number of bytes that a function may return from an invocation
EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE"
// EnvHdrMaxResponseSize is the maximum number of bytes that a function may return in an invocation header
// EnvMaxHdrResponseSize is the maximum number of bytes that a function may return in an invocation header
EnvMaxHdrResponseSize = "FN_MAX_HDR_RESPONSE_SIZE"
// EnvMaxLogSize is the maximum size that a function's log may reach
EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES"
// EnvMaxTotalCPU is the maximum CPU that will be reserved across all containers
EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS"
// EnvMaxTotalMemory is the maximum memory that will be reserved across all containers
Expand Down Expand Up @@ -121,6 +120,8 @@ const (
EnvDisableReadOnlyRootFs = "FN_DISABLE_READONLY_ROOTFS"
// EnvDisableDebugUserLogs disables user function logs being logged at level debug. wise to enable for production.
EnvDisableDebugUserLogs = "FN_DISABLE_DEBUG_USER_LOGS"
// EnvUserLogLevel is the logging level to use for user's function logs to be logged by fn. to disable explicitly, use FN_DISABLE_DEBUG_USER_LOGS
EnvUserLogLevel = "FN_USER_LOG_LEVEL"

// EnvIOFSEnableTmpfs enables creating a per-container tmpfs mount for the IOFS
EnvIOFSEnableTmpfs = "FN_IOFS_TMPFS"
Expand Down Expand Up @@ -155,7 +156,6 @@ const (
func NewConfig() (*Config, error) {
cfg := &Config{
MinDockerVersion: "17.10.0-ce",
MaxLogSize: 1 * 1024 * 1024,
PreForkImage: "busybox",
PreForkCmd: "tail -f /dev/null",
}
Expand All @@ -175,7 +175,6 @@ func NewConfig() (*Config, error) {
err = setEnvMsecs(err, EnvDetachedHeadroom, &cfg.DetachedHeadRoom, time.Duration(360)*time.Second)
err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize, nil)
err = setEnvUint(err, EnvMaxHdrResponseSize, &cfg.MaxHdrResponseSize, nil)
err = setEnvUint(err, EnvMaxLogSize, &cfg.MaxLogSize, nil)
err = setEnvUint(err, EnvMaxTotalCPU, &cfg.MaxTotalCPU, nil)
err = setEnvUint(err, EnvMaxTotalMemory, &cfg.MaxTotalMemory, nil)
err = setEnvUint(err, EnvMaxFsSize, &cfg.MaxFsSize, nil)
Expand All @@ -201,6 +200,7 @@ func NewConfig() (*Config, error) {
err = setEnvBool(err, EnvEnableNBResourceTracker, &cfg.EnableNBResourceTracker)
err = setEnvBool(err, EnvDisableReadOnlyRootFs, &cfg.DisableReadOnlyRootFs)
err = setEnvBool(err, EnvDisableDebugUserLogs, &cfg.DisableDebugUserLogs)
err = setEnvStr(err, EnvUserLogLevel, &cfg.UserLogLevel)
err = setEnvUint(err, EnvImageCleanMaxSize, &cfg.ImageCleanMaxSize, nil)
err = setEnvStr(err, EnvImageCleanExemptTags, &cfg.ImageCleanExemptTags)
err = setEnvBool(err, EnvImageEnableVolume, &cfg.ImageEnableVolume)
Expand All @@ -209,11 +209,6 @@ func NewConfig() (*Config, error) {
return cfg, err
}

if cfg.MaxLogSize > math.MaxInt64 {
// for safety during uint64 to int conversions in Write()/Read(), etc.
return cfg, fmt.Errorf("error invalid %s %v > %v", EnvMaxLogSize, cfg.MaxLogSize, math.MaxInt64)
}

return cfg, nil
}

Expand Down
Loading