Skip to content

Commit

Permalink
Merge pull request #140 from urso/doc/lumberjack
Browse files Browse the repository at this point in the history
Doc lumberjack
  • Loading branch information
monicasarbu committed Oct 13, 2015
2 parents d33ae45 + 8a82974 commit ef3c679
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 13 deletions.
126 changes: 122 additions & 4 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ If geoip.paths is missing from config file the default paths /usr/share/GeoIP/Ge
[[configuration-output]]
=== Outputs

Starting with Beat version 0.3.0, multiple outputs can be configured for
exporting the correlated transactions. Currently the following output types are
supported:
Multiple outputs can be configured for exporting the correlated transactions.
Currently the following output types are supported:

* Elasticsearch
* Lumberjack (Logstash)
* Redis
* File

Expand Down Expand Up @@ -244,6 +244,10 @@ Basic authentication password for connecting to Elasticsearch.
The index root name where to write events to. The default is `packetbeat` and
generates `[packetbeat-]YYYY.MM.DD` indexes (e.g. `packetbeat-2015.04.26`).

The index root name where to write events to. The default is the beat its name.
For example `packetbeat` generates `[packetbeat-]YYYY.MM.DD` indexes (e.g. `packetbeat-2015.04.26`).


===== max_retries

The number of times a particular Elasticsearch index operation is attempted. If
Expand Down Expand Up @@ -299,6 +303,120 @@ accepted. In this mode TLS based connections are susceptible to
man-in-the-middle attacks. Use only for testing.


[[lumberjack-output]]
==== Lumberjack (Logstash) Output

The lumberjack output sends the events directly to logstash using the lumberjack
protocol. The logstash-input-lumberjack plugin must be installed and configured
in logstash. Logstash will allow for additional processing and routing of
generated events.

Every event send to logstash contains additional meta data for indexing and filtering:

[source,json]
------------------------------------------------------------------------------
{
...
@metadata: {
"index": "<beat>-<date>",
"type": "<event type>"
}
}
------------------------------------------------------------------------------

Logstash elasticsearch output can be configured to use metadata and event type
for indexing.

[source,logstash]
------------------------------------------------------------------------------
...
output {
elasticsearch {
host => "localhost"
port => "9200"
protocol => "http"
index => "%{[@metadata][index]}"
document_type => "%{[@metadata][type]}"
}
...
}
------------------------------------------------------------------------------

Example configuration:
[source,yaml]
------------------------------------------------------------------------------
output:
lumberjack:
enabled: true
hosts:
- logstash1:12345
- logstash2:12345
- logstash3:12345
# configure index prefix name
index: mybeat
# configure lumberjack to loadbalance events between logstash instances
loadbalance: true
tls:
# disable tls for testing (TLS must be disabled in logstash too)
disabled: true
------------------------------------------------------------------------------

===== enabled

Boolean option that enables lumberjack output. The default is true.

===== hosts

List of known logstash servers to connect to. All entries in this list can
contain a port number. If no port number is given, the port options value is
used as default port number.

===== loadbalance

If set to true and multiple logstash hosts are configured, The output plugin will
load balance published events onto all logstash hosts. If set to false,
the output plugin will send all events to only one host (determined by random)
switching to another host if selected one becomes unresponsive.
The default value is false.

===== port

Default port to use if port number not given in hosts. The default port number
is 10200.

===== index

The index root name where to write events to. The default is the beat its name.
For example `packetbeat` generates `[packetbeat-]YYYY.MM.DD` indexes (e.g. `packetbeat-2015.04.26`).

===== tls

TLS configuration like root CA for the lumberjack connections. See
<<configuration-tls>> for TLS options. If missing or tls.disabled is set to
true, a TCP only connection is assumed. Logstash must also be configured to use
TCP for lumberjack input.

===== timeout

Lumberjack connection timeout waiting for responses from logstash server.

===== max_retries

The number of times a particular logstash send attempted is tried. If
the send operation doesn't succeed after this many retries, the events are
dropped. The default is 3.

It is up to the beat to decide to drop the event or try again sending the event
if it was dropped by the output plugin. If send operation doesn't succeed after
max_retries, the beat optionally will be notified about it.


[[redis-output]]
==== Redis Output

Expand All @@ -307,7 +425,7 @@ TODO: I think besides the list option, PUB-SUB is also supported here (there
was a pull request some time ago. But that's not documented yet.
////

Inserts the transaction in a Redis list. This output plugin is compatibile with
Inserts the events in a Redis list. This output plugin is compatibile with
http://logstash.net/docs/1.4.2/inputs/redis[Redis input plugin] from Logstash,
so Redis can be used as queue between the Beat shippers and Logstash.

Expand Down
11 changes: 9 additions & 2 deletions outputs/lumberjack/lumberjack.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type lumberjack struct {
}

const (
lumberjackDefaultPort = 10200

lumberjackDefaultTimeout = 5 * time.Second
defaultSendRetries = 3
)
Expand All @@ -66,6 +68,11 @@ func (lj *lumberjack) init(
timeout = time.Duration(config.Timeout) * time.Second
}

defaultPort := lumberjackDefaultPort
if config.Port != 0 {
defaultPort = config.Port
}

var clients []mode.ProtocolClient
var err error
if useTLS {
Expand All @@ -77,12 +84,12 @@ func (lj *lumberjack) init(

clients, err = makeClients(config, timeout,
func(host string) (TransportClient, error) {
return newTLSClient(host, tlsConfig)
return newTLSClient(host, defaultPort, tlsConfig)
})
} else {
clients, err = makeClients(config, timeout,
func(host string) (TransportClient, error) {
return newTCPClient(host)
return newTCPClient(host, defaultPort)
})
}
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions outputs/lumberjack/lumberjack_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

const (
lumberjackDefaultHost = "localhost"
lumberjackDefaultPort = "12345"
lumberjackDefaultHost = "localhost"
lumberjackTestDefaultPort = "12345"

elasticsearchDefaultHost = "localhost"
elasticsearchDefaultPort = "9200"
Expand Down Expand Up @@ -50,7 +50,7 @@ func getenv(name, defaultValue string) string {
func getLumberjackHost() string {
return fmt.Sprintf("%v:%v",
getenv("LS_HOST", lumberjackDefaultHost),
getenv("LS_LUMBERJACK_TCP_PORT", lumberjackDefaultPort),
getenv("LS_LUMBERJACK_TCP_PORT", lumberjackTestDefaultPort),
)
}

Expand Down
23 changes: 19 additions & 4 deletions outputs/lumberjack/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package lumberjack

import (
"crypto/tls"
"fmt"
"math/rand"
"net"
"strings"
"time"

"github.com/elastic/libbeat/logp"
Expand All @@ -27,8 +29,8 @@ type tlsClient struct {
tls tls.Config
}

func newTCPClient(host string) (*tcpClient, error) {
return &tcpClient{hostport: host}, nil
func newTCPClient(host string, defaultPort int) (*tcpClient, error) {
return &tcpClient{hostport: fullAddress(host, defaultPort)}, nil
}

func (c *tcpClient) Connect(timeout time.Duration) error {
Expand Down Expand Up @@ -85,9 +87,9 @@ func (c *tcpClient) Close() error {
return err
}

func newTLSClient(host string, tls *tls.Config) (*tlsClient, error) {
func newTLSClient(host string, defaultPort int, tls *tls.Config) (*tlsClient, error) {
c := tlsClient{}
c.hostport = host
c.hostport = fullAddress(host, defaultPort)
c.tls = *tls
return &c, nil
}
Expand Down Expand Up @@ -129,3 +131,16 @@ func (c *tlsClient) onFail(err error) error {
c.connected = false
return err
}

func fullAddress(host string, defaultPort int) string {
if _, _, err := net.SplitHostPort(host); err == nil {
return host
}

idx := strings.Index(host, ":")
if idx >= 0 {
// IPv6 address detected
return fmt.Sprintf("[%v]:%v", host, defaultPort)
}
return fmt.Sprintf("%v:%v", host, defaultPort)
}

0 comments on commit ef3c679

Please sign in to comment.