From de2fc25d57123fda3fa94435d1966eac485db166 Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Wed, 21 Nov 2018 20:13:59 +0530 Subject: [PATCH] Decoupled logrus and added a logger interface - `producer.Config` now takes a producer.Logger interface instead of a Logrus instance. - If not logger is provided to config, producer uses a standard library logger by default (`producer.StandardLogger`) - `loggers` sub package ships implementations for zap and logrus. --- README.md | 24 +++++++++++++++++++++--- config.go | 9 +++++---- example_test.go | 12 +++++++----- logger.go | 21 +++++++++++++++++++++ loggers/logrus.go | 29 +++++++++++++++++++++++++++++ loggers/zap.go | 32 ++++++++++++++++++++++++++++++++ producer.go | 36 ++++++++++++++++++------------------ standard.go | 30 ++++++++++++++++++++++++++++++ 8 files changed, 163 insertions(+), 30 deletions(-) create mode 100644 logger.go create mode 100644 loggers/logrus.go create mode 100644 loggers/zap.go create mode 100644 standard.go diff --git a/README.md b/README.md index 63ccb49..d8cac7b 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ and using the same aggregation format that [KPL][kpl-url] use. ### Useful links +- [Documentation][godoc-url] - [Aggregation format][aggregation-format-url] - [Considerations When Using KPL Aggregation][kpl-aggregation] - [Consumer De-aggregation][de-aggregation] @@ -22,13 +23,11 @@ import ( ) func main() { - log := logrus.New() client := kinesis.New(session.New(aws.NewConfig())) pr := producer.New(&producer.Config{ StreamName: "test", BacklogCount: 2000, - Client: client, - Logger: log, + Client: client }) pr.Start() @@ -55,6 +54,25 @@ func main() { } ``` +#### Using custom logger +`producer.Config` takes an optional `logging.Logger` implementation. + +```go +customLogger := &CustomLogger{} + +&producer.Config{ + StreamName: "test", + BacklogCount: 2000, + Client: client, + Logger: customLogger, +} +``` + +kinesis-producer ships with three logger implementations. + +- `producer.Standard` used the standard library logger +- `loggers.Logrus` uses logrus logger +- `loggers.Zap` uses zap logger ### License MIT diff --git a/config.go b/config.go index f65ae58..e77ebaf 100644 --- a/config.go +++ b/config.go @@ -1,10 +1,11 @@ package producer import ( + "log" + "os" "time" k "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/sirupsen/logrus" ) // Constants and default configuration take from: @@ -54,8 +55,8 @@ type Config struct { // Number of requests to sent concurrently. Default to 24. MaxConnections int - // Logger is the logger used. Default to logrus.Log. - Logger logrus.FieldLogger + // Logger is the logger used. Default to producer.Logger. + Logger Logger // Enabling verbose logging. Default to false. Verbose bool @@ -67,7 +68,7 @@ type Config struct { // defaults for configuration func (c *Config) defaults() { if c.Logger == nil { - c.Logger = logrus.New() + c.Logger = &StandardLogger{log.New(os.Stdout, "", log.LstdFlags)} } if c.BatchCount == 0 { c.BatchCount = maxRecordsPerRequest diff --git a/example_test.go b/example_test.go index 0e6abcc..a65f9f9 100644 --- a/example_test.go +++ b/example_test.go @@ -1,22 +1,24 @@ package producer import ( + "log" + "os" "time" + "github.com/a8m/kinesis-producer/logging" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/sirupsen/logrus" ) func ExampleSimple() { - log := logrus.New() + logger := &logging.Standard{log.New(os.Stdout, "", log.LstdFlags)} client := kinesis.New(session.New(aws.NewConfig())) pr := New(&Config{ StreamName: "test", BacklogCount: 2000, Client: client, - Logger: log, + Logger: logger, }) pr.Start() @@ -25,7 +27,7 @@ func ExampleSimple() { go func() { for r := range pr.NotifyFailures() { // r contains `Data`, `PartitionKey` and `Error()` - log.Error(r) + logger.Error("detected put failure", r.error) } }() @@ -33,7 +35,7 @@ func ExampleSimple() { for i := 0; i < 5000; i++ { err := pr.Put([]byte("foo"), "bar") if err != nil { - log.WithError(err).Fatal("error producing") + logger.Error("error producing", err) } } }() diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..4467d35 --- /dev/null +++ b/logger.go @@ -0,0 +1,21 @@ +package producer + +import ( + "fmt" +) + +// Logger represents a simple interface used by kinesis-producer to handle logging +type Logger interface { + Info(msg string, values ...Value) + Error(msg string, err error, values ...Value) +} + +// Value represents a key:value pair used by the Logger interface +type Value struct { + Name string + Value interface{} +} + +func (v Value) String() string { + return fmt.Sprintf(" %s=%s", v.Name, v.Value) +} diff --git a/loggers/logrus.go b/loggers/logrus.go new file mode 100644 index 0000000..ba4601c --- /dev/null +++ b/loggers/logrus.go @@ -0,0 +1,29 @@ +package loggers + +import ( + producer "github.com/a8m/kinesis-producer" + "github.com/sirupsen/logrus" +) + +// Logrus implements a logurs.Logger for kinesis-producer +type Logrus struct { + Logger *logrus.Logger +} + +// Info logs a message +func (l *Logrus) Info(msg string, args ...producer.Value) { + l.Logger.WithFields(l.valuesToFields(args...)).Info(msg) +} + +// Error logs an error +func (l *Logrus) Error(msg string, err error, args ...producer.Value) { + l.Logger.WithError(err).WithFields(l.valuesToFields(args...)).Error(msg) +} + +func (l *Logrus) valuesToFields(values ...producer.Value) logrus.Fields { + fields := logrus.Fields{} + for _, v := range values { + fields[v.Name] = v.Value + } + return fields +} diff --git a/loggers/zap.go b/loggers/zap.go new file mode 100644 index 0000000..74e7279 --- /dev/null +++ b/loggers/zap.go @@ -0,0 +1,32 @@ +package loggers + +import ( + "go.uber.org/zap" + + producer "github.com/a8m/kinesis-producer" +) + +// Zap implements a logurs.Logger for kinesis-producer +type Zap struct { + Logger *zap.Logger +} + +// Info logs a message +func (l *Zap) Info(msg string, values ...producer.Value) { + l.Logger.Info(msg, l.valuesToFields(values)...) +} + +// Error logs an error +func (l *Zap) Error(msg string, err error, values ...producer.Value) { + fields := l.valuesToFields(values) + fields = append(fields, zap.Error(err)) + l.Logger.Info(msg, fields...) +} + +func (l *Zap) valuesToFields(values []producer.Value) []zap.Field { + fields := make([]zap.Field, len(values)) + for i, v := range values { + fields[i] = zap.Any(v.Name, v.Value) + } + return fields +} diff --git a/producer.go b/producer.go index c821f81..f1a9da3 100644 --- a/producer.go +++ b/producer.go @@ -8,10 +8,10 @@ package producer import ( "errors" + "fmt" "sync" "time" - "github.com/sirupsen/logrus" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/jpillora/backoff" ) @@ -92,7 +92,7 @@ func (p *Producer) Put(data []byte, partitionKey string) error { p.Lock() if needToDrain { if record, err = p.aggregator.Drain(); err != nil { - p.Logger.WithError(err).Error("drain aggregator") + p.Logger.Error("drain aggregator", err) } } p.aggregator.Put(data, partitionKey) @@ -129,7 +129,7 @@ func (p *Producer) NotifyFailures() <-chan *FailureRecord { // Start the producer func (p *Producer) Start() { - p.Logger.WithField("stream", p.StreamName).Info("starting producer") + p.Logger.Info("starting producer", Value{"stream", p.StreamName}) go p.loop() } @@ -138,7 +138,7 @@ func (p *Producer) Stop() { p.Lock() p.stopped = true p.Unlock() - p.Logger.WithField("backlog", len(p.records)).Info("stopping producer") + p.Logger.Info("stopping producer", Value{"backlog", len(p.records)}) // drain if record, ok := p.drainIfNeed(); ok { @@ -225,7 +225,7 @@ func (p *Producer) drainIfNeed() (*kinesis.PutRecordsRequestEntry, bool) { record, err := p.aggregator.Drain() p.Unlock() if err != nil { - p.Logger.WithError(err).Error("drain aggregator") + p.Logger.Error("drain aggregator", err) } else { return record, true } @@ -243,14 +243,14 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin defer p.semaphore.release() for { - p.Logger.WithField("reason", reason).Infof("flush %v records", len(records)) + p.Logger.Info("flushing records", Value{"reason", reason}, Value{"records", len(records)}) out, err := p.Client.PutRecords(&kinesis.PutRecordsInput{ StreamName: &p.StreamName, Records: records, }) if err != nil { - p.Logger.WithError(err).Error("flush") + p.Logger.Error("flush", err) p.RLock() notify := p.notify p.RUnlock() @@ -262,15 +262,15 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin if p.Verbose { for i, r := range out.Records { - fields := make(logrus.Fields) + values := make([]Value, 2) if r.ErrorCode != nil { - fields["ErrorCode"] = *r.ErrorCode - fields["ErrorMessage"] = *r.ErrorMessage + values[0] = Value{"ErrorCode", *r.ErrorCode} + values[1] = Value{"ErrorMessage", *r.ErrorMessage} } else { - fields["ShardId"] = *r.ShardId - fields["SequenceNumber"] = *r.SequenceNumber + values[0] = Value{"ShardId", *r.ShardId} + values[1] = Value{"SequenceNumber", *r.SequenceNumber} } - p.Logger.WithFields(fields).Infof("Result[%d]", i) + p.Logger.Info(fmt.Sprintf("Result[%d]", i), values...) } } @@ -281,11 +281,11 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin duration := b.Duration() - p.Logger.WithFields(logrus.Fields{ - "failures": failed, - "backoff": duration.String(), - }).Warn("put failures") - + p.Logger.Info( + "put failures", + Value{"failures", failed}, + Value{"backoff", duration.String()}, + ) time.Sleep(duration) // change the logging state for the next itertion diff --git a/standard.go b/standard.go new file mode 100644 index 0000000..bfd0d63 --- /dev/null +++ b/standard.go @@ -0,0 +1,30 @@ +package producer + +import ( + "fmt" + "log" + "strings" +) + +// StandardLogger implements the Logger interface using standard library loggers +type StandardLogger struct { + Logger *log.Logger +} + +// Info prints log message to screen +func (l *StandardLogger) Info(msg string, values ...Value) { + l.Logger.Print(msg, l.valuesToString(values...)) +} + +// Error prints log message to screen +func (l *StandardLogger) Error(msg string, err error, values ...Value) { + l.Logger.Print(msg, l.valuesToString(values...), err) +} + +func (l *StandardLogger) valuesToString(values ...Value) string { + parts := make([]string, len(values)) + for i, v := range values { + parts[i] = fmt.Sprint(v) + } + return strings.Join(parts, ", ") +}