Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch output support for gzip compressed content-encoding #1835

Merged
merged 3 commits into from
Jun 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ output.elasticsearch:
# IPv6 addresses should always be defined as: https://[2001:db8::1]:9200
hosts: ["localhost:9200"]

# Set gzip compression level.
#compression_level: 0

# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "admin"
Expand Down
3 changes: 3 additions & 0 deletions libbeat/_beat/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ output.elasticsearch:
# IPv6 addresses should always be defined as: https://[2001:db8::1]:9200
hosts: ["localhost:9200"]

# Set gzip compression level.
#compression_level: 0

# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "admin"
Expand Down
9 changes: 8 additions & 1 deletion libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ output.elasticsearch:
In the previous example, the Elasticsearch nodes are available at `https://10.45.3.2:9220/elasticsearch` and
`https://10.45.3.1:9230/elasticsearch`.

===== compression_level

The gzip compression level. Setting this value to 0 disables compression.
The compression level must be in the range of 1 (best speed) to 9 (best compression).

The default value is 0.

===== worker

The number of workers per configured host publishing events to Elasticsearch. This
Expand Down Expand Up @@ -363,7 +370,7 @@ is used as the default port number.

===== compression_level

The gzip compression level. Setting this value to values less than or equal to 0 disables compression.
The gzip compression level. Setting this value to 0 disables compression.
The compression level must be in the range of 1 (best speed) to 9 (best compression).

The default value is 3.
Expand Down
6 changes: 5 additions & 1 deletion libbeat/outputs/elasticsearch/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,9 @@ func newTestClient(url string) *Client {
}

func newTestClientAuth(url, user, pass string) *Client {
return NewClient(url, "", nil, nil, user, pass, nil, 60*time.Second, nil)
client, err := NewClient(url, "", nil, nil, user, pass, nil, 60*time.Second, 3, nil)
if err != nil {
panic(err)
}
return client
}
66 changes: 15 additions & 51 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package elasticsearch

import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
Expand All @@ -25,18 +24,10 @@ type bulkRequest struct {
requ *http.Request
}

type bulkBody interface {
Reader() io.Reader
}

type bulkResult struct {
raw []byte
}

type jsonBulkBody struct {
buf *bytes.Buffer
}

// Bulk performs many index/delete operations in a single API call.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
func (conn *Connection) Bulk(
Expand All @@ -60,12 +51,13 @@ func (conn *Connection) BulkWith(
return nil, nil
}

bulkBody := newJSONBulkBody(nil)
if err := bulkEncode(bulkBody, metaBuilder, body); err != nil {
enc := conn.encoder
enc.Reset()
if err := bulkEncode(enc, metaBuilder, body); err != nil {
return nil, err
}

requ, err := newBulkRequest(conn.URL, index, docType, params, bulkBody)
requ, err := newBulkRequest(conn.URL, index, docType, params, enc)
if err != nil {
return nil, err
}
Expand All @@ -81,7 +73,7 @@ func newBulkRequest(
urlStr string,
index, docType string,
params map[string]string,
body bulkBody,
body bodyEncoder,
) (*bulkRequest, error) {
path, err := makePath(index, docType, "_bulk")
if err != nil {
Expand All @@ -100,12 +92,16 @@ func newBulkRequest(
return nil, err
}

if body != nil {
body.AddHeader(&requ.Header)
}

return &bulkRequest{
requ: requ,
}, nil
}

func (r *bulkRequest) Reset(body bulkBody) {
func (r *bulkRequest) Reset(body bodyEncoder) {
bdy := body.Reader()

rc, ok := bdy.(io.ReadCloser)
Expand All @@ -124,6 +120,8 @@ func (r *bulkRequest) Reset(body bulkBody) {

r.requ.Header = http.Header{}
r.requ.Body = rc

body.AddHeader(&r.requ.Header)
}

func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, bulkResult, error) {
Expand All @@ -140,53 +138,19 @@ func readBulkResult(obj []byte) (bulkResult, error) {
return bulkResult{obj}, nil
}

func newJSONBulkBody(buf *bytes.Buffer) *jsonBulkBody {
if buf == nil {
buf = bytes.NewBuffer(nil)
}
return &jsonBulkBody{buf}
}

func (b *jsonBulkBody) Reset() {
b.buf.Reset()
}

func (b *jsonBulkBody) Reader() io.Reader {
return b.buf
}

func (b *jsonBulkBody) AddRaw(raw interface{}) error {
enc := json.NewEncoder(b.buf)
return enc.Encode(raw)
}

func (b *jsonBulkBody) Add(meta, obj interface{}) error {
enc := json.NewEncoder(b.buf)
pos := b.buf.Len()

if err := enc.Encode(meta); err != nil {
b.buf.Truncate(pos)
return err
}
if err := enc.Encode(obj); err != nil {
b.buf.Truncate(pos)
return err
}
return nil
}
func bulkEncode(out *jsonBulkBody, metaBuilder MetaBuilder, body []interface{}) error {
func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error {
if metaBuilder == nil {
for _, obj := range body {
if err := out.AddRaw(obj); err != nil {
debug("Failed to encode message: %s", err)
debugf("Failed to encode message: %s", err)
return err
}
}
} else {
for _, obj := range body {
meta := metaBuilder(obj)
if err := out.Add(meta, obj); err != nil {
debug("Failed to encode message: %s", err)
debugf("Failed to encode event (dropping event): %s", err)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions libbeat/outputs/elasticsearch/bulkapi_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,15 @@ func TestBulkMoreOperations(t *testing.T) {
{
"field1": "value1",
},

{
"delete": map[string]interface{}{
"_index": index,
"_type": "type1",
"_id": "2",
},
},

{
"create": map[string]interface{}{
"_index": index,
Expand All @@ -119,6 +121,7 @@ func TestBulkMoreOperations(t *testing.T) {
{
"field1": "value3",
},

{
"update": map[string]interface{}{
"_id": "1",
Expand Down
Loading