Skip to content

Commit

Permalink
feature: use ringBuffer to make log not blocking and configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <[email protected]>
  • Loading branch information
starnop committed Nov 22, 2018
1 parent 4d421ec commit 32d02ed
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 79 deletions.
22 changes: 22 additions & 0 deletions daemon/containerio/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/alibaba/pouch/daemon/logger"
"github.com/alibaba/pouch/daemon/logger/crilog"
"github.com/alibaba/pouch/daemon/logger/logbuffer"
"github.com/alibaba/pouch/pkg/multierror"
"github.com/alibaba/pouch/pkg/streams"

Expand Down Expand Up @@ -47,6 +48,9 @@ type IO struct {
logdriver logger.LogDriver
logcopier *logger.LogCopier
criLog *crilog.Log

nonBlock bool
maxBufferSize int64
}

// NewIO return IO instance.
Expand Down Expand Up @@ -87,6 +91,16 @@ func (ctrio *IO) SetLogDriver(logdriver logger.LogDriver) {
ctrio.logdriver = logdriver
}

// SetMaxBufferSize set the max size of buffer.
func (ctrio *IO) SetMaxBufferSize(maxBufferSize int64) {
ctrio.maxBufferSize = maxBufferSize
}

// SetNonBlock whether to cache the container's logs with buffer.
func (ctrio *IO) SetNonBlock(nonBlock bool) {
ctrio.nonBlock = nonBlock
}

// Stream is used to export the stream field.
func (ctrio *IO) Stream() *streams.Stream {
return ctrio.stream
Expand Down Expand Up @@ -188,6 +202,14 @@ func (ctrio *IO) startLogging() error {
return nil
}

if ctrio.nonBlock {
logDriver, err := logbuffer.NewLogBuffer(ctrio.logdriver, ctrio.maxBufferSize)
if err != nil {
return err
}
ctrio.logdriver = logDriver
}

ctrio.logcopier = logger.NewLogCopier(ctrio.logdriver, map[string]io.Reader{
"stdout": ctrio.stream.NewStdoutPipe(),
"stderr": ctrio.stream.NewStderrPipe(),
Expand Down
10 changes: 6 additions & 4 deletions pkg/ringbuffer/list.go → daemon/logger/logbuffer/list.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package ringbuffer
package logbuffer

import (
"sync"

"github.com/alibaba/pouch/daemon/logger"
)

var elemPool = &sync.Pool{New: func() interface{} { return new(element) }}

type element struct {
next, prev *element
val interface{}
val *logger.LogMessage
}

func (e *element) reset() {
Expand All @@ -34,7 +36,7 @@ func (q *queue) size() int {
return q.count
}

func (q *queue) enqueue(val interface{}) {
func (q *queue) enqueue(val *logger.LogMessage) {
elem := elemPool.Get().(*element)
elem.val = val

Expand All @@ -47,7 +49,7 @@ func (q *queue) enqueue(val interface{}) {
q.count++
}

func (q *queue) dequeue() interface{} {
func (q *queue) dequeue() *logger.LogMessage {
if q.size() == 0 {
return nil
}
Expand Down
61 changes: 61 additions & 0 deletions daemon/logger/logbuffer/logbuff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package logbuffer

import (
"github.com/alibaba/pouch/daemon/logger"

"github.com/sirupsen/logrus"
)

// LogBuffer is uses to cache the container's logs with ringBuffer.
type LogBuffer struct {
ringBuffer *RingBuffer
logger logger.LogDriver
}

// NewLogBuffer return a new BufferLog.
func NewLogBuffer(logDriver logger.LogDriver, maxBytes int64) (logger.LogDriver, error) {
bl := &LogBuffer{
logger: logDriver,
ringBuffer: NewRingBuffer(maxBytes),
}

// use a goroutine to write logs continuously with specified log driver
go bl.run()
return bl, nil
}

// Name return the log driver's name.
func (bl *LogBuffer) Name() string {
return bl.logger.Name()
}

// WriteLogMessage will write the LogMessage to the ringBuffer.
func (bl *LogBuffer) WriteLogMessage(msg *logger.LogMessage) error {
return bl.ringBuffer.Push(msg)
}

// Close close the ringBuffer and drain the messages.
func (bl *LogBuffer) Close() error {
bl.ringBuffer.Close()
for _, msg := range bl.ringBuffer.Drain() {
if err := bl.logger.WriteLogMessage(msg); err != nil {
logrus.Debugf("failed to write log %v when closing with log driver %s", msg, bl.logger.Name())
}
}

return bl.logger.Close()
}

// write logs continuously with specified log driver from ringBuffer.
func (bl *LogBuffer) run() {
for {
msg, err := bl.ringBuffer.Pop()
if err != nil {
return
}

if err := bl.logger.WriteLogMessage(msg); err != nil {
logrus.Debugf("failed to write log %v with log driver %s", msg, bl.logger.Name())
}
}
}
50 changes: 29 additions & 21 deletions pkg/ringbuffer/ringbuff.go → daemon/logger/logbuffer/ringbuff.go
Original file line number Diff line number Diff line change
@@ -1,69 +1,75 @@
package ringbuffer
package logbuffer

import (
"fmt"
"sync"

"github.com/alibaba/pouch/daemon/logger"
)

// ErrClosed is used to indicate the ringbuffer has been closed.
var ErrClosed = fmt.Errorf("closed")

const defaultSize = 1024
const (
defaultMaxBytes = 1e6 //1MB
)

// RingBuffer implements a fixed-size buffer which will drop oldest data if full.
type RingBuffer struct {
mu sync.Mutex
wait *sync.Cond

cap int
closed bool
q *queue

maxBytes int64
currentBytes int64
}

// New creates new RingBuffer.
func New(cap int) *RingBuffer {
if cap <= 0 {
cap = defaultSize
// NewRingBuffer creates new RingBuffer.
func NewRingBuffer(maxBytes int64) *RingBuffer {
if maxBytes < 0 {
maxBytes = defaultMaxBytes
}

rb := &RingBuffer{
cap: cap,
closed: false,
q: newQueue(),
closed: false,
q: newQueue(),
maxBytes: maxBytes,
}
rb.wait = sync.NewCond(&rb.mu)
return rb
}

// Push pushes value into buffer and return whether it covers the oldest data
// or not.
func (rb *RingBuffer) Push(val interface{}) (bool, error) {
func (rb *RingBuffer) Push(val *logger.LogMessage) error {
rb.mu.Lock()
defer rb.mu.Unlock()

if rb.closed {
return false, ErrClosed
return ErrClosed
}

if val == nil {
return false, nil
return nil
}

// drop the oldest element if covered
covered := (rb.q.size() == rb.cap)
if covered {
rb.q.dequeue()
msgLength := int64(len(val.Line))
if (rb.currentBytes + msgLength) > rb.maxBytes {
rb.wait.Broadcast()
return nil
}

rb.q.enqueue(val)
rb.wait.Broadcast()
return covered, nil
return nil
}

// Pop pops the value in the buffer.
//
// NOTE: it returns ErrClosed if the buffer has been closed.
func (rb *RingBuffer) Pop() (interface{}, error) {
func (rb *RingBuffer) Pop() (*logger.LogMessage, error) {
rb.mu.Lock()
for rb.q.size() == 0 && !rb.closed {
rb.wait.Wait()
Expand All @@ -75,23 +81,25 @@ func (rb *RingBuffer) Pop() (interface{}, error) {
}

val := rb.q.dequeue()
rb.currentBytes -= int64(len(val.Line))
rb.mu.Unlock()
return val, nil
}

// Drain returns all the data in the buffer.
//
// NOTE: it can be used after closed to make sure the data have been consumed.
func (rb *RingBuffer) Drain() []interface{} {
func (rb *RingBuffer) Drain() []*logger.LogMessage {
rb.mu.Lock()
defer rb.mu.Unlock()

size := rb.q.size()
vals := make([]interface{}, 0, size)
vals := make([]*logger.LogMessage, 0, size)

for i := 0; i < size; i++ {
vals = append(vals, rb.q.dequeue())
}
rb.currentBytes = 0
return vals
}

Expand Down
Loading

0 comments on commit 32d02ed

Please sign in to comment.