Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved and batched logs translation #2694

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions internal/stanza/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package stanza

import (
"time"

"github.com/open-telemetry/opentelemetry-log-collection/operator"
"go.opentelemetry.io/collector/config/configmodels"
"gopkg.in/yaml.v2"
Expand All @@ -24,13 +26,25 @@ import (
type BaseConfig struct {
configmodels.ReceiverSettings `mapstructure:",squash"`
Operators OperatorConfigs `mapstructure:"operators"`
Converter ConverterConfig `mapstructure:"converter"`
}

// OperatorConfigs is an alias that allows for unmarshaling outside of mapstructure
// Stanza operators should will be migrated to mapstructure for greater compatibility
// but this allows a temporary solution
type OperatorConfigs []map[string]interface{}

// ConverterConfig controls how the internal entry.Entry to pdata.Logs converter
// works.
type ConverterConfig struct {
// MaxFlushCount defines the maximum number of entries that can be
// accumulated before flushing them for further processing.
MaxFlushCount uint `mapstructure:"max_flush_count"`
// FlushInterval defines how often to flush the converted and accumulated
// log entries.
FlushInterval time.Duration `mapstructure:"flush_interval"`
}

// InputConfig is an alias that allows unmarshaling outside of mapstructure
// This is meant to be used only for the input operator
type InputConfig map[string]interface{}
Expand Down
273 changes: 255 additions & 18 deletions internal/stanza/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,286 @@
package stanza

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

// Convert a stanza-style entry to a pdata.Logs
func Convert(obsLog *entry.Entry) pdata.Logs {
const (
DefaultFlushInterval = 100 * time.Millisecond
DefaultMaxFlushCount = 100
)

// Converter converts entry.Entry into pdata.Logs aggregating translated
// entries into logs coming from the same Resource.
// Logs are being sent out based on the flush interval and/or the maximum
// batch size.
type Converter struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add comments that describes the design and why we have a separate flushChan. It is not obvious when reading the code.

// pLogsChan is a channel on which batched logs will be sent to.
pLogsChan chan pdata.Logs

stopOnce sync.Once
stopChan chan struct{}

// flushInterval defines how often we flush the aggregated log entries.
flushInterval time.Duration
// maxFlushCount defines what's the amount of entries in the buffer that
// will trigger a flush of log entries.
maxFlushCount uint
// flushChan is an internal channel used for transporting batched pdata.Logs.
flushChan chan []pdata.Logs

// data is the internal cache which is flushed regularly, either when
// flushInterval ticker ticks or when max number of entries for a
// particular Resource is reached.
data map[string][]*entry.Entry
dataMutex sync.RWMutex
dataCount uint

// wg is a WaitGroup that guards that the spun up goroutines exit when Stop()
// is called.
wg sync.WaitGroup
pmalek-sumo marked this conversation as resolved.
Show resolved Hide resolved

logger *zap.Logger
}

type ConverterOption interface {
apply(*Converter)
}

type optionFunc func(*Converter)

func (f optionFunc) apply(c *Converter) {
f(c)
}

func WithFlushInterval(interval time.Duration) ConverterOption {
return optionFunc(func(c *Converter) {
c.flushInterval = interval
})
}

func WithMaxFlushCount(count uint) ConverterOption {
return optionFunc(func(c *Converter) {
c.maxFlushCount = count
})
}

func WithLogger(logger *zap.Logger) ConverterOption {
return optionFunc(func(c *Converter) {
c.logger = logger
})
}

func NewConverter(opts ...ConverterOption) *Converter {
c := &Converter{
pLogsChan: make(chan pdata.Logs),
stopChan: make(chan struct{}),
logger: zap.NewNop(),
flushChan: make(chan []pdata.Logs),
flushInterval: DefaultFlushInterval,
maxFlushCount: DefaultMaxFlushCount,
data: make(map[string][]*entry.Entry),
wg: sync.WaitGroup{},
}

for _, opt := range opts {
opt.apply(c)
}

return c
}

func (c *Converter) Start() {
c.logger.Debug("Starting log converter")

c.wg.Add(1)
go func(c *Converter) {
defer c.wg.Done()

ticker := time.NewTicker(c.flushInterval)
defer ticker.Stop()

for {
select {
case <-c.stopChan:
return

case pLogs := <-c.flushChan:
if err := c.flush(context.Background(), pLogs); err != nil {
c.logger.Debug("Problem sending log entries",
zap.Error(err),
)
}
// NOTE:
// Since we've received a flush signal independently of flush
// ticker do we want to reset the flush ticker?
djaglowski marked this conversation as resolved.
Show resolved Hide resolved

case <-ticker.C:
c.dataMutex.RLock()
count := c.dataCount
c.dataMutex.RUnlock()
if count > 0 {
pLogs := c.convertBuffer()
go c.queueForFlush(pLogs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear why queueForFlush needs to be in a separate goroutine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my attempt to not introduce backpressure for the caller of Batch().

We might take a different approach introducing configurable amount of workers that would consume consume the logs queued for flushing in a round robin fashion but this would introduce backpressure to the caller whenever flushing stalls.

}
}
}
}(c)
}

func (c *Converter) Stop() {
c.stopOnce.Do(func() {
close(c.stopChan)
c.wg.Wait()
close(c.pLogsChan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to close this channel? How are external readers expected to drain any items pending in pLogsChan? You can't drain until you call Stop because more data may be arriving and you can't drain after you call Stop since the channel is now closed. Is this intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation pLogsChan is unbuffered so there won't be any logs to flush from it.

My reasoning here was as follows:

  • we close the stopChan which signals with internal goroutines to return
  • we wait for them to return (to prevent producing dangling goroutines)
  • we close the pLogsChan to signal the consumers that they can stop consuming (gracefully stopping downstream consumers/pipelines)

If I missed anything or there are reasons to introduce buffering to the channel please let me know.

})
}

// Channel returns the channel on which converted entries will be sent to.
func (c *Converter) OutChannel() <-chan pdata.Logs {
return c.pLogsChan
}

// flush flushes provided pdata.Logs entries onto a channel.
func (c *Converter) flush(ctx context.Context, pLogs []pdata.Logs) error {
doneChan := ctx.Done()

for _, pLog := range pLogs {
select {
case <-doneChan:
return fmt.Errorf("flushing log entries interrupted, err: %v", ctx.Err())

case c.pLogsChan <- pLog:

// The converter has been stopped so bail the flush.
case <-c.stopChan:
return nil
}
}

return nil
}

// Batch takes in an entry.Entry and aggregates it with other entries
// that came from the same Resource.
// If the maxFlushCount has been reached then trigger a flush via the flushChan.
func (c *Converter) Batch(e *entry.Entry) error {
b, err := json.Marshal(e.Resource)
if err != nil {
return err
}

resource := string(b)

c.dataMutex.Lock()

resourceEntries, ok := c.data[resource]
if !ok {
// If we don't have any log entries for this Resource then create
// the provider entry in the cache for it.
resourceEntries = make([]*entry.Entry, 0, 1)
}

c.data[resource] = append(resourceEntries, e)
c.dataCount++

needToFlush := c.dataCount >= c.maxFlushCount
c.dataMutex.Unlock()

// Flush max size has been reached: schedule a log flush.
if needToFlush {
pLogs := c.convertBuffer()
go c.queueForFlush(pLogs)
}

return nil
}

// convertBuffer converts the accumulated entries in the buffer and empties it.
func (c *Converter) convertBuffer() []pdata.Logs {
c.dataMutex.Lock()
pLogs := make([]pdata.Logs, 0, len(c.data))
for h, entries := range c.data {
pLogs = append(pLogs, convertEntries(entries))
delete(c.data, h)
}
c.dataCount = 0
c.dataMutex.Unlock()

return pLogs
}

// queueForFlush queues the provided slice of pdata.Logs for flushing.
func (c *Converter) queueForFlush(pLogs []pdata.Logs) {
select {
case c.flushChan <- pLogs:
case <-c.stopChan:
}
}

// convertEntries converts takes in a slice of entries coming from the same
// Resource and converts them into a pdata.Logs.
func convertEntries(entries []*entry.Entry) pdata.Logs {
out := pdata.NewLogs()
if len(entries) == 0 {
return out
}

logs := out.ResourceLogs()
logs.Resize(1)
rls := logs.At(0)

resource := rls.Resource()
if len(obsLog.Resource) > 0 {
// NOTE: This assumes that passed in entries all come from the same Resource.
if len(entries[0].Resource) > 0 {
resource := rls.Resource()
resourceAtts := resource.Attributes()
for k, v := range obsLog.Resource {
for k, v := range entries[0].Resource {
resourceAtts.InsertString(k, v)
}
}

rls.InstrumentationLibraryLogs().Resize(1)
ills := rls.InstrumentationLibraryLogs().At(0)
ills.Logs().Resize(len(entries))

lr := pdata.NewLogRecord()
lr.SetTimestamp(pdata.TimestampFromTime(obsLog.Timestamp))
for i := 0; i < len(entries); i++ {
pmalek-sumo marked this conversation as resolved.
Show resolved Hide resolved
ent := entries[i]
convertInto(ent, ills.Logs().At(i))
}

sevText, sevNum := convertSeverity(obsLog.Severity)
lr.SetSeverityText(sevText)
lr.SetSeverityNumber(sevNum)
return out
}

// convert converts one entry.Entry into pdata.LogRecord allocating it.
func convert(ent *entry.Entry) pdata.LogRecord {
dest := pdata.NewLogRecord()
convertInto(ent, dest)
return dest
}

// convertInto converts entry.Entry into provided pdata.LogRecord.
func convertInto(ent *entry.Entry, dest pdata.LogRecord) {
dest.SetTimestamp(pdata.TimestampFromTime(ent.Timestamp))

if len(obsLog.Attributes) > 0 {
attributes := lr.Attributes()
for k, v := range obsLog.Attributes {
sevText, sevNum := convertSeverity(ent.Severity)
dest.SetSeverityText(sevText)
dest.SetSeverityNumber(sevNum)

if len(ent.Attributes) > 0 {
attributes := dest.Attributes()
for k, v := range ent.Attributes {
attributes.InsertString(k, v)
}
}

insertToAttributeVal(obsLog.Record, lr.Body())

ills.Logs().Append(lr)

return out
insertToAttributeVal(ent.Record, dest.Body())
}

func insertToAttributeVal(value interface{}, dest pdata.AttributeValue) {
Expand Down
Loading