Skip to content

Commit

Permalink
Decoupled logrus and added a logger interface
Browse files Browse the repository at this point in the history
- `producer.Config` now takes a producer.Logger interface instead of
a Logrus instance.
- If not logger is provided to config, producer uses a standard library
logger by default (`producer.StandardLogger`)
- `loggers` sub package ships implementations for zap and logrus.
  • Loading branch information
owais committed Nov 21, 2018
1 parent 38d0242 commit de2fc25
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 30 deletions.
24 changes: 21 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
and using the same aggregation format that [KPL][kpl-url] use.

### Useful links
- [Documentation][godoc-url]
- [Aggregation format][aggregation-format-url]
- [Considerations When Using KPL Aggregation][kpl-aggregation]
- [Consumer De-aggregation][de-aggregation]
Expand All @@ -22,13 +23,11 @@ import (
)

func main() {
log := logrus.New()
client := kinesis.New(session.New(aws.NewConfig()))
pr := producer.New(&producer.Config{
StreamName: "test",
BacklogCount: 2000,
Client: client,
Logger: log,
Client: client
})

pr.Start()
Expand All @@ -55,6 +54,25 @@ func main() {
}
```

#### Using custom logger
`producer.Config` takes an optional `logging.Logger` implementation.

```go
customLogger := &CustomLogger{}

&producer.Config{
StreamName: "test",
BacklogCount: 2000,
Client: client,
Logger: customLogger,
}
```

kinesis-producer ships with three logger implementations.

- `producer.Standard` used the standard library logger
- `loggers.Logrus` uses logrus logger
- `loggers.Zap` uses zap logger

### License
MIT
Expand Down
9 changes: 5 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package producer

import (
"log"
"os"
"time"

k "github.com/aws/aws-sdk-go/service/kinesis"
"github.com/sirupsen/logrus"
)

// Constants and default configuration take from:
Expand Down Expand Up @@ -54,8 +55,8 @@ type Config struct {
// Number of requests to sent concurrently. Default to 24.
MaxConnections int

// Logger is the logger used. Default to logrus.Log.
Logger logrus.FieldLogger
// Logger is the logger used. Default to producer.Logger.
Logger Logger

// Enabling verbose logging. Default to false.
Verbose bool
Expand All @@ -67,7 +68,7 @@ type Config struct {
// defaults for configuration
func (c *Config) defaults() {
if c.Logger == nil {
c.Logger = logrus.New()
c.Logger = &StandardLogger{log.New(os.Stdout, "", log.LstdFlags)}
}
if c.BatchCount == 0 {
c.BatchCount = maxRecordsPerRequest
Expand Down
12 changes: 7 additions & 5 deletions example_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package producer

import (
"log"
"os"
"time"

"github.com/a8m/kinesis-producer/logging"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/sirupsen/logrus"
)

func ExampleSimple() {
log := logrus.New()
logger := &logging.Standard{log.New(os.Stdout, "", log.LstdFlags)}
client := kinesis.New(session.New(aws.NewConfig()))
pr := New(&Config{
StreamName: "test",
BacklogCount: 2000,
Client: client,
Logger: log,
Logger: logger,
})

pr.Start()
Expand All @@ -25,15 +27,15 @@ func ExampleSimple() {
go func() {
for r := range pr.NotifyFailures() {
// r contains `Data`, `PartitionKey` and `Error()`
log.Error(r)
logger.Error("detected put failure", r.error)
}
}()

go func() {
for i := 0; i < 5000; i++ {
err := pr.Put([]byte("foo"), "bar")
if err != nil {
log.WithError(err).Fatal("error producing")
logger.Error("error producing", err)
}
}
}()
Expand Down
21 changes: 21 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package producer

import (
"fmt"
)

// Logger represents a simple interface used by kinesis-producer to handle logging
type Logger interface {
Info(msg string, values ...Value)
Error(msg string, err error, values ...Value)
}

// Value represents a key:value pair used by the Logger interface
type Value struct {
Name string
Value interface{}
}

func (v Value) String() string {
return fmt.Sprintf(" %s=%s", v.Name, v.Value)
}
29 changes: 29 additions & 0 deletions loggers/logrus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package loggers

import (
producer "github.com/a8m/kinesis-producer"
"github.com/sirupsen/logrus"
)

// Logrus implements a logurs.Logger for kinesis-producer
type Logrus struct {
Logger *logrus.Logger
}

// Info logs a message
func (l *Logrus) Info(msg string, args ...producer.Value) {
l.Logger.WithFields(l.valuesToFields(args...)).Info(msg)
}

// Error logs an error
func (l *Logrus) Error(msg string, err error, args ...producer.Value) {
l.Logger.WithError(err).WithFields(l.valuesToFields(args...)).Error(msg)
}

func (l *Logrus) valuesToFields(values ...producer.Value) logrus.Fields {
fields := logrus.Fields{}
for _, v := range values {
fields[v.Name] = v.Value
}
return fields
}
32 changes: 32 additions & 0 deletions loggers/zap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package loggers

import (
"go.uber.org/zap"

producer "github.com/a8m/kinesis-producer"
)

// Zap implements a logurs.Logger for kinesis-producer
type Zap struct {
Logger *zap.Logger
}

// Info logs a message
func (l *Zap) Info(msg string, values ...producer.Value) {
l.Logger.Info(msg, l.valuesToFields(values)...)
}

// Error logs an error
func (l *Zap) Error(msg string, err error, values ...producer.Value) {
fields := l.valuesToFields(values)
fields = append(fields, zap.Error(err))
l.Logger.Info(msg, fields...)
}

func (l *Zap) valuesToFields(values []producer.Value) []zap.Field {
fields := make([]zap.Field, len(values))
for i, v := range values {
fields[i] = zap.Any(v.Name, v.Value)
}
return fields
}
36 changes: 18 additions & 18 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ package producer

import (
"errors"
"fmt"
"sync"
"time"

"github.com/sirupsen/logrus"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/jpillora/backoff"
)
Expand Down Expand Up @@ -92,7 +92,7 @@ func (p *Producer) Put(data []byte, partitionKey string) error {
p.Lock()
if needToDrain {
if record, err = p.aggregator.Drain(); err != nil {
p.Logger.WithError(err).Error("drain aggregator")
p.Logger.Error("drain aggregator", err)
}
}
p.aggregator.Put(data, partitionKey)
Expand Down Expand Up @@ -129,7 +129,7 @@ func (p *Producer) NotifyFailures() <-chan *FailureRecord {

// Start the producer
func (p *Producer) Start() {
p.Logger.WithField("stream", p.StreamName).Info("starting producer")
p.Logger.Info("starting producer", Value{"stream", p.StreamName})
go p.loop()
}

Expand All @@ -138,7 +138,7 @@ func (p *Producer) Stop() {
p.Lock()
p.stopped = true
p.Unlock()
p.Logger.WithField("backlog", len(p.records)).Info("stopping producer")
p.Logger.Info("stopping producer", Value{"backlog", len(p.records)})

// drain
if record, ok := p.drainIfNeed(); ok {
Expand Down Expand Up @@ -225,7 +225,7 @@ func (p *Producer) drainIfNeed() (*kinesis.PutRecordsRequestEntry, bool) {
record, err := p.aggregator.Drain()
p.Unlock()
if err != nil {
p.Logger.WithError(err).Error("drain aggregator")
p.Logger.Error("drain aggregator", err)
} else {
return record, true
}
Expand All @@ -243,14 +243,14 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin
defer p.semaphore.release()

for {
p.Logger.WithField("reason", reason).Infof("flush %v records", len(records))
p.Logger.Info("flushing records", Value{"reason", reason}, Value{"records", len(records)})
out, err := p.Client.PutRecords(&kinesis.PutRecordsInput{
StreamName: &p.StreamName,
Records: records,
})

if err != nil {
p.Logger.WithError(err).Error("flush")
p.Logger.Error("flush", err)
p.RLock()
notify := p.notify
p.RUnlock()
Expand All @@ -262,15 +262,15 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin

if p.Verbose {
for i, r := range out.Records {
fields := make(logrus.Fields)
values := make([]Value, 2)
if r.ErrorCode != nil {
fields["ErrorCode"] = *r.ErrorCode
fields["ErrorMessage"] = *r.ErrorMessage
values[0] = Value{"ErrorCode", *r.ErrorCode}
values[1] = Value{"ErrorMessage", *r.ErrorMessage}
} else {
fields["ShardId"] = *r.ShardId
fields["SequenceNumber"] = *r.SequenceNumber
values[0] = Value{"ShardId", *r.ShardId}
values[1] = Value{"SequenceNumber", *r.SequenceNumber}
}
p.Logger.WithFields(fields).Infof("Result[%d]", i)
p.Logger.Info(fmt.Sprintf("Result[%d]", i), values...)
}
}

Expand All @@ -281,11 +281,11 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin

duration := b.Duration()

p.Logger.WithFields(logrus.Fields{
"failures": failed,
"backoff": duration.String(),
}).Warn("put failures")

p.Logger.Info(
"put failures",
Value{"failures", failed},
Value{"backoff", duration.String()},
)
time.Sleep(duration)

// change the logging state for the next itertion
Expand Down
30 changes: 30 additions & 0 deletions standard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package producer

import (
"fmt"
"log"
"strings"
)

// StandardLogger implements the Logger interface using standard library loggers
type StandardLogger struct {
Logger *log.Logger
}

// Info prints log message to screen
func (l *StandardLogger) Info(msg string, values ...Value) {
l.Logger.Print(msg, l.valuesToString(values...))
}

// Error prints log message to screen
func (l *StandardLogger) Error(msg string, err error, values ...Value) {
l.Logger.Print(msg, l.valuesToString(values...), err)
}

func (l *StandardLogger) valuesToString(values ...Value) string {
parts := make([]string, len(values))
for i, v := range values {
parts[i] = fmt.Sprint(v)
}
return strings.Join(parts, ", ")
}

0 comments on commit de2fc25

Please sign in to comment.