Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add docker logs support to the Elastic Log Driver #19531

Merged
merged 14 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions x-pack/dockerlogbeat/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package main

import (
"encoding/json"
"io"
"net/http"

"github.com/docker/docker/daemon/logger"

"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemanager"

"github.com/docker/docker/pkg/ioutils"
"github.com/pkg/errors"
)

Expand All @@ -26,6 +28,26 @@ type StopLoggingRequest struct {
File string
}

// CapabilitiesResponse represents the response to a capabilities request
type CapabilitiesResponse struct {
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
Err string
Cap logger.Capability
}

// LogsRequest represents the request object we get from a `docker logs` call
type LogsRequest struct {
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
Info logger.Info
Config logger.ReadConfig
}

func reportCaps() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(&CapabilitiesResponse{
Cap: logger.Capability{ReadLogs: true},
})
}
}

// This gets called when a container starts that requests the log driver
func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -81,6 +103,28 @@ func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respons
} // end func
}

func readLogHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var logReq LogsRequest
err := json.NewDecoder(r.Body).Decode(&logReq)
if err != nil {
http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest)
return
}

pm.Logger.Infof("Got logging request for container %s\n", logReq.Info.ContainerName)
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
stream, err := pm.CreateReaderForContainer(logReq.Info, logReq.Config)
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
http.Error(w, errors.Wrap(err, "error creating log reader").Error(), http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/x-json-stream")
wf := ioutils.NewWriteFlusher(w)
io.Copy(wf, stream)
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

} //end func
}

// For the start/stop handler, the daemon expects back an error object. If the body is empty, then all is well.
func respondOK(w http.ResponseWriter) {
res := struct {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/dockerlogbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func main() {
// Create handlers for startup and shutdown of the log driver
sdkHandler.HandleFunc("/LogDriver.StartLogging", startLoggingHandler(pipelines))
sdkHandler.HandleFunc("/LogDriver.StopLogging", stopLoggingHandler(pipelines))
sdkHandler.HandleFunc("/LogDriver.Capabilities", reportCaps())
sdkHandler.HandleFunc("/LogDriver.ReadLogs", readLogHandler(pipelines))

err = sdkHandler.ServeUnix("beatSocket", 0)
if err != nil {
Expand Down
39 changes: 31 additions & 8 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/daemon/logger"

"github.com/docker/docker/api/types/backend"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
helper "github.com/elastic/beats/v7/libbeat/common/docker"
Expand All @@ -27,12 +28,13 @@ type ClientLogger struct {
client beat.Client
pipelineHash uint64
closer chan struct{}
containerMeta logger.Info
ContainerMeta logger.Info
logger *logp.Logger
logSpool logger.Logger
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
}
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info) (*ClientLogger, error) {
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger) (*ClientLogger, error) {
// setup the beat client
settings := beat.ClientConfig{
WaitClose: 0,
Expand All @@ -49,7 +51,13 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade

clientLogger.Debugf("Created new logger for %d", hash)

return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hash, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil
return &ClientLogger{logFile: inputFile,
client: client,
pipelineHash: hash,
closer: make(chan struct{}),
ContainerMeta: info,
logSpool: localLog,
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
logger: clientLogger}, nil
}

// Close closes the pipeline client and reader
Expand All @@ -63,7 +71,6 @@ func (cl *ClientLogger) Close() error {
// ConsumePipelineAndSend consumes events from the FIFO pipe and sends them to the pipeline client
func (cl *ClientLogger) ConsumePipelineAndSend() {
publishWriter := make(chan logdriver.LogEntry, 500)

go cl.publishLoop(publishWriter)
// Clean up the reader after we're done
defer func() {
Expand Down Expand Up @@ -95,18 +102,19 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
return
}

cl.logSpool.Log(constructLogSpoolMsg(entry))
line := strings.TrimSpace(string(entry.Line))

cl.client.Publish(beat.Event{
Timestamp: time.Unix(0, entry.TimeNano),
Fields: common.MapStr{
"message": line,
"container": common.MapStr{
"labels": helper.DeDotLabels(cl.containerMeta.ContainerLabels, true),
"id": cl.containerMeta.ContainerID,
"name": helper.ExtractContainerName([]string{cl.containerMeta.ContainerName}),
"labels": helper.DeDotLabels(cl.ContainerMeta.ContainerLabels, true),
"id": cl.ContainerMeta.ContainerID,
"name": helper.ExtractContainerName([]string{cl.ContainerMeta.ContainerName}),
"image": common.MapStr{
"name": cl.containerMeta.ContainerImageName,
"name": cl.ContainerMeta.ContainerImageName,
},
},
},
Expand All @@ -115,3 +123,18 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
}

}

func constructLogSpoolMsg(line logdriver.LogEntry) *logger.Message {
var msg logger.Message

msg.Line = line.Line
msg.Source = line.Source
msg.Timestamp = time.Unix(0, line.TimeNano)
if line.PartialLogMetadata != nil {
msg.PLogMetaData = &backend.PartialLogMetaData{}
msg.PLogMetaData.ID = line.PartialLogMetadata.Id
msg.PLogMetaData.Last = line.PartialLogMetadata.Last
msg.PLogMetaData.Ordinal = int(line.PartialLogMetadata.Ordinal)
}
return &msg
}
16 changes: 15 additions & 1 deletion x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
package pipelinemanager

import (
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/docker/docker/daemon/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemock"
Expand Down Expand Up @@ -85,7 +89,17 @@ func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock
reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInputFromLine(t, logString))
require.NoError(t, err)

client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject)
info := logger.Info{
ContainerID: "b87d3b0379f816a5f2f7070f28cc05e2f564a3fb549a67c64ec30fc5b04142ed",
LogPath: filepath.Join("/tmp/dockerbeattest/", string(time.Now().Unix())),
}

err = os.MkdirAll(filepath.Dir(info.LogPath), 0755)
assert.NoError(t, err)
localLog, err := jsonfilelog.New(info)
assert.NoError(t, err)

client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog)
require.NoError(t, err)

return client
Expand Down
2 changes: 1 addition & 1 deletion x-pack/dockerlogbeat/pipelinemanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewCfgFromRaw(input map[string]string) (ContainerOutputConfig, error) {
newCfg := ContainerOutputConfig{}
endpoint, ok := input["hosts"]
if !ok {
return newCfg, errors.New("An endpoint flag is required")
return newCfg, errors.New("A hosts flag is required")
}

endpointList := strings.Split(endpoint, ",")
Expand Down
Loading