Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disk buffers #109

Merged
merged 4 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
### Added
- Disk buffer for output operators
### Changed
- Split buffers into buffers and flushers for better modularity
- New memory buffer design for a uniform interface between disk and memory buffers

## [0.9.14] - 2020-08-31
### Fixed
- Rendering issue with the `kubernetes_events` plugin
Expand Down
1 change: 1 addition & 0 deletions commands/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func runGraph(_ *cobra.Command, _ []string, flags *RootFlags) {
}

buildContext := pg.BuildContext{
Database: operator.NewStubDatabase(),
PluginRegistry: pluginRegistry,
Logger: logger,
}
Expand Down
8 changes: 5 additions & 3 deletions commands/offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command {
db, err := agent.OpenDatabase(rootFlags.DatabaseFile)
exitOnErr("Failed to open database", err)
defer db.Close()
defer db.Sync()
defer func() { _ = db.Sync() }()

if all {
if len(args) != 0 {
Expand Down Expand Up @@ -94,7 +94,7 @@ func NewOffsetsListCmd(rootFlags *RootFlags) *cobra.Command {
exitOnErr("Failed to open database", err)
defer db.Close()

db.View(func(tx *bbolt.Tx) error {
err = db.View(func(tx *bbolt.Tx) error {
offsetBucket := tx.Bucket(helper.OffsetsBucket)
if offsetBucket == nil {
return nil
Expand All @@ -105,7 +105,9 @@ func NewOffsetsListCmd(rootFlags *RootFlags) *cobra.Command {
return nil
})
})

if err != nil {
exitOnErr("Failed to read database", err)
}
},
}

Expand Down
1 change: 0 additions & 1 deletion commands/offsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,4 @@ func TestOffsets(t *testing.T) {
err = offsetsList.Execute()
require.NoError(t, err)
require.Equal(t, "$.testoperatorid1\n", buf.String())

}
17 changes: 17 additions & 0 deletions docs/operators/elastic_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ The `elastic_output` operator will send entries to an Elasticsearch instance
| `api_key` | | Base64-encoded token for authorization. If set, overrides username and password |
| `index_field` | default | A [field](/docs/types/field.md) that indicates which index to send the log entry to |
| `id_field` | | A [field](/docs/types/field.md) that contains an id for the entry. If unset, a unique id is generated |
| `buffer` | | A [buffer](/docs/types/buffer.md) block indicating how to buffer entries before flushing |
| `flusher` | | A [flusher](/docs/types/flusher.md) block configuring flushing behavior |


### Example Configurations
Expand All @@ -27,3 +29,18 @@ Configuration:
- "http://localhost:9200"
api_key: <my_api_key>
```

#### Configuration with non-default buffer and flusher params

Configuration:
```yaml
- type: elastic_output
addresses:
- "http://localhost:9200"
api_key: <my_api_key>
buffer:
type: disk
path: /tmp/stanza_buffer
flusher:
max_concurrent: 8
```
16 changes: 16 additions & 0 deletions docs/operators/google_cloud_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The `google_cloud_output` operator will send entries to Google Cloud Logging.
| `span_id_field` | | A [field](/docs/types/field.md) for the span_id on the log entry |
| `use_compression` | `true` | Whether to compress the log entry payloads with gzip before sending to Google Cloud |
| `timeout` | 10s | A [duration](/docs/types/duration.md) indicating how long to wait for the API to respond before timing out |
| `buffer` | | A [buffer](/docs/types/buffer.md) block indicating how to buffer entries before flushing |
| `flusher` | | A [flusher](/docs/types/flusher.md) block configuring flushing behavior |

If both `credentials` and `credentials_file` are left empty, the agent will attempt to find
[Application Default Credentials](https://cloud.google.com/docs/authentication/production) from the environment.
Expand All @@ -30,3 +32,17 @@ Configuration:
project_id: sample_project
credentials_file: /tmp/credentials.json
```

#### Configuration with non-default buffer and flusher params

Configuration:
```yaml
- type: google_cloud_output
project_id: sample_project
credentials_file: /tmp/credentials.json
buffer:
type: disk
path: /tmp/stanza_buffer
flusher:
max_concurrent: 8
```
61 changes: 61 additions & 0 deletions docs/types/buffer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Buffers
camdencheek marked this conversation as resolved.
Show resolved Hide resolved

Buffers are used to temporarily store log entries until they can be flushed to their final destination.

There are two types of buffers: `memory` buffers and `disk` buffers.

## Memory Buffers

Memory buffers keep log entries in memory until they are flushed, which makes them very fast. However, because
entries are only stored in memory, they will be lost if the agent is shut down uncleanly. If the agent is shut down
cleanly, they will be saved to the agent's database.

### Memory Buffer Configuration

Memory buffers are configured by setting the `type` field of the `buffer` block on an output to `memory`. The only other
configurable field is `max_entries`, which is maximum number of entries that will be held in memory before blocking and
camdencheek marked this conversation as resolved.
Show resolved Hide resolved
waiting for some entries to be flushed. The default value of `max_entries` is `1048576` (2^20).

Example:
```yaml
- type: google_cloud_output
project_id: my_project_id
buffer:
type: memory
max_entries: 10000
```


## Disk Buffers

Disk buffers store all log entries on disk until they have been successfully flushed to their destination. This means
that, even in the case of an unclean shutdown (kill signal or power loss), no entries will be lost. However, this comes at the cost of
some performance.

By default, a disk buffer can handle roughly 10,000 logs per second. This number is highly subject to the specs of the
machine running the agent, so if exact numbers are important, we'd advise running your own tests.

If you'd like better performance and power loss is not a concern, disabling sync writes improves performance to
(roughly) 100,000 entries per second. This comes at the tradeoff that, if there is a power failure, there may
be logs that are lost or a corruption of the database.

### Disk Buffer Configuration

Disk buffers are configured by setting the `type` field of the `buffer` block on an output to `disk`. Other fields are described below:

| Field | Default | Description |
| --- | --- | --- |
| `max_size` | `4294967296` (4GiB) | The maximum size of the disk buffer file in bytes |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this is documented well here, it might be slightly clearer to call this max_bytes, since many users will only encounter this in a config file and would either have to make assumptions about the units, or dig up the docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought the same thing, but I would love to implement something like this as a future feature. Would it still make sense to do max_bytes if the value is 8GB?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I guess it comes down to how likely we are to implement that feature and how soon it would happen. If we're going to end up living with the current implementation for a while, then I think we should consider using the more explicit term now and deprecating it later. We could pretty easily support both and just require at most one of the two be specified.

| `path` | required | The path to the directory which will contain the disk buffer data |
| `sync` | `true` | Whether to open the database files with the O_SYNC flag. Disabling this improves performance, but relaxes guarantees about log delivery. |

Example:
```yaml
- type: google_cloud_output
project_id: my_project_id
buffer:
type: disk
max_size: 10000000 # 10MB
path: /tmp/stanza_buffer
sync: true
```
23 changes: 23 additions & 0 deletions docs/types/flusher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Flushers

Flushers handle reading entries from buffers in chunks, flushing them to their final destination, and retrying on failure.

In most cases, the default options will work well, but they may be need tuning for optimal performance or for reducing load
on the destination API.

For example, if you hit an API limit on the number of requests per second, consider decreasing `max_concurrent` and
increasing `max_chunk_entries`. This will make fewer, larger requests which should increase efficiency at the cost of
some latency.

Or, if you have low load and don't care about the higher latency, consider increasing `max_wait` so that entries are sent
less often in larger requests.

## Flusher configuration

Flushers are configured with the `flusher` block on output plugins.

| Field | Default | Description |
| --- | --- | --- |
| `max_concurrent` | `16` | The maximum number of goroutines flushing entries concurrently |
| `max_wait` | 1s | The maximum amount of time to wait for a chunk to fill before flushing it. Higher values can reduce load, but also increase delivery latency. |
| `max_chunk_entries` | 1000 | The maximum number of entries to flush in a single chunk. |
19 changes: 13 additions & 6 deletions entry/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"fmt"
)

const (
labelsPrefix = "$labels"
resourcePrefix = "$resource"
recordPrefix = "$record"
)

// Field represents a potential field on an entry.
// It is used to get, set, and delete values at this field.
// It is deserialized from JSON dot notation.
Expand Down Expand Up @@ -49,17 +55,17 @@ func fieldFromString(s string) (Field, error) {
}

switch split[0] {
case "$labels":
case labelsPrefix:
if len(split) != 2 {
return Field{}, fmt.Errorf("labels cannot be nested")
}
return Field{LabelField{split[1]}}, nil
case "$resource":
case resourcePrefix:
if len(split) != 2 {
return Field{}, fmt.Errorf("resource fields cannot be nested")
}
return Field{ResourceField{split[1]}}, nil
case "$record", "$":
case recordPrefix, "$":
return Field{RecordField{split[1:]}}, nil
default:
return Field{RecordField{split}}, nil
Expand Down Expand Up @@ -127,12 +133,13 @@ func splitField(s string) ([]string, error) {
}
state = OUT_BRACKET
case OUT_BRACKET:
if c == '.' {
switch c {
case '.':
state = IN_UNBRACKETED_TOKEN
tokenStart = i + 1
} else if c == '[' {
case '[':
state = IN_BRACKET
} else {
default:
return nil, fmt.Errorf("bracketed access must be followed by a dot or another bracketed access")
}
case IN_UNBRACKETED_TOKEN:
Expand Down
4 changes: 0 additions & 4 deletions entry/label_field_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func TestLabelFieldGet(t *testing.T) {
require.Equal(t, tc.expected, val)
})
}

}

func TestLabelFieldDelete(t *testing.T) {
Expand Down Expand Up @@ -103,7 +102,6 @@ func TestLabelFieldDelete(t *testing.T) {
require.Equal(t, tc.expected, val)
})
}

}

func TestLabelFieldSet(t *testing.T) {
Expand Down Expand Up @@ -172,7 +170,6 @@ func TestLabelFieldSet(t *testing.T) {
require.Equal(t, tc.expected, entry.Labels)
})
}

}

func TestLabelFieldString(t *testing.T) {
Expand All @@ -198,5 +195,4 @@ func TestLabelFieldString(t *testing.T) {
require.Equal(t, tc.expected, tc.field.String())
})
}

}
7 changes: 3 additions & 4 deletions entry/record_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (f RecordField) MarshalYAML() (interface{}, error) {
func fromJSONDot(value string) RecordField {
keys := strings.Split(value, ".")

if keys[0] == "$" || keys[0] == "$record" {
if keys[0] == "$" || keys[0] == recordPrefix {
keys = keys[1:]
}

Expand All @@ -207,7 +207,7 @@ func fromJSONDot(value string) RecordField {
// toJSONDot returns the JSON dot notation for a field.
func toJSONDot(field RecordField) string {
if field.isRoot() {
return "$record"
return recordPrefix
}

containsDots := false
Expand All @@ -219,7 +219,7 @@ func toJSONDot(field RecordField) string {

var b strings.Builder
if containsDots {
b.WriteString("$record")
b.WriteString(recordPrefix)
for _, key := range field.Keys {
b.WriteString(`['`)
b.WriteString(key)
Expand All @@ -232,7 +232,6 @@ func toJSONDot(field RecordField) string {
}
b.WriteString(key)
}

}

return b.String()
Expand Down
2 changes: 1 addition & 1 deletion entry/record_field_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func TestRecordFieldSet(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
entry := New()
entry.Record = tc.record
entry.Set(tc.field, tc.setTo)
require.NoError(t, entry.Set(tc.field, tc.setTo))
assert.Equal(t, tc.expectedVal, entry.Record)
})
}
Expand Down
4 changes: 0 additions & 4 deletions entry/resource_field_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func TestResourceFieldGet(t *testing.T) {
require.Equal(t, tc.expected, val)
})
}

}

func TestResourceFieldDelete(t *testing.T) {
Expand Down Expand Up @@ -103,7 +102,6 @@ func TestResourceFieldDelete(t *testing.T) {
require.Equal(t, tc.expected, val)
})
}

}

func TestResourceFieldSet(t *testing.T) {
Expand Down Expand Up @@ -172,7 +170,6 @@ func TestResourceFieldSet(t *testing.T) {
require.Equal(t, tc.expected, entry.Resource)
})
}

}

func TestResourceFieldString(t *testing.T) {
Expand All @@ -198,5 +195,4 @@ func TestResourceFieldString(t *testing.T) {
require.Equal(t, tc.expected, tc.field.String())
})
}

}
16 changes: 10 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,40 @@ module github.com/observiq/stanza
go 1.14

require (
cloud.google.com/go v0.46.3 // indirect
cloud.google.com/go/logging v1.0.0
github.com/antonmedv/expr v1.8.2
github.com/cenkalti/backoff/v4 v4.0.2
github.com/elastic/go-elasticsearch/v7 v7.7.0
github.com/golang/protobuf v1.4.2
github.com/golangci/golangci-lint v1.30.0 // indirect
github.com/googleapis/gax-go v1.0.3
github.com/google/go-cmp v0.5.0 // indirect
github.com/hashicorp/go-uuid v1.0.2
github.com/influxdata/go-syslog/v3 v3.0.0 // indirect
github.com/json-iterator/go v1.1.9
github.com/kardianos/service v1.0.0
github.com/kr/pretty v0.2.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/observiq/ctimefmt v1.0.0
github.com/observiq/go-syslog/v3 v3.0.2
github.com/onsi/ginkgo v1.13.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/spf13/cobra v1.0.0
github.com/stretchr/testify v1.6.1
github.com/vektra/mockery v1.1.2 // indirect
go.etcd.io/bbolt v1.3.4
go.uber.org/zap v1.15.0
golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200625001655-4c5254603344
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae
golang.org/x/text v0.3.3
golang.org/x/tools v0.0.0-20200724022722-7017fd6b1305 // indirect
gonum.org/v1/gonum v0.6.2
google.golang.org/api v0.20.0
google.golang.org/genproto v0.0.0-20200304201815-d429ff31ee6c
google.golang.org/grpc v1.27.1
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v2 v2.3.0
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
k8s.io/api v0.18.6
k8s.io/apimachinery v0.18.6
k8s.io/client-go v0.18.6
Expand Down
Loading