Skip to content

Commit

Permalink
Add a docker plugin - dockerlogbeat (#13761)
Browse files Browse the repository at this point in the history
* init commit of dockerlogbeat
  • Loading branch information
fearful-symmetry authored Oct 9, 2019
1 parent dd08ed5 commit 8206e04
Show file tree
Hide file tree
Showing 385 changed files with 63,848 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ beat.db
mage_output_file.go
x-pack/functionbeat/*/fields.yml
x-pack/functionbeat/provider/*/functionbeat-*
x-pack/dockerlogbeat/temproot.tar

# Editor swap files
*.swp
Expand Down
2 changes: 2 additions & 0 deletions x-pack/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
*beat/build
*beat/logs
*beat/data
dockerlogbeat/rootfs
dockerlogbeat/vendor/github.com/elastic/beats

# Files
*beat/*beat
Expand Down
18 changes: 18 additions & 0 deletions x-pack/dockerlogbeat/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
ARG versionString
FROM golang:${versionString} as builder

WORKDIR /go/src/github.com/elastic/beats/x-pack/dockerlogbeat
COPY . ../..

ENV GOPATH=/go
ARG GOOS=linux
ARG GOARCH=amd64
ARG GOARM=

RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o dockerlogbeat


FROM alpine:3.7 as final
RUN apk --no-cache add ca-certificates
RUN mkdir /contmount
COPY --from=builder /go/src/github.com/elastic/beats/x-pack/dockerlogbeat/dockerlogbeat /usr/bin/dockerlogbeat
24 changes: 24 additions & 0 deletions x-pack/dockerlogbeat/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"description": "A beat for docker logs",
"documentation": "https://docs.docker.com/engine/extend/plugin_api/",
"entrypoint": ["/usr/bin/dockerlogbeat"],
"network": {
"type": "host"
},
"interface": {
"types": ["docker.logdriver/1.0"],
"socket": "beatSocket.sock"
},
"env":[
{
"description": "libbeat env hack",
"name": "BEAT_STRICT_PERMS",
"value": "false"
},
{
"description": "config for dockerlogbeat",
"name": "BEAT_UNIX_SOCK_PATH",
"value": "/contmount/controller.sock"
}
]
}
95 changes: 95 additions & 0 deletions x-pack/dockerlogbeat/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package main

import (
"encoding/json"
"net/http"
"os"
"path/filepath"

"github.com/docker/engine/daemon/logger"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/x-pack/dockerlogbeat/pipelinemanager"

"github.com/pkg/errors"
)

// StartLoggingRequest represents the request object we get on a call to //LogDriver.StartLogging
type StartLoggingRequest struct {
File string
Info logger.Info
}

// StopLoggingRequest represents the request object we get on a call to //LogDriver.StopLogging
type StopLoggingRequest struct {
File string
}

// ProcessorPipeline handles a single output pipeline
type ProcessorPipeline struct {
Pipeline *pipeline.Pipeline
}

// This gets called when a container starts that requests the log driver
func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var startReq StartLoggingRequest
err := json.NewDecoder(r.Body).Decode(&startReq)
if err != nil {
http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest)
return
}

pm.Logger.Debugf("Homepath: %v\n", filepath.Dir(os.Args[0]))
pm.Logger.Debugf("Got start request object from container %#v\n", startReq.Info.ContainerName)
pm.Logger.Debugf("Got a container with the following labels: %#v\n", startReq.Info.ContainerLabels)
pm.Logger.Debugf("Got a container with the following log opts: %#v\n", startReq.Info.Config)

cl, err := pm.CreateClientWithConfig(startReq.Info.Config, startReq.File)
if err != nil {
http.Error(w, errors.Wrap(err, "error creating client").Error(), http.StatusBadRequest)
return
}

go cl.ConsumeAndSendLogs()

respondOK(w)
} // end func
}

// This gets called when a container using the log driver stops
func stopLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var stopReq StopLoggingRequest
err := json.NewDecoder(r.Body).Decode(&stopReq)
if err != nil {
http.Error(w, errors.Wrap(err, "error decoding json request").Error(), http.StatusBadRequest)
return
}
pm.Logger.Infof(" Got stop request object %#v\n", stopReq)
// Run the stop async, since nothing 'depends' on it,
// and we can break people's docker automation if this times out.
go func() {
err = pm.CloseClientWithFile(stopReq.File)
if err != nil {
pm.Logger.Infof(" Got stop request error %#v\n", err)
}
}()

respondOK(w)
} // end func
}

// For the start/stop handler, the daemon expects back an error object. If the body is empty, then all is well.
func respondOK(w http.ResponseWriter) {
res := struct {
Err string
}{
"",
}

json.NewEncoder(w).Encode(&res)
}
101 changes: 101 additions & 0 deletions x-pack/dockerlogbeat/magefile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build mage

package main

import (
"os"

"github.com/elastic/beats/dev-tools/mage"
"github.com/magefile/mage/mg"
"github.com/magefile/mage/sh"

"github.com/pkg/errors"
)

var hubID = "ossifrage"
var pluginVersion = "0.0.1"
var name = "dockerlogbeat"
var containerName = name + "_container"
var dockerPluginName = hubID + "/" + name
var dockerPlugin = dockerPluginName + ":" + pluginVersion

// Build builds docker rootfs container root
func Build() error {
mg.Deps(Clean)

dockerLogBeatDir, err := os.Getwd()
if err != nil {
return errors.Wrap(err, "error getting work dir")
}

err = os.Chdir("../..")
if err != nil {
return errors.Wrap(err, "error changing directory")
}

gv, err := mage.GoVersion()
if err != nil {
return errors.Wrap(err, "error determining go version")
}

err = sh.RunV("docker", "build", "--build-arg", "versionString"+"="+gv, "--target", "final", "-t", "rootfsimage", "-f", "x-pack/dockerlogbeat/Dockerfile", ".")
if err != nil {
return errors.Wrap(err, "error building final container image")
}

err = os.Chdir(dockerLogBeatDir)
if err != nil {
return errors.Wrap(err, "error returning to dockerlogbeat dir")
}

os.Mkdir("rootfs", 0755)

err = sh.RunV("docker", "create", "--name", containerName, "rootfsimage", "true")
if err != nil {
return errors.Wrap(err, "error creating container")
}

err = sh.RunV("docker", "export", containerName, "-o", "temproot.tar")
if err != nil {
return errors.Wrap(err, "error exporting container")
}

return sh.RunV("tar", "-xf", "temproot.tar", "-C", "rootfs")
}

// Clean removes working objects and containers
func Clean() error {

sh.RunV("docker", "rm", "-vf", containerName)
sh.RunV("docker", "rmi", "rootfsimage")
sh.Rm("temproot.tar")
sh.Rm("rootfs")
sh.RunV("docker", "plugin", "disable", "-f", dockerPlugin)
sh.RunV("docker", "plugin", "rm", "-f", dockerPlugin)

return nil
}

// Install installs the beat
func Install() error {
err := sh.RunV("docker", "plugin", "create", dockerPlugin, ".")
if err != nil {
return errors.Wrap(err, "error creating plugin")
}

err = sh.RunV("docker", "plugin", "enable", dockerPlugin)
if err != nil {
return errors.Wrap(err, "error enabling plugin")
}

return nil
}

// Create builds and creates a docker plugin
func Create() {
mg.SerialDeps(Build, Install)
}
67 changes: 67 additions & 0 deletions x-pack/dockerlogbeat/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package main

import (
"fmt"
"os"

"github.com/docker/go-plugins-helpers/sdk"
"github.com/elastic/beats/libbeat/common"
logpcfg "github.com/elastic/beats/libbeat/logp/configure"
_ "github.com/elastic/beats/libbeat/outputs/console"
_ "github.com/elastic/beats/libbeat/outputs/elasticsearch"
_ "github.com/elastic/beats/libbeat/outputs/fileout"
_ "github.com/elastic/beats/libbeat/outputs/logstash"
_ "github.com/elastic/beats/libbeat/publisher/queue/memqueue"
_ "github.com/elastic/beats/libbeat/publisher/queue/spool"
"github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/x-pack/dockerlogbeat/pipelinemanager"
)

// genNewMonitoringConfig is a hacked-in function to enable a debug stderr logger
func genNewMonitoringConfig() (*common.Config, error) {
cfgObject := make(map[string]string)
cfgObject["level"] = "debug"
cfgObject["to_stderr"] = "true"

cfg, err := common.NewConfigFrom(cfgObject)
if err != nil {
return nil, err
}
return cfg, nil
}

func fatal(format string, vs ...interface{}) {
fmt.Fprintf(os.Stderr, format, vs...)
os.Exit(1)
}

func main() {
service.BeforeRun()
defer service.Cleanup()

logcfg, err := genNewMonitoringConfig()
if err != nil {
fatal("error starting config: %s", err)
}

err = logpcfg.Logging("dockerbeat", logcfg)
if err != nil {
fatal("error starting log handler: %s", err)
}

pipelines := pipelinemanager.NewPipelineManager(logcfg)

sdkHandler := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)
// Create handlers for startup and shutdown of the log driver
sdkHandler.HandleFunc("/LogDriver.StartLogging", startLoggingHandler(pipelines))
sdkHandler.HandleFunc("/LogDriver.StopLogging", stopLoggingHandler(pipelines))

err = sdkHandler.ServeUnix("beatSocket", 0)
if err != nil {
fatal("Error in socket handler: %s", err)
}
}
Loading

0 comments on commit 8206e04

Please sign in to comment.