From aa87954a422c95bace9af533ab43e92dd8c1c9cd Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Tue, 11 Aug 2020 11:59:34 -0400 Subject: [PATCH 1/3] Add compression support --- CHANGELOG.md | 1 + docs/operators/google_cloud_output.md | 1 + operator/buffer/buffer.go | 2 +- operator/builtin/output/google_cloud.go | 21 +++++++++++++++------ 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fbed79679..b197bf349 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Added a default function to plugin templates - Add a host metadata operator that adds hostname to entries +- Google Cloud Output option to enable gzip compression ## [0.9.7] - 2020-08-05 ### Changed diff --git a/docs/operators/google_cloud_output.md b/docs/operators/google_cloud_output.md index 90f08835b..05bfca367 100644 --- a/docs/operators/google_cloud_output.md +++ b/docs/operators/google_cloud_output.md @@ -14,6 +14,7 @@ The `google_cloud_output` operator will send entries to Google Cloud Logging. | `severity_field` | | A [field](/docs/types/field.md) for the severity on the log entry | | `trace_field` | | A [field](/docs/types/field.md) for the trace on the log entry | | `span_id_field` | | A [field](/docs/types/field.md) for the span_id on the log entry | +| `use_compression` | `false` | Whether to compress the log entry payloads with gzip before sending to Google Cloud | | `timeout` | 10s | A [duration](/docs/types/duration.md) indicating how long to wait for the API to respond before timing out | If both `credentials` and `credentials_file` are left empty, the agent will attempt to find diff --git a/operator/buffer/buffer.go b/operator/buffer/buffer.go index cb7c6913d..b4d2fa8fc 100644 --- a/operator/buffer/buffer.go +++ b/operator/buffer/buffer.go @@ -27,7 +27,7 @@ func NewConfig() Config { BundleByteThreshold: 4 * 1024 * 1024 * 1024, // 4MB BundleByteLimit: 4 * 1024 * 1024 * 1024, // 4MB BufferedByteLimit: 500 * 1024 * 1024 * 1024, // 500MB - HandlerLimit: 32, + HandlerLimit: 16, Retry: NewRetryConfig(), } } diff --git a/operator/builtin/output/google_cloud.go b/operator/builtin/output/google_cloud.go index 446b4603e..5aaa762c5 100644 --- a/operator/builtin/output/google_cloud.go +++ b/operator/builtin/output/google_cloud.go @@ -22,6 +22,8 @@ import ( mrpb "google.golang.org/genproto/googleapis/api/monitoredres" sev "google.golang.org/genproto/googleapis/logging/type" logpb "google.golang.org/genproto/googleapis/logging/v2" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" ) func init() { @@ -30,9 +32,10 @@ func init() { func NewGoogleCloudOutputConfig(operatorID string) *GoogleCloudOutputConfig { return &GoogleCloudOutputConfig{ - OutputConfig: helper.NewOutputConfig(operatorID, "google_cloud_output"), - BufferConfig: buffer.NewConfig(), - Timeout: operator.Duration{Duration: 10 * time.Second}, + OutputConfig: helper.NewOutputConfig(operatorID, "google_cloud_output"), + BufferConfig: buffer.NewConfig(), + Timeout: operator.Duration{Duration: 30 * time.Second}, + UseCompression: false, } } @@ -48,6 +51,7 @@ type GoogleCloudOutputConfig struct { TraceField *entry.Field `json:"trace_field,omitempty" yaml:"trace_field,omitempty"` SpanIDField *entry.Field `json:"span_id_field,omitempty" yaml:"span_id_field,omitempty"` Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + UseCompression bool `json:"use_compression,omitempty" yaml:"use_compression,omitempty"` } // Build will build a google cloud output operator. @@ -72,6 +76,7 @@ func (c GoogleCloudOutputConfig) Build(buildContext operator.BuildContext) (oper traceField: c.TraceField, spanIDField: c.SpanIDField, timeout: c.Timeout.Raw(), + useCompression: c.UseCompression, } newBuffer.SetHandler(googleCloudOutput) @@ -88,9 +93,10 @@ type GoogleCloudOutput struct { credentialsFile string projectID string - logNameField *entry.Field - traceField *entry.Field - spanIDField *entry.Field + logNameField *entry.Field + traceField *entry.Field + spanIDField *entry.Field + useCompression bool client *vkit.Client timeout time.Duration @@ -136,6 +142,9 @@ func (p *GoogleCloudOutput) Start() error { options := make([]option.ClientOption, 0, 2) options = append(options, option.WithCredentials(credentials)) options = append(options, option.WithUserAgent("CarbonLogAgent/"+version.GetVersion())) + if p.useCompression { + options = append(options, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))) + } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() From 29ffc008b640f84290d5518d104968951f13a288 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Tue, 11 Aug 2020 12:08:27 -0400 Subject: [PATCH 2/3] Add benchmark for compression --- operator/builtin/output/google_cloud_test.go | 23 ++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/operator/builtin/output/google_cloud_test.go b/operator/builtin/output/google_cloud_test.go index 3fe48b8a2..52ca8daa9 100644 --- a/operator/builtin/output/google_cloud_test.go +++ b/operator/builtin/output/google_cloud_test.go @@ -292,8 +292,9 @@ func startServer() (*grpc.ClientConn, chan *logpb.WriteLogEntriesRequest, func() } type googleCloudOutputBenchmark struct { - name string - entry *entry.Entry + name string + entry *entry.Entry + configMod func(*GoogleCloudOutputConfig) } func (g *googleCloudOutputBenchmark) Run(b *testing.B) { @@ -306,6 +307,9 @@ func (g *googleCloudOutputBenchmark) Run(b *testing.B) { cfg := NewGoogleCloudOutputConfig(g.name) cfg.ProjectID = "test_project_id" + if g.configMod != nil { + g.configMod(cfg) + } op, err := cfg.Build(testutil.NewBuildContext(b)) require.NoError(b, err) op.(*GoogleCloudOutput).client = client @@ -345,6 +349,7 @@ func BenchmarkGoogleCloudOutput(b *testing.B) { Timestamp: t, Record: "test", }, + nil, }, { "MapRecord", @@ -352,6 +357,7 @@ func BenchmarkGoogleCloudOutput(b *testing.B) { Timestamp: t, Record: mapOfSize(1, 0), }, + nil, }, { "LargeMapRecord", @@ -359,6 +365,7 @@ func BenchmarkGoogleCloudOutput(b *testing.B) { Timestamp: t, Record: mapOfSize(30, 0), }, + nil, }, { "DeepMapRecord", @@ -366,6 +373,7 @@ func BenchmarkGoogleCloudOutput(b *testing.B) { Timestamp: t, Record: mapOfSize(1, 10), }, + nil, }, { "Labels", @@ -376,6 +384,17 @@ func BenchmarkGoogleCloudOutput(b *testing.B) { "test": "val", }, }, + nil, + }, + { + "Compression", + &entry.Entry{ + Timestamp: t, + Record: "compressiblecompressiblecompressiblecompressiblecompressiblecompressiblecompressible", + }, + func(cfg *GoogleCloudOutputConfig) { + cfg.UseCompression = true + }, }, } From 65387fa304d6d6f8c22534298890d82ba651fd75 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Wed, 12 Aug 2020 16:26:14 -0400 Subject: [PATCH 3/3] Add IP to changelog entry --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b197bf349..5f4fbd89f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Google Cloud Output failure when sent a field of type uint16 ### Added - Added a default function to plugin templates -- Add a host metadata operator that adds hostname to entries +- Add a host metadata operator that adds hostname and IP to entries - Google Cloud Output option to enable gzip compression ## [0.9.7] - 2020-08-05