Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compression support #86

Merged
merged 3 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Google Cloud Output failure when sent a field of type uint16
### Added
- Added a default function to plugin templates
- Add a host metadata operator that adds hostname to entries
- Add a host metadata operator that adds hostname and IP to entries
- Google Cloud Output option to enable gzip compression

## [0.9.7] - 2020-08-05
### Changed
Expand Down
1 change: 1 addition & 0 deletions docs/operators/google_cloud_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The `google_cloud_output` operator will send entries to Google Cloud Logging.
| `severity_field` | | A [field](/docs/types/field.md) for the severity on the log entry |
| `trace_field` | | A [field](/docs/types/field.md) for the trace on the log entry |
| `span_id_field` | | A [field](/docs/types/field.md) for the span_id on the log entry |
| `use_compression` | `false` | Whether to compress the log entry payloads with gzip before sending to Google Cloud |
| `timeout` | 10s | A [duration](/docs/types/duration.md) indicating how long to wait for the API to respond before timing out |

If both `credentials` and `credentials_file` are left empty, the agent will attempt to find
Expand Down
2 changes: 1 addition & 1 deletion operator/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewConfig() Config {
BundleByteThreshold: 4 * 1024 * 1024 * 1024, // 4MB
BundleByteLimit: 4 * 1024 * 1024 * 1024, // 4MB
BufferedByteLimit: 500 * 1024 * 1024 * 1024, // 500MB
HandlerLimit: 32,
HandlerLimit: 16,
Retry: NewRetryConfig(),
}
}
Expand Down
21 changes: 15 additions & 6 deletions operator/builtin/output/google_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
mrpb "google.golang.org/genproto/googleapis/api/monitoredres"
sev "google.golang.org/genproto/googleapis/logging/type"
logpb "google.golang.org/genproto/googleapis/logging/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
)

func init() {
Expand All @@ -30,9 +32,10 @@ func init() {

func NewGoogleCloudOutputConfig(operatorID string) *GoogleCloudOutputConfig {
return &GoogleCloudOutputConfig{
OutputConfig: helper.NewOutputConfig(operatorID, "google_cloud_output"),
BufferConfig: buffer.NewConfig(),
Timeout: operator.Duration{Duration: 10 * time.Second},
OutputConfig: helper.NewOutputConfig(operatorID, "google_cloud_output"),
BufferConfig: buffer.NewConfig(),
Timeout: operator.Duration{Duration: 30 * time.Second},
UseCompression: false,
}
}

Expand All @@ -48,6 +51,7 @@ type GoogleCloudOutputConfig struct {
TraceField *entry.Field `json:"trace_field,omitempty" yaml:"trace_field,omitempty"`
SpanIDField *entry.Field `json:"span_id_field,omitempty" yaml:"span_id_field,omitempty"`
Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
UseCompression bool `json:"use_compression,omitempty" yaml:"use_compression,omitempty"`
}

// Build will build a google cloud output operator.
Expand All @@ -72,6 +76,7 @@ func (c GoogleCloudOutputConfig) Build(buildContext operator.BuildContext) (oper
traceField: c.TraceField,
spanIDField: c.SpanIDField,
timeout: c.Timeout.Raw(),
useCompression: c.UseCompression,
}

newBuffer.SetHandler(googleCloudOutput)
Expand All @@ -88,9 +93,10 @@ type GoogleCloudOutput struct {
credentialsFile string
projectID string

logNameField *entry.Field
traceField *entry.Field
spanIDField *entry.Field
logNameField *entry.Field
traceField *entry.Field
spanIDField *entry.Field
useCompression bool

client *vkit.Client
timeout time.Duration
Expand Down Expand Up @@ -136,6 +142,9 @@ func (p *GoogleCloudOutput) Start() error {
options := make([]option.ClientOption, 0, 2)
options = append(options, option.WithCredentials(credentials))
options = append(options, option.WithUserAgent("CarbonLogAgent/"+version.GetVersion()))
if p.useCompression {
options = append(options, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))))
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

Expand Down
23 changes: 21 additions & 2 deletions operator/builtin/output/google_cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,9 @@ func startServer() (*grpc.ClientConn, chan *logpb.WriteLogEntriesRequest, func()
}

type googleCloudOutputBenchmark struct {
name string
entry *entry.Entry
name string
entry *entry.Entry
configMod func(*GoogleCloudOutputConfig)
}

func (g *googleCloudOutputBenchmark) Run(b *testing.B) {
Expand All @@ -306,6 +307,9 @@ func (g *googleCloudOutputBenchmark) Run(b *testing.B) {

cfg := NewGoogleCloudOutputConfig(g.name)
cfg.ProjectID = "test_project_id"
if g.configMod != nil {
g.configMod(cfg)
}
op, err := cfg.Build(testutil.NewBuildContext(b))
require.NoError(b, err)
op.(*GoogleCloudOutput).client = client
Expand Down Expand Up @@ -345,27 +349,31 @@ func BenchmarkGoogleCloudOutput(b *testing.B) {
Timestamp: t,
Record: "test",
},
nil,
},
{
"MapRecord",
&entry.Entry{
Timestamp: t,
Record: mapOfSize(1, 0),
},
nil,
},
{
"LargeMapRecord",
&entry.Entry{
Timestamp: t,
Record: mapOfSize(30, 0),
},
nil,
},
{
"DeepMapRecord",
&entry.Entry{
Timestamp: t,
Record: mapOfSize(1, 10),
},
nil,
},
{
"Labels",
Expand All @@ -376,6 +384,17 @@ func BenchmarkGoogleCloudOutput(b *testing.B) {
"test": "val",
},
},
nil,
},
{
"Compression",
&entry.Entry{
Timestamp: t,
Record: "compressiblecompressiblecompressiblecompressiblecompressiblecompressiblecompressible",
},
func(cfg *GoogleCloudOutputConfig) {
cfg.UseCompression = true
},
},
}

Expand Down