diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go
index 435838caa12..405fc7cc432 100644
--- a/libbeat/publisher/pipeline/output.go
+++ b/libbeat/publisher/pipeline/output.go
@@ -40,7 +40,6 @@ type netClientWorker struct {
 
 	batchSize  int
 	batchSizer func() int
-	logger     *logp.Logger
 }
 
 func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker {
@@ -86,24 +85,24 @@ func (w *netClientWorker) run() {
 			batch.Cancelled()
 
 			if w.closed.Load() {
-				w.logger.Infof("Closed connection to %v", w.client)
+				logp.Info("Closed connection to %v", w.client)
 				return
 			}
 
 			if reconnectAttempts > 0 {
-				w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
+				logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
 			} else {
-				w.logger.Infof("Connecting to %v", w.client)
+				logp.Info("Connecting to %v", w.client)
 			}
 
 			err := w.client.Connect()
 			if err != nil {
-				w.logger.Errorf("Failed to connect to %v: %v", w.client, err)
+				logp.Err("Failed to connect to %v: %v", w.client, err)
 				reconnectAttempts++
 				continue
 			}
 
-			w.logger.Infof("Connection to %v established", w.client)
+			logp.Info("Connection to %v established", w.client)
 			reconnectAttempts = 0
 			break
 		}
@@ -119,7 +118,7 @@ func (w *netClientWorker) run() {
 
 			err := w.client.Publish(batch)
 			if err != nil {
-				w.logger.Errorf("Failed to publish events: %v", err)
+				logp.Err("Failed to publish events: %v", err)
 				// on error return to connect loop
 				break
 			}
diff --git a/libbeat/publisher/pipeline/stress/gen.go b/libbeat/publisher/pipeline/stress/gen.go
index 149278304ea..e5a18ceaef6 100644
--- a/libbeat/publisher/pipeline/stress/gen.go
+++ b/libbeat/publisher/pipeline/stress/gen.go
@@ -24,11 +24,10 @@ import (
 	"sync"
 	"time"
 
-	"github.com/elastic/beats/v7/libbeat/logp"
-
 	"github.com/elastic/beats/v7/libbeat/beat"
 	"github.com/elastic/beats/v7/libbeat/common"
 	"github.com/elastic/beats/v7/libbeat/common/atomic"
+	"github.com/elastic/beats/v7/libbeat/logp"
 )
 
 type generateConfig struct {
@@ -66,10 +65,9 @@ func generate(
 		WaitClose: config.WaitClose,
 	}
 
-	logger := logp.NewLogger("publisher_pipeline_stress_generate")
 	if config.ACK {
 		settings.ACKCount = func(n int) {
-			logger.Infof("Pipeline client (%v) ACKS; %v", id, n)
+			logp.Info("Pipeline client (%v) ACKS; %v", id, n)
 		}
 	}
 
@@ -91,7 +89,7 @@ func generate(
 		panic(err)
 	}
 
-	defer logger.Infof("client (%v) closed: %v", id, time.Now())
+	defer logp.Info("client (%v) closed: %v", id, time.Now())
 
 	done := make(chan struct{})
 	defer close(done)
@@ -138,8 +136,8 @@ func generate(
 		})
 	}
 
-	logger.Infof("start (%v) generator: %v", id, time.Now())
-	defer logger.Infof("stop (%v) generator: %v", id, time.Now())
+	logp.Info("start (%v) generator: %v", id, time.Now())
+	defer logp.Info("stop (%v) generator: %v", id, time.Now())
 
 	for cs.Active() {
 		event := beat.Event{
diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go
index 80e61146cc3..8a46489ae6c 100644
--- a/libbeat/publisher/pipeline/stress/run.go
+++ b/libbeat/publisher/pipeline/stress/run.go
@@ -84,9 +84,9 @@ func RunTests(
 		return fmt.Errorf("loading pipeline failed: %+v", err)
 	}
 	defer func() {
-		log.Info("Stop pipeline")
+		logp.Info("Stop pipeline")
 		pipeline.Close()
-		log.Info("pipeline closed")
+		logp.Info("pipeline closed")
 	}()
 
 	cs := newCloseSignaler()
@@ -100,7 +100,7 @@ func RunTests(
 		withWG(&genWG, func() {
 			err := generate(cs, pipeline, config.Generate, i, errors)
 			if err != nil {
-				log.Errorf("Generator failed with: %v", err)
+				logp.Err("Generator failed with: %v", err)
 			}
 		})
 	}
diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go
index e994eef48cc..0125008c39f 100644
--- a/libbeat/publisher/processing/processors.go
+++ b/libbeat/publisher/processing/processors.go
@@ -41,7 +41,6 @@ type processorFn struct {
 }
 
 func newGeneralizeProcessor(keepNull bool) *processorFn {
-	logger := logp.NewLogger("publisher_processing")
 	return newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) {
 		// Filter out empty events. Empty events are still reported by ACK callbacks.
 		if len(event.Fields) == 0 {
@@ -51,7 +50,7 @@ func newGeneralizeProcessor(keepNull bool) *processorFn {
 		g := common.NewGenericEventConverter(keepNull)
 		fields := g.Convert(event.Fields)
 		if fields == nil {
-			logger.Error("fail to convert to generic event")
+			logp.Err("fail to convert to generic event")
 			return nil, nil
 		}
 
diff --git a/libbeat/reader/multiline/multiline.go b/libbeat/reader/multiline/multiline.go
index ec20362fc17..a6054c76550 100644
--- a/libbeat/reader/multiline/multiline.go
+++ b/libbeat/reader/multiline/multiline.go
@@ -52,7 +52,6 @@ type Reader struct {
 	err          error // last seen error
 	state        func(*Reader) (reader.Message, error)
 	message      reader.Message
-	logger       *logp.Logger
 }
 
 const (
@@ -144,7 +143,7 @@ func (mlr *Reader) readFirst() (reader.Message, error) {
 				continue
 			}
 
-			mlr.logger.Debug("Multiline event flushed because timeout reached.")
+			logp.Debug("multiline", "Multiline event flushed because timeout reached.")
 
 			// pass error to caller (next layer) for handling
 			return message, err
@@ -173,7 +172,7 @@ func (mlr *Reader) readNext() (reader.Message, error) {
 					continue
 				}
 
-				mlr.logger.Debug("Multiline event flushed because timeout reached.")
+				logp.Debug("multiline", "Multiline event flushed because timeout reached.")
 
 				// return collected multiline event and
 				// empty buffer for new multiline event
diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go
index 1c801ae672f..21d902c6eb8 100644
--- a/libbeat/reader/readfile/line.go
+++ b/libbeat/reader/readfile/line.go
@@ -41,7 +41,6 @@ type LineReader struct {
 	inOffset   int // input buffer read offset
 	byteCount  int // number of bytes decoded from input buffer into output buffer
 	decoder    transform.Transformer
-	logger     *logp.Logger
 }
 
 // New creates a new reader object
@@ -87,15 +86,15 @@ func (r *LineReader) Next() ([]byte, int, error) {
 
 		// This can happen if something goes wrong during decoding
 		if len(buf) == 0 {
-			r.logger.Error("Empty buffer returned by advance")
+			logp.Err("Empty buffer returned by advance")
 			continue
 		}
 
 		if bytes.HasSuffix(buf, r.decodedNl) {
 			break
 		} else {
-			r.logger.Debugf("Line ending char found which wasn't one: %c", buf[len(buf)-1])
-			r.logger.Debugf("In %s", string(buf))
+			logp.Debug("line", "Line ending char found which wasn't one: %c", buf[len(buf)-1])
+			logp.Debug("line", "In %s", string(buf))
 		}
 	}
 
@@ -152,7 +151,7 @@ func (r *LineReader) advance() error {
 	// -> decode input sequence into outBuffer
 	sz, err := r.decode(idx + len(r.nl))
 	if err != nil {
-		r.logger.Errorf("Error decoding line: %s", err)
+		logp.Err("Error decoding line: %s", err)
 		// In case of error increase size by unencoded length
 		sz = idx + len(r.nl)
 	}
diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go
index 6d587a55aa8..95c98bddbb8 100644
--- a/libbeat/reader/readjson/docker_json.go
+++ b/libbeat/reader/readjson/docker_json.go
@@ -46,8 +46,6 @@ type DockerJSONReader struct {
 	parseLine func(message *reader.Message, msg *logLine) error
 
 	stripNewLine func(msg *reader.Message)
-
-	logger *logp.Logger
 }
 
 type logLine struct {
@@ -200,7 +198,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
 		var logLine logLine
 		err = p.parseLine(&message, &logLine)
 		if err != nil {
-			p.logger.Errorf("Parse line error: %v", err)
+			logp.Err("Parse line error: %v", err)
 			return message, reader.ErrLineUnparsable
 		}
 
@@ -217,7 +215,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
 			}
 			err = p.parseLine(&next, &logLine)
 			if err != nil {
-				p.logger.Errorf("Parse line error: %v", err)
+				logp.Err("Parse line error: %v", err)
 				return message, reader.ErrLineUnparsable
 			}
 			message.Content = append(message.Content, next.Content...)
diff --git a/libbeat/reader/readjson/json.go b/libbeat/reader/readjson/json.go
index a404d611056..3567c372ed8 100644
--- a/libbeat/reader/readjson/json.go
+++ b/libbeat/reader/readjson/json.go
@@ -34,7 +34,6 @@ import (
 type JSONReader struct {
 	reader reader.Reader
 	cfg    *Config
-	logger *logp.Logger
 }
 
 // NewJSONReader creates a new reader that can decode JSON.
@@ -50,7 +49,7 @@ func (r *JSONReader) decode(text []byte) ([]byte, common.MapStr) {
 	err := unmarshal(text, &jsonFields)
 	if err != nil || jsonFields == nil {
 		if !r.cfg.IgnoreDecodingError {
-			r.logger.Errorf("Error decoding JSON: %v", err)
+			logp.Err("Error decoding JSON: %v", err)
 		}
 		if r.cfg.AddErrorKey {
 			jsonFields = common.MapStr{"error": createJSONError(fmt.Sprintf("Error decoding JSON: %v", err))}