Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: use ringBuffer to make log not blocking and make it configurable #2428

Merged
merged 1 commit into from
Nov 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions daemon/containerio/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/alibaba/pouch/daemon/logger"
"github.com/alibaba/pouch/daemon/logger/crilog"
"github.com/alibaba/pouch/daemon/logger/logbuffer"
"github.com/alibaba/pouch/pkg/multierror"
"github.com/alibaba/pouch/pkg/streams"

Expand Down Expand Up @@ -47,6 +48,9 @@ type IO struct {
logdriver logger.LogDriver
logcopier *logger.LogCopier
criLog *crilog.Log

nonBlock bool
maxBufferSize int64
}

// NewIO return IO instance.
Expand Down Expand Up @@ -87,6 +91,16 @@ func (ctrio *IO) SetLogDriver(logdriver logger.LogDriver) {
ctrio.logdriver = logdriver
}

// SetMaxBufferSize set the max size of buffer.
func (ctrio *IO) SetMaxBufferSize(maxBufferSize int64) {
ctrio.maxBufferSize = maxBufferSize
}

// SetNonBlock whether to cache the container's logs with buffer.
func (ctrio *IO) SetNonBlock(nonBlock bool) {
ctrio.nonBlock = nonBlock
}

// Stream is used to export the stream field.
func (ctrio *IO) Stream() *streams.Stream {
return ctrio.stream
Expand Down Expand Up @@ -188,6 +202,14 @@ func (ctrio *IO) startLogging() error {
return nil
}

if ctrio.nonBlock {
logDriver, err := logbuffer.NewLogBuffer(ctrio.logdriver, ctrio.maxBufferSize)
if err != nil {
return err
}
ctrio.logdriver = logDriver
}

ctrio.logcopier = logger.NewLogCopier(ctrio.logdriver, map[string]io.Reader{
"stdout": ctrio.stream.NewStdoutPipe(),
"stderr": ctrio.stream.NewStderrPipe(),
Expand Down
10 changes: 6 additions & 4 deletions pkg/ringbuffer/list.go → daemon/logger/logbuffer/list.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package ringbuffer
package logbuffer

import (
"sync"

"github.com/alibaba/pouch/daemon/logger"
)

var elemPool = &sync.Pool{New: func() interface{} { return new(element) }}

type element struct {
next, prev *element
val interface{}
val *logger.LogMessage
}

func (e *element) reset() {
Expand All @@ -34,7 +36,7 @@ func (q *queue) size() int {
return q.count
}

func (q *queue) enqueue(val interface{}) {
func (q *queue) enqueue(val *logger.LogMessage) {
elem := elemPool.Get().(*element)
elem.val = val

Expand All @@ -47,7 +49,7 @@ func (q *queue) enqueue(val interface{}) {
q.count++
}

func (q *queue) dequeue() interface{} {
func (q *queue) dequeue() *logger.LogMessage {
if q.size() == 0 {
return nil
}
Expand Down
61 changes: 61 additions & 0 deletions daemon/logger/logbuffer/logbuff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package logbuffer

import (
"github.com/alibaba/pouch/daemon/logger"

"github.com/sirupsen/logrus"
)

// LogBuffer is uses to cache the container's logs with ringBuffer.
type LogBuffer struct {
ringBuffer *RingBuffer
logger logger.LogDriver
}

// NewLogBuffer return a new BufferLog.
func NewLogBuffer(logDriver logger.LogDriver, maxBytes int64) (logger.LogDriver, error) {
bl := &LogBuffer{
logger: logDriver,
ringBuffer: NewRingBuffer(maxBytes),
}

// use a goroutine to write logs continuously with specified log driver
go bl.run()
return bl, nil
}

// Name return the log driver's name.
func (bl *LogBuffer) Name() string {
return bl.logger.Name()
}

// WriteLogMessage will write the LogMessage to the ringBuffer.
func (bl *LogBuffer) WriteLogMessage(msg *logger.LogMessage) error {
return bl.ringBuffer.Push(msg)
}

// Close close the ringBuffer and drain the messages.
func (bl *LogBuffer) Close() error {
bl.ringBuffer.Close()
for _, msg := range bl.ringBuffer.Drain() {
if err := bl.logger.WriteLogMessage(msg); err != nil {
logrus.Debugf("failed to write log %v when closing with log driver %s", msg, bl.logger.Name())
}
}

return bl.logger.Close()
}

// write logs continuously with specified log driver from ringBuffer.
func (bl *LogBuffer) run() {
for {
msg, err := bl.ringBuffer.Pop()
if err != nil {
return
}

if err := bl.logger.WriteLogMessage(msg); err != nil {
logrus.Debugf("failed to write log %v with log driver %s", msg, bl.logger.Name())
}
}
}
50 changes: 29 additions & 21 deletions pkg/ringbuffer/ringbuff.go → daemon/logger/logbuffer/ringbuff.go
Original file line number Diff line number Diff line change
@@ -1,69 +1,75 @@
package ringbuffer
package logbuffer

import (
"fmt"
"sync"

"github.com/alibaba/pouch/daemon/logger"
)

// ErrClosed is used to indicate the ringbuffer has been closed.
var ErrClosed = fmt.Errorf("closed")

const defaultSize = 1024
const (
defaultMaxBytes = 1e6 //1MB
)

// RingBuffer implements a fixed-size buffer which will drop oldest data if full.
type RingBuffer struct {
mu sync.Mutex
wait *sync.Cond

cap int
closed bool
q *queue

maxBytes int64
currentBytes int64
}

// New creates new RingBuffer.
func New(cap int) *RingBuffer {
if cap <= 0 {
cap = defaultSize
// NewRingBuffer creates new RingBuffer.
func NewRingBuffer(maxBytes int64) *RingBuffer {
if maxBytes < 0 {
maxBytes = defaultMaxBytes
}

rb := &RingBuffer{
cap: cap,
closed: false,
q: newQueue(),
closed: false,
q: newQueue(),
maxBytes: maxBytes,
}
rb.wait = sync.NewCond(&rb.mu)
return rb
}

// Push pushes value into buffer and return whether it covers the oldest data
// or not.
func (rb *RingBuffer) Push(val interface{}) (bool, error) {
func (rb *RingBuffer) Push(val *logger.LogMessage) error {
rb.mu.Lock()
defer rb.mu.Unlock()

if rb.closed {
return false, ErrClosed
return ErrClosed
}

if val == nil {
return false, nil
return nil
}

// drop the oldest element if covered
covered := (rb.q.size() == rb.cap)
if covered {
rb.q.dequeue()
msgLength := int64(len(val.Line))
if (rb.currentBytes + msgLength) > rb.maxBytes {
rb.wait.Broadcast()
return nil
}

rb.q.enqueue(val)
rb.wait.Broadcast()
return covered, nil
return nil
}

// Pop pops the value in the buffer.
//
// NOTE: it returns ErrClosed if the buffer has been closed.
func (rb *RingBuffer) Pop() (interface{}, error) {
func (rb *RingBuffer) Pop() (*logger.LogMessage, error) {
rb.mu.Lock()
for rb.q.size() == 0 && !rb.closed {
rb.wait.Wait()
Expand All @@ -75,23 +81,25 @@ func (rb *RingBuffer) Pop() (interface{}, error) {
}

val := rb.q.dequeue()
rb.currentBytes -= int64(len(val.Line))
rb.mu.Unlock()
return val, nil
}

// Drain returns all the data in the buffer.
//
// NOTE: it can be used after closed to make sure the data have been consumed.
func (rb *RingBuffer) Drain() []interface{} {
func (rb *RingBuffer) Drain() []*logger.LogMessage {
rb.mu.Lock()
defer rb.mu.Unlock()

size := rb.q.size()
vals := make([]interface{}, 0, size)
vals := make([]*logger.LogMessage, 0, size)

for i := 0; i < size; i++ {
vals = append(vals, rb.q.dequeue())
}
rb.currentBytes = 0
return vals
}

Expand Down
Loading