diff --git a/daemon/containerio/io.go b/daemon/containerio/io.go index 30e5a9b0c4..e1915cfbb5 100644 --- a/daemon/containerio/io.go +++ b/daemon/containerio/io.go @@ -6,6 +6,7 @@ import ( "github.com/alibaba/pouch/daemon/logger" "github.com/alibaba/pouch/daemon/logger/crilog" + "github.com/alibaba/pouch/daemon/logger/logbuffer" "github.com/alibaba/pouch/pkg/multierror" "github.com/alibaba/pouch/pkg/streams" @@ -47,6 +48,9 @@ type IO struct { logdriver logger.LogDriver logcopier *logger.LogCopier criLog *crilog.Log + + nonBlock bool + maxBufferSize int64 } // NewIO return IO instance. @@ -87,6 +91,16 @@ func (ctrio *IO) SetLogDriver(logdriver logger.LogDriver) { ctrio.logdriver = logdriver } +// SetMaxBufferSize set the max size of buffer. +func (ctrio *IO) SetMaxBufferSize(maxBufferSize int64) { + ctrio.maxBufferSize = maxBufferSize +} + +// SetNonBlock whether to cache the container's logs with buffer. +func (ctrio *IO) SetNonBlock(nonBlock bool) { + ctrio.nonBlock = nonBlock +} + // Stream is used to export the stream field. func (ctrio *IO) Stream() *streams.Stream { return ctrio.stream @@ -188,6 +202,14 @@ func (ctrio *IO) startLogging() error { return nil } + if ctrio.nonBlock { + logDriver, err := logbuffer.NewLogBuffer(ctrio.logdriver, ctrio.maxBufferSize) + if err != nil { + return err + } + ctrio.logdriver = logDriver + } + ctrio.logcopier = logger.NewLogCopier(ctrio.logdriver, map[string]io.Reader{ "stdout": ctrio.stream.NewStdoutPipe(), "stderr": ctrio.stream.NewStderrPipe(), diff --git a/pkg/ringbuffer/list.go b/daemon/logger/logbuffer/list.go similarity index 80% rename from pkg/ringbuffer/list.go rename to daemon/logger/logbuffer/list.go index 64157cdb7b..e7873f26bc 100644 --- a/pkg/ringbuffer/list.go +++ b/daemon/logger/logbuffer/list.go @@ -1,14 +1,16 @@ -package ringbuffer +package logbuffer import ( "sync" + + "github.com/alibaba/pouch/daemon/logger" ) var elemPool = &sync.Pool{New: func() interface{} { return new(element) }} type element struct { next, prev *element - val interface{} + val *logger.LogMessage } func (e *element) reset() { @@ -34,7 +36,7 @@ func (q *queue) size() int { return q.count } -func (q *queue) enqueue(val interface{}) { +func (q *queue) enqueue(val *logger.LogMessage) { elem := elemPool.Get().(*element) elem.val = val @@ -47,7 +49,7 @@ func (q *queue) enqueue(val interface{}) { q.count++ } -func (q *queue) dequeue() interface{} { +func (q *queue) dequeue() *logger.LogMessage { if q.size() == 0 { return nil } diff --git a/daemon/logger/logbuffer/logbuff.go b/daemon/logger/logbuffer/logbuff.go new file mode 100644 index 0000000000..342b375f10 --- /dev/null +++ b/daemon/logger/logbuffer/logbuff.go @@ -0,0 +1,65 @@ +package logbuffer + +import ( + "github.com/alibaba/pouch/daemon/logger" + + "github.com/sirupsen/logrus" +) + +// LogBuffer is uses to cache the container's logs with ringBuffer. +type LogBuffer struct { + ringBuffer *RingBuffer + logger logger.LogDriver +} + +// NewLogBuffer return a new BufferLog. +func NewLogBuffer(logDriver logger.LogDriver, maxBytes int64) (logger.LogDriver, error) { + bl := &LogBuffer{ + logger: logDriver, + ringBuffer: New(-1, maxBytes), + } + + // use a goroutine to write logs continuously with specified log driver + go bl.run() + return bl, nil +} + +// Name return the log driver's name. +func (bl *LogBuffer) Name() string { + return bl.logger.Name() +} + +// WriteLogMessage will write the LogMessage to the ringBuffer. +func (bl *LogBuffer) WriteLogMessage(msg *logger.LogMessage) error { + covered, err := bl.ringBuffer.Push(msg) + if covered { + logrus.Debugf("data coverage occurs: %v", msg) + } + return err +} + +// Close close the ringBuffer and drain the messages. +func (bl *LogBuffer) Close() error { + bl.ringBuffer.Close() + for _, msg := range bl.ringBuffer.Drain() { + if err := bl.logger.WriteLogMessage(msg); err != nil { + logrus.Debugf("failed to write log %v when closing with log driver %s", msg, bl.logger.Name()) + } + } + + return bl.logger.Close() +} + +// write logs continuously with specified log driver from ringBuffer. +func (bl *LogBuffer) run() { + for { + msg, err := bl.ringBuffer.Pop() + if err != nil { + return + } + + if err := bl.logger.WriteLogMessage(msg); err != nil { + logrus.Debugf("failed to write log %v with log driver %s", msg, bl.logger.Name()) + } + } +} diff --git a/pkg/ringbuffer/ringbuff.go b/daemon/logger/logbuffer/ringbuff.go similarity index 67% rename from pkg/ringbuffer/ringbuff.go rename to daemon/logger/logbuffer/ringbuff.go index f0c3a91e47..d0d606f55c 100644 --- a/pkg/ringbuffer/ringbuff.go +++ b/daemon/logger/logbuffer/ringbuff.go @@ -1,14 +1,19 @@ -package ringbuffer +package logbuffer import ( "fmt" "sync" + + "github.com/alibaba/pouch/daemon/logger" ) // ErrClosed is used to indicate the ringbuffer has been closed. var ErrClosed = fmt.Errorf("closed") -const defaultSize = 1024 +const ( + defaultSize = 1024 + defaultMaxBytes = 1e6 //1MB +) // RingBuffer implements a fixed-size buffer which will drop oldest data if full. type RingBuffer struct { @@ -18,18 +23,25 @@ type RingBuffer struct { cap int closed bool q *queue + + maxBytes int64 + currentBytes int64 } // New creates new RingBuffer. -func New(cap int) *RingBuffer { +func New(cap int, maxBytes int64) *RingBuffer { if cap <= 0 { cap = defaultSize } + if maxBytes < 0 { + maxBytes = defaultMaxBytes + } rb := &RingBuffer{ - cap: cap, - closed: false, - q: newQueue(), + cap: cap, + closed: false, + q: newQueue(), + maxBytes: maxBytes, } rb.wait = sync.NewCond(&rb.mu) return rb @@ -37,7 +49,7 @@ func New(cap int) *RingBuffer { // Push pushes value into buffer and return whether it covers the oldest data // or not. -func (rb *RingBuffer) Push(val interface{}) (bool, error) { +func (rb *RingBuffer) Push(val *logger.LogMessage) (bool, error) { rb.mu.Lock() defer rb.mu.Unlock() @@ -49,6 +61,12 @@ func (rb *RingBuffer) Push(val interface{}) (bool, error) { return false, nil } + msgLength := int64(len(val.Line)) + if (rb.currentBytes + msgLength) > rb.maxBytes { + rb.wait.Broadcast() + return false, nil + } + // drop the oldest element if covered covered := (rb.q.size() == rb.cap) if covered { @@ -63,7 +81,7 @@ func (rb *RingBuffer) Push(val interface{}) (bool, error) { // Pop pops the value in the buffer. // // NOTE: it returns ErrClosed if the buffer has been closed. -func (rb *RingBuffer) Pop() (interface{}, error) { +func (rb *RingBuffer) Pop() (*logger.LogMessage, error) { rb.mu.Lock() for rb.q.size() == 0 && !rb.closed { rb.wait.Wait() @@ -75,6 +93,7 @@ func (rb *RingBuffer) Pop() (interface{}, error) { } val := rb.q.dequeue() + rb.currentBytes -= int64(len(val.Line)) rb.mu.Unlock() return val, nil } @@ -82,16 +101,17 @@ func (rb *RingBuffer) Pop() (interface{}, error) { // Drain returns all the data in the buffer. // // NOTE: it can be used after closed to make sure the data have been consumed. -func (rb *RingBuffer) Drain() []interface{} { +func (rb *RingBuffer) Drain() []*logger.LogMessage { rb.mu.Lock() defer rb.mu.Unlock() size := rb.q.size() - vals := make([]interface{}, 0, size) + vals := make([]*logger.LogMessage, 0, size) for i := 0; i < size; i++ { vals = append(vals, rb.q.dequeue()) } + rb.currentBytes = 0 return vals } diff --git a/pkg/ringbuffer/ringbuff_test.go b/daemon/logger/logbuffer/ringbuff_test.go similarity index 58% rename from pkg/ringbuffer/ringbuff_test.go rename to daemon/logger/logbuffer/ringbuff_test.go index ebf403596a..ad2aedd9e7 100644 --- a/pkg/ringbuffer/ringbuff_test.go +++ b/daemon/logger/logbuffer/ringbuff_test.go @@ -1,47 +1,77 @@ -package ringbuffer +package logbuffer import ( "reflect" + "strconv" "sync" "testing" "time" + + "github.com/alibaba/pouch/daemon/logger" ) func TestPushNormal(t *testing.T) { count := 5 - rb := New(count) + rb := New(count, defaultMaxBytes) // make the buffer full for i := 0; i < count; i++ { - covered, err := rb.Push(i) + covered, err := rb.Push(wrapLogWithInt(i)) assertHelper(t, false, covered, "unexpected to drop data") assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) } // continue to push new data for i := 0; i < count; i++ { - covered, err := rb.Push(i + count) + covered, err := rb.Push(wrapLogWithInt(i + count)) assertHelper(t, true, covered, "expected to drop data, but not") assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) } // check the buffer data - expectedDump := make([]interface{}, 0, count) + expectedDump := make([]*logger.LogMessage, 0, count) for i := 0; i < count; i++ { - expectedDump = append(expectedDump, count+i) + expectedDump = append(expectedDump, wrapLogWithInt(count+i)) } got := rb.Drain() assertHelper(t, expectedDump, got, "expected return %v, but got %v", expectedDump, got) } +func TestPushMaxBytes(t *testing.T) { + count := 5 + rb := New(count, 1024) + + b := make([]byte, 1024) + extraB := []byte{1} + + // push bytes of max size + _, err := rb.Push(wrapLogWithByte(b)) + assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) + + // continue to push new data + _, err = rb.Push(wrapLogWithByte(extraB)) + assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) + + // get data + logMsg, err := rb.Pop() + expectedDump := wrapLogWithByte(b) + assertHelper(t, nil, err, "unexpected error during pop: %v", err) + assertHelper(t, expectedDump, logMsg, "expected return %v, but got %v", expectedDump, logMsg) + + // get drain data + got := rb.Drain() + expectedLogs := []*logger.LogMessage{wrapLogWithByte(extraB)} + assertHelper(t, expectedLogs, got, "expected return %v, but got %v", expectedLogs, got) +} + func TestPopNormal(t *testing.T) { count := 5 - rb := New(count) + rb := New(count, defaultMaxBytes) // make the buffer full for i := 0; i < count; i++ { - covered, err := rb.Push(i) + covered, err := rb.Push(wrapLogWithInt(i)) assertHelper(t, false, covered, "unexpected to drop data") assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) } @@ -49,7 +79,7 @@ func TestPopNormal(t *testing.T) { for i := 0; i < count; i++ { val, err := rb.Pop() assertHelper(t, nil, err, "unexpected error during pop: %v", err) - assertHelper(t, i, val, "expected to have %v, but got %v", i, val) + assertHelper(t, wrapLogWithInt(i), val, "expected to have %v, but got %v", i, val) } assertHelper(t, 0, rb.q.size(), "expected to have empty queue, but got %d size of queue", rb.q.size()) @@ -59,36 +89,36 @@ func TestPopNormal(t *testing.T) { func TestPushAndPop(t *testing.T) { count := 5 - rb := New(count) + rb := New(count, defaultMaxBytes) for _, v := range []int{1, 3, 5} { - rb.Push(v) + rb.Push(wrapLogWithInt(v)) } { // get 1 without error val, err := rb.Pop() - assertHelper(t, val, 1, "expected to get 1, but got %v", val) + assertHelper(t, val, wrapLogWithInt(1), "expected to get 1, but got %v", val) assertHelper(t, nil, err, "unexpected error during pop: %v", err) } // push 4, [3, 5, 4] - rb.Push(4) + rb.Push(wrapLogWithInt(4)) { // get 3 without error val, err := rb.Pop() - assertHelper(t, val, 3, "expected to get 3, but got %v", val) + assertHelper(t, val, wrapLogWithInt(3), "expected to get 3, but got %v", val) assertHelper(t, nil, err, "unexpected error during pop: %v", err) } // push 2, [5, 4, 2] - rb.Push(2) + rb.Push(wrapLogWithInt(2)) { // get 5 without error val, err := rb.Pop() - assertHelper(t, val, 5, "expected to get 5, but got %v", val) + assertHelper(t, val, wrapLogWithInt(5), "expected to get 5, but got %v", val) assertHelper(t, nil, err, "unexpected error during pop: %v", err) } @@ -96,19 +126,19 @@ func TestPushAndPop(t *testing.T) { { // get error if push data into closed buffer - _, err := rb.Push(0) + _, err := rb.Push(wrapLogWithInt(0)) assertHelper(t, ErrClosed, err, "expected to get error(%v) when push data into closed buffer, but got error(%v)", ErrClosed, err) } // check the buffer data - expectedDump, got := []interface{}{4, 2}, rb.Drain() + expectedDump, got := []*logger.LogMessage{wrapLogWithInt(4), wrapLogWithInt(2)}, rb.Drain() assertHelper(t, expectedDump, got, "expected return %v, but got %v", expectedDump, got) } func TestPopWaitWhenNotData(t *testing.T) { count := 5 - rb := New(count) + rb := New(count, defaultMaxBytes) var ( wg sync.WaitGroup @@ -144,3 +174,15 @@ func assertHelper(t *testing.T, expected, got interface{}, format string, args . t.FailNow() } } + +func wrapLogWithInt(num int) *logger.LogMessage { + return &logger.LogMessage{ + Line: []byte(strconv.Itoa(num)), + } +} + +func wrapLogWithByte(bytes []byte) *logger.LogMessage { + return &logger.LogMessage{ + Line: bytes, + } +} diff --git a/daemon/mgr/container.go b/daemon/mgr/container.go index b4eda4a6cf..b904eb7a4f 100644 --- a/daemon/mgr/container.go +++ b/daemon/mgr/container.go @@ -35,6 +35,7 @@ import ( "github.com/containerd/cgroups" containerdtypes "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/mount" + "github.com/docker/go-units" "github.com/docker/libnetwork" "github.com/go-openapi/strfmt" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -42,6 +43,11 @@ import ( "github.com/sirupsen/logrus" ) +const ( + // LogNonBlocking means to use buffer to make logs non blocking. + LogNonBlocking = "non-blocking" +) + // ContainerMgr as an interface defines all operations against container. // ContainerMgr's functionality could be divided into three parts: // 1. regular container management; @@ -1582,10 +1588,22 @@ func (mgr *ContainerManager) initLogDriverBeforeStart(c *Container) error { } } - logDriver, err := logOptionsForContainerio(c, mgr.convContainerToLoggerInfo(c)) + logInfo := mgr.convContainerToLoggerInfo(c) + logDriver, err := logOptionsForContainerio(c, logInfo) if err != nil { return err } + + if logInfo.LogConfig["mode"] == LogNonBlocking { + if maxBufferSize, ok := logInfo.LogConfig["max-buffer-size"]; ok { + maxBytes, err := units.RAMInBytes(maxBufferSize) + if err != nil { + return errors.Wrap(err, "failed to parse option max-buffer-size") + } + cntrio.SetMaxBufferSize(maxBytes) + cntrio.SetNonBlock(true) + } + } cntrio.SetLogDriver(logDriver) return nil }