Skip to content

Commit

Permalink
Merge pull request #12 from owais/decouple-logrus
Browse files Browse the repository at this point in the history
Decoupled logrus and added a logger interface
  • Loading branch information
a8m authored Nov 21, 2018
2 parents 38d0242 + da1db9a commit 2b76ac6
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 30 deletions.
44 changes: 41 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
and using the same aggregation format that [KPL][kpl-url] use.

### Useful links
- [Documentation][godoc-url]
- [Aggregation format][aggregation-format-url]
- [Considerations When Using KPL Aggregation][kpl-aggregation]
- [Consumer De-aggregation][de-aggregation]
Expand All @@ -22,13 +23,11 @@ import (
)

func main() {
log := logrus.New()
client := kinesis.New(session.New(aws.NewConfig()))
pr := producer.New(&producer.Config{
StreamName: "test",
BacklogCount: 2000,
Client: client,
Logger: log,
Client: client
})

pr.Start()
Expand All @@ -55,6 +54,45 @@ func main() {
}
```

#### Specifying logger implementation
`producer.Config` takes an optional `logging.Logger` implementation.

##### Using a custom logger
```go
customLogger := &CustomLogger{}

&producer.Config{
StreamName: "test",
BacklogCount: 2000,
Client: client,
Logger: customLogger,
}
```

#### Using logrus

```go
import (
"github.com/sirupsen/logrus"
producer "github.com/a8m/kinesis-producer"
"github.com/a8m/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{
StreamName: "test",
BacklogCount: 2000,
Client: client,
Logger: loggers.Logrus(log),
}
```

kinesis-producer ships with three logger implementations.

- `producer.Standard` used the standard library logger
- `loggers.Logrus` uses logrus logger
- `loggers.Zap` uses zap logger

### License
MIT
Expand Down
9 changes: 5 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package producer

import (
"log"
"os"
"time"

k "github.com/aws/aws-sdk-go/service/kinesis"
"github.com/sirupsen/logrus"
)

// Constants and default configuration take from:
Expand Down Expand Up @@ -54,8 +55,8 @@ type Config struct {
// Number of requests to sent concurrently. Default to 24.
MaxConnections int

// Logger is the logger used. Default to logrus.Log.
Logger logrus.FieldLogger
// Logger is the logger used. Default to producer.Logger.
Logger Logger

// Enabling verbose logging. Default to false.
Verbose bool
Expand All @@ -67,7 +68,7 @@ type Config struct {
// defaults for configuration
func (c *Config) defaults() {
if c.Logger == nil {
c.Logger = logrus.New()
c.Logger = &StdLogger{log.New(os.Stdout, "", log.LstdFlags)}
}
if c.BatchCount == 0 {
c.BatchCount = maxRecordsPerRequest
Expand Down
11 changes: 6 additions & 5 deletions example_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package producer

import (
"log"
"os"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/sirupsen/logrus"
)

func ExampleSimple() {
log := logrus.New()
logger := &StdLogger{log.New(os.Stdout, "", log.LstdFlags)}
client := kinesis.New(session.New(aws.NewConfig()))
pr := New(&Config{
StreamName: "test",
BacklogCount: 2000,
Client: client,
Logger: log,
Logger: logger,
})

pr.Start()
Expand All @@ -25,15 +26,15 @@ func ExampleSimple() {
go func() {
for r := range pr.NotifyFailures() {
// r contains `Data`, `PartitionKey` and `Error()`
log.Error(r)
logger.Error("detected put failure", r.error)
}
}()

go func() {
for i := 0; i < 5000; i++ {
err := pr.Put([]byte("foo"), "bar")
if err != nil {
log.WithError(err).Fatal("error producing")
logger.Error("error producing", err)
}
}
}()
Expand Down
46 changes: 46 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package producer

import (
"fmt"
"log"
"strings"
)

// Logger represents a simple interface used by kinesis-producer to handle logging
type Logger interface {
Info(msg string, values ...LogValue)
Error(msg string, err error, values ...LogValue)
}

// LogValue represents a key:value pair used by the Logger interface
type LogValue struct {
Name string
Value interface{}
}

func (v LogValue) String() string {
return fmt.Sprintf(" %s=%s", v.Name, v.Value)
}

// StdLogger implements the Logger interface using standard library loggers
type StdLogger struct {
Logger *log.Logger
}

// Info prints log message
func (l *StdLogger) Info(msg string, values ...LogValue) {
l.Logger.Print(msg, l.valuesToString(values...))
}

// Error prints log message
func (l *StdLogger) Error(msg string, err error, values ...LogValue) {
l.Logger.Print(msg, l.valuesToString(values...), err)
}

func (l *StdLogger) valuesToString(values ...LogValue) string {
parts := make([]string, len(values))
for i, v := range values {
parts[i] = fmt.Sprint(v)
}
return strings.Join(parts, ", ")
}
29 changes: 29 additions & 0 deletions loggers/kplogrus/logrus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kplogrus

import (
producer "github.com/a8m/kinesis-producer"
"github.com/sirupsen/logrus"
)

// Logger implements a logurs.Logger logger for kinesis-producer
type Logger struct {
Logger *logrus.Logger
}

// Info logs a message
func (l *Logger) Info(msg string, args ...producer.LogValue) {
l.Logger.WithFields(l.valuesToFields(args...)).Info(msg)
}

// Error logs an error
func (l *Logger) Error(msg string, err error, args ...producer.LogValue) {
l.Logger.WithError(err).WithFields(l.valuesToFields(args...)).Error(msg)
}

func (l *Logger) valuesToFields(values ...producer.LogValue) logrus.Fields {
fields := logrus.Fields{}
for _, v := range values {
fields[v.Name] = v.Value
}
return fields
}
32 changes: 32 additions & 0 deletions loggers/kpzap/zap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kpzap

import (
"go.uber.org/zap"

producer "github.com/a8m/kinesis-producer"
)

// Logger implements a zap.Logger logger for kinesis-producer
type Logger struct {
Logger *zap.Logger
}

// Info logs a message
func (l *Logger) Info(msg string, values ...producer.LogValue) {
l.Logger.Info(msg, l.valuesToFields(values)...)
}

// Error logs an error
func (l *Logger) Error(msg string, err error, values ...producer.LogValue) {
fields := l.valuesToFields(values)
fields = append(fields, zap.Error(err))
l.Logger.Info(msg, fields...)
}

func (l *Logger) valuesToFields(values []producer.LogValue) []zap.Field {
fields := make([]zap.Field, len(values))
for i, v := range values {
fields[i] = zap.Any(v.Name, v.Value)
}
return fields
}
36 changes: 18 additions & 18 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ package producer

import (
"errors"
"fmt"
"sync"
"time"

"github.com/sirupsen/logrus"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/jpillora/backoff"
)
Expand Down Expand Up @@ -92,7 +92,7 @@ func (p *Producer) Put(data []byte, partitionKey string) error {
p.Lock()
if needToDrain {
if record, err = p.aggregator.Drain(); err != nil {
p.Logger.WithError(err).Error("drain aggregator")
p.Logger.Error("drain aggregator", err)
}
}
p.aggregator.Put(data, partitionKey)
Expand Down Expand Up @@ -129,7 +129,7 @@ func (p *Producer) NotifyFailures() <-chan *FailureRecord {

// Start the producer
func (p *Producer) Start() {
p.Logger.WithField("stream", p.StreamName).Info("starting producer")
p.Logger.Info("starting producer", LogValue{"stream", p.StreamName})
go p.loop()
}

Expand All @@ -138,7 +138,7 @@ func (p *Producer) Stop() {
p.Lock()
p.stopped = true
p.Unlock()
p.Logger.WithField("backlog", len(p.records)).Info("stopping producer")
p.Logger.Info("stopping producer", LogValue{"backlog", len(p.records)})

// drain
if record, ok := p.drainIfNeed(); ok {
Expand Down Expand Up @@ -225,7 +225,7 @@ func (p *Producer) drainIfNeed() (*kinesis.PutRecordsRequestEntry, bool) {
record, err := p.aggregator.Drain()
p.Unlock()
if err != nil {
p.Logger.WithError(err).Error("drain aggregator")
p.Logger.Error("drain aggregator", err)
} else {
return record, true
}
Expand All @@ -243,14 +243,14 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin
defer p.semaphore.release()

for {
p.Logger.WithField("reason", reason).Infof("flush %v records", len(records))
p.Logger.Info("flushing records", LogValue{"reason", reason}, LogValue{"records", len(records)})
out, err := p.Client.PutRecords(&kinesis.PutRecordsInput{
StreamName: &p.StreamName,
Records: records,
})

if err != nil {
p.Logger.WithError(err).Error("flush")
p.Logger.Error("flush", err)
p.RLock()
notify := p.notify
p.RUnlock()
Expand All @@ -262,15 +262,15 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin

if p.Verbose {
for i, r := range out.Records {
fields := make(logrus.Fields)
values := make([]LogValue, 2)
if r.ErrorCode != nil {
fields["ErrorCode"] = *r.ErrorCode
fields["ErrorMessage"] = *r.ErrorMessage
values[0] = LogValue{"ErrorCode", *r.ErrorCode}
values[1] = LogValue{"ErrorMessage", *r.ErrorMessage}
} else {
fields["ShardId"] = *r.ShardId
fields["SequenceNumber"] = *r.SequenceNumber
values[0] = LogValue{"ShardId", *r.ShardId}
values[1] = LogValue{"SequenceNumber", *r.SequenceNumber}
}
p.Logger.WithFields(fields).Infof("Result[%d]", i)
p.Logger.Info(fmt.Sprintf("Result[%d]", i), values...)
}
}

Expand All @@ -281,11 +281,11 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin

duration := b.Duration()

p.Logger.WithFields(logrus.Fields{
"failures": failed,
"backoff": duration.String(),
}).Warn("put failures")

p.Logger.Info(
"put failures",
LogValue{"failures", failed},
LogValue{"backoff", duration.String()},
)
time.Sleep(duration)

// change the logging state for the next itertion
Expand Down

0 comments on commit 2b76ac6

Please sign in to comment.